He estado intentando que una versión dinámica de org.apache.spark.sql.explode
funcione sin suerte: tengo un conjunto de datos con una columna de fecha llamada event_date
y otra columna llamada no_of_days_gap
. Quiero usar no_of_days_gap
para crear clones de la fila usando la función explode
. Uno de mis primeros intentos fue usar esto:
myDataset.withColumn("clone", explode(array((0 until col("no_of_days_gap")).map(lit): _*)))
Sin embargo, col("no_of_days_gap")
es del tipo Column
y se espera un Int
. También probé varios otros enfoques. Entonces, ¿cómo puedo hacer que esto funcione?
P.S .: logré obtener una solución alternativa que funcionara utilizando una función map
seguida de llamar a flatMap
, sin embargo, estoy realmente interesado en comprender cómo hacer que funcione el enfoque withColumn
.
2 respuestas
¿Qué pasa con lo siguiente?
scala> val diddy = Seq(
| ("2017/03/07", 4),
| ("2016/12/09", 2)).toDF("event_date", "no_of_days_gap")
diddy: org.apache.spark.sql.DataFrame = [event_date: string, no_of_days_gap: int]
scala> diddy.flatMap(r => Seq.fill(r.getInt(1))(r.getString(0))).show
+----------+
| value|
+----------+
|2017/03/07|
|2017/03/07|
|2017/03/07|
|2017/03/07|
|2016/12/09|
|2016/12/09|
+----------+
// use explode instead
scala> diddy.explode("no_of_days_gap", "events") { n: Int => 0 until n }.show
warning: there was one deprecation warning; re-run with -deprecation for details
+----------+--------------+------+
|event_date|no_of_days_gap|events|
+----------+--------------+------+
|2017/03/07| 4| 0|
|2017/03/07| 4| 1|
|2017/03/07| 4| 2|
|2017/03/07| 4| 3|
|2016/12/09| 2| 0|
|2016/12/09| 2| 1|
+----------+--------------+------+
Sin embargo, si insiste en withColumn
, entonces ... ¡sea ... eso! ¡Abróchate el cinturón!
diddy
.withColumn("concat", concat($"event_date", lit(",")))
.withColumn("repeat", expr("repeat(concat, no_of_days_gap)"))
.withColumn("split", split($"repeat", ","))
.withColumn("explode", explode($"split"))
Tienes que usar udf:
val range = udf((i: Integer) => (0 until i).toSeq)
df
.withColumn("clone", range($"no_of_days_gap")) // Add range
.withColumn("clone", explode($"clone")) // Explode
Preguntas relacionadas
Nuevas preguntas
apache-spark
Apache Spark es un motor de procesamiento de datos distribuidos de código abierto escrito en Scala que proporciona una API unificada y conjuntos de datos distribuidos a los usuarios para el procesamiento por lotes y de transmisión. Los casos de uso de Apache Spark a menudo están relacionados con el aprendizaje profundo / máquina y el procesamiento de gráficos.