본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
1. 카프카 컨슈머 소개
1-1. 컨슈머
컨슈머는 브로커로부터, 프로듀서가 브로커로 전송한 데이터를 가져와서 필요한 처리를 함
1-2. 컨슈머 내부 구조
Fetcher: 리더 파티션으로부터 레코드들을 미리 가져와서 대기
poll(): Fetcher에 있는 레코드들을 리턴
ConsumerRecords: 처리하고자 하는 레코드들의 모음(오프셋 포함)
2. 컨슈머 그룹
2-1. 컨슈머 그룹의 소개
컨슈머 그룹: 특정 토픽에 대해서 어떤 목적에 따라 데이터를 처리하는 컨슈머를 묶은 그룹
기본적으로 한 컨슈머 그룹 내 컨슈머들은 동일한 로직을 가지고 있음
컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거다 작아야 함
컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우
컨슈머가 파티션의 개수보다 많을 경우, 1개의 컨슈머는 파티션을 할당받지 못하고 유휴(idle) 상태가 됨
스레드만 차지하고 실질적인 데이터 처리를 못하므로, 불필요한 스레드가 됨
2-2. 컨슈머 그룹을 활용하는 이유
서로 다른 목적을 가진 컨슈머 그룹을 생성하여, 최종 적재되는 저장소의 장애에 유연하게 대응 가능
또한 각 그룹 별로 리소스 관리를 할 수 있음
3. 리밸런싱
컨슈머가 추가되거나, 컨슈머가 제외하는 상황에서 리밸런싱 필요
리밸런싱이 자주 일어나지 않도록 운영하는 것이 좋음
4. 커밋
컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록
특정 토픅의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 내부 토픽(__consumer_offsets)에 기록
5. Assignor
컨슈머의 어싸이너를 통해 컨슈머와 파티션 할당 정책이 결정됨
- RangeAssignor: 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 사전 순서로 정렬하여 할당
- RoundRobinAssignor: 모든 파티션을 컨슈머에서 번갈아가면서 할당
- StickyAssignor: 최대한 파티션을 균등하게 배분하며 할당
Kafka 2.5.0 은 RangeAssignor가 기본값으로 설정
Kafka 3.5.1 은 RangeAssignor가 기본값으로 CooperativeStickyAssignor로 업그레이드 가능
6. 컨슈머 주요 옵션 소개
6-1. 컨슈머 필수 옵션
- bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스에 속한 브로커의 호스트 이름:포트 1개 이상
- key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스를 지정
- value.deserializer: 레코드의 메시지 값을 역직렬화하는 클래스를 지정
6-2. 컨슈머 선택 옵션
- group.id: 컨슈머 그룹 아이디를 지정. subscribe() 메서드로 토픽을 구독하여 사용할 때는 필수이며 기본값은 null
- auto.offset.reset: 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우, 어느 오프셋부터 읽을지 선택하는 옵션이며 기본값은 latest. 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시
- latest: 설정하면 가장 높은 오프셋부터 읽기 시작
- earliest: 설정하면 가장 낮은 오프셋부터 읽기 시작
- none: 컨슈머 그룹이 커밋한 기록이 있는지 찾아보고, 있으면 기존 커밋 기록 이후 오프셋, 없다면 오류를 반환
- enable.auto.commit: 자동 커밋, 수동 커밋 여부이며 기본값은 true
- auto.commit.interval.ms: 자동 커밋일 경우 오프셋 커밋 간격을 지정하며 기본값은 5000(5초)
- max.poll.records: poll() 메서드를 통해 반환되는 레코드 개수를 지정하며 기본값은 500
- session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 최대 시간이며 기본값은 10000(10초)
- heartbeat.interval.ms: 하트비트를 전송하는 시간 간격이며 기본값은 3000(3초)
- max.poll.interval.ms: poll() 메서드를 호출하는 간격의 최대 시간이며 기본값은 300000(5분)
- isolation.level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용
7. 컨슈머 애플리케이션 개발하기
7-1. 자동 커밋
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerWithAutoCommit {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithAutoCommit.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "172.20.218.223:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// subscribe를 직접하기 때문에 GROUP_ID 필수
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// ENABLE_AUTO_COMMIT_CONFIG의 기본값은 true지만 보여주기 위해 넣음
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
}
}
}
실행시킨 결과는 다음과 같다
7-2. 수동 커밋(동기 오프셋 커밋 컨슈머)
commitSync()는 poll() 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
동기 오프셋 커밋을 사용할 경우, 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단
public class ConsumerWithSyncCommit {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithSyncCommit.class);
// 중략...
public static void main(String[] args) {
// 중략...
// subscribe()를 이용하는 경우 GROUP_ID 필요
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// ENABLE_AUTO_COMMIT_CONFIG 값을 false로 설정하면 수동 커밋
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
// 수동 커밋
consumer.commitSync();
}
}
}
7-3. 수동 커밋(비동기 오프셋 커밋 컨슈머)
더 많은 데이터를 처리하기 위해서 commitAsync() 호출
비동기 오프셋 커밋 콜백
커밋이 완료되었는지에 대해서 콜백으로 로그를 남길 수 있음
public class ConsumerWithASyncCommit {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithASyncCommit.class);
// 중략...
public static void main(String[] args) {
// 중략...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
System.err.println("Commit failed");
else
System.out.println("Commit succeeded");
if (e != null)
logger.error("Commit failed for offsets {}", offsets, e);
}
});
}
}
}
7-4. 리밸런스 리스너를 가진 컨슈머
ConsumerRebalanceListener 인터페이스로 구현된 클래스는 onPartitionAssigned() 메서드와 onPartitionRevoked() 메서드로 이루어짐
- onPartitionAssigned(): 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드
- onPartitionRevoked(): 리밸런스가 시작되기 직전에 호출되는 메서드. 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서는 리밸런스가 시작하기 직전에 커밋을 하면 되므로 onPartitionRevoked() 메서드에 커밋을 구현하여 처리
public class RebalanceListener implements ConsumerRebalanceListener {
private final static Logger logger = LoggerFactory.getLogger(RebalanceListener.class);
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.warn("Partitions are assigned : " + partitions.toString());
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.warn("Partitions are revoked : " + partitions.toString());
}
}
public class ConsumerWithRebalanceListener {
private final static Logger logger = LoggerFactory.getLogger(ConsumerWithRebalanceListener.class);
// 중략...
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
// 중략...
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
}
}
}
}
다중 인스턴스 허용으로 인스턴스를 하나 더 띄우면 리밸런싱이 일어난다.
첫번째 인스턴스를 중지하면 두번째 인스턴스에서 리밸런싱이 다음과 같이 일어남을 확인할 수 있음
[main] WARN com.example.RebalanceListener - Partitions are assigned : [test-5, test-6, test-7, test-8, test-9]
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-5 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-7 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-6 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-8 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Request joining group due to: group is already rebalancing
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Revoke previously assigned partitions test-5, test-6, test-7, test-8, test-9
[main] WARN com.example.RebalanceListener - Partitions are revoked : [test-5, test-6, test-7, test-8, test-9]
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-group-1-fe5a8367-6d36-499b-bed9-03af702bddfe', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Finished assignment for group at generation 7: {consumer-test-group-1-fe5a8367-6d36-499b-bed9-03af702bddfe=Assignment(partitions=[test-0, test-1, test-2, test-3, test-4, test-5, test-6, test-7, test-8, test-9])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-group-1-fe5a8367-6d36-499b-bed9-03af702bddfe', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Notifying assignor about the new Assignment(partitions=[test-0, test-1, test-2, test-3, test-4, test-5, test-6, test-7, test-8, test-9])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Adding newly assigned partitions: test-0, test-1, test-2, test-3, test-4, test-5, test-6, test-7, test-8, test-9
[main] WARN com.example.RebalanceListener - Partitions are assigned : [test-0, test-1, test-2, test-3, test-4, test-5, test-6, test-7, test-8, test-9]
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-3 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-5 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-4 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-7 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-6 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-1, groupId=test-group] Setting offset for partition test-8 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.20.218.223:9092 (id: 0 rack: null)], epoch=0}}
7-5. 파티션 할당 컨슈머
public class ConsumerWithExactPartition {
// 중략...
private final static int PARTITION_NUMBER = 0;
public static void main(String[] args) {
// 중략...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record:{}", record);
}
}
}
}
8. 컨슈머의 안전한 종료
정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남음
wakeup() 메서드를 통해 KafkaConsumer 인스턴스를 안전하게 종료할 수 있음
public class ConsumerWithSyncOffsetCommit {
// 중략...
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
// ... 중략
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
}
consumer.commitSync();
}
} catch (WakeupException e) {
logger.warn("Wakeup consumer");
} finally {
logger.warn("Consumer close");
consumer.close();
}
}
static class ShutdownThread extends Thread {
public void run() {
logger.info("Shutdown hook");
consumer.wakeup();
}
}
}
Mac에서는 다음과 같은 명령어로 pid 조회
ps -ef | grep ConsumerWithSyncOffsetCommit
Windows에서는 다음과 같은 명령어로 pid조회(인텔리제이 pid 같음)
tasklist /v | findstr ConsumerWithSyncOffset
조회한 pid로 taskkill
Mac에서는 다음과 같은 명령어로
kill -term [조회한pid]
Windows에선 다음과 같은 명령어로 하면 꺼지기는 꺼지는데 인텔리제이 pid라 인텔리제이가 꺼진다
인텔리제이를 끄지않고 구동중인 애플리케이션만 종료하고 싶은데 아시는 분은 댓글로 부탁드립니다!
taskkill /PID 27528
'Kafka' 카테고리의 다른 글
[Apache Kafka] 7. 멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머 (0) | 2024.10.28 |
---|---|
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(2) (0) | 2024.10.26 |
[Apache Kafka] 5. 카프카 프로듀서 애플리케이션 개발 (0) | 2024.10.23 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(4) - CLI 추가 참고사항 (0) | 2024.10.11 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(3) - 컨슈머, 컨슈머 그룹, 성능테스트, 로그덤프 (1) | 2024.10.11 |