Estoy tratando de usar kafka streams con scala a continuación, mi código en Java funciona perfectamente bien:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

Mi scala tiene el siguiente código:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

El código scala arroja un error de compilación. Se esperaba una falta de coincidencia de tipos: ForEachAction [> String, > String], Real ((any, any), Unit) no encontrado: clave de valor no encontrado: valor valor

¿Alguien sabe cómo usar la API de streams en Scala?

1
user1733735 5 sep. 2017 a las 00:00

2 respuestas

La mejor respuesta

Tu sintaxis es incorrecta :). -> es solo un operador para crear pares, por lo que la expresión

(key,value)-> {
  println(key)
}

Tiene tipo ((Cualquiera, Cualquiera), Unidad) porque el compilador no puede inferir ningún tipo de información (y faltan key y value)

Si está usando scala 2.12 reemplazando -> con => debería resolver el problema, pero si está usando una versión anterior de scala, tendrá que implementar la bifunción de Java explícitamente:

 textLines.foreach(new BiFunction[T1, T2] { ... })
5
L.Lampart 4 sep. 2017 a las 22:48

Puede imprimir directamente kafkastream utilizando el método de impresión.

textlines.print

Imprimirá el flujo de kafka. Incluso puede imprimir la clave o los valores pasando un argumento a la función de impresión.

1
Mahesh Chand 5 sep. 2017 a las 02:29