Spring/Spring for Apache Kafka

[Spring for Apache kafka] Apache Kafka Streams Support

noahkim_ 2025. 5. 9. 13:25

1. Basics

주요 구성 요소

구성 요소 설명 예시
StreamsBuilder 스트림 처리 로직을 정의하는 객체 생성 KStream, KTable 등
KafkaStreams 스트림들을 실제로 실행하는 객체 - kafka 클러스터와 연결
- 스트림 처리 시작 (토폴로지 실행)
- 스트림 처리 종료
- KStream 연속된 이벤트 스트림 
- insert (append-only 로그 형식)
카드 결제 기록, 서버 로그 등
- KTable 상태 변경 기록
- update
집계 테이블 등

 

동작 흐름

  1. StreamsBuilder: Stream 처리 로직 정의
  2. StreamsConfig: 설정 (Kafka 클러스터 정보, 기본 직렬화/역직렬화 설정, 보안 설정 등)
  3. KafkaStreams: 스트림 처리 시작
  4. KafkaStreams: 스트림 처리 종료

 

2. Spring Management

구성 요소 설명 주요 용도 및 특징
StreamsBuilderFactoryBean StreamsBuilder를 스프링 빈으로 등록
KafkaStreams 인스턴스 생성 및 생명주기 관리
- Spring 컨텍스트와 통합
- SmartLifecycle로 자동 시작/종료
KafkaStreamsInfrastructureCustomizer StreamsBuilder 커스터마이징 인터페이스 - State Store 수동 추가
- Topology 확장 및 설정 조정

 

설정) StreamsBuilderFactoryBean

더보기
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream("topic-name");
    return stream;
}

 

설정) KafkaStreamsInfrastructureCustomizer

더보기
@Component
public class MyStreamsCustomizer implements KafkaStreamsInfrastructureCustomizer {

    @Override
    public void configureBuilder(StreamsBuilder builder) {
        // 예시: In-memory Key-Value State Store 추가
        StoreBuilder<?> myStore = Stores.keyValueStoreBuilder(
                Stores.inMemoryKeyValueStore("my-state-store"),
                org.apache.kafka.common.serialization.Serdes.String(),
                org.apache.kafka.common.serialization.Serdes.String()
        );
        builder.addStateStore(myStore);
    }

    @Override
    public void configureTopology(Topology topology) {
        // 여기서는 Topology 직접 조작할 수 있는데, 보통 configureBuilder만 많이 씀
    }
}

 

3. KafkaStreams Micrometer Support

  • Kafka Streams 인스턴스의 metrics 수집 (Micrometer 사용)
  • 모니터링 도구와 통합 가능 (Prometheus, Datadog, New Relic 등)

 

설정

더보기
streamsBuilderFactoryBean.addListener(
    new KafkaStreamsMicrometerListener(
        meterRegistry, // Micrometer의 MeterRegistry 인스턴스
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue")) // 커스텀 태그
    )
);
  • Micrometer는 Kafka Streams의 KafkaStreamsMetrics를 사용해서 Kafka Streams의 모니터링 지표를 가져옴

 

4. Streams JSON Serialization and Deserialization

JsonSerde

  • 직렬화/역직렬화를 담당하는 객체
  • Kafka 토픽이나 state store에서 JSON 형식으로 데이터를 읽고 쓸 때 필요
  • Serde<T> 구현체 사용 (내부적으로 JsonSerializer + JsonDeserializer를 사용)

 

예제) TradeSerde

더보기
public class TradeSerde extends Serdes.WrapperSerde<Trade>{

    public TradeSerde(Serializer<Trade> serializer, Deserializer<Trade> deserializer) {
        super(serializer, deserializer);
    }

