Tengo un marco de datos Apache PySpark similar al siguiente (la fecha es yyyy-mm-dd)

| ID |  is_test  |    date    |
|----|-----------|------------|
| 10 |     0     | 2019-01-01 |
| 10 |     0     | 2019-01-05 |
| 10 |     1     | 2019-01-08 | <= Most recent
| 11 |     0     | 2019-03-02 |
| 11 |     0     | 2019-03-04 |
| 11 |     0     | 2019-03-04 |
| 11 |     1     | 2019-03-08 | <= Most recent
| 12 |     0     | 2019-06-08 |
| 12 |     0     | 2019-06-12 | <= Most recent

El objetivo es crear un nuevo valor y asignar un mismo valor para cada ID dependiendo de la última fila (más reciente) de cada ID

Entonces algo como

| ID |  is_test  |    date    |    is_ok   |
|----|-----------|------------|------------|
| 10 |     0     | 2019-01-01 |      1     |
| 10 |     0     | 2019-01-05 |      1     |
| 10 |     1     | 2019-01-08 |      1     | 
| 11 |     0     | 2019-03-02 |      1     |
| 11 |     0     | 2019-03-04 |      1     |
| 11 |     0     | 2019-03-04 |      1     |
| 11 |     1     | 2019-03-08 |      1     | 
| 12 |     0     | 2019-06-08 |      0     |
| 12 |     0     | 2019-06-12 |      0     |

Básicamente, para una ID dada, cuando la última es 1, todos antes y la misma ID también son 1 y si la última es 0, todos antes (la misma ID también) también son cero.

Intenté algo con:

partition = Window.partitionBy("ID").orderBy(F.col("date")) y estoy bastante seguro de que este es un buen comienzo, pero también puede ser el final.

Gracias

0
LaSul 11 oct. 2019 a las 17:39

3 respuestas

La mejor respuesta

puedes usar la última función para lograr esto. aqui esta el codigo

>>> df = spark.createDataFrame([[10,0,"2019-01-01"],[10,0,"2019-01-05"],[10,1,"2019-01-08"],[11,0,"2019-03-02"],[11,0,"2019-03-04"],[11,0,"2019-03-04"],[11,1,"2019-03-08"],[12,0,"2019-06-08"],[12,0,"2019-06-12"]],["ID","is_test","_date"])
>>> df.show()
+---+-------+----------+
| ID|is_test|     _date|
+---+-------+----------+
| 10|      0|2019-01-01|
| 10|      0|2019-01-05|
| 10|      1|2019-01-08|
| 11|      0|2019-03-02|
| 11|      0|2019-03-04|
| 11|      0|2019-03-04|
| 11|      1|2019-03-08|
| 12|      0|2019-06-08|
| 12|      0|2019-06-12|
+---+-------+----------+

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as func
>>> win = Window.partitionBy(df['ID']) .orderBy(df['date'].desc()) 
>>> df.withColumn("test",func.last('is_test').over(win)).show()
+---+-------+----------+----+
| ID|is_test|     _date|test|
+---+-------+----------+----+
| 10|      0|2019-01-01|   1|
| 10|      0|2019-01-05|   1|
| 10|      1|2019-01-08|   1|
| 12|      0|2019-06-08|   0|
| 12|      0|2019-06-12|   0|
| 11|      0|2019-03-02|   1|
| 11|      0|2019-03-04|   1|
| 11|      0|2019-03-04|   1|
| 11|      1|2019-03-08|   1|
+---+-------+----------+----+

Creo que eso es lo que está buscando. Avíseme si tiene alguna pregunta relacionada con el mismo.

1
LaSul 14 oct. 2019 a las 05:24

Ya casi estás allí, solo necesitas definir rowsBetween con Window.unboundedFollowing como abajo y estarás listo.

win_spec = Window.partitionBy("ID").orderBy("date").rowsBetween(0, Window.unboundedFollowing)
import pyspark.sql.functions as f
from pyspark.sql.window import Window

df.withColumn("is_ok", f.last('is_test').over(win_spec)).orderBy('ID').show()
+---+-------+----------+-----+
| ID|is_test|      date|is_ok|
+---+-------+----------+-----+
| 10|      1|2019-01-08|    1|
| 10|      0|2019-01-01|    1|
| 10|      0|2019-01-05|    1|
| 11|      0|2019-03-02|    1|
| 11|      0|2019-03-04|    1|
| 11|      0|2019-03-04|    1|
| 11|      1|2019-03-08|    1|
| 12|      0|2019-06-08|    0|
| 12|      0|2019-06-12|    0|
+---+-------+----------+-----+

1
SMaZ 11 oct. 2019 a las 15:37

Solución usando row_number y dos uniones

  val df1 = df.select('*, row_number().over(Window.partitionBy('id).orderBy('date)).as("rn"))
  val df2 = df1.groupBy('id).agg(max('rn).as("rn"))
  val df3 = df1.join(df2,Seq("id","rn"),"inner").select('id,'is_test.as("is_ok"))
  val df4 = df.join(df3,Seq("id"),"left")
  df4.show()

Salida:

+---+-------+----------+-----+
| id|is_test|      date|is_ok|
+---+-------+----------+-----+
| 10|      0|2019-01-01|    1|
| 10|      0|2019-01-05|    1|
| 10|      1|2019-01-08|    1|
| 11|      0|2019-03-02|    1|
| 11|      0|2019-03-04|    1|
| 11|      0|2019-03-04|    1|
| 11|      1|2019-03-08|    1|
| 12|      0|2019-06-08|    0|
| 12|      0|2019-06-12|    0|
+---+-------+----------+-----+
1
chlebek 11 oct. 2019 a las 15:29
58343576