Estoy usando Spark / Scala para procesar una tabla Hive que contiene datos de transacciones para cada miembro. Necesito obtener el registro máximo para cada miembro. Hice esta tarea usando el siguiente código y funciona con éxito, pero no se obtiene el rendimiento.

¿Tengo que preguntar si hay alguna otra forma de mejorar el rendimiento de este código? Encontré algunas formas de hacerlo usando spark-sql pero prefiero Spark Dataframe o Dataset.

El siguiente ejemplo reproducirá mi código y mis datos.

  val mamberData = Seq(
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-09-09 00:00:00")),
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-03-02 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2019-01-01 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2018-01-01 00:00:00")),
    Row("7088", "SF", java.sql.Timestamp.valueOf("2018-09-01 00:00:00"))
  )



  val MemberDataSchema = List(
    StructField("member_id", StringType, nullable = true),
    StructField("member_state", StringType, nullable = true),
    StructField("activation_date", TimestampType, nullable = true)
  )

  import spark.implicits._

  val memberDF =spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  )

  val memberDfMaxDate = memberDF.groupBy('member_id).agg(max('activation_date).as("activation_date"))

  val memberDFMaxOnly = memberDF.join(memberDfMaxDate,Seq("member_id","activation_date"))

La salida está por debajo

+---------+------------+-------------------+
|member_id|member_state|activation_date    |
+---------+------------+-------------------+
|1234     |CX          |2018-09-09 00:00:00|
|1234     |CX          |2018-03-02 00:00:00|
|5678     |NY          |2019-01-01 00:00:00|
|5678     |NY          |2018-01-01 00:00:00|
|7088     |SF          |2018-09-01 00:00:00|
+---------+------------+-------------------+

+---------+-------------------+------------+
|member_id|    activation_date|member_state|
+---------+-------------------+------------+
|     7088|2018-09-01 00:00:00|          SF|
|     1234|2018-09-09 00:00:00|          CX|
|     5678|2019-01-01 00:00:00|          NY|
+---------+-------------------+------------+
3
khaled 9 may. 2019 a las 20:41

3 respuestas

La mejor respuesta

Puede usar muchas técnicas, por ejemplo Ranking o Dataset. Prefiero usar reduceGroups ya que es una forma de estilo funcional y fácil de interpretar.

  case class MemberDetails(member_id: String, member_state: String, activation_date: FileStreamSource.Timestamp)

  val dataDS: Dataset[MemberDetails] = spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  ).as[MemberDetails]
    .groupByKey(_.member_id)
    .reduceGroups((r1, r2) ⇒ if (r1.activation_date > r2.activation_date) r1 else r2)
    .map { case (key, row) ⇒ row }


  dataDS.show(truncate = false)
2
Moustafa Mahmoud 9 may. 2019 a las 17:43

Utilice funciones de ventana para asigne un rango y filtre el primero en cada grupo.

import org.apache.spark.sql.expressions.Window

// Partition by member_id order by activation_date
val byMemberId = Window.partitionBy($"member_id").orderBy($"activation_date" desc)

// Get the new DF applying window function
val memberDFMaxOnly = memberDF.select('*, rank().over(byMemberId) as 'rank).where($"rank" === 1).drop("rank")

// View the results
memberDFMaxOnly.show()
+---------+------------+-------------------+
|member_id|member_state|    activation_date|
+---------+------------+-------------------+
|     1234|          CX|2018-09-09 00:00:00|
|     5678|          NY|2019-01-01 00:00:00|
|     7088|          SF|2018-09-01 00:00:00|
+---------+------------+-------------------+
1
gasparms 9 may. 2019 a las 18:05

El groupBy de DataFrame es tan eficiente como se pone (más eficiente que las Funciones de ventana debido a la agregación parcial).

Pero puede evitar la unión utilizando un struct dentro de la cláusula de agregación:

val memberDfMaxOnly = memberDF.groupBy('member_id).agg(max(struct('activation_date, 'member_state)).as("row_selection"))
  .select(
    $"member_id",
    $"row_selection.activation_date",
    $"row_selection.member_state"
  )
1
Raphael Roth 9 may. 2019 a las 18:29
56064848