Apache Kafka
- ์คํ์์ค ๋ฉ์์ง ๋ธ๋ก์ปค ํ๋ก์ ํธ
- ์ค์๊ฐ ๋ฐ์ดํฐ ํผ๋๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํด ํต์ผ๋ ๋์ ์ฒ๋ฆฌ๋, ๋ฎ์ ์ง์ฐ ์๊ฐ์ ์ง๋ ํ๋ซํผ ์ ๊ณต
Kafka๊ฐ ๋์ค๊ฒ ๋ ๋ฐฐ๊ฒฝ
๋ฐ์ดํฐ๊ฐ ๋ง์๋ ํ์ฅ์ด ์ฉ์ดํ ์์คํ
Kafka Broker
- ์คํ๋ kafka ์๋ฒ
- 3๋ ์ด์์ ๋ธ๋ก์ปค ํด๋ฌ์คํฐ ๊ตฌ์ฑ
- ์ค์ ์ฝ๋๋ค์ดํฐ๋ก 'Zookeeper' ์ฐ๋
- ๋ฉํ๋ฐ์ดํฐ (Broker ID, Controller ID ์ ์ฅ)
- ๋ฆฌ๋ ๋ธ๋ก์ปค ๋ฌธ์ ๋ฐ์ ์ ์ฅ์ ๋์ํ๋ ์ญํ
๋๊ฐ ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ ์๋นํ๋์ง ์ ๊ฒฝ์ฐ์ง์๊ณ ๋ฉ์์ง๋ฅผ ๊ตํํ๋ค.
- kafka client
JAVA, Python, Go, C++ ๋ฑ ๋ง์ ์ธ์ด๋ฅผ ์ง์ํ๋ค.
Kafka ์คํํด๋ณด๊ธฐ
Zookeeper ์คํ
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Kafka ์๋ฒ ๊ตฌ๋
$KAFKA_HOME/bin/kafka-server-start.sh ./config/server.properties
ํ ํฝ ์์ฑ -> ํ ๋ฏน ๋ชฉ๋ก ํ์ธ -> ํ ํฝ ์ ๋ณด ํ์ธ
*ํ ํฝ ์์ฑ
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1
*ํ ํฝ ๋ชฉ๋ก ํ์ธ
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:9092
๋ฉ์์ง ์์ฐ&์๋น
*๋ฉ์์ง ์์ฐ (producer)
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic quickstart-events
*๋ฉ์์ง ์๋น (consumer)
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic quickstart-events --from-beginning
producer์์ ๋ฐํํ ๋ฉ์์ง๊ฐ consumer์์ ์๋น๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Kafka Connect
- ์ฝ๋ ์์ด Configuration์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ด๋
- RESTful API๋ฅผ ํตํด ์ง์
- Stream ๋๋ Batchํํ๋ก ๋ฐ์ดํฐ ์ ์ก ๊ฐ๋ฅ
- ์ปค์คํ Connertor๋ฅผ ํตํ ๋ค์ํ Plugin ์ ๊ณต
- ์๋ก ๋ค๋ฅธ ์์คํ ์ ๋ฐ์ดํฐ๋ฅผ kafka connect, cluster, sink๋ฅผ ํตํด ์ด๋๊ฐ๋ฅํ๋ค.
confluent์์ kafka-connect ์ค์นํ์ฌ ์คํ.
connectํ๋กํผํฐ ํ์ผ ์ค์ ์
plugin.path=$KAFKA_JDBC_HOME/lib ์ค์ ํ์ฌ jdbc ์ฌ์ฉํ ์ ์๋๋ก ์ค์ (jdbc)
$KAFKA_CONNECT_HOME/share/java/kafka ๋๋ ํ ๋ฆฌ์ mysql-connector-java-version.jar ํ์ผ์ ๋ณต์ฌํด๋๋ค. (mysql)
Kafka Source Connect ์ฌ์ฉ
echo '
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
localhost:8083 (์นดํ์นด ์ปค๋ฅํฐ)๊ฐ ์ ๊ณตํ๋ ์๋ํฌ์ธํธ API๋ฅผ ํ์ฉํ์ฌ ์๋ก์ด source connector๋ฅผ ๋ฑ๋กํ ์ ์๋ค.
์ปค๋ฅํธ ์ด๋ฆ, ์ปค๋ฅํฐํด๋์ค, url์ ๋ณด, ์ฆ๊ฐ์ปฌ๋ผ ๋ฑ์ด ์ ๊ณต๋๋ค.
whitelist -> ๊ฐ์ง ๋์ ํ ์ด๋ธ
MySql ๋ฐ์ดํฐ ์ถ๊ฐ
- users ์ถ๊ฐ ๋ฐ์ดํฐ
insert into users(user_id, pwd, name) value('user1', 'test1111', 'User name');
users ํ ์ด๋ธ์์ ๋ณ๊ฒฝ์ฌํญ์ด ์๊ธฐ๋ฉด ์ต์ด์๋ topic์ด ์๊ธฐ๋ฉด์ ๋ฐ์ดํฐ๋ฅผ publishํ๋ค. (๋ฐ์ดํฐ๋ฅผ ํ ํฝ์ ์์)
consumer์์๋ ์ด ๋ฐ์ดํฐ๋ฅผ ๋ฐ์๋ณผ ์ ์๋ค.
- consumer์์ ๋ฐ๋ ๋ฐ์ดํฐ
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "user1",
"pwd": "test1111",
"name": "User name",
"created_at": 1646540630000
}
}
Kafka Sink Connect ์ฌ์ฉ
ํ ํฝ์ ์ ๋ฌ๋ ๋ฐ์ดํฐ๋ฅผ sink๋ก ๊ฐ์ ธ์์ ์ฌ์ฉํ๋ค.
* ์ฐ๊ฒฐํ ์ ๋ณด๋ฅผ ์ค์ ํ๋ค.
* ํ ํฝ์ ์ง์ ํด ์ฌ์ฉํ๋ ํ ํฝ๊ณผ ๊ฐ์ ์คํค๋ง์ ๋ฐ์ดํฐ์ ํ ์ด๋ธ์ ์๋ ์์ฑํ๋ค.
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "test1357",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "my_topic_users"
}
}
usersํ ์ด๋ธ์ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํด๋ณด์.
insert into users(user_id, pwd, name) value('admin1234', 'admin1234', 'SUPER_ADMIN');
ํ ํฝ์ ๋ฐ์ดํฐ๊ฐ ๋ณด๋ด์ง๊ณ ์ด๋ฅผ ์ธ์งํ sink๊ฐ my_topic_usersํ ์ด๋ธ๋ ์ ๋ฐ์ดํธํ๋ค.
Console๋ก ๋ฐ์ดํฐ ๋ฐํ
ํ ํฝ ๋ฐํ ์์๋ sink์ ์ฐ๊ฒฐ๋ my_topic_users์๋ง ๋ฐ์ดํฐ๊ฐ ์ถ๊ฐ๋๊ณ users์๋ ์ถ๊ฐ๋์ง ์๋๋ค.
Source๋ userํ ์ด๋ธ ๋ณ๊ฒฝ ๋ฐ์ ์ ํ ํฝ์ โ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋ ์ญํ ์ด๊ธฐ ๋๋ฌธ์ด๋ค.
์ค์ ์๋น์ค ๋ฐ์ดํฐ ๋๊ธฐํ ์ ์ฉํ๊ธฐ
๋ฐ์ดํฐ ๋๊ธฐํ Order -> Catalogs
- Orders Service์ ์์ฒญ ๋ ์ฃผ๋ฌธ ์๋ ์ ๋ณด๋ฅผ Catalogs Service์ ๋ฐ์
- Order์๋น์ค์์ kafka Topic์ผ๋ก ๋ฉ์์ง ์ ์ก (Producer)
- Catalog์๋น์ค์์ kafka Topic์ผ๋ก ๋ฉ์์ง ์ทจ๋ (Consumer)
๊ฐ๊ฐ์ ์๋น์ค๊ฐ ์๋ก ๋ค๋ฅธ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๊ฐ์ง๊ณ ์์ผ๋ฏ๋ก ์ฃผ๋ฌธ์ด ๋ฐ์ํ๋ฉด ์ํ์ ์๋์ ๋ณด๋ฅผ ๋๊ธฐํํด์ค์ผํ๋ค.
Catalog - ์นดํ์นด ์ปจ์๋จธ ๋น ๋ฑ๋ก
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
HashMap<String, Object> properties = new HashMap<>();
// kafka container host
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1.9093"); // ์นดํ์นด ์๋ฒ์ ์ฃผ์
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value - Deserializer๋ก ํ์ด์ ์ฌ์ฉ
return new DefaultKafkaConsumerFactory<>(properties);
}
/**
* ํ ํฝ์ ๋ณ๊ฒฝ์ฌํญ์ด ์๊ฒผ์ ๋ ํด๋น ๋ฐ์ดํฐ ์ฒ๋ฆฌ
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
์นดํ์นด ์๋ฒ์ ์ปจ์๋จธ๊ทธ๋ฃน ์ ๋ณด key, value์ฒ๋ฆฌ ๋ฐฉ์์ ์ ์ํ ConsumerFactory๋ฅผ ๋น์ ๋ฑ๋กํ๊ณ ์นดํ์นด๋ฆฌ์ค๋์ ์ด ์ ๋ณด๋ฅผ ํฌํจํ์ฌ ๋น์ ๋ฑ๋กํ๋ค.
Catalog - KafkaConsumer
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
/**
* ํ ํฝ์ ๊ฐ์ด ์ ๋ฌ์ด ๋๋ฉด ๊ทธ๊ฐ์ ๊ฐ์ ธ์์ ์คํ
* @param kafkaMessage
*/
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message : {}", kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId")); //id๊ฐ์ผ๋ก catalogEntity์ ๋ณด ๊ฐ์ ธ์ด
if (entity != null) {
// kafka๋ก ์ ๋ฌ๋ ์ฃผ๋ฌธ ์๋ ์ ๋ณด๋ฅผ ์์ฌ์๋์ ๋ณด์์ ๋นผ์ ๋ฐ์ดํฐ๋ฅผ ๋๊ธฐํ ์ํด
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
repository.save(entity);
}
}
}
'example-catalog-topic'์ด๋ฆ์ ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ ์๋์ ์ ๋ฐ์ดํธ ํด์ฃผ๋ ์ญํ ์ ๋ด๋นํ๋ค.
์ด์ order-service์ ์ฃผ๋ฌธ์ ํ๋ฉด ์ฃผ๋ฌธํ ์๋๋งํผ catalog-service์ ์์ฌ ์๋์ด kafka๋ฅผ ํตํด ์ ๋ฐ์ดํธ ๋์ด ๋ฐ์ดํฐ ๋๊ธฐํ ๋๋ค.
Multiple Orders Service์์์ ๋ฐ์ดํฐ ๋๊ธฐํ
์ธ์คํด์ค๊ฐ ๊ฐ๊ฐ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๊ฐ์ง๊ธฐ ๋๋ฌธ์ ์ฃผ๋ฌธ ๋ฐ์ดํฐ๊ฐ ์๋ก ๋ค๋ฅธ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅ๋ ์ ์๋ค.
๊ทธ๋ ๊ธฐ ๋๋ฌธ์ ํน์ ์ ์ ์ ์ฃผ๋ฌธ ์กฐํ์ ์ ์ฒด ๋ฐ์ดํฐ๊ฐ ์กฐํ๋์ง ์์ ์ ์๋ค (ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ง ์กฐํ ์)
Kafka๋ฅผ ์ด์ฉํ์ฌ ๋จ์ผ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก ๋๊ธฐํ
- ์ฃผ๋ฌธ ์ ๋ณด๋ฅผ DB๊ฐ ์๋ Kafka Topic์ผ๋ก ์ ์ก
- Kafka Topic์ ์ค์ ๋ Kafka Sink Connect๋ฅผ ์ฌ์ฉํด ๋จ์ผ DB์ ์ ์ฅ (๋ฐ์ดํฐ ๋๊ธฐํ)
ํ ํฝ์ ์ ๋ฌํ ๋ฉ์์ง ๋ฑ๋ก
Topic์ ๋ณด๋ผ ๋ฐ์ดํฐ๋ฅผ KafkaOrderDto๊ฐ์ฒด๋ก ์์ฑํ๋๋ก ํ๋ค.
@Service
@Slf4j
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* ์ฃผ๋ฌธ ๋ค์ด์ฌ ์ ํธ์ถํ์ฌ ํ ํฝ์ ๋ฐํํ๋ค. (๋ฐ์ดํฐ ๋๊ธฐํ์ฉ)
* @param topic
* @param orderDto
* @return
*/
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
//์คํค๋ง์ ํ์ด๋ก๋๋ก ๊ตฌ์ฑ๋ ์นดํ์นด์ค๋dto
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from Order Microservice" + orderDto);
return orderDto;
}
}
ํ๋์ ์คํค๋ง๋ ์ ํด์ ธ์์ผ๋ฏ๋ก ๋ฏธ๋ฆฌ ๋ง๋ค์ด์ ์ฌ์ฉํ๊ณ payload๋ ์์ฒญ์ผ๋ก ๋ค์ด์จ ๋ฐ์ดํฐ๋ฅผ ์ด์ฉํ์ฌ kafka์ ๋ณด๋ผ ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด์ค๋ค. ์ดํ ๊ฐ์ฒด๋ฅผ jsonํ์ ์ผ๋ก ๋ณํํ์ฌ ์ ํด์ง topic์ผ๋ก send()ํด์ฃผ๋ฉด order์๋น์ค ์ธ์คํด์ค์ ์๊ด์์ด kafka ํ ํฝ์ ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ค.
์ด ํ์ ์์ธ ๋ฐ์ดํฐ๋ ์ค์ ํด๋ order-sink-connect์ ์ํด ํ๋์ DB์ ์ ์ฅ๋๊ฒ ๋๋ค.
cqrs ํจํด?