Kafka Broker์ 'coffee' ํ ํฝ์ ๋ฉ์์ง๋ฅผ ๊ตฌ๋ ํ์ฌ ์ฒ๋ฆฌํ๋ Consumer ์๋ฒ ์ ๋๋ค.
ํด๋น ๋ฉ์์ง(๋คํ์ฉ๊ธฐ์ ์์น ๋ฐ ์ํ ์ ๋ณด)๋ฅผ WebSocket ํต์ ์ ํตํด Client๋ก ์ค์๊ฐ์ผ๋ก ์ ๋ฌํฉ๋๋ค.
์๋์ ํ๊ฒฝ์ ๊ถ์ฅํฉ๋๋ค.
service | version |
---|---|
SpringBoot | v3.1.x |
Java | v17 |
kafka_2.13 | v3.6.0 |
spring-boot-starter-websocket | v3.1.0 |
@ConfigurationProperties("kafka")
@Data
public class KafkaProperties {
public static final String CONSUMER_GROUP = "coffee-group";
public static final String TOPIC = "coffee";
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
}
CONSUMER_GROUP
: Kafka Consumer์ ๊ทธ๋ฃน ์์ด๋๋ฅผ ๋ํ๋ด๋ ์์TOPIC
: Kafka Broker์์ ๊ตฌ๋ ํ๋ ํ ํฝbootstrapServers
: Kafka Broker์ ๋ถํธ์คํธ๋ฉ ์๋ฒ ์ฃผ์
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaProperties.CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
kafkaListenerContainerFactory()
: Kafka Listener Container ์์ฑ- Kafka Consumer๋
ConcurrentMessageListenerContainer
๋ฅผ ์ฌ์ฉํ์ฌ ๋์์ ์ฌ๋ฌ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋๋ก ์ค์
- Kafka Consumer๋
consumerFactory()
: Kafka Consumer ์์ฑconsumerConfigs()
: Kafka Consumer์ ๊ตฌ์ฑ ์์ฑ ์ค์
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/container-status")
.setAllowedOrigins("https://coffee-tree-front.web.app/", "http://localhost:3000")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
registerStompEndpoints()
: WebSocket ์๋ํฌ์ธํธ๋ฅผ ๋ฑ๋กaddEndpoint("/container-status")
: "/container-status" ๊ฒฝ๋ก๋ฅผ WebSocket ์๋ํฌ์ธํธ๋ก ๋ฑ๋กsetAllowedOrigins
: WebSocket ์ฐ๊ฒฐ์ ํ์ฉํ ์๋ณธ(Origin)์ ์ค์ withSockJS
: SockJS๋ฅผ ์ฌ์ฉํ๋๋ก ์ค์
- SockJS๋ WebSocket์ด ์ง์๋์ง ์๋ ํ๊ฒฝ์์ ๋์ฒด ์๋จ์ ์ ๊ณตํ๋ JavaScript ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋๋ค.
configureMessageBroker()
: ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ๊ตฌ์ฑํ๋ ์ญํenableSimpleBroker("/topic")
: "/topic"์ ๊ตฌ๋ ํ๋ ํด๋ผ์ด์ธํธ์๊ฒ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ ๊ฐ๋จํ ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ํ์ฑํsetApplicationDestinationPrefixes("/app")
: ํด๋ผ์ด์ธํธ์์ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ์ฌ์ฉํ ์ ํ๋ฆฌ์ผ์ด์ ํ๋ฆฌํฝ์ค๋ฅผ ์ค์ - ํด๋ผ์ด์ธํธ๋ "/app"์ ์ ๋์ฌ๋ก ์ฌ์ฉํ์ฌ ๋ฉ์์ง๋ฅผ ์๋ฒ๋ก ๋ณด๋ผ ์ ์์ต๋๋ค.
- ํ์ฌ ํ๋ก์ ํธ์์๋ ์ฌ์ฉ๋์ง ์๋ ์ค์ ์ ๋๋ค.