Estoy usando Spark / Scala para procesar una tabla Hive
que contiene datos de transacciones para cada miembro. Necesito obtener el registro máximo para cada miembro. Hice esta tarea usando el siguiente código y funciona con éxito, pero no se obtiene el rendimiento.
¿Tengo que preguntar si hay alguna otra forma de mejorar el rendimiento de este código? Encontré algunas formas de hacerlo usando spark-sql pero prefiero Spark
Dataframe o Dataset.
El siguiente ejemplo reproducirá mi código y mis datos.
val mamberData = Seq(
Row("1234", "CX", java.sql.Timestamp.valueOf("2018-09-09 00:00:00")),
Row("1234", "CX", java.sql.Timestamp.valueOf("2018-03-02 00:00:00")),
Row("5678", "NY", java.sql.Timestamp.valueOf("2019-01-01 00:00:00")),
Row("5678", "NY", java.sql.Timestamp.valueOf("2018-01-01 00:00:00")),
Row("7088", "SF", java.sql.Timestamp.valueOf("2018-09-01 00:00:00"))
)
val MemberDataSchema = List(
StructField("member_id", StringType, nullable = true),
StructField("member_state", StringType, nullable = true),
StructField("activation_date", TimestampType, nullable = true)
)
import spark.implicits._
val memberDF =spark.createDataFrame(
spark.sparkContext.parallelize(mamberData),
StructType(MemberDataSchema)
)
val memberDfMaxDate = memberDF.groupBy('member_id).agg(max('activation_date).as("activation_date"))
val memberDFMaxOnly = memberDF.join(memberDfMaxDate,Seq("member_id","activation_date"))
La salida está por debajo
+---------+------------+-------------------+
|member_id|member_state|activation_date |
+---------+------------+-------------------+
|1234 |CX |2018-09-09 00:00:00|
|1234 |CX |2018-03-02 00:00:00|
|5678 |NY |2019-01-01 00:00:00|
|5678 |NY |2018-01-01 00:00:00|
|7088 |SF |2018-09-01 00:00:00|
+---------+------------+-------------------+
+---------+-------------------+------------+
|member_id| activation_date|member_state|
+---------+-------------------+------------+
| 7088|2018-09-01 00:00:00| SF|
| 1234|2018-09-09 00:00:00| CX|
| 5678|2019-01-01 00:00:00| NY|
+---------+-------------------+------------+
3 respuestas
Puede usar muchas técnicas, por ejemplo Ranking
o Dataset
. Prefiero usar reduceGroups
ya que es una forma de estilo funcional y fácil de interpretar.
case class MemberDetails(member_id: String, member_state: String, activation_date: FileStreamSource.Timestamp)
val dataDS: Dataset[MemberDetails] = spark.createDataFrame(
spark.sparkContext.parallelize(mamberData),
StructType(MemberDataSchema)
).as[MemberDetails]
.groupByKey(_.member_id)
.reduceGroups((r1, r2) ⇒ if (r1.activation_date > r2.activation_date) r1 else r2)
.map { case (key, row) ⇒ row }
dataDS.show(truncate = false)
Utilice funciones de ventana para asigne un rango y filtre el primero en cada grupo.
import org.apache.spark.sql.expressions.Window
// Partition by member_id order by activation_date
val byMemberId = Window.partitionBy($"member_id").orderBy($"activation_date" desc)
// Get the new DF applying window function
val memberDFMaxOnly = memberDF.select('*, rank().over(byMemberId) as 'rank).where($"rank" === 1).drop("rank")
// View the results
memberDFMaxOnly.show()
+---------+------------+-------------------+
|member_id|member_state| activation_date|
+---------+------------+-------------------+
| 1234| CX|2018-09-09 00:00:00|
| 5678| NY|2019-01-01 00:00:00|
| 7088| SF|2018-09-01 00:00:00|
+---------+------------+-------------------+
El groupBy
de DataFrame es tan eficiente como se pone (más eficiente que las Funciones de ventana debido a la agregación parcial).
Pero puede evitar la unión utilizando un struct
dentro de la cláusula de agregación:
val memberDfMaxOnly = memberDF.groupBy('member_id).agg(max(struct('activation_date, 'member_state)).as("row_selection"))
.select(
$"member_id",
$"row_selection.activation_date",
$"row_selection.member_state"
)
Nuevas preguntas
scala
Scala es un lenguaje de programación de propósito general dirigido principalmente a la máquina virtual Java. Diseñado para expresar patrones de programación comunes de una manera concisa, elegante y segura de tipos, fusiona estilos de programación imperativos y funcionales. Sus características clave son: un sistema de tipo estático avanzado con inferencia de tipo; tipos de funciones; la coincidencia de patrones; parámetros implícitos y conversiones; sobrecarga del operador; interoperabilidad total con Java; concurrencia