OiO.lk Blog java Topic partition with spring cloud stream. Can not consumer topic
java

Topic partition with spring cloud stream. Can not consumer topic


I am trying to produce and consume messages to and from Kafka (using spring cloud streams).

When producing messages I have no problems, using kafbat I can see that the message has been created correctly in my topic (although it is created encoded as a JWT, but it is not a problem for me).
I have the problem when consuming, since when the service starts it is not able to connect or find the topic partition for some reason.

My yml is:

spring:
  cloud:
    function:
      definition: chents
    stream:
      bindings:
        chents-in-0:
          destination: segnnel 
          group: console-consumer-20965  
          consumer:
            max-attempts: 2
        chents-out-0:
          destination: segnnel   
      kafka:
        bootstrap-servers: localhost:19092
        binder:
          brokers: localhost:19092
        bindings:
          chents-in-0:
            consumer:
              enableDlq: false
              ack-mode: record
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
              properties:
                spring.json.trusted.packages: "*"
      binders:
        local-kafka:
          type: kafka

My code to produce the messages with stream bridge (this is necessary) is:

private void sendToKafka(EventData<Channel> event) {
    Message<EventData<Channel>> message = MessageBuilder.withPayload(event)
            .setHeader("eventType", "Channel")
            .build();
    streamBridge.send("segnnel", message);
}

and this is my subscriber class

@Configuration 
public class ChannelSubscriber {

    private Repository repository;

    private static final Mapper mapper ...;

    public ChannelSubscriber(Repository repository) {
        this.repository = repository;
    }

    @Bean
    Consumer<EventData<Channel>> chents() {
        return e -> Mono.just(e).map(mapper::mapToEntity).flatMap(repository::save)
                .doOnError(ex -> log.error("Error processing event: ", ex));
    }

}

However, the service starts correctly, it is able to send the messages but from time to time it gives me an error when trying to consume them

Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic segnnel
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:658)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)


 o.s.cloud.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (segnnel):
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:685)

I used kafka listener and I managed to receive messages from the topic, but I have to use spring cloud streams to make it flexible.

I also tried to change the topics in case there was any problem with them being the same when producing and consuming but that’s not the case.



You need to sign in to view this answers

Exit mobile version