Tengo dos marcos de datos, uno con intervalos de 15 minutos y otro con un starttime, un endtime y un valor. Me gustaría encontrar el máximo value del segundo marco de datos que cae dentro del intervalo del primer marco de datos.

Esquema de ambos DataFrames:

DF1 Schema
 |-- start: timestamp (nullable = false)
 |-- end: timestamp (nullable = false)

 DF2 Schema
 |-- starttime: timestamp (nullable = false)
 |-- endtime: timestamp (nullable = false)
 |-- value: Long(nullable = false)

He creado esta solución, aunque me preocupa su rendimiento. Me pregunto si hay una mejor manera de lograr esto sin bucles. Pensé en unirme, pero como necesito encontrar el máximo de df2 dentro de un intervalo de df1, no estoy seguro de a qué me uniría.

case class maxCaseClass(starttime:ZonedDateTime, endtime:ZonedDateTime, max: Long)
var maxInInterval = Seq.newBuilder[maxCaseClass]
val distinctIntervals = df1.select("start", "end").distinct().collect()
distinctIntervals.foreach(row => {
  val starttime = row.getAs("start").asInstanceOf[Timestamp]
  val endtime = row.getAs("end").asInstanceOf[Timestamp]
  val maxDF = df2.filter(col("endtime") >= lit(starttime).cast(TimestampType) && col("starttime") <= lit(endtime).cast(TimestampType)).agg(max("value").as("max"))
  maxInInterval += maxCaseClass(
                   LocalDateTime.parse(starttime.toString).atZone(ZoneOffset.UTC), 
                   LocalDateTime.parse(endtime.toString).atZone(ZoneOffset.UTC), 
                   maxDF.head().getAs("max").asInstanceOf[Long]
                   )
})

En lugar de terminar con una secuencia, solo quiero agregar una nueva columna a df1 con maxValue, pero no estoy seguro de cómo lograrlo.

1
sgallagher 3 mar. 2021 a las 23:18

1 respuesta

La mejor respuesta

Puede unir df1 con df2 en esa condición de intervalo y luego agregar:

val result = DF1.join(
  DF2,
  (col("end") >= col("endtime")) && (col("starttime") >= col("start"))
).groupBy("start", "end")
.agg(max("value").as("max_value"))

O usando SQL con subconsulta correlacionada:

DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")


val result = spark.sql("""
select  *,
        (select max(value) from df2 where end >= endtime and starttime >= start) as max_value
from  df1
""")
1
blackbishop 4 mar. 2021 a las 06:43