I am trying to produce and consume messages to and from Kafka
(using spring cloud streams).
When producing messages I have no problems, using kafbat I can see that the message has been created correctly in my topic (although it is created encoded as a JWT, but it is not a problem for me).
I have the problem when consuming, since when the service starts it is not able to connect or find the topic partition for some reason.
My yml is:
spring:
cloud:
function:
definition: chents
stream:
bindings:
chents-in-0:
destination: segnnel
group: console-consumer-20965
consumer:
max-attempts: 2
chents-out-0:
destination: segnnel
kafka:
bootstrap-servers: localhost:19092
binder:
brokers: localhost:19092
bindings:
chents-in-0:
consumer:
enableDlq: false
ack-mode: record
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
binders:
local-kafka:
type: kafka
My code to produce the messages with stream bridge (this is necessary) is:
private void sendToKafka(EventData<Channel> event) {
Message<EventData<Channel>> message = MessageBuilder.withPayload(event)
.setHeader("eventType", "Channel")
.build();
streamBridge.send("segnnel", message);
}
and this is my subscriber class
@Configuration
public class ChannelSubscriber {
private Repository repository;
private static final Mapper mapper ...;
public ChannelSubscriber(Repository repository) {
this.repository = repository;
}
@Bean
Consumer<EventData<Channel>> chents() {
return e -> Mono.just(e).map(mapper::mapToEntity).flatMap(repository::save)
.doOnError(ex -> log.error("Error processing event: ", ex));
}
}
However, the service starts correctly, it is able to send the messages but from time to time it gives me an error when trying to consume them
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic segnnel
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:658)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)
o.s.cloud.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (segnnel):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:685)
I used kafka listener and I managed to receive messages from the topic, but I have to use spring cloud streams to make it flexible.
I also tried to change the topics in case there was any problem with them being the same when producing and consuming but that’s not the case.
You need to sign in to view this answers