Me encontré con la clase Histogram en la jerarquía de Flink, pero no hay documentación del tipo "así es como se puede usar este". Quería hacer algo como:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()

Pero lamentablemente el Histogram no se puede serializar a través de Flink. Tal vez haya un "aquí es cómo se puede usar esto" o hay otra forma de histograma a través de flink.

Claramente estoy haciendo algo mal.

4
Cubs Fan Ron 30 dic. 2016 a las 02:35
Estoy enfrentando el mismo problema, ¿tiene la solución? La API de Flink no está bien documentada ya que no tiene suficientes ejemplos
 – 
Amarjit Dhillon
13 nov. 2017 a las 07:58

1 respuesta

La mejor respuesta

Los acumuladores de Flink no deben usarse como tipos de datos para DataStream o DataSet.

En su lugar, se registran a través de RuntimeContext, que está disponible en RichFunction.getRuntimeContext(). This is usually done in the open () method of a RichFunction`:

class MyFunc extends RichFlatMapFunction[Int, Int] {

  val hist: Histogram = new Histogram()

  override def open(conf: Configuration): Unit = {
    getRuntimeContext.addAccumulator("myHist", hist)
  }

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    hist.add(value)
  }
}

Todas las instancias paralelas de un acumulador se envían periódicamente al JobManager (el proceso maestro) y se combinan. Se puede acceder a sus valores desde el JobExecutionResult devuelto por StreamExecutionEnvironment.execute().

Creo que los acumuladores de Flink no pueden abordar su caso de uso. Debe crear un tipo de datos de histograma personalizado.

7
Fabian Hueske 30 dic. 2016 a las 03:08