본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
1. 커스텀 싱크 커넥터
1-1. 커스텀 싱크 커넥터
카프카 커넥트 라이브러리에서 제공하는 SinkConnector와 SinkTask 클래스를 사용하면 직접 싱크 커넥터를 구현 가능
TestSinkConnector
public class TestSinkConnector extends SinkConnector {
/**
* 커넥터의 버전을 반환합니다.
* @return 현재 커넥터 버전 정보
*/
@Override
public String version() {
// TODO: 커넥터의 버전을 문자열로 반환
return "1.0"; // 예: "1.0"으로 버전을 하드코딩하거나 다른 방식으로 설정할 수 있습니다.
}
/**
* 커넥터가 시작될 때 호출됩니다.
* 커넥터의 설정 값을 받아와 검증 및 초기화 작업을 수행합니다.
* @param props 커넥터 설정 값들이 포함된 맵(Map)입니다.
*/
@Override
public void start(Map<String, String> props) {
// TODO: 설정 값(props)을 이용한 커넥터 초기화 로직 추가
// 예: 외부 시스템과의 연결 설정 또는 초기화 작업 수행
}
/**
* Sink 작업(Task) 클래스를 지정합니다.
* SinkConnector는 데이터를 처리할 Task 클래스를 여러 개 생성할 수 있습니다.
* @return 데이터를 처리할 Task 클래스 반환
*/
@Override
public Class<? extends Task> taskClass() {
// TODO: 데이터를 처리할 Task 클래스 지정
// 예: return TestSinkTask.class;
return null; // 실제 Task 클래스 사용 시 null 대신 Task 클래스명 반환
}
/**
* 각 Task 인스턴스에 전달할 설정을 정의합니다.
* maxTasks는 최대 실행 가능한 Task 수이며, 각 Task의 설정을 리스트로 반환합니다.
* @param maxTasks 최대 실행 가능한 Task 수
* @return 각 Task 인스턴스에 전달할 설정 리스트
*/
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// TODO: 각 Task에 필요한 설정을 정의하고 반환
// 예: 설정 맵을 생성하고, maxTasks 수만큼 리스트에 추가
return null; // 실제 설정 리스트 사용 시 null 대신 설정 리스트 반환
}
/**
* 커넥터의 설정 항목들을 정의합니다.
* 설정 항목들은 커넥터가 시작되기 전에 사용자 또는 관리자에 의해 지정됩니다.
* @return 커넥터의 설정 정의(ConfigDef 객체)
*/
@Override
public ConfigDef config() {
// TODO: 커넥터의 설정 정의 반환
// 예: return new ConfigDef().define(...); // 설정 정의 추가
return null; // 실제 설정 정의 사용 시 null 대신 ConfigDef 객체 반환해야 합니다.
}
/**
* 커넥터가 중지될 때 호출됩니다.
* 종료 시 외부 시스템 연결을 닫거나 사용한 자원을 해제하는 등 정리 작업을 수행합니다.
*/
@Override
public void stop() {
// TODO: 커넥터 종료 시 필요한 작업 수행
// 예: 외부 시스템과의 연결 해제 및 자원 반환
}
}
TestSinkTask
public class SingleFileSinkTask extends SinkTask {
/**
* Task의 버전을 반환합니다.
* @return 현재 Task의 버전 정보
*/
@Override
public String version() {
// TODO: Task의 버전을 문자열로 반환
return "1.0"; // 예: "1.0"으로 버전을 하드코딩하거나 다른 방식으로 설정할 수 있습니다.
}
/**
* Task가 시작될 때 호출됩니다.
* 설정 값을 받아와 초기화 작업을 수행하며, 파일에 쓰기 위한 준비를 합니다.
* @param props Task 설정 값들이 포함된 맵(Map)입니다.
*/
@Override
public void start(Map<String, String> props) {
// TODO: 설정 값(props)을 통해 파일 객체 및 FileWriter 초기화
// 예: 파일 경로를 설정하고, 파일 쓰기를 위한 FileWriter 객체를 생성
}
/**
* Kafka에서 읽은 데이터를 파일에 기록합니다.
* Kafka에서 전송된 SinkRecord 컬렉션을 받아 각 레코드를 파일에 저장합니다.
* @param records Kafka에서 전달된 SinkRecord의 컬렉션
*/
@Override
public void put(Collection<SinkRecord> records) {
// TODO: SinkRecord에서 데이터를 읽어와 파일에 기록
// 예: fileWriter를 이용하여 각 SinkRecord의 값을 파일에 씁니다.
}
/**
* 특정 오프셋(offset)을 기준으로 데이터를 flush(즉시 기록)합니다.
* @param offsets 커밋된 오프셋 정보가 담긴 TopicPartition과 OffsetAndMetadata 맵
*/
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
// TODO: 파일에 기록된 내용을 flush하여 디스크에 저장
// 예: fileWriter.flush() 호출로 파일에 기록된 내용을 강제로 저장
}
/**
* Task가 중지될 때 호출됩니다.
* 파일에 기록을 완료하고 파일을 닫는 등 필요한 정리 작업을 수행합니다.
*/
@Override
public void stop() {
// TODO: Task 종료 시 FileWriter를 닫는 등 정리 작업 수행
// 예: fileWriter.close() 호출로 파일 쓰기 작업을 종료
}
}
2. 파일 싱크 커넥터 구현 예제
파일 디렉토리 구조
SingleFileSinkConnectorConfig
public class SingleFileSinkConnectorConfig extends AbstractConfig {
// 저장할 파일의 디렉토리 경로와 파일 이름을 지정하는 설정 키
public static final String DIR_FILE_NAME = "file";
// DIR_FILE_NAME의 기본값으로 "/tmp/kafka.txt"가 설정되어 있습니다.
private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
// DIR_FILE_NAME 설정에 대한 설명
private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름";
/**
* Sink Connector의 설정을 정의하는 ConfigDef 객체입니다.
* DIR_FILE_NAME 항목의 이름, 타입, 기본값, 중요도, 설명을 설정합니다.
*/
public static ConfigDef CONFIG = new ConfigDef()
.define(DIR_FILE_NAME, // 설정 키 (파일 경로와 이름)
Type.STRING, // 설정 타입 (문자열)
DIR_FILE_NAME_DEFAULT_VALUE, // 기본값 ("/tmp/kafka.txt")
Importance.HIGH, // 중요도 (HIGH)
DIR_FILE_NAME_DOC); // 설정에 대한 설명
/**
* 생성자: AbstractConfig를 상속받아 설정값을 초기화합니다.
* @param props 커넥터에 전달된 설정값들이 포함된 맵(Map)입니다.
*/
public SingleFileSinkConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
SingleFileSinkConnector
public class SingleFileSinkConnector extends SinkConnector {
// Connector 설정값을 저장하는 맵
private Map<String, String> configProperties;
/**
* 커넥터의 버전을 반환합니다.
* @return 현재 커넥터 버전 정보
*/
@Override
public String version() {
return "1.0"; // 버전을 하드코딩하여 반환
}
/**
* 커넥터 시작 시 호출되는 메서드로, 전달받은 설정 값을 통해 초기화합니다.
* @param props 커넥터의 설정 값이 포함된 맵
*/
@Override
public void start(Map<String, String> props) {
this.configProperties = props;
try {
// 설정을 검증하고, 유효하지 않으면 예외를 발생시킵니다.
new SingleFileSinkConnectorConfig(props);
} catch (ConfigException e) {
// 설정 검증 실패 시 ConnectException 예외를 발생
throw new ConnectException(e.getMessage(), e);
}
}
/**
* 데이터를 처리할 Task 클래스를 반환합니다.
* @return Sink 작업을 수행하는 Task 클래스
*/
@Override
public Class<? extends Task> taskClass() {
// 데이터를 파일에 저장하는 Task 클래스 지정
return SingleFileSinkTask.class;
}
/**
* 각 Task 인스턴스에 전달할 설정을 정의합니다.
* @param maxTasks 최대 실행 가능한 Task 수
* @return 각 Task에 전달할 설정을 담은 맵의 리스트
*/
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(configProperties);
// 최대 maxTasks 수만큼 동일한 설정 맵을 Task 설정 리스트에 추가
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(taskProps);
}
return taskConfigs;
}
/**
* 커넥터의 설정을 정의하는 ConfigDef 객체를 반환합니다.
* @return 설정 정의를 담은 ConfigDef 객체
*/
@Override
public ConfigDef config() {
// SingleFileSinkConnectorConfig에 정의된 설정을 반환
return SingleFileSinkConnectorConfig.CONFIG;
}
/**
* 커넥터가 종료될 때 호출되는 메서드로, 필요한 정리 작업을 수행합니다.
*/
@Override
public void stop() {
// 필요한 경우 자원 해제 작업 수행
}
}
SingleFileSinkTask
public class SingleFileSinkTask extends SinkTask {
// Sink Connector 설정 객체
private SingleFileSinkConnectorConfig config;
// 데이터가 저장될 파일
private File file;
// 파일 쓰기 객체
private FileWriter fileWriter;
/**
* 커넥터의 버전을 반환합니다.
* @return 현재 버전 정보
*/
@Override
public String version() {
return "1.0"; // 하드코딩된 버전을 반환
}
/**
* Task가 시작될 때 호출되는 메서드로, 설정을 받아 파일을 초기화합니다.
* @param props 커넥터 설정 값이 담긴 맵
*/
@Override
public void start(Map<String, String> props) {
try {
// 설정 객체를 생성하고, 파일을 설정된 경로에 생성
config = new SingleFileSinkConnectorConfig(props);
file = new File(config.getString(config.DIR_FILE_NAME));
fileWriter = new FileWriter(file, true); // 파일에 내용을 추가하는 모드로 FileWriter 초기화
} catch (Exception e) {
// 설정 오류나 파일 초기화 실패 시 ConnectException 예외 발생
throw new ConnectException(e.getMessage(), e);
}
}
/**
* Kafka에서 수신한 레코드를 처리하여 파일에 기록합니다.
* @param records Kafka에서 수신한 SinkRecord의 모음
*/
@Override
public void put(Collection<SinkRecord> records) {
try {
// 각 SinkRecord의 값을 파일에 기록
for (SinkRecord record : records) {
fileWriter.write(record.value().toString() + "\n");
}
} catch (IOException e) {
// 파일 쓰기 실패 시 ConnectException 예외 발생
throw new ConnectException(e.getMessage(), e);
}
}
/**
* 파일 내용을 플러시하여 모든 변경 내용을 저장합니다.
* @param offsets 마지막으로 처리한 오프셋 정보를 포함하는 맵
*/
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
// 파일에 기록된 내용을 저장
fileWriter.flush();
} catch (IOException e) {
// 플러시 실패 시 ConnectException 예외 발생
throw new ConnectException(e.getMessage(), e);
}
}
/**
* Task가 종료될 때 호출되며, 파일 쓰기 객체를 닫아 리소스를 해제합니다.
*/
@Override
public void stop() {
try {
// 파일 쓰기 객체를 닫아 자원을 해제
fileWriter.close();
} catch (IOException e) {
// 파일 닫기 실패 시 ConnectException 예외 발생
throw new ConnectException(e.getMessage(), e);
}
}
}
3. 분산 모드 커넥터 실습
커스텀 커넥터 jar파일을 넣을 디렉토리 생성
mkdir connect
connect 디렉토리에 커스텀 커넥터 shadowJar 하여 jar파일 옮겨주기
vi config/connect-distributed.properties
bootstrap.servers=localhost:9092 # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# ... 중략 ...
# 커스텀 커넥터 jar파일 위치할 곳
plugin.path=/home/iglu123/kafka_2.12-3.5.1/connect
카프카 커넥트 실행
bin/connect-distributed.sh config/connect-distributed.properties
커넥터 플러그인 조회
curl -X GET http://localhost:8083/connector-plugins
커스터마이즈 커넥터인 SingleFileSinkConnector가 조회됨을 확인할 수 있음
새로운 토픽 생성
bin/kafka-topics.sh --create --bootstrap-server 172.20.218.223:9092 --topic custom-test --partitions 10
커스터마이즈 커넥터 생성
curl -X POST -H "Content-Type: application/json" \
--data '{
"name": "single-file-sink-connector",
"config": {
"connector.class": "com.example.SingleFileSinkConnector",
"tasks.max": "1",
"file": "/tmp/custom-sink-test-data.txt",
"topics": "custom-test"
}
}' \
http://localhost:8083/connectors
생성된 커넥터 목록 조회
curl -X GET http://localhost:8083/connectors
커넥터 상태 조회
curl -X GET http://localhost:8083/connectors/single-file-sink-connector/status
프로듀서로 레코드 생성
bin/kafka-console-producer.sh --bootstrap-server 172.20.218.223:9092 --topic custom-test
싱크된 데이터 확인
cat /tmp/custom-sink-test-data.txt
성공적으로 데이터가 불러와졌음을 확인할 수 있음
커넥터를 지워보기
curl -X DELETE http://localhost:8083/connectors/single-file-sink-connector
'Kafka' 카테고리의 다른 글
[Amazon MSK] Amazon MSK 클러스터 생성하기 (1) | 2024.11.09 |
---|---|
[Apache Kafka] 9. 카프카 커넥트(2) - 커스텀 소스 커넥터 (1) | 2024.11.02 |
[Apache Kafka] 9. 카프카 커넥트(1) - 카프카 커넥트 (5) | 2024.11.01 |
[Apache Kafka] 8. 카프카 스트림즈(2) - 윈도우 프로세싱 (0) | 2024.10.30 |
[Apache Kafka] 8. 카프카 스트림즈(1) - KStream,KTable,GlobalKTable (0) | 2024.10.29 |