๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

๐Ÿ“”๊ฐ•์˜ ์ •๋ฆฌ ๋…ธํŠธ

[Spring Cloud๋กœ ๊ตฌํ˜„ํ•˜๋Š” ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค] Section. 11 ๋ฐ์ดํ„ฐ ๋™๊ธฐํ™”๋ฅผ ์œ„ํ•œ Kafka ํ™œ์šฉ

Apache Kafka

- ์˜คํ”ˆ์†Œ์Šค ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ํ”„๋กœ์ ํŠธ

- ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ํ”ผ๋“œ๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ํ†ต์ผ๋œ ๋†’์€ ์ฒ˜๋ฆฌ๋Ÿ‰, ๋‚ฎ์€ ์ง€์—ฐ ์‹œ๊ฐ„์„ ์ง€๋‹Œ ํ”Œ๋žซํผ ์ œ๊ณต

 

Kafka๊ฐ€ ๋‚˜์˜ค๊ฒŒ ๋œ ๋ฐฐ๊ฒฝ

ํ™•์žฅ์ด ์–ด๋ ค์šด ๋ณต์žกํ•œ ์‹œ์Šคํ…œ ๊ตฌ์กฐ

 

๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์•„๋„ ํ™•์žฅ์ด ์šฉ์ดํ•œ ์‹œ์Šคํ…œ

 

Kafka Broker

  • ์‹คํ–‰๋œ kafka ์„œ๋ฒ„
  • 3๋Œ€ ์ด์ƒ์˜ ๋ธŒ๋กœ์ปค ํด๋Ÿฌ์Šคํ„ฐ ๊ตฌ์„ฑ
  • ์„ค์ • ์ฝ”๋””๋„ค์ดํ„ฐ๋กœ 'Zookeeper' ์—ฐ๋™
    • ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ (Broker ID, Controller ID ์ €์žฅ)
    • ๋ฆฌ๋” ๋ธŒ๋กœ์ปค ๋ฌธ์ œ ๋ฐœ์ƒ ์‹œ ์žฅ์•  ๋Œ€์‘ํ•˜๋Š” ์—ญํ• 

 

๋ˆ„๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์†Œ๋น„ํ•˜๋Š”์ง€ ์‹ ๊ฒฝ์“ฐ์ง€์•Š๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ๊ตํ™˜ํ•œ๋‹ค.

- kafka client

JAVA, Python, Go, C++ ๋“ฑ ๋งŽ์€ ์–ธ์–ด๋ฅผ ์ง€์›ํ•œ๋‹ค.

 

Kafka ์‹คํ–‰ํ•ด๋ณด๊ธฐ

Zookeeper ์‹คํ–‰

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

 

Kafka ์„œ๋ฒ„ ๊ตฌ๋™

$KAFKA_HOME/bin/kafka-server-start.sh ./config/server.properties

 

ํ† ํ”ฝ ์ƒ์„ฑ -> ํ† ๋ฏน ๋ชฉ๋ก ํ™•์ธ -> ํ† ํ”ฝ ์ •๋ณด ํ™•์ธ

*ํ† ํ”ฝ ์ƒ์„ฑ

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1

 

*ํ† ํ”ฝ ๋ชฉ๋ก ํ™•์ธ

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:9092

 

๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ&์†Œ๋น„

*๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ (producer)

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic quickstart-events

 

*๋ฉ”์‹œ์ง€ ์†Œ๋น„ (consumer)

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic quickstart-events --from-beginning

 

producer์—์„œ ๋ฐœํ–‰ํ•œ ๋ฉ”์‹œ์ง€๊ฐ€ consumer์—์„œ ์†Œ๋น„๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Kafka Connect

  • ์ฝ”๋“œ ์—†์ด Configuration์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ด๋™
    • RESTful API๋ฅผ ํ†ตํ•ด ์ง€์›
    • Stream ๋˜๋Š” Batchํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ ์ „์†ก ๊ฐ€๋Šฅ
    • ์ปค์Šคํ…€ Connertor๋ฅผ ํ†ตํ•œ ๋‹ค์–‘ํ•œ Plugin ์ œ๊ณต
  • ์„œ๋กœ ๋‹ค๋ฅธ ์‹œ์Šคํ…œ์˜ ๋ฐ์ดํ„ฐ๋ฅผ kafka connect, cluster, sink๋ฅผ ํ†ตํ•ด ์ด๋™๊ฐ€๋Šฅํ•˜๋‹ค.

 

confluent์—์„œ kafka-connect ์„ค์น˜ํ•˜์—ฌ ์‹คํ–‰.

 

connectํ”„๋กœํผํ‹ฐ ํŒŒ์ผ ์„ค์ •์—