    public static TradeSerde create(ObjectMapper objectMapper) {
        return new TradeSerde(
                new Serializer<Trade>() {
                    @Override
                    public byte[] serialize(String topic, Trade data) {
                        try {
                            return objectMapper.writeValueAsBytes(data);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new Deserializer<Trade>() {
                    @Override
                    public Trade deserialize(String topic, byte[] data) {
                        try {
                            return objectMapper.readValue(data, Trade.class);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
        );
    }
}
streamsBuilder.stream("trade-input", Consumed.with(Serdes.String(), tradeSerde));
  • key: String으로 직렬화/역직렬화
  • value: Trade로 직렬화/역직렬화 (tradeSerde 사용)

 

5. KafkaStreamBrancher

  • 조건 분기(branching)를 더 깔끔하고 가독성 좋게 처리할 수 있도록 해주는 클래스
  • 각 조건과 후속 동작을 함수형으로 직접 연결 (method chaining 형태)

 

예제

더보기
KStream<String, String> sourceStream = builder.stream("source");

new KafkaStreamBrancher<String, String>()
    .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
    .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
    .defaultBranch(ks -> ks.to("C"))
    .onTopOf(sourceStream); // ★ 여기서 stream에 분기 적용!
  • onTopOf: sourceStream 지정
  • branch: 조건에 맞으면 수행
  • defaultBranch: 위의 브랜치에서 어떤 조건에도 해당하지 않을 경우 수행

 

6. Configuration

기본 설정

 

설정

더보기
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {

    @Bean(name = "defaultKafkaStreamsConfig")
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
        return new KafkaStreamsConfiguration(props);
    }
}

@EnableKafkaStreams

  • StreamsBuilderFactoryBean 자동 생성 및 세팅

KafkaStreamsConfiguration

  • 이름 고정 필수: defaultKafkaStreamsConfig (스프링이 사용하도록 명시적으로 지정해줘야 함)

 

고급 설정

설정 항목 설명 기본값
closeTimeout() .close(timeout) 호출 시 대기 시간 10초
leaveGroupOnClose() 종료 시 consumer group에서 명시적으로 leave() 호출 여부 FALSE
cleanUp() 인스턴스 중지 시, 상태와 토픽 정리 여부 never (2.7 이후)

 

예제) 고급 설정

더보기
@Configuration
public class KafkaStreamsCustomizer {

    @Bean
    public StreamsBuilderFactoryBeanConfigurer customConfigurer() {
        return factoryBean -> {
            // 1. 종료 시 최대 대기 시간 설정 (기본 10초 → 여기서는 20초)
            factoryBean.setCloseTimeout(Duration.ofSeconds(20));

            // 2. Kafka consumer group에서 명시적으로 leave() 호출하도록 설정
            factoryBean.setLeaveGroupOnClose(true);

            // 3. 상태 저장 디렉토리 정리 여부 설정
            // - 시작 시에는 cleanUp() 실행
            // - 종료 시에는 cleanUp() 실행 안 함
            factoryBean.setCleanupConfig(
                new CleanupConfig(CleanupConfig.ON_START, CleanupConfig.NOT_ON_STOP)
            );
        };
    }
}

 

7. Header Enricher

일단 넘어감

 

8. MessagingProcessor

Spring Messaging

  • 스프링 프레임워크 내에서 메시지 기반 프로그래밍을 위한 공통 추상화 계층
  • 다양한 메시지 기반 시스템을 일관된 방식으로 처리할 수 있게 해줌

 

MessagingProcessor

  • Kafka Streams에서 레코드를 Spring Messaging 기반 처리 흐름에 위임할 수 있게 해주는 Processor
  • 메시지 기반 처리 로직을 Kafka Streams에서 직접 구현하지 않고, Spring Integration 플로우에 안전하게 위탁할 수 있음

 

구성 요소

구성 요소 설명
MessagingFunction
Kafka 레코드를 Spring Message로 변환 및 처리하는 일을 담당하는 함수형 인터페이스
MessagingMessageConverter
Kafka 레코드 ↔ Spring Message 간 데이터 변환 처리
(key, value, headers, metadata 포함)
GatewayProxyFactoryBean 스프링 게이트웨이 구성 도구
- MessagingFunction 인터페이스만 정의하여 해당 게이트웨이 구현체를 자동 생성해줌
- 생성된 게이트웨이 구현체를 Spring Integration flow와 연결해줌
Spring Integration Flow 실제 메시지 처리 로직을 정의하는 구성 요소

 

 

예제

  1. Kafka Streams가 "input-topic"에서 레코드 수신
  2. MessagingMessageConverter를 통해 Kafka 레코드를 Spring Message<?>로 변환
  3. MessagingProcessor가 MessagingFunction.exchange() 실행
  4. Gateway가 Spring Integration Flow로 메시지 전송
  5. Flow가 메시지를 처리하고, 필요 시 응답 생성
  6. 다시 Kafka 레코드로 변환

 

예제

더보기

MessagingFunction 인터페이스 정의

public interface StompPushGateway {
    Message<?> push(Message<?> message);
}

 

GatewayProxyFactoryBean 설정

@Bean
public GatewayProxyFactoryBean stompGateway() {
    GatewayProxyFactoryBean factory = new GatewayProxyFactoryBean(StompPushGateway.class);
    factory.setDefaultRequestChannelName("stomp.push.channel");
    return factory;
}
  • 메시지 수신 기본 채널 지정
    • MessagingFunction 호출 시, Spring Integration Flow에 보내짐
    • Spring Integration 도착 채널을 지정함

 

Spring Integration Flow 정의

@Bean
public IntegrationFlow stompPushFlow(SimpMessagingTemplate messagingTemplate) {
    return IntegrationFlows.from("stomp.push.channel")
        .handle(message -> {
            String payload = (String) message.getPayload();
            messagingTemplate.convertAndSend("/topic/out", payload); // STOMP 발송
        })
        .get();
}
  • 해당 채널의 수신 메시지를 처리하는 로직을 정의함

 

Kafka Stream (MessagingProcessor)

@Bean
public KStream<String, String> kafkaStream(
    StreamsBuilder builder,
    ApplicationContext context
) {
    StompPushGateway gateway = context.getBean(StompPushGateway.class);
    MessagingFunction function = gateway::push;
    MessagingMessageConverter converter = new MessagingMessageConverter();

    builder.stream("input-topic")
        .process(() -> new MessagingProcessor<>(function, converter)); // STOMP 전송 흐름 실행

    return null;
}

 

9. Recovery from Deserialization Exceptions

일단 넘어감

 

10. Interactive Query Support

  • stateful 애플리케이션에서 상태 저장소를 애플리케이션 실행 중에 외부에서 직접 조회할 수 있는 기능
  • 활용 예: 현재 집계 상태, 특정 키의 값, 윈도우 집계 결과 등을 REST API로 외부에 노출.

 

KafkaStreamsInteractiveQueryService

  • Spring Kafka에서 Interactive Query를 지원하는 핵심 API.
  • 내부적으로 Kafka Streams의 Interactive Query API를 감싸는 Facade 역할.
  • 이 Bean을 생성해두면 애플리케이션 내에서 state store 조회, 호스트 위치 조회 등을 할 수 있음.

 

예제

더보기
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
        StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object> appStore =
    interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
  • QueryableStoreTypes: 조회할 저장소의 타입
  • 반환값: 읽기 전용 상태 저장소

 

 

조회 실패 대비: 재시도 설정

  • 상태 저장소 조회 시 일시적인 이유로 실패할 수 있음 (예: 초기화 지연).
  • Spring Kafka는 RetryTemplate을 통해 재시도 정책 설정 가능.

 

예제

더보기
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}
  • 최대 10회 재시도하며, 1초 간격으로 재시도.

 

분산 환경에서의 주의점

  • Kafka Streams는 키-파티션 매핑을 기반으로 상태 저장소를 분산 관리함.
  • 상태 저장소는 해당 키를 처리하는 인스턴스에만 존재함.
  • 잘못된 인스턴스에서 retrieveQueryableStore()를 호출하면 해당 키에 대한 데이터는 조회되지 않음.

 

호스트 정보 조회

  • 분산 환경에서 특정 키를 어느 인스턴스가 처리 중인지 확인하기

 

예제

더보기
HostInfo hostInfo = interactiveQueryService.getKafkaStreamsApplicationHostInfo(
    "app-store", 12345, new IntegerSerializer());
  • "app-store": 상태 저장소 이름
  • 12345: 조회하려는 키
  • IntegerSerializer: 키 직렬화 방식

→ 반환값: HostInfo (host:port 정보) → 그 인스턴스에 RPC로 요청 전송 가능

 

현재 인스턴스 확인

  • 내가 요청하는 인스턴스가 그 키의 실제 담당자인지 확인하기

 

예제

더보기
HostInfo current = interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo();
  • 현재 인스턴스의 HostInfo와 비교하여, 로컬에서 처리할지 원격으로 요청할지 결정할 수 있음.