LanguagePreferenceProducer.java

package com.maybeitssquid.kafkaguaranteeslab.producer;

import com.maybeitssquid.kafkaguaranteeslab.model.LanguagePreference;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

@Service
public class LanguagePreferenceProducer {

  private static final Logger log = LoggerFactory.getLogger(LanguagePreferenceProducer.class);
  public static final String TOPIC = "language-preferences";

  private final KafkaTemplate<String, LanguagePreference> kafkaTemplate;

  public LanguagePreferenceProducer(KafkaTemplate<String, LanguagePreference> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  @Retry(name = "languagePreferenceProducer")
  @CircuitBreaker(name = "languagePreferenceProducer", fallbackMethod = "publishFallback")
  public CompletableFuture<SendResult<String, LanguagePreference>> publish(
      LanguagePreference event) {
    log.info("Publishing event: customerId={}", event.customerId());
    return kafkaTemplate
        .send(TOPIC, event.customerId(), event)
        .whenComplete(
            (result, ex) -> {
              if (ex != null) {
                log.error("Failed to publish event: customerId={}", event.customerId(), ex);
              } else {
                log.info(
                    "Published event: customerId={} partition={} offset={}",
                    event.customerId(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
              }
            });
  }

  // Called by Resilience4j when the circuit is open
  public CompletableFuture<SendResult<String, LanguagePreference>> publishFallback(
      LanguagePreference event, Throwable t) {
    log.warn(
        "Circuit open — dropping event to dead-letter store: customerId={} reason={}",
        event.customerId(),
        t.getMessage());
    return CompletableFuture.failedFuture(t);
  }
}