Estoy probando el inicio rápido de la documentación de kafka, el enlace es, https://kafka.apache.org/quickstart. He desplegado 3 corredores y creo un tema.

➜  kafka_2.10-0.10.1.0 bin/kafka-topics.sh --describe --zookeeper 
    localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 
    Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 
    Isr: 2,1,0

Luego uso el "bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic my-replicated-topic" para probar al productor. Y use "bin / kafka-console-consumer.sh --bootstrap-server localhost: 9092 --from-begin --topic my-replicated-topic para probar al consumidor", el productor y el consumidor funcionan bien. Si elimino el servidor 1 o 2, el productor y el consumidor funcionan correctamente.

Pero si apago el servidor 0 y escribo el mensaje en la terminal del productor, el consumidor no puede leer nuevos mensajes. cuando mato el servidor 0, el consumidor imprime el registro:

[2017-06-23 17:29:52,750] WARN Auto offset commit failed for group console-consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:29:52,974] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:29:53,085] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:29:53,195] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:29:53,302] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:29:53,409] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Luego reinicio el servidor 0, el consumidor imprime el mensaje y algunos registros de advertencia:

hhhh
hello
[2017-06-23 17:32:32,795] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-23 17:32:32,902] WARN Auto offset commit failed for group console-
consumer-97540: Offset commit failed with a retriable exception. You should 
retry committing offsets. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Esto me confundió. Por qué el servidor 0 es tan especial y el servidor 0 no es el líder.

Y noté que el registro del servidor impreso por el servidor 0 tiene mucha información de la siguiente manera:

[2017-06-23 17:32:33,640] INFO [Group Metadata Manager on Broker 0]: Finished 
loading offsets from [__consumer_offsets,23] in 38 milliseconds. 
(kafka.coordinator.GroupMetadataManager)
[2017-06-23 17:32:33,641] INFO [Group Metadata Manager on Broker 0]: Loading 
offsets and group metadata from [__consumer_offsets,26] 
(kafka.coordinator.GroupMetadataManager)
[2017-06-23 17:32:33,646] INFO [Group Metadata Manager on Broker 0]: Finished 
loading offsets from [__consumer_offsets,26] in 4 milliseconds. 
(kafka.coordinator.GroupMetadataManager)
[2017-06-23 17:32:33,646] INFO [Group Metadata Manager on Broker 0]: Loading 
offsets and group metadata from [__consumer_offsets,29] 
(kafka.coordinator.GroupMetadataManager)

Pero server1 y serve2 log no tienen ese contenido.

Alguien me lo puede explicar, muchas gracias!

Resuelto: El factor de replicación en el tema _consumer-offsets es la causa raíz. Es un problema: issues.apache.org/jira/browse/KAFKA-3959

-1
yun.xia 23 jun. 2017 a las 12:53

3 respuestas

La mejor respuesta

Kafka-console-productor tiene como valor predeterminado acks = 1, por lo que no es tolerante a fallas en absoluto. Agregue la bandera o el parámetro de configuración para establecer acks = all y si su tema y el tema _consumer-offsets fueron creados con un factor de replicación de 3, su prueba funcionará.

1
Hans Jespersen 24 jun. 2017 a las 22:49

Los servidores comparten su carga para administrar Grupos de consumidores.

Por lo general, cada consumidor independiente tiene una ID de grupo de consumidores única y usted usa la misma ID de grupo cuando desea dividir el proceso de consumo entre múltiples consumidores.

Dicho esto: ser el corredor principal, para un servidor Kafka dentro del clúster, es solo para la coordinación de otros corredores. ¡El líder no tiene nada que ver (directamente) con el servidor que actualmente administra la ID de grupo y se compromete para un consumidor específico!

Por lo tanto, cada vez que se suscribe, se le designa un servidor que se encargará de las confirmaciones de compensación para su grupo y esto no tiene nada que ver con la elección del líder.

Apague ese servidor y es posible que tenga problemas para el consumo de su grupo hasta que el clúster de Kafka se estabilice nuevamente (reasigne a su consumidor para mover la administración del Grupo a otros servidores o espere a que los nodos respondan de nuevo ... No soy lo suficientemente experto desde allí para decirle exactamente cómo ocurre la conmutación por error).

1
Fabien 23 jun. 2017 a las 10:00

Probablemente, el tema __consumer_offsets tiene las "Réplicas" establecidas en 0. Para confirmar esto, verifique el tema __consumer_offsets:

Kafka-topics.sh --bootstrap-server localhost: 9092 --describe --topic __consumer_offsets

Topic: __consumer_offsets   PartitionCount: 50  ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: __consumer_offsets   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic: __consumer_offsets   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: __consumer_offsets   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
Topic: __consumer_offsets   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
Topic: __consumer_offsets   Partition: 4    Leader: 0   Replicas: 0 Isr: 0
...
Topic: __consumer_offsets   Partition: 49   Leader: 0   Replicas: 0 Isr: 0

Observe las "Réplicas: 0 Isr: 0". Esta es la razón cuando detiene el intermediario 0, el consumidor ya no recibe los mensajes.

Para corregir esto, debe modificar las "Réplicas" del tema __consumer_offsets, incluidos los otros intermediarios.

  1. Cree un archivo json como este (config / inc-replication-factor-consumer_offsets.json):
{"version":1,
 "partitions":[
   {"topic":"__consumer_offsets", "partition":0,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":1,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":2,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":3,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":4,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":5,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":6,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":7,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":8,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":9,  "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":10, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":11, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":12, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":13, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":14, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":15, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":16, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":17, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":18, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":19, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":20, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":21, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":22, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":23, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":24, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":25, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":26, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":27, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":28, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":29, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":30, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":31, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":32, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":33, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":34, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":35, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":36, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":37, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":38, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":39, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":40, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":41, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":42, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":43, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":44, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":45, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":46, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":47, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":48, "replicas":[0, 1, 2]},
   {"topic":"__consumer_offsets", "partition":49, "replicas":[0, 1, 2]}
 ]
}
  1. Ejecute el siguiente comando:

Kafka-reassign-partitions.sh --bootstrap-server localhost: 9092 --zookeeper localhost: 2181 --reassignment-json-file ../config/inc-replication-factor-consumer_offsets.json --execute

  1. Confirme las "réplicas":

Kafka-topics.sh --bootstrap-server localhost: 9092 --describe --topic __consumer_offsets

Topic: __consumer_offsets   PartitionCount: 50  ReplicationFactor: 3    Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: __consumer_offsets   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,2,1
Topic: __consumer_offsets   Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 0,2,1
Topic: __consumer_offsets   Partition: 2    Leader: 0   Replicas: 0,1,2 Isr: 0,2,1
Topic: __consumer_offsets   Partition: 3    Leader: 0   Replicas: 0,1,2 Isr: 0,2,1
...
Topic: __consumer_offsets   Partition: 49   Leader: 0   Replicas: 0,1,2 Isr: 0,2,1
  1. Ahora puede detener solo el intermediario 0, generar algunos mensajes y ver el resultado en el consumidor.
0
matthias_h 30 mar. 2020 a las 22:44