Tengo un marco de datos Apache PySpark similar al siguiente (la fecha es yyyy-mm-dd
)
| ID | is_test | date |
|----|-----------|------------|
| 10 | 0 | 2019-01-01 |
| 10 | 0 | 2019-01-05 |
| 10 | 1 | 2019-01-08 | <= Most recent
| 11 | 0 | 2019-03-02 |
| 11 | 0 | 2019-03-04 |
| 11 | 0 | 2019-03-04 |
| 11 | 1 | 2019-03-08 | <= Most recent
| 12 | 0 | 2019-06-08 |
| 12 | 0 | 2019-06-12 | <= Most recent
El objetivo es crear un nuevo valor y asignar un mismo valor para cada ID dependiendo de la última fila (más reciente) de cada ID
Entonces algo como
| ID | is_test | date | is_ok |
|----|-----------|------------|------------|
| 10 | 0 | 2019-01-01 | 1 |
| 10 | 0 | 2019-01-05 | 1 |
| 10 | 1 | 2019-01-08 | 1 |
| 11 | 0 | 2019-03-02 | 1 |
| 11 | 0 | 2019-03-04 | 1 |
| 11 | 0 | 2019-03-04 | 1 |
| 11 | 1 | 2019-03-08 | 1 |
| 12 | 0 | 2019-06-08 | 0 |
| 12 | 0 | 2019-06-12 | 0 |
Básicamente, para una ID dada, cuando la última es 1, todos antes y la misma ID también son 1 y si la última es 0, todos antes (la misma ID también) también son cero.
Intenté algo con:
partition = Window.partitionBy("ID").orderBy(F.col("date"))
y estoy bastante seguro de que este es un buen comienzo, pero también puede ser el final.
Gracias
3 respuestas
puedes usar la última función para lograr esto. aqui esta el codigo
>>> df = spark.createDataFrame([[10,0,"2019-01-01"],[10,0,"2019-01-05"],[10,1,"2019-01-08"],[11,0,"2019-03-02"],[11,0,"2019-03-04"],[11,0,"2019-03-04"],[11,1,"2019-03-08"],[12,0,"2019-06-08"],[12,0,"2019-06-12"]],["ID","is_test","_date"])
>>> df.show()
+---+-------+----------+
| ID|is_test| _date|
+---+-------+----------+
| 10| 0|2019-01-01|
| 10| 0|2019-01-05|
| 10| 1|2019-01-08|
| 11| 0|2019-03-02|
| 11| 0|2019-03-04|
| 11| 0|2019-03-04|
| 11| 1|2019-03-08|
| 12| 0|2019-06-08|
| 12| 0|2019-06-12|
+---+-------+----------+
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as func
>>> win = Window.partitionBy(df['ID']) .orderBy(df['date'].desc())
>>> df.withColumn("test",func.last('is_test').over(win)).show()
+---+-------+----------+----+
| ID|is_test| _date|test|
+---+-------+----------+----+
| 10| 0|2019-01-01| 1|
| 10| 0|2019-01-05| 1|
| 10| 1|2019-01-08| 1|
| 12| 0|2019-06-08| 0|
| 12| 0|2019-06-12| 0|
| 11| 0|2019-03-02| 1|
| 11| 0|2019-03-04| 1|
| 11| 0|2019-03-04| 1|
| 11| 1|2019-03-08| 1|
+---+-------+----------+----+
Creo que eso es lo que está buscando. Avíseme si tiene alguna pregunta relacionada con el mismo.
Ya casi estás allí, solo necesitas definir rowsBetween
con Window.unboundedFollowing
como abajo y estarás listo.
win_spec = Window.partitionBy("ID").orderBy("date").rowsBetween(0, Window.unboundedFollowing)
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df.withColumn("is_ok", f.last('is_test').over(win_spec)).orderBy('ID').show()
+---+-------+----------+-----+
| ID|is_test| date|is_ok|
+---+-------+----------+-----+
| 10| 1|2019-01-08| 1|
| 10| 0|2019-01-01| 1|
| 10| 0|2019-01-05| 1|
| 11| 0|2019-03-02| 1|
| 11| 0|2019-03-04| 1|
| 11| 0|2019-03-04| 1|
| 11| 1|2019-03-08| 1|
| 12| 0|2019-06-08| 0|
| 12| 0|2019-06-12| 0|
+---+-------+----------+-----+
Solución usando row_number
y dos uniones
val df1 = df.select('*, row_number().over(Window.partitionBy('id).orderBy('date)).as("rn"))
val df2 = df1.groupBy('id).agg(max('rn).as("rn"))
val df3 = df1.join(df2,Seq("id","rn"),"inner").select('id,'is_test.as("is_ok"))
val df4 = df.join(df3,Seq("id"),"left")
df4.show()
Salida:
+---+-------+----------+-----+
| id|is_test| date|is_ok|
+---+-------+----------+-----+
| 10| 0|2019-01-01| 1|
| 10| 0|2019-01-05| 1|
| 10| 1|2019-01-08| 1|
| 11| 0|2019-03-02| 1|
| 11| 0|2019-03-04| 1|
| 11| 0|2019-03-04| 1|
| 11| 1|2019-03-08| 1|
| 12| 0|2019-06-08| 0|
| 12| 0|2019-06-12| 0|
+---+-------+----------+-----+
Preguntas relacionadas
Nuevas preguntas
python
Python es un lenguaje de programación multipropósito, de tipificación dinámica y de múltiples paradigmas. Está diseñado para ser rápido de aprender, comprender y usar, y hacer cumplir una sintaxis limpia y uniforme. Tenga en cuenta que Python 2 está oficialmente fuera de soporte a partir del 01-01-2020. Aún así, para preguntas de Python específicas de la versión, agregue la etiqueta [python-2.7] o [python-3.x]. Cuando utilice una variante de Python (por ejemplo, Jython, PyPy) o una biblioteca (por ejemplo, Pandas y NumPy), inclúyala en las etiquetas.