He estado intentando que una versión dinámica de org.apache.spark.sql.explode funcione sin suerte: tengo un conjunto de datos con una columna de fecha llamada event_date y otra columna llamada no_of_days_gap. Quiero usar no_of_days_gap para crear clones de la fila usando la función explode. Uno de mis primeros intentos fue usar esto:

myDataset.withColumn("clone", explode(array((0 until col("no_of_days_gap")).map(lit): _*)))

Sin embargo, col("no_of_days_gap") es del tipo Column y se espera un Int. También probé varios otros enfoques. Entonces, ¿cómo puedo hacer que esto funcione?

P.S .: logré obtener una solución alternativa que funcionara utilizando una función map seguida de llamar a flatMap, sin embargo, estoy realmente interesado en comprender cómo hacer que funcione el enfoque withColumn.

1
Diddy 7 mar. 2017 a las 00:46

2 respuestas

La mejor respuesta

¿Qué pasa con lo siguiente?

scala> val diddy = Seq(
     |   ("2017/03/07", 4),
     |   ("2016/12/09", 2)).toDF("event_date", "no_of_days_gap")
diddy: org.apache.spark.sql.DataFrame = [event_date: string, no_of_days_gap: int]

scala> diddy.flatMap(r => Seq.fill(r.getInt(1))(r.getString(0))).show
+----------+
|     value|
+----------+
|2017/03/07|
|2017/03/07|
|2017/03/07|
|2017/03/07|
|2016/12/09|
|2016/12/09|
+----------+

// use explode instead

scala> diddy.explode("no_of_days_gap", "events") { n: Int => 0 until n }.show
warning: there was one deprecation warning; re-run with -deprecation for details
+----------+--------------+------+
|event_date|no_of_days_gap|events|
+----------+--------------+------+
|2017/03/07|             4|     0|
|2017/03/07|             4|     1|
|2017/03/07|             4|     2|
|2017/03/07|             4|     3|
|2016/12/09|             2|     0|
|2016/12/09|             2|     1|
+----------+--------------+------+

Sin embargo, si insiste en withColumn, entonces ... ¡sea ... eso! ¡Abróchate el cinturón!

diddy
  .withColumn("concat", concat($"event_date", lit(",")))
  .withColumn("repeat", expr("repeat(concat, no_of_days_gap)"))
  .withColumn("split", split($"repeat", ","))
  .withColumn("explode", explode($"split"))
0
Jacek Laskowski 7 mar. 2017 a las 21:10

Tienes que usar udf:

val range = udf((i: Integer) => (0 until i).toSeq)

df
  .withColumn("clone", range($"no_of_days_gap"))  // Add range
  .withColumn("clone", explode($"clone"))  // Explode
0
1d210d2d0 6 mar. 2017 a las 22:25