Spring/Spring for Apache Kafka

[Spring for Apache Kafka] 1. Basic

noahkim_ 2025. 5. 6. 15:00

1. Connecting to Kafka

Factory

구성 요소 역할
ProducerFactory 메시지 전송을 위한 팩토리 생성
ConsumerFactory 메시지 수신을 위한 팩토리 생성

 

FactoryListener

  • Factory의 Listener는 프로듀서/컨슈머가 만들어지거나 닫힐 때 호출하는 메서드를 정의한 인터페이스
  • 생성된 프로듀서/컨슈머의 bean id는 factory의 beanName + kafka client id 패턴으로 정의됩니다.

 

예제

더보기
public class MyProducerFactoryListener implements ProducerFactory.Listener<String, String> {
    @Override
    public void producerAdded(String id, Producer<String, String> producer) {
        System.out.println("Producer added: " + id);
    }

    @Override
    public void producerRemoved(String id, Producer<String, String> producer) {
        System.out.println("Producer removed: " + id);
    }
}

 

KafkaListenerEndpointRegistry

  • @KafkaListener로 등록된 리스너 컨테이너들을 관리하는 클래스 (생명주기, 시작, 중지 등)
  • 각 리스너는 MessageListenerContainer로 감싸져서 관리됨

 

예제

더보기
@KafkaListener(topics = "my_topic", groupId = "my_group")
public void listen(String message) {
    System.out.println("Received: " + message);
}

 

수동 관리

List<MessageListenerContainer> containers = kafkaListenerEndpointRegistry.getListenerContainers();
kafkaListenerEndpointRegistry.start(); // 모든 리스너 시작
kafkaListenerEndpointRegistry.stop(); // 모든 리스너 중지
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("listenerId");
kafkaListenerEndpointRegistry.destroy();

 

동적 커넥션 생성

  • 런타임에서 커넥션 생성 가능
  • 기존 연결을 유지함.
    • 클러스터 자체가 바뀐 경우, 기존 연결들을 삭제해주어야 함

 

예시) 커넥션 생성

더보기
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs);
producerFactory.setBootstrapServersSupplier(() -> "new-bootstrap-server:9092"});

 

예시) 기존 연결 종료

더보기

프로듀서 연결 종료

@Autowired
private DefaultKafkaProducerFactory<String, String> producerFactory;

public void resetProducer() {
    producerFactory.reset();
}

 

컨슈머 연결 종료

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public void restartConsumer() {
    // 모든 리스너 컨테이너를 가져옴
    kafkaListenerEndpointRegistry.getListenerContainers().forEach(container -> {
        container.stop();   // 먼저 끊고
        container.start();  // 다시 연결
    });
}

 

ABSwitchCluster

  • 두 개의 bootstrap server 셋을 미리 구성해놓고, 하나를 활성화하는 방식.
 
예시
더보기
@Component
public class ABSwitchCluster {

    @Autowired
    private DefaultKafkaProducerFactory<String, String> producerFactory;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    // A/B 클러스터 주소를 미리 저장
    private final String primaryBootstrapServers = "primary1:9092,primary2:9092";
    private final String secondaryBootstrapServers = "secondary1:9092,secondary2:9092";

    // 현재 active 서버
    private Supplier<String> bootstrapServersSupplier = () -> primaryBootstrapServers;

    public ABSwitchCluster() {}

    // Primary로 전환
    public void primary() {
        this.bootstrapServersSupplier = () -> primaryBootstrapServers;
        applyBootstrapServers();
    }

    // Secondary로 전환
    public void secondary() {
        this.bootstrapServersSupplier = () -> secondaryBootstrapServers;
        applyBootstrapServers();
    }

    private void applyBootstrapServers() {
        producerFactory.setBootstrapServersSupplier(() -> bootstrapServersSupplier.get());
        producerFactory.reset(); // Producer 연결 끊고 새로 연결

        kafkaListenerEndpointRegistry.getListenerContainers().forEach(container -> {
            container.stop();
            container.start();   // Consumer 연결 끊고 새로 연결
        });
    }
}
  • primary() 또는 secondary() 메서드를 호출해서 전환 가능.
  • 전환 후에는 프로듀서와 컨슈머를 초기화(Reset/Restart) 해야 연결이 새로 맺어짐.

 

2. Configuring Topics

기능 설명
NewTopic
Kafka에서 토픽을 정의하는 클래스.
KafkaAdmin
Spring Kafka에서 제공하는 고수준 관리 클래스
- Spring 컨텍스트에서 Kafka 클러스터 관리 작업을 처리.
AdminClient
Kafka의 원시 Java 클라이언트 API
- Kafka 클러스터와 직접 통신하여 더 세밀한 관리 작업을 수행.

 

예시) NewTopic

더보기
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);

 

설정) KafkaAdmin, AdminClient

더보기
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        return new KafkaAdmin(configs);
    }

    @Bean
    public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }
}

  

예제) AdminClient

더보기
@Service
@RequiredArgsConstructor
public class KafkaTopicService {

    private final AdminClient adminClient;

    // 토픽 생성
    public void createTopic(String topicName, int numPartitions, short replicationFactor) {
        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
        try {
            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
            System.out.println("토픽 생성 완료: " + topicName);
        } catch (ExecutionException | InterruptedException e) {
            // 예외 처리
        }
    }

    // 토픽 삭제
    public void deleteTopic(String topicName) {
        try {
            adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
            System.out.println("토픽 삭제 완료: " + topicName);
        } catch (ExecutionException | InterruptedException e) {
            // 예외 처리
        }
    }

    // 토픽 설정 변경
    public void alterTopicConfig(String topicName, Map<String, String> newConfigs) {
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        Collection<AlterConfigOp> alterConfigOps = new ArrayList<>();

        for (Map.Entry<String, String> entry : newConfigs.entrySet()) {
            alterConfigOps.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
        }

        try {
            adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps)).all().get();
            System.out.println("토픽 설정 변경 완료: " + topicName);
        } catch (ExecutionException | InterruptedException e) {
            // 예외 처리
        }
    }

    // 토픽의 현재 설정 조회
    public void describeTopicConfig(String topicName) {
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

        try {
            DescribeConfigsResult result = adminClient.describeConfigs(Collections.singletonList(topicResource));
            Map<ConfigResource, Config> configs = result.all().get();
            Config config = configs.get(topicResource);

            System.out.println("토픽 설정 정보: " + topicName);
            for (ConfigEntry entry : config.entries()) {
                System.out.println(entry.name() + " = " + entry.value());
            }
        } catch (ExecutionException | InterruptedException e) {
            // 예외 처리
        }
    }
}

 

출처