Spring/Spring for Apache Kafka
[Spring for Apache Kafka] 1. Basic
noahkim_
2025. 5. 6. 15:00
1. Connecting to Kafka
Factory
구성 요소 | 역할 |
ProducerFactory | 메시지 전송을 위한 팩토리 생성 |
ConsumerFactory | 메시지 수신을 위한 팩토리 생성 |
FactoryListener
- Factory의 Listener는 프로듀서/컨슈머가 만들어지거나 닫힐 때 호출하는 메서드를 정의한 인터페이스
- 생성된 프로듀서/컨슈머의 bean id는 factory의 beanName + kafka client id 패턴으로 정의됩니다.
예제
더보기
public class MyProducerFactoryListener implements ProducerFactory.Listener<String, String> {
@Override
public void producerAdded(String id, Producer<String, String> producer) {
System.out.println("Producer added: " + id);
}
@Override
public void producerRemoved(String id, Producer<String, String> producer) {
System.out.println("Producer removed: " + id);
}
}
KafkaListenerEndpointRegistry
- @KafkaListener로 등록된 리스너 컨테이너들을 관리하는 클래스 (생명주기, 시작, 중지 등)
- 각 리스너는 MessageListenerContainer로 감싸져서 관리됨
예제
더보기
@KafkaListener(topics = "my_topic", groupId = "my_group")
public void listen(String message) {
System.out.println("Received: " + message);
}
수동 관리
List<MessageListenerContainer> containers = kafkaListenerEndpointRegistry.getListenerContainers();
kafkaListenerEndpointRegistry.start(); // 모든 리스너 시작
kafkaListenerEndpointRegistry.stop(); // 모든 리스너 중지
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("listenerId");
kafkaListenerEndpointRegistry.destroy();
동적 커넥션 생성
- 런타임에서 커넥션 생성 가능
- 기존 연결을 유지함.
- 클러스터 자체가 바뀐 경우, 기존 연결들을 삭제해주어야 함
예시) 커넥션 생성
더보기
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs);
producerFactory.setBootstrapServersSupplier(() -> "new-bootstrap-server:9092"});
예시) 기존 연결 종료
더보기
프로듀서 연결 종료
@Autowired
private DefaultKafkaProducerFactory<String, String> producerFactory;
public void resetProducer() {
producerFactory.reset();
}
컨슈머 연결 종료
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void restartConsumer() {
// 모든 리스너 컨테이너를 가져옴
kafkaListenerEndpointRegistry.getListenerContainers().forEach(container -> {
container.stop(); // 먼저 끊고
container.start(); // 다시 연결
});
}
ABSwitchCluster
- 두 개의 bootstrap server 셋을 미리 구성해놓고, 하나를 활성화하는 방식.
예시
더보기
@Component
public class ABSwitchCluster {
@Autowired
private DefaultKafkaProducerFactory<String, String> producerFactory;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
// A/B 클러스터 주소를 미리 저장
private final String primaryBootstrapServers = "primary1:9092,primary2:9092";
private final String secondaryBootstrapServers = "secondary1:9092,secondary2:9092";
// 현재 active 서버
private Supplier<String> bootstrapServersSupplier = () -> primaryBootstrapServers;
public ABSwitchCluster() {}
// Primary로 전환
public void primary() {
this.bootstrapServersSupplier = () -> primaryBootstrapServers;
applyBootstrapServers();
}
// Secondary로 전환
public void secondary() {
this.bootstrapServersSupplier = () -> secondaryBootstrapServers;
applyBootstrapServers();
}
private void applyBootstrapServers() {
producerFactory.setBootstrapServersSupplier(() -> bootstrapServersSupplier.get());
producerFactory.reset(); // Producer 연결 끊고 새로 연결
kafkaListenerEndpointRegistry.getListenerContainers().forEach(container -> {
container.stop();
container.start(); // Consumer 연결 끊고 새로 연결
});
}
}
- primary() 또는 secondary() 메서드를 호출해서 전환 가능.
- 전환 후에는 프로듀서와 컨슈머를 초기화(Reset/Restart) 해야 연결이 새로 맺어짐.
2. Configuring Topics
기능 | 설명 |
NewTopic |
Kafka에서 토픽을 정의하는 클래스.
|
KafkaAdmin |
Spring Kafka에서 제공하는 고수준 관리 클래스
- Spring 컨텍스트에서 Kafka 클러스터 관리 작업을 처리. |
AdminClient |
Kafka의 원시 Java 클라이언트 API
- Kafka 클러스터와 직접 통신하여 더 세밀한 관리 작업을 수행. |
예시) NewTopic
더보기
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
설정) KafkaAdmin, AdminClient
더보기
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
}
예제) AdminClient
더보기
@Service
@RequiredArgsConstructor
public class KafkaTopicService {
private final AdminClient adminClient;
// 토픽 생성
public void createTopic(String topicName, int numPartitions, short replicationFactor) {
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
try {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
System.out.println("토픽 생성 완료: " + topicName);
} catch (ExecutionException | InterruptedException e) {
// 예외 처리
}
}
// 토픽 삭제
public void deleteTopic(String topicName) {
try {
adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
System.out.println("토픽 삭제 완료: " + topicName);
} catch (ExecutionException | InterruptedException e) {
// 예외 처리
}
}
// 토픽 설정 변경
public void alterTopicConfig(String topicName, Map<String, String> newConfigs) {
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Collection<AlterConfigOp> alterConfigOps = new ArrayList<>();
for (Map.Entry<String, String> entry : newConfigs.entrySet()) {
alterConfigOps.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
}
try {
adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps)).all().get();
System.out.println("토픽 설정 변경 완료: " + topicName);
} catch (ExecutionException | InterruptedException e) {
// 예외 처리
}
}
// 토픽의 현재 설정 조회
public void describeTopicConfig(String topicName) {
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
try {
DescribeConfigsResult result = adminClient.describeConfigs(Collections.singletonList(topicResource));
Map<ConfigResource, Config> configs = result.all().get();
Config config = configs.get(topicResource);
System.out.println("토픽 설정 정보: " + topicName);
for (ConfigEntry entry : config.entries()) {
System.out.println(entry.name() + " = " + entry.value());
}
} catch (ExecutionException | InterruptedException e) {
// 예외 처리
}
}
}
출처