본 포스팅은 인프런 데브원영님의 [아파치 카프카 애플리케이션 프로그래밍]의 강의를 수강 후 정리하는 글입니다.
1. 카프카 커넥트 소개
1-1. 카프카 커넥트
카프카 커넥트: 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
1-2. 커넥트 내부 구조
카프카 스트림과 유사한 구조로, 커넥터와 커넥터 내부에 태스크가 있음
태스크가 실질적인 데이터 처리를 함
2. 커넥트
2-1. 소스 커넥터, 싱크 커넥터
소스 커넥터: 프로듀서 역할
싱크 커넥터: 컨슈머 역할
2-2. 커넥터 플러그인
직접 커넥터 플러그인을 만들거나 오픈 소스 커넥터 플러그인을 가져다가 추가 가능
2-3. 오픈소스 커넥터
직접 커넥터를 만들 필요가 없고, 커넥터 jar파일을 다운로드하여 사용할 수 있음
https://www.confluent.io/hub/ 에서 필요한 커넥터를 다운할 수 있음
반드시 라이선스를 참고하여 사용 범위를 확인해야함
2-4. 컨버터, 트랜스폼
- 컨버터: 데이터 처리를 하기 전, 스키마를 변경하도록 도와줌
- JsonConverter, StringConverter, ByteArrayConverter
- 트랜스폼: 데이터 처리 시 각 메시지 단위로 데이터를 간단하게 변환하기 위한 용도
ex) JSON 데이터에서 특정 키를 삭제하거나 추가
- Cast, Drop, ExtractField
3. 커넥트 배포 및 운영
3-1. 단일 모드 커넥트
단일 프로세스로 실행이기 때문에 단일 장애점(SPOF)이 될 수 있음
개발환경이나 중요도가 낮은 파이프라인을 운영할 때 사용됨
connect-standalone.properties(커넥트 설정 파일)
bootstrap.servers=localhost:9092
# converters는 Kafka에서 데이터를 저장하거나 가져올 때 데이터의 형식을 지정하고
# Connect 데이터로 변환하는 방식을 정의합니다. 모든 Connect 사용자는
# Kafka에서 데이터를 로드하거나 저장할 때 원하는 형식으로 구성해야 합니다.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter에 특정 설정을 전달하려면 설정 앞에 적용할 converter 이름을 접두사로 붙여서 사용합니다.
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# 테스트/디버깅에 유용하게 일반보다 더 자주 플러시하도록 설정
offset.flush.interval.ms=10000
# 플러그인(커넥터, 컨버터, 변환 등)의 클래스 로딩 격리를 활성화하려면 파일 시스템 경로 목록을 쉼표(,)로 구분하여 설정합니다.
# 이 목록은 다음을 포함할 수 있는 최상위 디렉토리로 구성되어야 합니다.
# a) 플러그인 및 해당 종속성을 포함하는 jar 파일이 바로 포함된 디렉토리
# b) 플러그인 및 해당 종속성을 포함하는 uber-jar 파일
# c) 플러그인 및 해당 종속성의 패키지 디렉토리 구조를 직접 포함하는 디렉토리
# 참고: 플러그인 또는 종속성을 찾기 위해 심볼릭 링크(symlinks)는 따라가게 됩니다.
# 예시:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
connect-file-source.properties(커넥터 설정)
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
단일 모드 커넥트 실행(REST API가 아닌 커넥트 설정 파일과 커넥터 설정 파일을 차례로 넣어 실행)
bin/connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties
3-2. 분산 모드 커넥트
동일한 group.id으로 묶어야 한다
3개 중 하나라도 REST API가 받게 되면, 모두 공유하게 됨
2개 이상의 커넥트가 있으면 1개의 커넥트가 이슈 발생하더라도 나머지 중 1개의 커넥트가 파이프라인을 지속적으로 처리 가능
bootstrap.servers=localhost:9092
group.id=connect-cluster
# converters는 Kafka에서 데이터를 저장하거나 가져올 때 데이터의 형식을 정의하고,
# Connect 데이터로 변환하는 방식을 설정합니다.
# 모든 Connect 사용자는 데이터를 Kafka에서 로드하거나 저장할 때 원하는 형식으로 설정해야 합니다.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 특정 converter 설정을 전달하려면 적용할 converter의 이름을 설정 앞에 접두사로 추가합니다.
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 오프셋을 저장할 Topic 이름입니다. 이 Topic은 여러 파티션을 가져야 하며, 복제 및 압축되어야 합니다.
# Kafka Connect는 필요 시 자동으로 Topic을 생성하지만, 필요한 경우 Kafka Connect를 시작하기 전에
# 특정 Topic 구성을 수동으로 생성할 수 있습니다.
# 대부분의 사용자는 기본 복제 계수인 3을 사용하거나 더 큰 값을 지정합니다.
# 이 설정을 단일 브로커 클러스터에서도 실행 가능하게 하려면, 여기서 복제 계수를 1로 설정합니다.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# 커넥터 및 작업 설정을 저장할 Topic 이름입니다. 이 Topic은 단일 파티션이어야 하며, 고도로 복제되고 압축되어야 합니다.
# Kafka Connect는 필요 시 자동으로 Topic을 생성하지만, 필요한 경우 Kafka Connect를 시작하기 전에
# 특정 Topic 구성을 수동으로 생성할 수 있습니다.
# 대부분의 사용자는 기본 복제 계수인 3을 사용하거나 더 큰 값을 지정합니다.
# 이 설정을 단일 브로커 클러스터에서도 실행 가능하게 하려면, 여기서 복제 계수를 1로 설정합니다.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# 상태를 저장할 Topic 이름입니다. 이 Topic은 여러 파티션을 가질 수 있으며, 복제 및 압축되어야 합니다.
# Kafka Connect는 필요 시 자동으로 Topic을 생성하지만, 필요한 경우 Kafka Connect를 시작하기 전에
# 특정 Topic 구성을 수동으로 생성할 수 있습니다.
# 대부분의 사용자는 기본 복제 계수인 3을 사용하거나 더 큰 값을 지정합니다.
# 이 설정을 단일 브로커 클러스터에서도 실행 가능하게 하려면, 여기서 복제 계수를 1로 설정합니다.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# 일반적으로보다 자주 플러시하도록 설정, 테스트 및 디버깅에 유용
offset.flush.interval.ms=10000
# REST API가 수신 대기할 URI 목록을 쉼표로 구분하여 나열합니다.
# 지원되는 프로토콜은 HTTP와 HTTPS입니다.
# 모든 인터페이스에 바인딩하려면 호스트 이름을 0.0.0.0으로 지정합니다.
# 기본 인터페이스에 바인딩하려면 호스트 이름을 비워 둡니다.
# 허용 가능한 예시: HTTP://myhost:8083, HTTPS://myhost:8084
#listeners=HTTP://:8083
# 다른 워커들이 연결할 때 사용할 호스트 이름과 포트를 설정합니다.
# 설정하지 않으면 "listeners" 설정 값을 사용합니다.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=
# 플러그인(커넥터, 컨버터, 변환 등)의 클래스 로딩 격리를 활성화하려면 파일 시스템 경로 목록을 쉼표(,)로 구분하여 설정합니다.
# 이 목록은 다음을 포함할 수 있는 최상위 디렉토리로 구성되어야 합니다.
# a) 플러그인 및 해당 종속성을 포함하는 jar 파일이 바로 포함된 디렉토리
# b) 플러그인 및 해당 종속성을 포함하는 uber-jar 파일
# c) 플러그인 및 해당 종속성의 패키지 디렉토리 구조를 직접 포함하는 디렉토리
# 예시:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
3-3. 커넥트 REST API 인터페이스
요청 메서드 | 호출 경로 | 설명 |
GET | / | 실행 중인 커넥트 정보 확인 |
GET | /connectors | 실행 중인 커넥터 이름 확인 |
POST | /connectors | 새로운 커넥터 생성 요청 |
GET | /connectors/{커넥터 이름} | 실행 중인 커넥터 정보 확인 |
GET | /connectors/{커넥터 이름}/config | 실행 중인 커넥터의 설정값 확인 |
PUT | /connectors/{커넥터 이름}/config | 실행 중인 커넥터 설정값 변경 요청 |
GET | /connectors/{커넥터 이름}/status | 실행 중인 커넥터 상태 확인 |
POST | /connectors/{커넥터 이름}/restart | 실행 중인 커넥터 재시작 요청 |
커넥트는 8083포트로 호출할 수 있으며, HTTP 메서드 기반 API를 제공함
'Kafka' 카테고리의 다른 글
[Apache Kafka] 9. 카프카 커넥트(3) - 커스텀 싱크 커넥터 (0) | 2024.11.03 |
---|---|
[Apache Kafka] 9. 카프카 커넥트(2) - 커스텀 소스 커넥터 (1) | 2024.11.02 |
[Apache Kafka] 8. 카프카 스트림즈(2) - 윈도우 프로세싱 (0) | 2024.10.30 |
[Apache Kafka] 8. 카프카 스트림즈(1) - KStream,KTable,GlobalKTable (0) | 2024.10.29 |
[Apache Kafka] 7. 멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머 (0) | 2024.10.28 |