본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
1. 카프카 스트림즈 소개
1-1. 카프카 스트림즈
카프카 스트림즈: 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리
장애가 발생하더라도 정확히 한 번 적재할 수 있도록 장애 허용 시스템을 가지고 있어 데이터 처리 안정성이 매우 뛰어남
1-2. 프로듀서와 컨슈머를 조합하지 않고 스트림즈를 사용해야 하는 이유
스트림 데이터 처리에 있어 필요한 다양한 기능을 스트림즈DSL로 제공
프로세서 API를 사용하여 기능을 확장
단 한번의 데이터 처리, 장애 허용 시스템
단, 스트림즈가 제공하지 못하는 기능을 구현할 때는 프로듀서와 컨슈머를 조합
ex) 소스 토픽(사용하는 토픽)과 싱크 토픽(저장하는 토픽)의 카프카 클러스터가 서로 다른 경우
1-3. 스트림즈 내부 구조
스트림즈 애플리케이션은 내부적으로 스레드를 1개 이상 생성할 수 있으며, 스레드는 1개 이상의 태스크를 가짐
태스크: 스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위
병렬 처리를 위해 파티션과 스트림즈 스레드(또는 프로세스) 개수를 늘림으로써 처리량을 늘릴 수 있음
1-4. 토폴로지
토폴로지: 2개 이상의 노드들과 선으로 이루어진 집합
토폴로지를 이루는 노드: 프로세서
노드와 노드를 이은 선: 스트림
스트림은 토픽의 데이터를 뜻하는 데 프로듀서와 컨슈머에서 활용했던 레코드와 동일
1-5. 소스 프로세서, 스트림 프로세서, 싱크 프로세서
소스 프로세서: 하나 이상의 토픽에서 데이터를 가져오고 데이터를 처리하기 위해 최초로 선언해야 함
스트림 프로세서: 다른 프로세서가 반환한 데이터를 처리(변환,분기)
싱크 프로세서: 특정 카프카 토픽으로 저장하며 스트림즈로 처리된 데이터의 최종 종착지
1-6. 스트림즈DSL과 프로세서API
- 스트림즈DSL(Domain Specific Language)
스트림 프로세싱에 쓰일 만한 다양한 기능들을 자체 API로 만들어 놓았음
ex)
- 메시지 값을 기반으로 토픽 분기 처리
- 지난 10분간 들어온 데이터의 개수 집계
- 프로세서API
스트림즈DSL에서 제공하지 않는 일부 기능들은 프로세서 API를 사용하여 구현 가능
ex)
- 메시지 값의 종류에 따라 토픽을 가변적으로 전송
- 일정한 시간 간격으로 데이터 처리
2. KStream, KTable, GlobalKTable
2-1. KStream
KStream: 레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어 있음
KStream으로 데이터를 조회하면 토픽에 존재하는 모든 레코드가 출력
2-2. KTable
KTable: 메시지 키를 기준으로 묶어서 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용
새로 데이터를 적재할 때 동일한 메시지 키가 있을 경우 데이터가 업데이트 되었다고 볼 수 있음
2-3. 코파티셔닝(co-partitioning)
코파티셔닝: 조인을 하는 2개 데이터의 파티션 개수가 동일하고, 파티셔닝 전략을 동일하게 맞추는 작업
파티션 개수가 동일하고 파티셔닝 전략이 같은 경우, 동일한 메시지 키가 가진 데이터가 동일한 태스크에 들어가는 것을 보장
코파티셔닝이 되지 않은 토픽들을 조인하는 로직이 담긴 스트림즈 애플리케이션을 실행하면 TopologyException 발생
2-4. GlobalKTable
코파티셔닝되지 않은 KStream과 데이터 조인 가능
KTable과 다르게 GlobalKTable에 정의된 데이터는 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용
3. 스트림즈 주요 옵션 소개
3-1. 스트림즈DSL 필수 옵션
- bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스에 속한 브로커의 호스트 이름:포트 1개 이상
- application.id: 스트림즈 애플리케이션을 구분하기 위한 고유한 아이디
3-2. 스트림즈DSL 선택 옵션
- default.key.serde: 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정
- default.value.serde: 레코드의 메시지 값을 직렬화, 역직렬화하는 클래스를 지정
- num.stream.threads: 스트림 프로세싱 실행 시 실행될 스레드 개수이며 기본 값은 1
- state.dir: 상태 기반 데이터를 처리 할 때 데이터를 저장할 디렉토리를 지정하며 기본 값은 /tmp/kafka-streams
4. 스트림즈 애플리케이션 개발
4-1. 필터링 스트림즈 애플리케이션
filter() 메서드는 스트림즈DSL에서 사용 가능한 필터링 스트림 프로세서
build.gradle
plugins {
id 'java'
}
group 'com.example'
version '1.0'
sourceCompatibility = 17
targetCompatibility = 17
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-streams:3.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
StreamsFilter.java
public class StreamsFilter {
//중략...
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLog = builder.stream(STREAM_LOG);
// KStream<String, String> filteredStream = streamLog.filter(
// (key, value) -> value.length() > 5);
// filteredStream.to(STREAM_LOG_FILTER);
streamLog.filter((key, value) -> value.length() > 5).to(STREAM_LOG_FILTER);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Producer
Consumer
글자 수가 5 초과인 것만 stream_log_filter 토픽에 들어온 것을 확인 할 수 있음
4-2. KStream, KTable 조인 스트림즈 애플리케이션
KTable과 KStream은 메시지 키를 기준으로 조인할 수 있음
카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있어서, 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있음
만약에 사용자의 주소가 변경이 된다면, KTable은 동일한 메시지 키가 들어올 경우 가장 최신의 키의 값이 유요한 데이터로 조인을 수행할 것임
parse.key=true를 넣어야지 메시지 키와 메시지 값을 지정할 수 있음
public class KStreamJoinKTable {
private static String APPLICATION_NAME = "order-join-application";
private static String BOOTSTRAP_SERVERS = "172.20.218.223:9092";
private static String ADDRESS_TABLE = "address";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
//소스 프로세서 두 개 생성
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
// 메시지 키가 동일할 경우 조인
orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
address 토픽 생성
bin/kafka-topics.sh --bootstrap-server 172.20.218.223:9092 --partitions 3 --topic address --create
order 토픽 생성
bin/kafka-topics.sh --bootstrap-server 172.20.218.223:9092 --partitions 3 --topic order --create
order_join 토픽 생성
bin/kafka-topics.sh --bootstrap-server 172.20.218.223:9092 --partitions 3 --topic order_join --create
address 토픽에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic address --property "parse.key=true" --property "key.separator=:"
>wonyoung:Seoul
>somin:Busan
KStreamJoinKTable 파일 실행
order 토픽에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>somin:iPhone
>wonyoung:Galaxy
order_join 토픽 컨슘
bin/kafka-console-consumer.sh --bootstrap-server 172.20.218.223:9092 --topic order_join --property print.key=true --p
roperty key.separator=":" --from-beginning
이제 KTable에 같은 키로 다른 값을 주면 어떻게 되는지 확인을 해보자
address 토픽에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic address --property "parse.key=true" --property "key.separator=:"
>wonyoung:Jeju
order 토픽에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic order --property "parse.key=true" --property "key.separator=:"
>wonyoung:Tesla
order_join 토픽 컨슘
bin/kafka-console-consumer.sh --bootstrap-server 172.20.218.223:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
KTable에서 최신 레코드의 메시지 값으로 조인을 수행했음을 확인할 수 있음
4-3. KStream, GlobalKTable 조인 스트림즈 애플리케이션
코파티셔닝이 되어있지 않은 토픽을 조인하는 방법임
public class KStreamJoinGlobalKTable {
private static String APPLICATION_NAME = "global-table-join-application";
private static String BOOTSTRAP_SERVERS = "172.20.218.223:9092";
private static String ADDRESS_GLOBAL_TABLE = "address_v2";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressGlobalTable,
(orderKey, orderValue) -> orderKey,
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
파티션 개수가 2인 새로운 주소 토픽 생성
bin/kafka-topics.sh --bootstrap-server 172.20.218.223:9092 --partitions 2 --topic address_v2 --create
address_v2 토픽(파티션 2개)에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic address_v2 --property "parse.key=true" --property "key.separator=:"
>wonyoung:Busan
KStreamJoinGlobalKTable 파일 실행
order 토픽(파티션 3개)에 프로듀스
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic order --property "parse.key=true" --proper
ty "key.separator=:"
>wonyoung:Tesla
order_join 토픽 컨슘
bin/kafka-console-consumer.sh --bootstrap-server 172.20.218.223:9092 --topic order_join --property print.key=true --property key.separator=":" --from-beginning
파티션 개수가 다르더라도 조인이 수행됨을 확인할 수 있음
'Kafka' 카테고리의 다른 글
[Apache Kafka] 9. 카프카 커넥트(1) - 카프카 커넥트 (5) | 2024.11.01 |
---|---|
[Apache Kafka] 8. 카프카 스트림즈(2) - 윈도우 프로세싱 (0) | 2024.10.30 |
[Apache Kafka] 7. 멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머 (0) | 2024.10.28 |
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(2) (0) | 2024.10.26 |
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(1) (3) | 2024.10.25 |