Estoy trabajando en un pequeño lote con Spring batch y Kafka que lee datos json de un tema de Kafka, lo convierte en un objeto Student, cambia un valor y lo envía de vuelta a un tema de Kafka. Todo está funcionando bien, pero mi único problema es que mi consumidor SIEMPRE está leyendo desde el principio del tema. Lo necesito para leer desde el último mensaje no consumido. Ya agregué esas propiedades:

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value

Pero esto no parece funcionar, en la puesta en marcha del consumidor, procesa todos los mensajes. ¿Alguien tiene una idea de cómo hacerlo con Spring Batch y Kafka, por favor? Este es mi codigo:

BatchStudent.java :

@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
    public static void main(String[] args) {
        SpringApplication.run(BatchStudent.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final KafkaTemplate<Integer, Student> template;
    private final KafkaProperties properties;

    @Value("${kafka.topic.consumer}")
    private String topic;

    @Bean
    public ItemProcessor<Student, Student> customItemProcessor() {
        return new CustomProcessor();
    }

    @Bean
    Job job() {
        return this.jobBuilderFactory.get("job")
                .start(start())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    KafkaItemWriter<Integer, Student> writer() {
        return new KafkaItemWriterBuilder<Integer, Student>()
                .kafkaTemplate(template)
                .itemKeyMapper(Student::getId)
                .build();
    }

    @Bean
    public KafkaItemReader<Integer, Student> reader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Integer, Student>()
                .partitions(0)
                .consumerProperties(props)
                .name("students-consumer-reader")
                .saveState(true)
                .topic(topic)
                .build();
    }

    @Bean
    Step start() {
        return this.stepBuilderFactory
                .get("step")
                .<Student, Student>chunk(10)
                .writer(writer())
                .processor(customItemProcessor())
                .reader(reader())
                .build();
    }
}

app.yml

spring.batch.initialize-schema: always

#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student

#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092

#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student

Student.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id;
    Integer count;
}

CustomProcessor.java

@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student studentRecieved) {
        final Student studentSent = new Student();
        studentSent.setId(studentRecieved.getId());
        studentSent.setCount(200);
        return studentSent;
    }
}

Gracias por su ayuda !

0
help me code 22 ene. 2021 a las 00:21

1 respuesta

La mejor respuesta

Todo está funcionando bien, pero mi único problema es que mi consumidor SIEMPRE está leyendo desde el principio del tema. Lo necesito para leer desde el último mensaje no consumido.

Spring Batch 4.3 introdujo una forma de consumir registros del desplazamiento almacenado en Kafka. Hablé sobre esta función en mi charla en Spring One el año pasado: ¿Qué hay de nuevo en Spring Batch 4.3? . Puede configurar el lector kafka con un desplazamiento de inicio personalizado en cada partición utilizando setPartitionOffsets:

Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.

Puede encontrar un ejemplo completo en este caso de prueba.

2
Mahmoud Ben Hassine 25 ene. 2021 a las 09:57