plugin.path=$KAFKA_JDBC_HOME/lib ์„ค์ •ํ•˜์—ฌ jdbc ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ์„ค์ • (jdbc)

$KAFKA_CONNECT_HOME/share/java/kafka ๋””๋ ‰ํ† ๋ฆฌ์— mysql-connector-java-version.jar ํŒŒ์ผ์„ ๋ณต์‚ฌํ•ด๋‘”๋‹ค. (mysql)

 

Kafka Source Connect ์‚ฌ์šฉ

echo '

{

    "name" : "my-source-connect",

    "config" : {

        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

        "connection.url":"jdbc:mysql://localhost:3306/mydb",

        "connection.user":"root",

        "connection.password":"test1357",

        "mode": "incrementing",

        "incrementing.column.name" : "id",

        "table.whitelist":"users",

        "topic.prefix" : "my_topic_",

        "tasks.max" : "1"

	}

}

' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

localhost:8083 (์นดํ”„์นด ์ปค๋„ฅํ„ฐ)๊ฐ€ ์ œ๊ณตํ•˜๋Š” ์—”๋“œํฌ์ธํŠธ API๋ฅผ ํ™œ์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด source connector๋ฅผ ๋“ฑ๋กํ•  ์ˆ˜ ์žˆ๋‹ค.

์ปค๋„ฅํŠธ ์ด๋ฆ„, ์ปค๋„ฅํ„ฐํด๋ž˜์Šค, url์ •๋ณด, ์ฆ๊ฐ€์ปฌ๋Ÿผ ๋“ฑ์ด ์ œ๊ณต๋œ๋‹ค. 

whitelist -> ๊ฐ์ง€ ๋Œ€์ƒ ํ…Œ์ด๋ธ”

MySql ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€

  • users ์ถ”๊ฐ€ ๋ฐ์ดํ„ฐ

insert into users(user_id, pwd, name) value('user1', 'test1111', 'User name');

 

users ํ…Œ์ด๋ธ”์—์„œ ๋ณ€๊ฒฝ์‚ฌํ•ญ์ด ์ƒ๊ธฐ๋ฉด ์ตœ์ดˆ์—๋Š” topic์ด ์ƒ๊ธฐ๋ฉด์„œ ๋ฐ์ดํ„ฐ๋ฅผ publishํ•œ๋‹ค. (๋ฐ์ดํ„ฐ๋ฅผ ํ† ํ”ฝ์— ์Œ“์Œ)

consumer์—์„œ๋Š” ์ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

  •  consumer์—์„œ ๋ฐ›๋Š” ๋ฐ์ดํ„ฐ
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "user_id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "pwd"
            },
            {
                "type": "string",
                "optional": true,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Timestamp",
                "version": 1,
                "field": "created_at"
            }
        ],
        "optional": false,
        "name": "users"
    },
    "payload": {
        "id": 1,
        "user_id": "user1",
        "pwd": "test1111",
        "name": "User name",
        "created_at": 1646540630000
    }
}

Kafka Sink Connect ์‚ฌ์šฉ

ํ† ํ”ฝ์— ์ „๋‹ฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ sink๋กœ ๊ฐ€์ ธ์™€์„œ ์‚ฌ์šฉํ•œ๋‹ค.

* ์—ฐ๊ฒฐํ•  ์ •๋ณด๋ฅผ ์„ค์ •ํ•œ๋‹ค.

* ํ† ํ”ฝ์„ ์ง€์ •ํ•ด ์‚ฌ์šฉํ•˜๋Š” ํ† ํ”ฝ๊ณผ ๊ฐ™์€ ์Šคํ‚ค๋งˆ์™€ ๋ฐ์ดํ„ฐ์˜ ํ…Œ์ด๋ธ”์„ ์ž๋™ ์ƒ์„ฑํ•œ๋‹ค.

{
    "name": "my-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "test1357",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "false",
        "tasks.max": "1",
        "topics": "my_topic_users"
    }
}

endpoint์š”์ฒญ์œผ๋กœ ์ƒˆ๋กœ ์ƒ๊ธด my_topic_users ํ…Œ์ด๋ธ”

usersํ…Œ์ด๋ธ”์— ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•ด๋ณด์ž.

insert into users(user_id, pwd, name) value('admin1234', 'admin1234', 'SUPER_ADMIN');

 

ํ† ํ”ฝ์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณด๋‚ด์ง€๊ณ  ์ด๋ฅผ ์ธ์ง€ํ•œ sink๊ฐ€ my_topic_usersํ…Œ์ด๋ธ”๋„ ์—…๋ฐ์ดํŠธํ•œ๋‹ค.

 

Console๋กœ ๋ฐ์ดํ„ฐ ๋ฐœํ–‰

์ฝ˜์†”ํ™”๋ฉด

