Estoy usando Flink 1.8.0 y estoy tratando de consultar mi estado de trabajo.

val descriptor = new ValueStateDescriptor("myState", Types.CASE_CLASS[Foo])
    descriptor.setQueryable("my-queryable-State")

Utilicé el puerto 9067, que es el puerto predeterminado de acuerdo con este, mi cliente:

val client = new QueryableStateClient("127.0.0.1", 9067)
val jobId = JobID.fromHexString("d48a6c980d1a147e0622565700158d9e")

      val execConfig = new ExecutionConfig
       val descriptor = new ValueStateDescriptor("my-queryable-State", Types.CASE_CLASS[Foo])
      val res: Future[ValueState[Foo]] = client.getKvState(jobId, "my-queryable-State","a", BasicTypeInfo.STRING_TYPE_INFO, descriptor)
      res.map(_.toString).pipeTo(sender)

Pero estoy obteniendo:

[ERROR] [06/25/2019 20:37:05.499] [bvAkkaHttpServer-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(bvAkkaHttpServer)] Error during processing of request: 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067
  1. Qué estoy haciendo mal ?
  2. ¿Cómo y dónde debo definir QueryableStateOptions
0
igx 25 jun. 2019 a las 20:49

1 respuesta

La mejor respuesta

Entonces, si desea utilizar el QueryableState, debe agregar el frasco adecuado a su Flink. El frasco es flink-queryable-state-runtime, se puede encontrar en la carpeta opt en su distribución de Flink y debe moverlo a la carpeta lib.

En cuanto a la segunda pregunta, el QueryableStateOption es solo una clase que se usa para crear definiciones estáticas ConfigOption. Luego se usan las definiciones para leer las configuraciones desde el archivo flink-conf.yaml. Así que actualmente la única opción para configurar el QueryableState es usar el archivo FLINK-CONF en la distribución de Flink.

Editar: TAMBIÉN, intente leer esto] 1 Proporciona más información sobre cómo funciona el estado consultable. Realmente no debe conectarse directamente al puerto del servidor, sino que debe usar el puerto proxy que de forma predeterminada es 9069.

0
Dominik Wosiński 26 jun. 2019 a las 08:44