Quiero filtrar las entradas en un DataFrame de eventos de mensajes en función de cuándo fueron editados. Tengo un DataFrame que tiene los eventos del mensaje y otro DataFrame que representa cuándo / si fueron editados. La eliminación debería eliminar filas en la tabla de mensajes si tienen un índice coincidente en la tabla editada Y si la marca de tiempo en la tabla de mensajes está debajo del evento de edición correspondiente.

El DataFrame editado es:

+----------+-------------------+
| timestamp|index              |
+----------+-------------------+
|1556247980|                 78|
|1558144430|                 87|
|1549964820|                 99|
+----------+-------------------+

El mensaje DataFrame es:

+-------------------+--------------------+------------------+--------------------+
|index              |  commonResponseText|publishedTimestamp|  commonResponseText|
+-------------------+--------------------+------------------+--------------------+
|                 78|Voluptatem enim a...|        1556247974|Voluptatem enim a...|
|                 87|Ut enim enim sunt...|        1558144420|Ut enim enim sunt...|
|                 99|Et est perferendi...|        1549964815|Et est perferendi...|
|                 78|Voluptatem porro ...|        1556248000|Voluptatem porro ...|
|                 87|Atque quod est au...|        1549965000|Atque quod est au...|
+-------------------+--------------------+------------------+--------------------+

Quiero que el resultado sea:

+-------------------+--------------------+------------------+--------------------+
|commonResponseIndex|  index             |publishedTimestamp|  commonResponseText|
+-------------------+--------------------+------------------+--------------------+
|                 78|Voluptatem porro ...|        1556248000|Voluptatem porro ...|
|                 87|Atque quod est au...|        1549965000|Atque quod est au...|
+-------------------+--------------------+------------------+--------------------+

¡Gracias por la ayuda!

0
tlanigan 27 jun. 2020 a las 02:37

2 respuestas

La mejor respuesta

Esto es lo que terminé haciendo:

val editedDF = Seq(("A",3),("B",3)).toDF("id","timestamp")
val messageDF = Seq(("A",2),("B",2),("A",2),("A",3),("B",4),("A",2),("B",2),("c",9)).toDF("id","timestamp")

Finalmente usé esta unión:

    // Filter out the edited meesages.
    val editedFilteredDF  = messageDF.join(editedDF,
(editedDF("id") === messageDF("id")) && (editedDF("timestamp") > messageDF("timestamp")),
joinType="left_anti")

El resultado:

 editedFilteredDF.show()
+---+---------+
| id|timestamp|
+---+---------+
|  A|        3|
|  B|        4|
|  c|        9|
+---+---------+
0
tlanigan 29 jun. 2020 a las 16:10

Puede agregar su tabla de mensajes, unirla con la tabla editada y filtrar

import pyspark.sql.functions as F
# Test dataframe
tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2),('c',9)],schema=("id","count"))
tst1 = sqlContext.createDataFrame([('A',4),('B',1)],schema=("id","val"))
# Aggregate and join
tst_g=tst.groupby('id').agg(F.max('count').alias('count'))
tst_j= tst_g.join(tst1,tst_g.id==tst1.id,'left')
# Filter result
tst_f = tst_j.where((F.col('count')>=F.col('val'))|(F.col('val').isNull()))

El resultado es :

tst_j.show()

+---+-----+----+----+
| id|count|  id| val|
+---+-----+----+----+
|  c|    9|null|null|
|  B|    4|   B|   1|
|  A|    3|   A|   4|
+---+-----+----+----+
 tst_f.show()
+---+-----+----+----+
| id|count|  id| val|
+---+-----+----+----+
|  c|    9|null|null|
|  B|    4|   B|   1|
+---+-----+----+----+

Finalmente, puede soltar las columnas irrelevantes.

Si necesita los datos completos, puede unirse a la tabla de actualización con la tabla de mensajes y hacer lo mismo. Si la tabla de actualización es pequeña, considere una unión de difusión por razones de rendimiento.

# Approach to join with full table
# Test dataframe
tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2),('c',9)],schema=("id","count"))
tst1 = sqlContext.createDataFrame([('A',4),('B',1)],schema=("id","val"))
#%%
# join with the full table
tst_j= tst.join(tst1,tst.id==tst1.id,'left')
# Filter result
tst_f = tst_j.where((F.col('count')>=F.col('val'))|(F.col('val').isNull()))

Sugerencia: si no desea dos columnas de identificación en su resultado, puede cambiar la sintaxis de unión como tst.join (tst1, on = "id", how = 'left')

0
Raghu 29 jun. 2020 a las 15:59