ํ† ํ”ฝ ๋ฐœํ–‰ ์‹œ์—๋Š” sink์™€ ์—ฐ๊ฒฐ๋œ my_topic_users์—๋งŒ ๋ฐ์ดํ„ฐ๊ฐ€ ์ถ”๊ฐ€๋˜๊ณ  users์—๋Š” ์ถ”๊ฐ€๋˜์ง€ ์•Š๋Š”๋‹ค.

Source๋Š” userํ…Œ์ด๋ธ” ๋ณ€๊ฒฝ ๋ฐœ์ƒ ์‹œ ํ† ํ”ฝ์— โ€‹๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ์—ญํ• ์ด๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

์‹ค์ œ ์„œ๋น„์Šค ๋ฐ์ดํ„ฐ ๋™๊ธฐํ™” ์ ์šฉํ•˜๊ธฐ

๋ฐ์ดํ„ฐ ๋™๊ธฐํ™” Order -> Catalogs

- Orders Service์— ์š”์ฒญ ๋œ ์ฃผ๋ฌธ ์ˆ˜๋Ÿ‰ ์ •๋ณด๋ฅผ Catalogs Service์— ๋ฐ˜์˜

- Order์„œ๋น„์Šค์—์„œ kafka Topic์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ „์†ก (Producer)

- Catalog์„œ๋น„์Šค์—์„œ kafka Topic์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ทจ๋“ (Consumer)

 

๊ฐ๊ฐ์˜ ์„œ๋น„์Šค๊ฐ€  ์„œ๋กœ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์œผ๋ฏ€๋กœ ์ฃผ๋ฌธ์ด ๋ฐœ์ƒํ•˜๋ฉด ์ƒํ’ˆ์˜ ์ˆ˜๋Ÿ‰์ •๋ณด๋ฅผ ๋™๊ธฐํ™”ํ•ด์ค˜์•ผํ•œ๋‹ค.

Catalog - ์นดํ”„์นด ์ปจ์Šˆ๋จธ ๋นˆ ๋“ฑ๋ก

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> properties = new HashMap<>();
        // kafka container host
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1.9093"); // ์นดํ”„์นด ์„œ๋ฒ„์˜ ์ฃผ์†Œ
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value - Deserializer๋กœ ํ’€์–ด์„œ ์‚ฌ์šฉ
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    /**
     * ํ† ํ”ฝ์˜ ๋ณ€๊ฒฝ์‚ฌํ•ญ์ด ์ƒ๊ฒผ์„ ๋•Œ ํ•ด๋‹น ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

์นดํ”„์นด ์„œ๋ฒ„์™€ ์ปจ์Šˆ๋จธ๊ทธ๋ฃน ์ •๋ณด key, value์ฒ˜๋ฆฌ ๋ฐฉ์‹์„ ์ •์˜ํ•œ ConsumerFactory๋ฅผ ๋นˆ์— ๋“ฑ๋กํ•˜๊ณ  ์นดํ”„์นด๋ฆฌ์Šค๋„ˆ์— ์ด ์ •๋ณด๋ฅผ ํฌํ•จํ•˜์—ฌ ๋นˆ์— ๋“ฑ๋กํ•œ๋‹ค.

 

Catalog - KafkaConsumer

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    /**
     * ํ† ํ”ฝ์— ๊ฐ’์ด ์ „๋‹ฌ์ด ๋˜๋ฉด ๊ทธ๊ฐ’์„ ๊ฐ€์ ธ์™€์„œ ์‹คํ–‰
     * @param kafkaMessage
     */
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message : {}", kafkaMessage);
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String) map.get("productId")); //id๊ฐ’์œผ๋กœ catalogEntity์ •๋ณด ๊ฐ€์ ธ์˜ด
        if (entity != null) {
            // kafka๋กœ ์ „๋‹ฌ๋œ ์ฃผ๋ฌธ ์ˆ˜๋Ÿ‰ ์ •๋ณด๋ฅผ ์ž”์—ฌ์ˆ˜๋Ÿ‰์ •๋ณด์—์„œ ๋นผ์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋™๊ธฐํ™” ์‹œํ‚ด
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            repository.save(entity);
        }

    }
}

'example-catalog-topic'์ด๋ฆ„์˜ ํ† ํ”ฝ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์„œ ์ˆ˜๋Ÿ‰์„ ์—…๋ฐ์ดํŠธ ํ•ด์ฃผ๋Š” ์—ญํ• ์„ ๋‹ด๋‹นํ•œ๋‹ค.

 

