I’m encountering a ProducerFencedException while trying to synchronize events between Kafka and a PostgreSQL database using Spring Kafka. The events are sent to Kafka and the database updates successfully, and I can see the events in the Kafka topic when I use the read_uncommitted isolation level. However, the consumer doesn’t seem to consume messages with the read_committed isolation level.
Producer Configuration:
@Configuration
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String boostrapServers;
public Map<String, Object> producerConfig() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
return producerConfig;
}
@Bean
public <T> ProducerFactory<String, T> producerFactory() {
DefaultKafkaProducerFactory<String, T> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(producerConfig());
defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
return defaultKafkaProducerFactory;
}
@Bean
public <T> KafkaTemplate<String, T> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, Object> transactionEventProducerKafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
Consumer Configuration:
isolation.level=read_committed
Producer Event and Database Update Logic:
@Transactional
public void sendMessageToTopics(Map<Long, String> duplicateCampaignId,
List<CampaignSMS> campaignSMSList, SmsType smsType) {
if (smsType == SmsType.BULK) {
campaignSMSList.forEach(campaignSMS -> {
sendEvent(smsBulkTopicName, campaignSMS);
});
}
if (smsType == SmsType.PROFILE) {
campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
}
duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}
private void sendEvent(String topicName, CampaignSMS campaignSMS) {
try {
kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
throw new InternalException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error(e.getMessage());
throw new InternalException(e.getMessage());
}
}
The error I receive in the logs is:
ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
This error occurs when trying to commit the transaction in my scheduled task. It seems like multiple producers are using the same transactionalId, which leads to this fencing issue.I also tried dynamically generating the transactional.id using UUID.randomUUID() for each producer, but that approach didn’t work either, and the same error appears.(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id"+UUID.randomUUID() and defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id"+UUID.randomUUID());)
Here are my questions:
- How can I ensure that each producer instance has a unique
transactionalId? - What are best practices for managing Kafka transactions when
synchronizing with a database? - Is there a way to handle ProducerFencedException gracefully in such
a scenario?
Any help would be greatly appreciated! Thank you!
You need to sign in to view this answers
Leave feedback about this