LanguagePreferenceConsumer.java

package com.maybeitssquid.kafkaguaranteeslab.consumer;

import com.maybeitssquid.kafkaguaranteeslab.model.LanguagePreference;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class LanguagePreferenceConsumer {

  private static final Logger log = LoggerFactory.getLogger(LanguagePreferenceConsumer.class);

  /**
   * Consumes language preference events with manual acknowledgment. The offset is committed only
   * after process() succeeds, guaranteeing at-least-once delivery. Resilience4j retries wrap the
   * downstream call.
   */
  @KafkaListener(
      topics = "language-preferences",
      groupId = "${spring.kafka.consumer.group-id}",
      containerFactory = "kafkaListenerContainerFactory")
  public void onMessage(ConsumerRecord<String, LanguagePreference> record, Acknowledgment ack) {
    LanguagePreference event = record.value();
    log.info(
        "Received event: customerId={} partition={} offset={}",
        event.customerId(),
        record.partition(),
        record.offset());
    try {
      process(event);
      ack.acknowledge(); // commit offset only on success
    } catch (Exception ex) {
      log.error(
          "Processing failed — will be retried by error handler: customerId={}",
          event.customerId(),
          ex);
      // Do NOT ack; DefaultErrorHandler in KafkaConfig will retry then route to DLT
      throw ex;
    }
  }

  @Retry(name = "languagePreferenceConsumer")
  @CircuitBreaker(name = "languagePreferenceConsumer")
  public void process(LanguagePreference event) {
    // TODO: replace with real downstream call (DB write, HTTP call, etc.)
    log.info(
        "Processing language preference: customerId={} locale={}",
        event.customerId(),
        event.preferredLanguage());
  }
}