์ด์ œ order-service์— ์ฃผ๋ฌธ์„ ํ•˜๋ฉด ์ฃผ๋ฌธํ•œ ์ˆ˜๋Ÿ‰๋งŒํผ catalog-service์— ์ž”์—ฌ ์ˆ˜๋Ÿ‰์ด kafka๋ฅผ ํ†ตํ•ด ์—…๋ฐ์ดํŠธ ๋˜์–ด ๋ฐ์ดํ„ฐ ๋™๊ธฐํ™” ๋œ๋‹ค.

 

Multiple Orders Service์—์„œ์˜ ๋ฐ์ดํ„ฐ ๋™๊ธฐํ™”

์ธ์Šคํ„ด์Šค๊ฐ€ ์—ฌ๋Ÿฌ๊ฐœ ์ผ๋•Œ Orders๋ฐ์ดํ„ฐ ๋™๊ธฐํ™” ๋ฌธ์ œ

์ธ์Šคํ„ด์Šค๊ฐ€ ๊ฐ๊ฐ์˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ๊ฐ€์ง€๊ธฐ ๋•Œ๋ฌธ์— ์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ๊ฐ€ ์„œ๋กœ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ €์žฅ๋  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— ํŠน์ • ์œ ์ €์˜ ์ฃผ๋ฌธ ์กฐํšŒ์‹œ ์ „์ฒด ๋ฐ์ดํ„ฐ๊ฐ€ ์กฐํšŒ๋˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ๋‹ค (ํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋งŒ ์กฐํšŒ ์‹œ)

 

Kafka๋ฅผ ์ด์šฉํ•˜์—ฌ ๋‹จ์ผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋กœ ๋™๊ธฐํ™”

  • ์ฃผ๋ฌธ ์ •๋ณด๋ฅผ DB๊ฐ€ ์•„๋‹Œ Kafka Topic์œผ๋กœ ์ „์†ก
  • Kafka Topic์— ์„ค์ • ๋œ Kafka Sink Connect๋ฅผ ์‚ฌ์šฉํ•ด ๋‹จ์ผ DB์— ์ €์žฅ (๋ฐ์ดํ„ฐ ๋™๊ธฐํ™”)

 

ํ† ํ”ฝ์— ์ „๋‹ฌํ•  ๋ฉ”์‹œ์ง€ ๋“ฑ๋ก

Topic์œผ๋กœ ๋ณด๋‚ผ ๋ฐ์ดํ„ฐ์˜ ๊ตฌ์„ฑ

Topic์— ๋ณด๋‚ผ ๋ฐ์ดํ„ฐ๋ฅผ KafkaOrderDto๊ฐ์ฒด๋กœ ์ƒ์„ฑํ•˜๋„๋ก ํ•œ๋‹ค.

@Service
@Slf4j
public class OrderProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price"));

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * ์ฃผ๋ฌธ ๋“ค์–ด์˜ฌ ์‹œ ํ˜ธ์ถœํ•˜์—ฌ ํ† ํ”ฝ์— ๋ฐœํ–‰ํ•œ๋‹ค. (๋ฐ์ดํ„ฐ ๋™๊ธฐํ™”์šฉ)
     * @param topic
     * @param orderDto
     * @return
     */
    public OrderDto send(String topic, OrderDto orderDto) {
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        //์Šคํ‚ค๋งˆ์™€ ํŽ˜์ด๋กœ๋“œ๋กœ ๊ตฌ์„ฑ๋œ ์นดํ”„์นด์˜ค๋”dto
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Order Producer sent data from Order Microservice" + orderDto);
        return orderDto;
    }
}

ํ•„๋“œ์™€ ์Šคํ‚ค๋งˆ๋Š” ์ •ํ•ด์ ธ์žˆ์œผ๋ฏ€๋กœ ๋ฏธ๋ฆฌ ๋งŒ๋“ค์–ด์„œ ์‚ฌ์šฉํ•˜๊ณ  payload๋Š” ์š”์ฒญ์œผ๋กœ ๋“ค์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ์ด์šฉํ•˜์—ฌ kafka์— ๋ณด๋‚ผ ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด์ค€๋‹ค. ์ดํ›„ ๊ฐ์ฒด๋ฅผ jsonํƒ€์ž…์œผ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ์ •ํ•ด์ง„ topic์œผ๋กœ send()ํ•ด์ฃผ๋ฉด order์„œ๋น„์Šค ์ธ์Šคํ„ด์Šค์— ์ƒ๊ด€์—†์ด kafka ํ† ํ”ฝ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค.

 

์ด ํ์— ์Œ“์ธ ๋ฐ์ดํ„ฐ๋Š” ์„ค์ •ํ•ด๋‘” order-sink-connect์— ์˜ํ•ด ํ•˜๋‚˜์˜ DB์— ์ €์žฅ๋˜๊ฒŒ ๋œ๋‹ค.

 

cqrs ํŒจํ„ด?