본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
1. 스트림즈DSL의 윈도우 프로세싱
1-1. 윈도우 프로세싱 소개
윈도우 연산: 특정 시간에 대응하여 취합 연산을 처리할 때 활용
카프카 스트림즈에서 제공하는 윈도우 프로세싱은 4가지를 지원함
- 텀블링 윈도우
- 호핑 윈도우
- 슬라이딩 윈도우
- 세션 윈도우
1-2. 텀블링 윈도우(Tumbling Window)
텀블링 윈도우: 서로 겹치지 않은 윈도우를 특정 간격으로 지속적으로 처리할 때 사용
윈도우 최대 사이즈에 도달하면 해당 시점에 데이터를 취합하여 결과를 도출
ex) 매 5분간 접속한 고객의 수를 측정하여 방문자 추이를 실시간 취합
1-3. 호핑 윈도우(Hopping Window)
호핑 윈도우: 일정 시간 간격으로 겹치는 윈도우가 존재하는 윈도우 연산을 처리
윈도우 사이즈: 연산을 수행할 최대 윈도우 사이즈
윈도우 간격: 서로 다른 윈도우 간 간격
1-4. 슬라이딩 윈도우
슬라이딩 윈도우: 데이터의 정확한 시간을 바탕으로 윈도우 사이즈에 포함되는 데이터를 모두 연산에 포함
1-5. 세션 윈도우
세션 윈도우: 동일 메시지 키의 데이터를 한 세션에 묶어 연산
세션의 최대 만료시간에 따라 윈도우 사이즈가 달라짐. 세션 만료 시간이 자니면 세션 윈도우가 종료되고 해당 윈도우의 모든 데이터를 취합하여 연산함.
세션 윈도우의 윈도우 사이즈는 가변적
1-6. 윈도우 연산시 주의해야할 사항
카프카 스트림즈는 커밋을 수행할 때 윈도우 사이즈가 종료되지 않아도 중간 정산 데이터를 출력함
커밋 시점마다 윈도우의 연산 데이터를 출력하기 때문에 동일 윈도우 사이즈의 데이터는 2개 이상 출력될 수 있음
=> 최종적으로 각 윈도우에 맞는 데이터를 출력하고 싶다면 Window를 기준으로 동일 윈도우 시간 데이터는 겹쳐쓰기(upsert)하는 방식으로 처리
2. 스트림즈DSL의 Queryable store
카프카 스트림즈에서 KTable은 카프카 토픽의 데이터를 로컬의 rocksDB에 Materialized View로 만들어두고 사용하여 레코드의 메시지 키, 메시지 값을 기반으로 keyValueStore로 사용할 수 있음
특정 토픽을 KTable로 사용하고 ReadOnlyKeyValueStore로 뷰를 가져오면 토픽 데이터를 조회할 수 있음
3. 프로세서API
스트림즈DSL에서 사용했던 KStream, KTable, GlobalKTable 개념이 없음. 하지만 스트림즈DSL과 프로세서API는 함께 구현하여 사용할 때는 활용 가능
프로세서 API를 구현하기 위해서는 Processor 또는 Transformer 인터페이스로 구현한 클래스가 필요
- Processor 인터페이스: 일정 로직이 이루어진 뒤 다음 프로세서로 데이터가 넘어가지 않을 때 사용
- Transformer 인터페이스: 일정 로직이 이루어진 뒤 다음 프로세서로 데이터를 넘길 때 사용
FilterProcessor
public class FilterProcessor implements Processor<String,String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {this.context = context;}
@Override
public void process(String key, String value) {
if(value.length() > 5) {
context.forward(key,value);
}
context.commit();
}
@Override
public void close() {
}
}
SimpleKafkaProcessor
public class SimpleKafkaProcessor {
private static String APPLICATION_NAME = "processor-application";
private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Topology topology = new Topology();
topology.addSource("Source",
STREAM_LOG)
.addProcessor("Process",
() -> new FilterProcessor(),
"Source")
.addSink("Sink",
STREAM_LOG_FILTER,
"Process");
KafkaStreams streaming = new KafkaStreams(topology, props);
streaming.start();
}
}
//https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html
4. 카프카 스트림즈 vs 스파크 스트리밍
'Kafka' 카테고리의 다른 글
[Apache Kafka] 9. 카프카 커넥트(2) - 커스텀 소스 커넥터 (1) | 2024.11.02 |
---|---|
[Apache Kafka] 9. 카프카 커넥트(1) - 카프카 커넥트 (5) | 2024.11.01 |
[Apache Kafka] 8. 카프카 스트림즈(1) - KStream,KTable,GlobalKTable (0) | 2024.10.29 |
[Apache Kafka] 7. 멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머 (0) | 2024.10.28 |
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(2) (0) | 2024.10.26 |