1. 트랜잭션 활성화 방법
DefaultKafkaProducerFactory의 transactionIdPrefix 설정
- 트랜잭션이 활성화됨
- 트랜잭션 전용 Producer 캐시를 유지함
- 각 프로듀서의 transactional.id는 transactionIdPrefix + 번호(n) 형식으로 생성됨
- spring boot는 spring.kafka.producer.transaction-id-prefix 만 설정하면 됨
2. KafkaTransactionManager
- Spring의 PlatformTransactionManager를 구현한 클래스
- Spring 트랜잭션 지원 방식과 함께 사용이 가능함 (@Transactional, TransactionTemplate 등)
- KafkaTemplate의 모든 작업은 트랜잭션 범위 안에서 같은 Producer를 사용함
3. Transaction Synchronization
- Kafka 트랜잭션과 DB 트랜잭션을 동기화 할 수 있음
- 즉, Kafka 전송과 DB 업데이트를 함께 트랜잭션으로 묶을 수 있음
커밋 순서
- DB 트랜잭션 -> Kafka 트랜잭션 순으로 커밋됨
- 순서를 바꾸려면 중첩된 @Transactional 사용 (내부 메서드를 KafkaTransactionManager를 사용하여 구성)
트랜잭션 실패시
- 전체 롤백 안됨 (DB 성공, Kafka 실패 시 DB는 롤백 안함)
- 애플리케이션에서 보상 처리 해주어야 함
- Kafka -> DB 순으로 수행하면 좀 더 처리하기 수월함
순서 | 결과 | 위험도 |
DB 먼저 커밋 → Kafka 커밋 | Kafka 실패하면 DB 롤백 못 함 | ❌ 위험 큼 |
Kafka 먼저 커밋 → DB 커밋 | Kafka 실패하면 DB 시도조차 안 함 Kafka 롤백하면 됨 |
✅ 위험 작음 |
예시
더보기
@Service
public class KafkaDbFacadeService {
private final KafkaService kafkaService;
private final DbService dbService;
public KafkaDbFacadeService(KafkaService kafkaService, DbService dbService) {
this.kafkaService = kafkaService;
this.dbService = dbService;
}
// 외부에서 호출할 메서드 (DB 트랜잭션 담당)
@Transactional(transactionManager = "dbTransactionManager")
public void process(String data) {
kafkaService.sendKafka(data); // Kafka 먼저 처리 (Kafka 트랜잭션)
dbService.saveDb(data); // 그 다음 DB 저장
}
}
@Service
public class KafkaService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Transactional(transactionManager = "kafkaTransactionManager")
public void sendKafka(String data) {
kafkaTemplate.send("my-topic", data);
}
}
4. KafkaTemplate Local Transactions
- KafkaTemplate를 이용해 로컬 트랜잭션을 수행할 수 있음
예제
더보기
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
- 콜백 안에서 KafkaTemplate을 이용해 여러 메시지를 보낼 수 있음 (새로운 중첩 트랜잭션 생성)
- 콜백이 정상 종료되면 → 트랜잭션 커밋
콜백 안에서 예외 발생하면 → 트랜잭션 롤백
5. TransactionIdPrefix
- 트랜잭션 인스턴스별로 서로 다른 transactional.id를 사용
- 각 애플리케이션 인스턴스는 서로 다른 transactional.id를 사용
출처