Tengo 2 DF, como a continuación.

+---+---+---+
|  M| c2| c3|
+---+---+---+
|  1|  2|  3|
|  2|  3|  4|
+---+---+---+

+---+---+---+
|  M| c2| c3|
+---+---+---+
|  1| 20| 30|
|  2| 30| 40|
+---+---+---+

¿Cuál debería ser el mejor enfoque para obtener un nuevo marco de datos como el siguiente? Esto significa que el nuevo Df tiene nombres de columna c2 y c3 pero el valor es concat( df1("c1"),df1("c2") ) pero con el mismo nombre de columna. Puedo hacer esto con df3.withColumn("c2_new",concat( df1("c2"),df2("c2") )) y luego renombrar la nueva columna a C2. Pero el problema es que tengo más de 150 columnas en mi DF. ¿Cuál debería ser el mejor enfoque aquí?

+---+------+-----+
|  M| c2  |   c3 |
+---+-----+------+
|  1| 2_20|  3_30|
|  2| 3_30|  4_40|
+---+------+-----+
0
abc_spark 11 dic. 2019 a las 15:08

2 respuestas

La mejor respuesta

Puedes hacer esto con una combinación:

val selectExpr = df1.columns.filterNot(_=="M").map(c => concat_ws("_",df1(c),df2(c)).as(c))

df1.join(df2,"M")
  .select((col("M") +: selectExpr):_*)
  .show()

Da:

---+----+----+
|  M|  c2|  c3|
+---+----+----+
|  1|2_20|3_30|
|  2|3_30|4_40|
+---+----+----+
2
Raphael Roth 12 dic. 2019 a las 04:30

Si tiene columnas anchas, puede iterar sobre columnas y aplicarle las mismas transformaciones. En su caso, debe fusionar marcos de datos y columnas agregadas como esta:

import org.apache.spark.sql.types.StringType

val commonColumns = (df1.columns.toSet & df2.columns.toSet).filter(_ != "M").toSeq
commonColumns

df1.union(df2)
    .groupBy("M")
    .agg(count(lit(1)) as "cnt", 
        commonColumns.map(c => concat_ws("_", collect_set(col(c).cast(StringType))) as c):_*)
    .select("M", commonColumns:_*)
        .show

Aquí está la salida:

+---+----+----+
|  M|  c2|  c3|
+---+----+----+
|  1|20_2|3_30|
|  2|3_30|40_4|
+---+----+----+

Si tiene un requisito para ordenar (es decir, el valor de df1 debe estar en el lado izquierdo, el valor de df2 debe estar a la derecha) puede usar este truco:

  1. Agregue el número de marco de fecha (1 y 2) antes de union como una nueva columna
  2. Crear estructura a partir del número de marco de datos y el valor de la columna
  3. Durante la agregación, tome min y max de esta estructura
  4. Extraer el valor de la estructura.
  5. Valores de concat con un guión bajo

Código:

df1
    .withColumn("src", lit(1))
    .union(df2.withColumn("src", lit(2)))
    .groupBy("M")
    .agg(count(lit(1)) as "cnt", 
        commonColumns.map(c => concat(
            min(struct($"src", col(c)))(c),
            lit("_"),
            max(struct($"src", col(c)))(c)) as c):_*)
    .select("M", commonColumns:_*)
    .show

El resultado final se ordena:

+---+----+----+
|  M|  c2|  c3|
+---+----+----+
|  1|2_20|3_30|
|  2|3_30|4_40|
+---+----+----+
2
shuvalov 11 dic. 2019 a las 13:19