카테고리 없음

[Spring for Apache Kafka] Transactions

noahkim_ 2025. 5. 8. 14:03

1. 트랜잭션 활성화 방법

DefaultKafkaProducerFactory의 transactionIdPrefix 설정

  • 트랜잭션이 활성화됨
  • 트랜잭션 전용 Producer 캐시를 유지함
  • 각 프로듀서의 transactional.id는 transactionIdPrefix + 번호(n) 형식으로 생성됨
  • spring boot는 spring.kafka.producer.transaction-id-prefix 만 설정하면 됨

 

2. KafkaTransactionManager

  • Spring의 PlatformTransactionManager를 구현한 클래스
  • Spring 트랜잭션 지원 방식과 함께 사용이 가능함 (@Transactional, TransactionTemplate 등)
  • KafkaTemplate의 모든 작업은 트랜잭션 범위 안에서 같은 Producer를 사용함

 

3. Transaction Synchronization

  • Kafka 트랜잭션과 DB 트랜잭션을 동기화 할 수 있음
  • 즉, Kafka 전송과 DB 업데이트를 함께 트랜잭션으로 묶을 수 있음

 

커밋 순서

  • DB 트랜잭션 -> Kafka 트랜잭션 순으로 커밋됨
  • 순서를 바꾸려면 중첩된 @Transactional 사용 (내부 메서드를 KafkaTransactionManager를 사용하여 구성)

 

트랜잭션 실패시 

  • 전체 롤백 안됨 (DB 성공, Kafka 실패 시 DB는 롤백 안함)
  • 애플리케이션에서 보상 처리 해주어야 함
  • Kafka -> DB 순으로 수행하면 좀 더 처리하기 수월함
순서 결과 위험도
DB 먼저 커밋 → Kafka 커밋 Kafka 실패하면 DB 롤백 못 함 ❌ 위험 큼
Kafka 먼저 커밋 → DB 커밋 Kafka 실패하면 DB 시도조차 안 함
Kafka 롤백하면 됨
✅ 위험 작음

 

예시

더보기
@Service
public class KafkaDbFacadeService {

    private final KafkaService kafkaService;
    private final DbService dbService;

    public KafkaDbFacadeService(KafkaService kafkaService, DbService dbService) {
        this.kafkaService = kafkaService;
        this.dbService = dbService;
    }

    // 외부에서 호출할 메서드 (DB 트랜잭션 담당)
    @Transactional(transactionManager = "dbTransactionManager")
    public void process(String data) {
        kafkaService.sendKafka(data);  // Kafka 먼저 처리 (Kafka 트랜잭션)
        dbService.saveDb(data);        // 그 다음 DB 저장
    }
}
@Service
public class KafkaService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Transactional(transactionManager = "kafkaTransactionManager")
    public void sendKafka(String data) {
        kafkaTemplate.send("my-topic", data);
    }
}

 

4. KafkaTemplate Local Transactions

  • KafkaTemplate를 이용해 로컬 트랜잭션을 수행할 수 있음

예제

더보기
boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});
  • 콜백 안에서 KafkaTemplate을 이용해 여러 메시지를 보낼 수 있음 (새로운 중첩 트랜잭션 생성)
  • 콜백이 정상 종료되면 → 트랜잭션 커밋
    콜백 안에서 예외 발생하면 → 트랜잭션 롤백

 

5. TransactionIdPrefix

  • 트랜잭션 인스턴스별로 서로 다른 transactional.id를 사용
  • 각 애플리케이션 인스턴스는 서로 다른 transactional.id를 사용

 

 

 

출처