OiO.lk Blog java Kafka ProducerFencedException When Using Transactions with Spring Kafka and Database Synchronization
java

Kafka ProducerFencedException When Using Transactions with Spring Kafka and Database Synchronization


I’m encountering a ProducerFencedException while trying to synchronize events between Kafka and a PostgreSQL database using Spring Kafka. The events are sent to Kafka and the database updates successfully, and I can see the events in the Kafka topic when I use the read_uncommitted isolation level. However, the consumer doesn’t seem to consume messages with the read_committed isolation level.

Producer Configuration:

@Configuration
public class KafkaProducerConfig {
    @Value("${bootstrap.servers}")
    private String boostrapServers;

    public Map<String, Object> producerConfig() {
        Map<String, Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
        return producerConfig;
    }

    @Bean
    public <T> ProducerFactory<String, T> producerFactory() {
        DefaultKafkaProducerFactory<String, T> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfig());
        defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
        return defaultKafkaProducerFactory;
    }

    @Bean
    public <T> KafkaTemplate<String, T> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, Object> transactionEventProducerKafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

Consumer Configuration:

isolation.level=read_committed

Producer Event and Database Update Logic:

@Transactional
public void sendMessageToTopics(Map<Long, String> duplicateCampaignId,
                                List<CampaignSMS> campaignSMSList, SmsType smsType) {

    if (smsType == SmsType.BULK) {
        campaignSMSList.forEach(campaignSMS -> {
            sendEvent(smsBulkTopicName, campaignSMS);
        });
    }
    if (smsType == SmsType.PROFILE) {
        campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
    }

    duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
            updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
    customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}

private void sendEvent(String topicName, CampaignSMS campaignSMS) {
    try {
        kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        log.error(e.getMessage());
        Thread.currentThread().interrupt();
        throw new InternalException(e.getMessage());
    } catch (ExecutionException | TimeoutException e) {
        log.error(e.getMessage());
        throw new InternalException(e.getMessage());
    }
}

The error I receive in the logs is:

ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.

This error occurs when trying to commit the transaction in my scheduled task. It seems like multiple producers are using the same transactionalId, which leads to this fencing issue.I also tried dynamically generating the transactional.id using UUID.randomUUID() for each producer, but that approach didn’t work either, and the same error appears.(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id"+UUID.randomUUID() and defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id"+UUID.randomUUID());)

Here are my questions:

  1. How can I ensure that each producer instance has a unique
    transactionalId?
  2. What are best practices for managing Kafka transactions when
    synchronizing with a database?
  3. Is there a way to handle ProducerFencedException gracefully in such
    a scenario?

Any help would be greatly appreciated! Thank you!



You need to sign in to view this answers

Exit mobile version