본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
1. 카프카 프로듀서 소개
1-1. 프로듀서
프로듀서는 카프카에서 데이터의 시작점
프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신
브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거침
1-2. 프로듀서 내부 구조
ProducerRecord: 프로듀서에서 생성하는 레코드(오프셋 미포함)
send(): 레코드를 전송 요청 메서드
Partitioner: 어느 파티션으로 전송할지 지정하는 파티셔너
Accumulator: 배치로 묶어 전송할 데이터를 모으는 버퍼
2. 파티셔너(Partitioner)
2-1. 프로듀서의 기본 파티셔너
프로듀서API를 사용하면 2개의 파티셔너를 제공
UniformStickyPartitioner
RoundRobinPartitioner
2.5.0버전에서는 파티셔너를 지정하지 않은 경우 UniformStickyPartitioner가 파티셔너로 기본 설정 됨
메시지 키가 있을 경우
UniformStickyPartitioner와 RoundRobinPartitioner 둘 다 메시지 키의 해시값과 파티션을 매칭하여 레코드 전송
=> 동일한 메시지 키가 존재하는 레코드는 동일한 파티션 번호에 저장이 됨
만약 파티션 개수가 변경이 될 경우 메시지 키와 파티션 번호 매칭은 깨지게 됨
메시지 키가 없을 경우
파티션에 최대한 동일하게 분배하는 로직이 들어있음
UniformStickyPartioner는 RoundRobinPartitioner의 단점을 개선
- RoundRobinPartitioner
프로듀서 레코드가 들어오는 대로 파티션을 순회하면서 전송
Accumulator에서 묶이는 정도가 적기 때문에 전송 성능이 낮음
- UniformStickyPartioner
Accumulator에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송
배치로 묶을 뿐 결국 파티션을 순회하면서 보내기 때문에 모든 파티션에 분배됨
2-2. 프로듀서의 커스텀 파티셔너
카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너 생성을 위한 Partitioner Interface를 제공
메시지 키 또는 메시지 값에 따른 파티션 지정 로직을 적용 가능
3. 프로듀서의 주요 옵션 소개
3-1. 프로듀서 필수 옵션
bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스에 속한 브로커의 호스트 이름:포트 1개 이상
key.serializer: 레코드의 메시지 키를 직렬화하는 클래스를 지정
value.serializer: 레코드의 메시지 값을 직렬화하는 클래스를 지정
String으로 직렬화하는 경우 kafka-console-consumer로 디버깅이 가능하지만, 다른 포맷인 경우 확인 불가
3-2. 프로듀서 선택 옵션
acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데에 사용하는 옵션
1, 0, -1(all) 세가지 옵션이 있으며 기본 값은 1임
linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간이며 기본 값은 0
retries: 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수이며 기본 값은 2147483647
max.in.flight.requests.per.connection: 한 번에 요청하는 최대 커넥션 개수. 설정된 값만큼 동시에 전달 요청을 수행하며 기본 값은 5
partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정함
enable.idempotence: 멱등성 프로듀서로 동작할지 여부를 설정, 기본값은 false(version 2.5.0이하)
transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정
4. ISR(In-Sync-Replicas)와 acks 옵션
4-1. ISR(In-Sync-Replicas)
ISR은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태
ISR이라는 용어가 나온 이유는 팔로워가 리더로부터 데이터를 복제하는데 시간이 걸리기 때문임
리더 파티션과 팔로워 파티션 간에 오프셋 차이가 발생함
4-2. acks(프로듀서 옵션)
카프카 프로듀서의 acks옵션은 0, 1, -1(all) 값을 가질 수 있음
acks 옵션에 따라 신뢰도와 성능의 tradeoff
- acks = 0
프로듀서가 리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않음
데이터가 일부 유실이 발생하더라도 전송 속도가 중요한 경우에 이 옵션값을 사용
- acks = 1
기본값
프로듀서는 보낸 데이터가 리더 파티션에만 정상적으로 적재되었는지 확인
정상적으로 적재되지 않았다면 리더 파티션에 적재될 때까지 재시도할 수 있음
하지만 데이터 유실 가능
리더 파티션에 적재가 완료되어도 팔로워 파티션에는 아직 데이터가 동기화되지 않을 수 있는데, 팔로워 파티션이 복제하기 이전에 리더 파티션이 있는 브로커에서 장애가 발생하면 데이터 유실 가능
- acks = -1(all)
프로듀서는 보낸 데이터가 리더 파티션과 팔로워 파티션에 모두 정상적으로 적재되었는지 확인
팔로우 파티션에 데이터가 정상 적재되었는지 기다리기 때문에 일부 브로커에 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장할 수 있음
acks를 all로 설정한 경우에, 토픽 단위로 설정 가능한 min.insync.replicas 옵션값에 따라 데이터의 안정성이 달라짐
min.insync.replicas의 값이 2이상일 때부터 의미가 있음
4-3. min.insync.replicas(토픽 옵션)
min.insync.replicas 는 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 적재되었는지 확인하기 위한 최소 ISR 그룹의 파티션 개수
min.insync.replicas의 값을 1로 한다면, ISR 중 최소 1개 이상의 파티션에 데이터가 적재되었음을 확인
acks = 1과 동일한 효과(ISR 중 가장 처음 적재가 완료되는 파티션은 리더 파티션이기 때문)
5. 프로듀서 애플리케이션 개발하기
메시지 키와 값을 확인할 수 있는 콘솔을 먼저 터미널에 띄워두기
bin/kafka-console-consumer.sh --bootstrap-server 172.23.8.51:9092 \
--topic test \
--property print.key=true \
--property key.separator="-" \
--from-beginning
5-1.메시지 값만 가진 레코드를 전송하는 프로듀서
강의와 다르게 Java17을 이용하였음.
build.gradle
plugins {
id 'java'
}
group 'com.example'
version '1.0'
// 강의와 다르게 Java 17이용
sourceCompatibility = 17
targetCompatibility = 17
repositories {
mavenCentral()
}
dependencies {
// 강의와 다르게 카프카 3.5.1을 사용하였음
implementation 'org.apache.kafka:kafka-clients:3.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
SimpleProducer.java
package com.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
// 필자는 WSL 환경에서 카프카 브로커를 실행시켜서 localhost 대신 WSL의 IP 호스트를 입력하였음
private final static String BOOTSTRAP_SERVERS = "172.23.8.51:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
5-2. 메시지 키를 가진 레코드를 전송하는 프로듀서
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
producer.send(record);
5-3. 레코드에 파티션 번호를 지정하여 전송하는 프로듀서
파티션을 직접 지정하고 싶다면 토픽 이름, 파티션 번호, 메시지 키, 메시지 값 순서대로
파티션 번호는 토픽에 존재하는 파티션 번호로 지정해야 함
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
int partitionNo = 0;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "Pangyo", "Pangyo");
producer.send(record);
5-4. 커스텀 파티셔너를 가지는 프로듀서
특정 데이터를 가지는 레코드를 특정 파티션으로 보내야하는 경우, Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
producer.send(record);
CustomPartitioner.java
package com.example;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("Need message key");
}
if (((String)key).equals("Pangyo"))
return 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}
6. 레코드 전송 결과를 확인하는 프로듀서 애플리케이션
KafkaProducer의 send() 메서드는 Future 객체를 반환
get() 메서드를 사용하면, RecordMetadata의 비동기 결과를 표현하여 어떤 토픽의 몇 번 파티션에 몇 번 오프셋으로 저장되었는지 동기적으로 정보를 가져올 수 있음
ex) test-0@3
토픽: test
파티션: 0
오프셋: 3
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
try {
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
단 acks를 0으로 지정했을 시 metadata에서 오프셋은 -1로 나옴
즉, 응답을 받지 않았기 때문에, 오프셋은 알 수가 없다.
7. 프로듀서의 안전한 종료
프로듀서를 안전하게 종료를 하기 위해서는 close() 메서드를 사용
producer.close();
'Kafka' 카테고리의 다른 글
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(2) (0) | 2024.10.26 |
---|---|
[Apache Kafka] 6. 카프카 컨슈머 애플리케이션 개발(1) (3) | 2024.10.25 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(4) - CLI 추가 참고사항 (0) | 2024.10.11 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(3) - 컨슈머, 컨슈머 그룹, 성능테스트, 로그덤프 (1) | 2024.10.11 |
[Apache Kafka] 4. 아파치 카프카 CLI 활용(2) - 토픽, 프로듀서 (2) | 2024.10.10 |