이 문서는 해당 섹션의 일부분만 정리되었음.
1. Message Listeners
- Kafka에서 메시지를 처리하려면 리스너를 제공해야 함
인터페이스 | 설명 |
MessageListener | 개별 메시지 처리 |
AcknowledgingMessageListener | 메시지 처리 후 수동으로 오프셋을 커밋 |
ConsumerAwareMessageListener | 메시지 처리 시 Consumer 객체 접근 |
BatchMessageListener | 배치 메시지 처리 |
BatchAcknowledgingMessageListener | 배치 메시지 처리 후 수동 커밋 |
예제
더보기
MessageListener
@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
AcknowledgingMessageListener
@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println(record.value());
ack.acknowledge(); // 메시지를 처리한 후 수동으로 오프셋을 커밋
}
ConsumerAwareMessageListener
@KafkaListener(topics = "topic1")
public void onMessage(ConsumerRecord<String, String> record, Consumer<?, ?> consumer) {
System.out.println(record.value());
// Consumer 객체에 접근하여 추가 작업 가능
}
BatchMessageListener
@KafkaListener(topics = "topic1")
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> System.out.println(record.value()));
}
BatchAcknowledgingMessageListener
@KafkaListener(topics = "topic1")
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
records.forEach(record -> System.out.println(record.value()));
ack.acknowledge(); // 모든 메시지를 처리한 후 수동으로 오프셋을 커밋
}
2. Message Listener Containers
- Kafka Consumer 관리 컨테이너
- 네트워크에서 메시지를 poll()하고, 해당 메시지를 리스너에게 전달
- 하나의 컨슈머당 하나의 컨테이너로 관리함
항목 | 설명 |
트랜잭션 관리 |
Kafka Consumer가 메시지를 처리하는 동안 트랜잭션을 관리
- 메시지의 처리 상태에 따라 커밋 또는 롤백 수행 |
스레드 관리 |
여러 스레드를 활용하여 병렬로 메시지 처리
|
오류 처리 | 메시지 처리 중 오류가 발생하면 예외 처리로 변환함 - @MessageExceptionHandler: 리스너에서 발생한 예외를 처리합니다. |
재시도 |
메시지 처리 중 실패가 발생한 경우, 일정 횟수만큼 재시도를 시도합니다.
- 재시도 로직을 설정할 수 있습니다. |
Interceptor
- 트랜잭션이 시작되기 전, 호출됨 (interceptBeforeTx 설정 시, 트랜잭션 시작 후 호출)
구현체
종류 | 설명 |
RecordInterceptor |
단일 레코드를 처리하는 리스너용 인터셉터
- 리스너 호출 전, null 반환 시 리스너 호출 안 됨 - 리스너 호출 후, 예외 발생해도 호출 가능 |
BatchInterceptor | 배치 리스너용 인터셉터 |
CompositeRecordInterceptor | 여러 개의 interceptor를 체인처럼 연결할 수 있음. |
CompositeBatchInterceptor | 여러 개의 interceptor를 체인처럼 연결할 수 있음. |
유의 사항
- interceptor 안에서는 consumer의 position이나 커밋된 offset을 변경하는 작업을 하면 안 됨.
- 레코드를 변조할 경우에도 topic, partition, offset은 그대로 유지해야 함 (안 그러면 레코드 유실 가능).
DefaultKafkaConsumerFactory
- KafkaConsumer 객체를 생성하는 클래스
- KafkaListenerContainer나 @KafkaListener를 사용할 때 내부적으로 KafkaConsumer 인스턴스를 만들어줌
설정
더보기
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
return new DefaultKafkaConsumerFactory<>(props);
}
KafkaMessageListenerContainer
- 단일 스레드로 모든 토픽/파티션의 메시지를 수신
옵션
기능 | 설명 |
logContainerConfig |
컨테이너가 시작될 때 설정 요약을 INFO 레벨로 로깅.
|
commitLogLevel |
offset 커밋 관련 로그 레벨 지정 가능
|
missingTopicsFatal | 설정한 토픽이 존재하지 않으면 컨테이너가 시작 실패. |
authExceptionRetryInterval |
인증/인가 오류 발생 시 재시도 대기 시간 설정 가능
|
예제) 생성자
더보기
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
- ConsumerFactory와 ContainerProperties를 전달받아 생성
예제) 설정
더보기
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("test-topic");
containerProps.setCommitLogLevel(LogIfLevelEnabled.Level.INFO); // Offset 커밋 로그 레벨
containerProps.setLogContainerConfig(true); // 컨테이너 설정 요약 로그
containerProps.setMissingTopicsFatal(true); // 설정한 토픽이 없으면 시작 실패
containerProps.setAuthExceptionRetryInterval(5000); // 인증 오류 발생 시 5초 대기 후 재시도
return new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
}
ConcurrentMessageListenerContainer
- 메시지를 '동시에 여러 쓰레드(컨슈머)로' 받게 해주는 컴포넌트
- 다수의 KafkaMessageListenerContainer에 위임받음
항목 | 설명 |
Client ID 규칙 |
client.id가 설정되면 컨테이너 번호 추가됨
→ JMX 모니터링 시 Consumer MBean 이름이 유일해야 하기 때문. |
Metrics 제공 |
MessageListenerContainer는 KafkaConsumer.metrics() 기반 모니터링 지표 제공.
→ ConcurrentMessageListenerContainer는 여러 하위 컨테이너의 metrics를 Map으로 묶어서 제공. |
파티션 할당 전략 |
컨슈머 그룹 안에서 파티션을 자동 분배.
- 컨슈머 수 ≥ 파티션 수: 1:1 매핑 - 컨슈머 수 < 파티션 수: 일부 컨슈머가 여러 파티션을 담당. 주요 전략 - RangeAssignor: 연속 파티션 할당 - RoundRobinAssignor: 균등 분배 |
idleBetweenPolls |
컨슈머가 poll() 호출하고 다음 poll()까지 쉬는 시간
- poll 간격 = idleBetweenPolls 설정값과 max.poll.interval.ms 중 더 작은 값 으로 결정됨. |
TopicPartitionOffsets |
특정 토픽-파티션-오프셋을 수동 지정할 때 사용
ConcurrentMessageListenerContainer는 지정된 TopicPartitionOffset 리스트를 하위 컨테이너에 균등하게 분배함. |
설정
더보기
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
// 여기서 ContainerProperties의 기본 속성들 설정
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 수동 커밋 모드
factory.getContainerProperties().setIdleBetweenPolls(5000L); // 폴링 사이 대기 시간 5초
factory.getContainerProperties().setPollTimeout(3000L); // poll timeout
// 모든 컨테이너가 동일한 리스너 사용
factory.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received: " + record.value());
});
return factory;
}
설정) TopicPartitionOffsets
더보기
List<TopicPartitionOffset> offsets = Arrays.asList(
new TopicPartitionOffset("topic-name", 0, 1000L),
new TopicPartitionOffset("topic-name", 1, 2000L)
);
ContainerProperties containerProps = new ContainerProperties(offsets.toArray(new TopicPartitionOffset[0]));
containerProps.setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received: " + record.value());
});
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
container.start();
설정) 파티션 분배
더보기
범위 분배
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RangeAssignor
균등 분배
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
예외) 파티션 분배: 범위 분배
더보기
여러 토픽에 대한 파티션 분배
- 컨슈머 그룹이 여러 토픽을 구독하고, 각 토픽에 여러 개의 파티션이 있을 경우
- RangeAssignor는 파티션을 연속적인 범위로 묶어서 분배하려 함
- 이때, 파티션 수가 많은 토픽부터 소비자에게 할당되고 적은 토픽은 나중에 할당될 수 있음
Committing Offsets
enable.auto.commit
- true: kafka가 자동으로 오프셋 커밋
- false: 컨테이너가 직접 오프셋 커밋 제어 (AckMode에 따라 동작됨)
AckMode
AckMode | 설명 |
RECORD |
레코드 하나를 처리한 후 즉시 오프셋 커밋
|
BATCH |
poll()로 가져온 모든 레코드를 처리 완료 후 오프셋 커밋
|
TIME |
모든 레코드 처리 후, 마지막 커밋 이후 ackTime 시간이 지나면 커밋
|
COUNT |
모든 레코드 처리 후, 마지막 커밋 이후 ackCount개 레코드가 처리되었으면 커밋
|
COUNT_TIME |
TIME 또는 COUNT 둘 중 하나 조건 만족하면 커밋
|
MANUAL | 리스너가 acknowledge()를 호출해야 함 (호출 후 BATCH처럼 커밋) |
MANUAL_IMMEDIATE |
리스너가 acknowledge() 호출 즉시 커밋
|
syncCommits
- 컨테이너 옵션
- 오프셋 커밋을 동기 혹은 비동기 요청할 지 결정함
예제
더보기
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
// 동기 커밋 사용
factory.getContainerProperties().setSyncCommits(true);
// 비동기 커밋 사용하려면
// factory.getContainerProperties().setSyncCommits(false);
Acknowledgment 인터페이스
- 리스너가 직접 오프셋 커밋 제어를 하도록 하는 인터페이스
메서드 | 설명 |
acknowledge() |
성공한 레코드까지 정상 커밋
|
nack() |
지금까지 성공한 레코드까지만 커밋
실패한 레코드들은 다시 poll()로 재시도 (반드시 AckMode.MANUAL 설정 필요) |
예제
더보기
@KafkaListener(topics = "test-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String message, Acknowledgment acknowledgment) {
try {
System.out.println("처리할 메시지: " + message);
// 메시지 처리 성공
acknowledgment.acknowledge(); // 오프셋 수동 커밋
} catch (Exception e) {
// 메시지 처리 실패
System.err.println("메시지 처리 실패: " + e.getMessage());
// nack() 등을 사용할 수도 있음 (AckMode.MANUAL일 때)
}
}
출처