Estoy usando Avro Producer en Python 2.7. Necesito enviar un mensaje con una clave y un valor, el valor tiene Avro-Schema en el tema, pero no hay Avro-Schema para la clave (no puedo agregar Schema para la clave - razones heredadas).

Este es mi código:

def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com', 'priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,
        'schema.registry.url': schemaRegistry
    }, default_value_schema=value_schema)


    key = 1638895406382020875
    
    avroProducer.produce(topic=topic, value=value, key=key)
    avroProducer.flush()

Aparece el siguiente error:

raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key

Si elimino la clave de la función de producción:

avroProducer.produce(topic=topic, value=value)

Funciona.

¿Cómo es posible enviar la clave sin tener esquema?

0
daniel the man 25 ago. 2020 a las 17:04

2 respuestas

La mejor respuesta

Deberá utilizar Producer normal y ejecutar las funciones de serialización usted mismo

from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer

avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 

value_schema = avro.load('avro_schemas/value.avsc')  # TODO: Create avro_schemas folder 

p = Producer({'bootstrap.servers': bootstrap_servers})

value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
2
OneCricketeer 25 ago. 2020 a las 17:53

AvroProducer asume que tanto las claves como los valores están codificados con el registro del esquema, anteponiendo un byte mágico y la identificación del esquema a la carga útil tanto de la clave como del valor.

Si desea utilizar una serialización personalizada para la clave, puede utilizar un Producer en lugar de un AvroProducer. Pero será su responsabilidad serializar la clave (usando el formato que desee) y los valores (lo que significa codificar el valor y anteponer el byte mágico y la identificación del esquema). Para saber cómo se hace esto, puede mirar el código AvroProducer.

Pero también significa que tendrás que escribir tu propio AvroConsumer y no podrás usar kafka-avro-console-consumer.

1
Arthur 25 ago. 2020 a las 15:45