본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
1. kafka-console-consumer.sh
1-1. 토픽의 데이터 조회(메시지 value만)
bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka --from-beginning
필수: 카프카 클러스터 정보, 토픽 이름
옵션: 필터링
--from-beginning: 토픽에 저장된 가장 처음 데이터부터 출력
1-2. 토픽의 데이터 조회(메시지 key & value)
레코드의 메시지 키와 메시지 값을 확인하고 싶다면 --property 옵션 사용
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--property print.key=true \
--property key.separator="-" \
--from-beginning
1-3. 최대 컨슘 메시지 개수 설정
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka --from-beginning --max-messages 1
1-4. 특정 파티션만 컨슘
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--partition 0 \
--from-beginning
1-5. 컨슈머 그룹을 기반으로 동작
특정 목적을 가진 컨슈머들을 묶음으로 사용하여, 토픽의 레코드를 가져갈 경우 어느 레코드까지 읽었는지에 대한 데이터카 카프카 브로커에 저장
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--group hello-group \
--from-beginning
컨슈머 그룹 이름 지정하면 __consumer_offsets 토픽 생성됨
__consumer_offsets: 컨슈머 그룹을 사용하면 어느 데이터까지 읽었는지의 데이터를 여기에 저장
bin/kafka-topics.sh --bootstrap-server \
my-kafka:9092 \
--list
2. kafka-consumer-groups.sh
컨슈머 그룹은 따로 생성하는 명령이 없고, 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성됨
2-1. 컨슈머 그룹 목록 확인
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--list
2-2. 컨슈머 그룹 상세 확인
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
--group hello-group --describe
어떤 토픽을 대상으로 어떤 오프셋의 레코드를 가져갔는지 상태를 확인할 수 있음
- 파티션 번호
- 현재까지 가져간 레코드의 오프셋
- 파티션 마지막 레코드의 오프셋
- 컨슈머 랙(파티션의 마지막 레코드의 오프셋 - 현재까지 가져간 레코드의 오프셋)
- 컨슈머 ID
- 호스트
2-3. 오프셋 리셋
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group hello-group \
--topic hello.kafka \
--reset-offsets --to-earliest --execute
- --to-earliest: 가장 처음 오프셋(작은 번호)로 리셋
- --to-latest: 가장 마지막 오프셋(큰 번호)로 리셋
- --to-current: 현 시점 기준 오프셋으로 리셋
- --to-datetime {YYYY-MM-DDTHH:mmSS.sss}: 특정 일시로 오프셋 리셋(레코드 타임스탬프 기준)
- --to-offset {long}: 특정 오프셋으로 리셋
- --shift-by {+/- long}: 현재 컨슈머 오프셋에서 앞뒤로 옮겨서 리셋
3. 그 외 커맨드 라인 툴
3-1. kafka-producer-perf-test.sh
카프카 프로듀서로 퍼포먼스를 측정할 때 사용
bin/kafka-producer-perf-test.sh \
--producer-props bootstrap.servers=my-kafka:9092 \
--topic hello.kafka \
--num-records 10 \
--throughput 1 \
--record-size 100 \
--print-metric
3-2. kafka-consumer-perf-test.sh
카프카 컨슈머의 퍼포먼스를 측정할 때 사용
카프카 브로커와 컨슈머(해당 스크립트를 돌리는 호스트)간의 네트워크를 체크
bin/kafka-consumer-perf-test.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--messages 10 \
--show-detailed-stats
3-3. kafka-reassign-partitions.sh
리더 파티션에 파티션이 몰릴 경우 재분배
리더 파티션과 팔로워 파티션의 위치가 변경될 수 있음
auto.leader.rebalance.enable: 클러스터 단위에서 리더를 자동으로 리밸런싱(디폴트: true)
브로커의 백그라운드 스레드가 일정한 간격으로 리더의 위치를 파악하고 필요시 리더 리밸런싱
3-4. kafka-delete-record.sh
레코드 각각을 삭제하는 것이 아님
오프셋까지의 레코드를 지우는 것
3-5. kafka-dump-log.sh
특정 파일에 대해 상세 로그를 확인할 수 있음
bin/kafka-dump-log.sh \
--files data/hello.kafka-0/00000000000000000000.log \
--deep-iteration
'Kafka' 카테고리의 다른 글
[Apache Kafka] 5. 카프카 프로듀서 애플리케이션 개발 (0) | 2024.10.23 |
---|---|
[Apache Kafka] 4. 아파치 카프카 CLI 활용(4) - CLI 추가 참고사항 (0) | 2024.10.11 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(2) - 토픽, 프로듀서 (2) | 2024.10.10 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(1) - 브로커, 주키퍼 생성 (3) | 2024.10.10 |
[Apache Kafka] 3. 카프카 클러스터 운영 (0) | 2024.10.09 |