1. 커스텀 소스 커넥터
오픈소스 소스 커넥터를 사용하면서 발생할 수 있는 라이선스 문제 혹은 로직 변경을 이유로 직접 개발할 수도 있음
카프카 커넥트 라이브러리에서 SourceConnector와 SourceTask 클래스를 사용하여 직접 소스 커넥터를 구현 가능
connect-api 라이브러리 추가
build.gradle
dependencies {
implementation 'org.apache.kafka:connect-api:3.5.1'
}
- SourceConnector
태스크를 실행하기 전 커넥터 설정파일을 초기화하고 어떤 태스크 클래스를 사용할 것인지 정의
실질적인 데이터를 다루는 부분이 없음
public class TestSourceConnector extends SourceConnector {
/**
* 커넥터의 버전을 반환합니다.
* @return 현재 커넥터 버전 정보
*/
@Override
public String version() {
// TODO: 커넥터의 버전을 문자열로 반환
return "1.0"; // 예: "1.0"으로 버전을 하드코딩하거나 다른 방식으로 설정할 수 있습니다.
}
/**
* 커넥터 시작 시 호출됩니다.
* 설정 값을 받아와 검증 및 초기화 작업을 수행합니다.
* @param props 커넥터 설정 값들이 포함된 맵(Map)입니다.
*/
@Override
public void start(Map<String, String> props) {
// TODO: 설정 값(props)을 이용한 커넥터 초기화 로직 추가
// 예: 설정 값 검증 및 구성 객체 생성, 필요한 자원 할당 등
}
/**
* 데이터를 Kafka에 전송할 작업(Task) 클래스를 지정합니다.
* 각 Task는 커넥터에 의해 실행되며, 데이터를 읽어오는 실제 작업을 수행합니다.
* @return Task 클래스를 반환합니다.
*/
@Override
public Class<? extends Task> taskClass() {
// TODO: 데이터를 읽어오는 작업을 수행할 Task 클래스 반환
// 예: return TestSourceTask.class;
return null; // 실제 Task 클래스 사용 시 null 대신 클래스명을 반환해야 합니다.
}
/**
* 각 Task 인스턴스에 전달할 설정을 정의합니다.
* maxTasks는 최대 실행 가능한 Task 수이며, 각 Task의 설정을 리스트로 반환합니다.
* @param maxTasks 최대 실행 가능한 Task 수
* @return 각 Task 인스턴스에 전달할 설정 리스트
*/
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// TODO: 각 Task에 필요한 설정을 정의하고 반환
// 예: 설정 맵 생성 후, maxTasks 개수만큼 반복하여 리스트에 추가
return null; // 실제 설정 리스트 사용 시 null 대신 설정 리스트 반환해야 합니다.
}
/**
* 커넥터의 설정 항목들을 정의합니다.
* 설정 항목들은 커넥터가 시작되기 전에 사용자 또는 관리자에 의해 지정됩니다.
* @return 커넥터의 설정 정의(ConfigDef 객체)
*/
@Override
public ConfigDef config() {
// TODO: 커넥터의 설정 정의 반환
// 예: return new ConfigDef().define(...); // 설정 정의 추가
return null; // 실제 설정 정의 사용 시 null 대신 ConfigDef 객체 반환해야 합니다.
}
/**
* 커넥터 중지 시 호출됩니다.
* 커넥터가 종료될 때 실행되며, 사용한 자원을 해제하고 연결을 닫는 등 정리 작업을 수행합니다.
*/
@Override
public void stop() {
// TODO: 커넥터 종료 시 필요한 작업 수행
// 예: 파일이나 데이터베이스 연결 닫기, 사용한 자원 반환 등
}
}
- SourceTask
소스 애플리케이션으로부터 데이터를 가져와서 토픽으로 데이터를 보내는 역할
토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용함
이를 오프셋 스토리지에 저장함
public class TestSourceTask extends SourceTask {
/**
* Task의 버전을 반환합니다.
* @return 현재 Task의 버전 정보
*/
@Override
public String version() {
// TODO: Task의 버전을 문자열로 반환
return "1.0"; // 예: "1.0"으로 버전을 하드코딩하거나 다른 방식으로 설정할 수 있습니다.
}
/**
* Task가 시작될 때 호출됩니다.
* Task가 데이터를 읽어올 소스와 필요한 초기화 작업을 수행합니다.
* @param props Task 설정 값들이 포함된 맵(Map)입니다.
*/
@Override
public void start(Map<String, String> props) {
// TODO: 설정 값(props)을 이용한 Task 초기화 로직 추가
// 예: 파일 경로나 데이터 소스 연결 설정 등을 초기화합니다.
}
/**
* Kafka로 전송할 데이터를 생성하고 반환합니다.
* 이 메서드는 Kafka Connect에 의해 주기적으로 호출되며, 데이터를 읽어와 SourceRecord 객체로 반환해야 합니다.
* @return Kafka로 전송할 데이터의 리스트(SourceRecord 객체 리스트)
*/
@Override
public List<SourceRecord> poll() {
// TODO: 데이터 소스에서 데이터를 읽어와 SourceRecord 리스트로 반환
// 예: 파일이나 데이터베이스에서 데이터를 읽고, SourceRecord 객체에 담아 Kafka로 전송합니다.
return null; // 실제 데이터 반환 시 null 대신 SourceRecord 리스트 반환
}
/**
* Task가 중지될 때 호출됩니다.
* 데이터를 읽어오는 연결을 닫거나 사용한 자원을 해제하는 등 정리 작업을 수행합니다.
*/
@Override
public void stop() {
// TODO: Task 종료 시 필요한 정리 작업 수행
// 예: 파일이나 데이터베이스 연결 닫기 등
}
}
2. 파일 소스 커넥터 구현 예제
connect-api 라이브러리와 함께 빌드하여 jar로 압축하기 위해 build.gradle에 다음과 같이 설정 추가(kafka 2.5.0 버전 기준)
같이 빌드하지 않으면, 참조하는 클래스를 찾지 못하여 ClassNotFoundException 발생할 수 있음
build.gradle ( kafka 2.5.0 기준)
dependencies {
implementation 'org.apache.kafka:connect-api:3.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
jar {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
강의대로 해봤는데 난 안 됨...카프카 3.5.1 버전이기 때문인가
그래서 build.gradle을 다음과 같이 변경하였음
jar파일로 압축할 때 shadowJar을 이용하도록 함
build.gradle( kafka 3.5.1)
dependencies {
implementation 'org.apache.kafka:connect-api:3.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
shadowJar {
archiveFileName = 'my-custom-connector-all.jar'
}
파일 구조
SingleFileSourceConnectorConfig
public class SingleFileSourceConnectorConfig extends AbstractConfig {
/**
* 읽을 파일 경로와 이름을 나타내는 설정 키
*/
public static final String DIR_FILE_NAME = "file";
/**
* DIR_FILE_NAME의 기본값으로 "/tmp/kafka.txt"로 설정되어 있습니다.
*/
private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
/**
* DIR_FILE_NAME 설정에 대한 설명
*/
private static final String DIR_FILE_NAME_DOC = "읽을 파일 경로와 이름";
/**
* 데이터를 보낼 Kafka 토픽을 나타내는 설정 키
*/
public static final String TOPIC_NAME = "topic";
/**
* TOPIC_NAME의 기본값으로 "test"로 설정되어 있습니다.
*/
private static final String TOPIC_DEFAULT_VALUE = "test";
/**
* TOPIC_NAME 설정에 대한 설명
*/
private static final String TOPIC_DOC = "보낼 토픽명";
/**
* 커넥터의 설정을 정의하는 ConfigDef 객체입니다.
* 각 설정 항목의 이름, 타입, 기본값, 중요도, 설명을 설정합니다.
*/
public static ConfigDef CONFIG = new ConfigDef()
.define(DIR_FILE_NAME, // 설정 키 (파일 경로)
Type.STRING, // 설정 타입 (문자열)
DIR_FILE_NAME_DEFAULT_VALUE, // 기본값 ("/tmp/kafka.txt")
Importance.HIGH, // 중요도 (HIGH)
DIR_FILE_NAME_DOC) // 설정에 대한 설명
.define(TOPIC_NAME, // 설정 키 (토픽명)
Type.STRING, // 설정 타입 (문자열)
TOPIC_DEFAULT_VALUE, // 기본값 ("test")
Importance.HIGH, // 중요도 (HIGH)
TOPIC_DOC); // 설정에 대한 설명
/**
* 생성자: AbstractConfig를 상속받아 설정 값을 초기화합니다.
* @param props 커넥터에 전달된 설정 값들이 포함된 맵(Map)입니다.
*/
public SingleFileSourceConnectorConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
커넥터 옵션값 설정시 중요도(Importance) 지정 기준
HIGH, MEDIUM, LOW 3가지가 있지만 명확한 기준은 없다.
HIGH: 커넥터에서 반드시 사용자가 입력한 설정이 필요한 값
MEDIUM: 사용자 입력값이 없더라도 상관없고 기본값이 있는 옵션
LOW: 입력값이 없어도 되는 옵션
SingleFileSourceTask
public class SingleFileSourceTask extends SourceTask {
private Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);
public final String FILENAME_FIELD = "filename";
public final String POSITION_FIELD = "position";
private Map<String, String> fileNamePartition;
private Map<String, Object> offset;
private String topic;
private String file;
private long position = -1;
@Override
public String version() {
return "1.0";
}
//리소스 초기화
@Override
public void start(Map<String, String> props) {
try {
// Init variables
SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props);
topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME);
file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME);
fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);
// 소스 커넥터에서 관리하는 내부 번호를 기록하는 용도
offset = context.offsetStorageReader().offset(fileNamePartition);
// Get file offset from offsetStorageReader
if (offset != null) {
Object lastReadFileOffset = offset.get(POSITION_FIELD);
if (lastReadFileOffset != null) {
// 만약 기존에 저장된 내부 번호가 있다면 해당 번호부터 시작
position = (Long) lastReadFileOffset;
}
} else { // 만약 기존에 저장된 내부 번호가 없다면 0부터 시작
position = 0;
}
} catch (Exception e) {
throw new ConnectException(e.getMessage(), e);
}
}
@Override
public List<SourceRecord> poll() {
List<SourceRecord> results = new ArrayList<>();
try {
Thread.sleep(1000);
// 가져가고 싶은 position(내부 번호)부터 데이터를 읽어감
List<String> lines = getLines(position);
if (lines.size() > 0) {
lines.forEach(line -> {
Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FIELD, ++position);
SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line);
results.add(sourceRecord); // 토픽으로 보내고 싶은 데이터는 List<SourceRecord>에 element 추가
});
}
return results; // 최종적으로 토픽으로 전송되는 List
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ConnectException(e.getMessage(), e);
}
}
private List<String> getLines(long readLine) throws Exception {
BufferedReader reader = Files.newBufferedReader(Paths.get(file));
return reader.lines().skip(readLine).collect(Collectors.toList());
}
@Override
public void stop() {
}
}
'Kafka' 카테고리의 다른 글
[Amazon MSK] Amazon MSK 클러스터 생성하기 (1) | 2024.11.09 |
---|---|
[Apache Kafka] 9. 카프카 커넥트(3) - 커스텀 싱크 커넥터 (0) | 2024.11.03 |
[Apache Kafka] 9. 카프카 커넥트(1) - 카프카 커넥트 (5) | 2024.11.01 |
[Apache Kafka] 8. 카프카 스트림즈(2) - 윈도우 프로세싱 (0) | 2024.10.30 |
[Apache Kafka] 8. 카프카 스트림즈(1) - KStream,KTable,GlobalKTable (0) | 2024.10.29 |