1. Basics
주요 구성 요소
구성 요소 | 설명 | 예시 |
StreamsBuilder | 스트림 처리 로직을 정의하는 객체 생성 | KStream, KTable 등 |
KafkaStreams | 스트림들을 실제로 실행하는 객체 | - kafka 클러스터와 연결 - 스트림 처리 시작 (토폴로지 실행) - 스트림 처리 종료 |
- KStream | 연속된 이벤트 스트림 - insert (append-only 로그 형식) |
카드 결제 기록, 서버 로그 등 |
- KTable | 상태 변경 기록 - update |
집계 테이블 등 |
동작 흐름
- StreamsBuilder: Stream 처리 로직 정의
- StreamsConfig: 설정 (Kafka 클러스터 정보, 기본 직렬화/역직렬화 설정, 보안 설정 등)
- KafkaStreams: 스트림 처리 시작
- KafkaStreams: 스트림 처리 종료
2. Spring Management
구성 요소 | 설명 | 주요 용도 및 특징 |
StreamsBuilderFactoryBean | StreamsBuilder를 스프링 빈으로 등록 KafkaStreams 인스턴스 생성 및 생명주기 관리 |
- Spring 컨텍스트와 통합 - SmartLifecycle로 자동 시작/종료 |
KafkaStreamsInfrastructureCustomizer | StreamsBuilder 커스터마이징 인터페이스 | - State Store 수동 추가 - Topology 확장 및 설정 조정 |
설정) StreamsBuilderFactoryBean
더보기
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("topic-name");
return stream;
}
설정) KafkaStreamsInfrastructureCustomizer
더보기
@Component
public class MyStreamsCustomizer implements KafkaStreamsInfrastructureCustomizer {
@Override
public void configureBuilder(StreamsBuilder builder) {
// 예시: In-memory Key-Value State Store 추가
StoreBuilder<?> myStore = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-state-store"),
org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.String()
);
builder.addStateStore(myStore);
}
@Override
public void configureTopology(Topology topology) {
// 여기서는 Topology 직접 조작할 수 있는데, 보통 configureBuilder만 많이 씀
}
}
3. KafkaStreams Micrometer Support
- Kafka Streams 인스턴스의 metrics 수집 (Micrometer 사용)
- 모니터링 도구와 통합 가능 (Prometheus, Datadog, New Relic 등)
설정
더보기
streamsBuilderFactoryBean.addListener(
new KafkaStreamsMicrometerListener(
meterRegistry, // Micrometer의 MeterRegistry 인스턴스
Collections.singletonList(new ImmutableTag("customTag", "customTagValue")) // 커스텀 태그
)
);
- Micrometer는 Kafka Streams의 KafkaStreamsMetrics를 사용해서 Kafka Streams의 모니터링 지표를 가져옴
4. Streams JSON Serialization and Deserialization
JsonSerde
- 직렬화/역직렬화를 담당하는 객체
- Kafka 토픽이나 state store에서 JSON 형식으로 데이터를 읽고 쓸 때 필요
- Serde<T> 구현체 사용 (내부적으로 JsonSerializer + JsonDeserializer를 사용)
예제) TradeSerde
더보기
public class TradeSerde extends Serdes.WrapperSerde<Trade>{
public TradeSerde(Serializer<Trade> serializer, Deserializer<Trade> deserializer) {
super(serializer, deserializer);
}
public static TradeSerde create(ObjectMapper objectMapper) {
return new TradeSerde(
new Serializer<Trade>() {
@Override
public byte[] serialize(String topic, Trade data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
},
new Deserializer<Trade>() {
@Override
public Trade deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Trade.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
}
}
streamsBuilder.stream("trade-input", Consumed.with(Serdes.String(), tradeSerde));
- key: String으로 직렬화/역직렬화
- value: Trade로 직렬화/역직렬화 (tradeSerde 사용)
5. KafkaStreamBrancher
- 조건 분기(branching)를 더 깔끔하고 가독성 좋게 처리할 수 있도록 해주는 클래스
- 각 조건과 후속 동작을 함수형으로 직접 연결 (method chaining 형태)
예제
더보기
KStream<String, String> sourceStream = builder.stream("source");
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
.defaultBranch(ks -> ks.to("C"))
.onTopOf(sourceStream); // ★ 여기서 stream에 분기 적용!
- onTopOf: sourceStream 지정
- branch: 조건에 맞으면 수행
- defaultBranch: 위의 브랜치에서 어떤 조건에도 해당하지 않을 경우 수행
6. Configuration
기본 설정
설정
더보기
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {
@Bean(name = "defaultKafkaStreamsConfig")
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
return new KafkaStreamsConfiguration(props);
}
}
@EnableKafkaStreams
- StreamsBuilderFactoryBean 자동 생성 및 세팅
KafkaStreamsConfiguration
- 이름 고정 필수: defaultKafkaStreamsConfig (스프링이 사용하도록 명시적으로 지정해줘야 함)
고급 설정
설정 항목 | 설명 | 기본값 |
closeTimeout() | .close(timeout) 호출 시 대기 시간 | 10초 |
leaveGroupOnClose() | 종료 시 consumer group에서 명시적으로 leave() 호출 여부 | FALSE |
cleanUp() | 인스턴스 중지 시, 상태와 토픽 정리 여부 | never (2.7 이후) |
예제) 고급 설정
더보기
@Configuration
public class KafkaStreamsCustomizer {
@Bean
public StreamsBuilderFactoryBeanConfigurer customConfigurer() {
return factoryBean -> {
// 1. 종료 시 최대 대기 시간 설정 (기본 10초 → 여기서는 20초)
factoryBean.setCloseTimeout(Duration.ofSeconds(20));
// 2. Kafka consumer group에서 명시적으로 leave() 호출하도록 설정
factoryBean.setLeaveGroupOnClose(true);
// 3. 상태 저장 디렉토리 정리 여부 설정
// - 시작 시에는 cleanUp() 실행
// - 종료 시에는 cleanUp() 실행 안 함
factoryBean.setCleanupConfig(
new CleanupConfig(CleanupConfig.ON_START, CleanupConfig.NOT_ON_STOP)
);
};
}
}
7. Header Enricher
일단 넘어감
8. MessagingProcessor
Spring Messaging
- 스프링 프레임워크 내에서 메시지 기반 프로그래밍을 위한 공통 추상화 계층
- 다양한 메시지 기반 시스템을 일관된 방식으로 처리할 수 있게 해줌
MessagingProcessor
- Kafka Streams에서 레코드를 Spring Messaging 기반 처리 흐름에 위임할 수 있게 해주는 Processor
- 메시지 기반 처리 로직을 Kafka Streams에서 직접 구현하지 않고, Spring Integration 플로우에 안전하게 위탁할 수 있음
구성 요소
구성 요소 | 설명 |
MessagingFunction |
Kafka 레코드를 Spring Message로 변환 및 처리하는 일을 담당하는 함수형 인터페이스
|
MessagingMessageConverter |
Kafka 레코드 ↔ Spring Message 간 데이터 변환 처리
(key, value, headers, metadata 포함) |
GatewayProxyFactoryBean | 스프링 게이트웨이 구성 도구 - MessagingFunction 인터페이스만 정의하여 해당 게이트웨이 구현체를 자동 생성해줌 - 생성된 게이트웨이 구현체를 Spring Integration flow와 연결해줌 |
Spring Integration Flow | 실제 메시지 처리 로직을 정의하는 구성 요소 |
예제
- Kafka Streams가 "input-topic"에서 레코드 수신
- MessagingMessageConverter를 통해 Kafka 레코드를 Spring Message<?>로 변환
- MessagingProcessor가 MessagingFunction.exchange() 실행
- Gateway가 Spring Integration Flow로 메시지 전송
- Flow가 메시지를 처리하고, 필요 시 응답 생성
- 다시 Kafka 레코드로 변환
예제
더보기
MessagingFunction 인터페이스 정의
public interface StompPushGateway {
Message<?> push(Message<?> message);
}
GatewayProxyFactoryBean 설정
@Bean
public GatewayProxyFactoryBean stompGateway() {
GatewayProxyFactoryBean factory = new GatewayProxyFactoryBean(StompPushGateway.class);
factory.setDefaultRequestChannelName("stomp.push.channel");
return factory;
}
- 메시지 수신 기본 채널 지정
- MessagingFunction 호출 시, Spring Integration Flow에 보내짐
- Spring Integration 도착 채널을 지정함
Spring Integration Flow 정의
@Bean
public IntegrationFlow stompPushFlow(SimpMessagingTemplate messagingTemplate) {
return IntegrationFlows.from("stomp.push.channel")
.handle(message -> {
String payload = (String) message.getPayload();
messagingTemplate.convertAndSend("/topic/out", payload); // STOMP 발송
})
.get();
}
- 해당 채널의 수신 메시지를 처리하는 로직을 정의함
Kafka Stream (MessagingProcessor)
@Bean
public KStream<String, String> kafkaStream(
StreamsBuilder builder,
ApplicationContext context
) {
StompPushGateway gateway = context.getBean(StompPushGateway.class);
MessagingFunction function = gateway::push;
MessagingMessageConverter converter = new MessagingMessageConverter();
builder.stream("input-topic")
.process(() -> new MessagingProcessor<>(function, converter)); // STOMP 전송 흐름 실행
return null;
}
9. Recovery from Deserialization Exceptions
일단 넘어감
10. Interactive Query Support
- stateful 애플리케이션에서 상태 저장소를 애플리케이션 실행 중에 외부에서 직접 조회할 수 있는 기능
- 활용 예: 현재 집계 상태, 특정 키의 값, 윈도우 집계 결과 등을 REST API로 외부에 노출.
KafkaStreamsInteractiveQueryService
- Spring Kafka에서 Interactive Query를 지원하는 핵심 API.
- 내부적으로 Kafka Streams의 Interactive Query API를 감싸는 Facade 역할.
- 이 Bean을 생성해두면 애플리케이션 내에서 state store 조회, 호스트 위치 조회 등을 할 수 있음.
예제
더보기
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore =
interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
- QueryableStoreTypes: 조회할 저장소의 타입
- 반환값: 읽기 전용 상태 저장소
조회 실패 대비: 재시도 설정
- 상태 저장소 조회 시 일시적인 이유로 실패할 수 있음 (예: 초기화 지연).
- Spring Kafka는 RetryTemplate을 통해 재시도 정책 설정 가능.
예제
더보기
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
- 최대 10회 재시도하며, 1초 간격으로 재시도.
분산 환경에서의 주의점
- Kafka Streams는 키-파티션 매핑을 기반으로 상태 저장소를 분산 관리함.
- 상태 저장소는 해당 키를 처리하는 인스턴스에만 존재함.
- 잘못된 인스턴스에서 retrieveQueryableStore()를 호출하면 해당 키에 대한 데이터는 조회되지 않음.
호스트 정보 조회
- 분산 환경에서 특정 키를 어느 인스턴스가 처리 중인지 확인하기
예제
더보기
HostInfo hostInfo = interactiveQueryService.getKafkaStreamsApplicationHostInfo(
"app-store", 12345, new IntegerSerializer());
- "app-store": 상태 저장소 이름
- 12345: 조회하려는 키
- IntegerSerializer: 키 직렬화 방식
→ 반환값: HostInfo (host:port 정보) → 그 인스턴스에 RPC로 요청 전송 가능
현재 인스턴스 확인
- 내가 요청하는 인스턴스가 그 키의 실제 담당자인지 확인하기
예제
더보기
HostInfo current = interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo();
- 현재 인스턴스의 HostInfo와 비교하여, 로컬에서 처리할지 원격으로 요청할지 결정할 수 있음.
'Spring > Spring for Apache Kafka' 카테고리의 다른 글
[Spring for Apache Kafka] Exactly Once Semantics (0) | 2025.05.08 |
---|---|
[Spring for Apache Kafka] Transactions (0) | 2025.05.08 |
[Spring for Apache Kafka] 3-3. Receiving Messages: Post-Processing (0) | 2025.05.07 |
[Spring for Apache Kafka] 3-2. Receiving Messages: @KafkaListener (0) | 2025.05.07 |
[Spring for Apache Kafka] 4. Application Events (0) | 2025.05.06 |