Tengo una tabla en SQL Server que me gustaría transmitir al tema de Kafka, la estructura es la siguiente:

(UserID, ReportID)

Esta tabla se cambiará continuamente (registros agregados, insertados, sin actualizaciones)

Me gustaría transformar esto en este tipo de estructura y ponerlo en Elasticsearch:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

Los ejemplos que he visto hasta ahora son registros o flujo de clics que no funcionan en mi caso.

¿Es posible este tipo de caso de uso? Siempre podría mirar los cambios de UserID y consultar la base de datos, pero eso parece ingenuo y no es el mejor enfoque.

Actualizar

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

De hecho, me quedo atascado en la etapa agregada porque no puedo escribir una SerDe adecuada para ArrayList<Long> (aún no tengo suficientes habilidades), las lambdas parecen no funcionar en el agregador; no sabe cuál es el tipo de {{X1} }:

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );
1
Evaldas Buinauskas 19 sep. 2017 a las 18:10

2 respuestas

La mejor respuesta

Puede utilizar la API Connect de Kafka para obtener los datos de SQL Server en Kafka. No conozco ningún conector específico para SQL Server, pero puede usar cualquier conector genérico basado en JDBC: https: //www.confluent.io/product/connectors/

Para procesar datos, puede utilizar la API Streams de Kafka. Simplemente puede aggregate() todos los informes por usuario. Algo como esto:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

Consulte los documentos para obtener más detalles sobre la API de Streams: https://docs.confluent.io /current/streams/index.html

Tenga en cuenta que debe asegurarse de que la lista de informes no crezca sin límites. Kafka tiene un tamaño de mensaje máximo (configurable) y la lista completa estará contenida en un solo mensaje. Por lo tanto, debe estimar el tamaño máximo del mensaje y aplicar la configuración correspondiente (-> max.message.bytes) antes de entrar en producción. Consulte las configuraciones en la página web: http://kafka.apache.org/documentation/#brokerconfigs

Finalmente, usa Connect API para enviar los datos a Elastic Search. Hay varios conectores diferentes disponibles (por supuesto, recomendaría el Confluent). Más detalles sobre Connect API: https://docs.confluent.io/current/connect/ userguide.html

3
Matthias J. Sax 19 sep. 2017 a las 16:47

Directamente, este tipo de enfoque no está permitido en SQL y Kafka Streams, sin embargo, el caso de uso es posible y se puede implementar de la siguiente manera:

1) Escriba una aplicación personalizada sobre el servidor SQL utilizando las API de SOLRJ que llegarán a la instancia de Solr cada vez que se realice una operación DML (Insertar, Actualizar, Eliminar, etc.) en SQL. https://wiki.apache.org/solr/Solrj

2) Utilice Solr SQL Data Import Handler usándolo SQL Server informará automáticamente a solr cada vez que ocurra una operación DML (Insertar, Actualizar, Eliminar, etc.) en SQL. https://wiki.apache.org/solr/DataImportHandler

-4
hagarwal 19 sep. 2017 a las 15:21