티스토리 뷰
들어가는 말
- 과거 kafka study 했던 자료를 한번 복기 하면서 정리합니다.
개요
카프카는 클러스터 관리를 위해 CLI 도구를 제공합니다.
메시지 읽기와 쓰기 도구
- kafka-console.consumer.sh
- kafka-console.producer.sh
- 주의
- 콘솔 컨슈머 사용 시 메시지 유실 가능
- 콘솔 프로듀서는 카프카 프로듀서의 모든 기능을 사용할 수 없음
- 메시지 전송이 까다로움
- 어플리케이션에서는 자바 클라이언트 라이브러리나, 다른 언어의 서드파티 클라이언트 사용
콘솔 프로듀서
- kafka-console.producer.sh
- 인자
- --broker-list
- --topic
kafka-console-producer.sh --broker-list kafka-test-broker.example.com:9092 --topic test-xep
- 프로듀서 구성 설정
- --producer.config 설정파일
- --producer-property key=value
- 콘솔 프로듀서 옵션
- --key-serializer 클래스 이름
- kafka.serializer.DefaultEncoder
- --value-serializer 클래스 이름
- kafka.serializer.DefaultEncoder
- --compression-code 문자열
- none, gzip, snappy, lz4
- --sync
- 동기화 방식 설정
- --key-serializer 클래스 이름
메시지 라인을 읽는 리더 클래스 옵션
- kafka.tools.LineMessageReader
- stdin 으로 부터 메시지 읽어서 프로듀서 레코드를 생성하는 클래스
- --property
- ignore.error
- parse.key 가 'true' 이며 키의 구분자가 없을때 'false' 로 되어 있으면 예외 발생시킴
- parse.key
- 'false' 로 설정시 항상 key를 nul로 지정
- key.seperator
- 메시지를 읽을때 키와 값을 구분하는 문자 지정, 기본 \t
- ignore.error
콘솔 컨슈머
- kafka-console-consumer.sh
- DefaultFormatter를 사용하여 그대로 출력
- 주의
- 콘솔 컨슈머는 카프카와 같은 버전을 사용
- 구버전의 콘솔 컨슈머는 잘못된 방법으로 주키퍼와 상호작용하므로 클러스터에 손상
- 구버전
- --zookeeper 옵션
kafka-console-consumer.sh --zookeeper zoo-test1.example.com:2181 --topic test
- 신버전
- --new-consumer, --bootstrap-server 옵션
kafka-console-consumer.sh --bootstrap-server kafka-test-broker.example.com:9092 --topic test-xep --from-beginning
- 대상 토픽
- --topic
- 하나의 토픽
- --whitelist
- 정규식과 일치하는 모든 토픽
- --blacklist
- 정규식과 일치하지 않는 모든 토픽
- --topic
- 컨슈머 구성 설정
- --consumer.config 설정파일
- --consumer-property key=value
- 주의
- --property 옵션의 경우 메시지 포맷터에 구성 옵션을 전달 할 때에만 사용
- 콘솔 컨슈머 옵션
- --formatter 클래스 이름
- kafka.tools.DefaultFormatter
- --from-beginning
- --max-message #num
- --partition #num
- id 가 num 인 파티션에서만 메시지를 읽음
- --formatter 클래스 이름
메시지 포맷터 옵션
- kafka.tools.LoggingMessageFormatter
- 표준 출력이 아닌 로거를 사용해서 출력
- kafka.tools.LoggingMessageFormatter
- 메시지 체크섬만 출력
- kafka.tools.NoOpMessageFormatter
- 메시지 출력하지 않음
- kafka.tools.DefaultMessageFormatter 의 옵션
- print.timestamp
- 'true' 일 경우 각 메시지의 타임 스탬프 표시
- print.key
- 'true' 일때 메시지에 키 표시
- key.seperator
- 메시지를 출력할 때 키와 값을 구분하는 문자 지정
- line.seperator
- 구분자
- key.deserializer
- value.deseiralizer
- print.timestamp
오프셋 토픽 읽기
- 클러스터 컨슈머 그룹에 어떤 오프셋이 커밋되어 있는지 확인
* 특정 컨슈머 그룹이 커밋하는 오프셋 확인
* 오프셋이 얼마나 자주 커밋되어 있는지 확인 - __consumer_offsets
- 모든 컨슈머들의 오프셋들이 메시지로 저장
- kafka.coordinator.GroupMetadataManager$OffstsMessageFormatter
- kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter
kafka-console-consumer.sh --bootstrap-server kafka-test-broker.example.com:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
토픽작업
- kafka-topics.sh
- 대부분의 토픽 작업 가능
- 구성 변경 kafka-configs.sh
- 생성
- 변경
- 삭제
- 정보 조회
- 대부분의 토픽 작업 가능
- --zookeeper 인자로 주키퍼 연결 문자열 지정 필요
- 주의
- 카프카의 CLI 도구의 경우 브로커를 통하지 않고 주키퍼에 직접 접근하므로 브로커와 동일한 버전의 도구 사용 필요
토픽 생성
- 필요한 인자
- 토픽 이름
- 복제 팩터
- 파티션
- 참고
- kafka-topics.sh의 인자로 --config 으로 구성을 override 할 수 있음
- __ 이름을 가진 토픽의 경우 클러스터 내부적으로 사용하는 토픽으로 간주
- 토픽 이름에 . 이 들어가 경우 메트릭에서는 _ 로 대체
- topic.1 -> topic_1
- 실행
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1
- 랙 인식을 원하지 않을때
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1 --disable-rack-aware
- 토픽 이름이 없을때에만 생성
- 해당 옵션이 없으면 동일한 이름이 있을 경우 에러 발생
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1 --disable-rack-aware --if-not-exists
파티션 추가하기
- 이미 존재하는 토픽의 파티션 개수를 증가
- --alter 인자 사용
- 목적
- 토픽의 크기를 확장
- 각 파티션의 처리량을 감소
- 하나의 컨슈머 그룹에서 컨슈머를 증가시키기 위해
- 주의
- 메시지에 키가 있는 토픽에 파티션을 추가할 경우 메시지 키에 매핑되는 파티션이 바뀔 수 있음
- 정보
- --if-exists 와 --alter 를 같이 사용하는 것은 권장한지 않음
- 토픽이 없을때 에러가 반환되어야 함
- --if-exists 와 --alter 를 같이 사용하는 것은 권장한지 않음
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --partitions 1 --if-not-exists
- 경고
- 파티션의 개수는 줄일 수 없음
- 파티션을 삭제할 경우에는 데이터도 삭제되므로
- 데이터 순서도 변경되므로 다른 파티션에 재분배 하기도 어려움
- 필요할 경우 토픽 삭제 후 재생성
- 파티션의 개수는 줄일 수 없음
토픽 삭제하기
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --delete --topic topic-name
- 번외
- 바로 삭제되는 것은 아니며, 컨슈머, 프로듀서에서 처리가 안되고 이후 재시작 시 삭제됨
- 브로커 옵션으로 즉시 삭제 가능
- delete.topic.enable=true
토픽 조회
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --list
토픽 상세 조회
- --describe
- --topic 옵션을 덧붙일 경우 특정 토픽 상세 정보 조회
- 출력되는 정보
- 파티션 개수
- 토픽 구성 오버라이드
- 각 파티션과 할당된 레플리카 내역
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe
- --describe 와 조합하여 사용
- 클러스터 트러블 슈팅을 위해
- --topic 으로 특정 토픽을 지정하지 않음
- --list 명령과는 함께 동작하지 않음
- --topic-with-overrides
- 브로커 설정의 기본값을 재설정한 토픽을 찾을때
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --topics-with-overrides
- --underreplicated-partitions
- 리더 레플리카가 있으면서 하나 이상의 레플리카가 동기화 되지 않은 모든 파티션
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --under-replicated-partitions
- --unavailable-partitions
- 리더가 없는 모든 파티션 조회
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --unavailable-partitions
컨슈머 그룹
- 컨슈머 그룹 정보
- 구버전 (0.9.0 이전)
- 주키퍼에서 관리
- 신버전 (0.9.0 이후)
- 카프카 브로커 내부에서 관리
- 구버전 (0.9.0 이전)
- kafka-consumer-groups.sh
- 모든 버전의 컨슈머 그룹 리스트와 오프셋 정보를 포함한 상세 내역 조회
- 구버전에서만 가능한 기능
- 컨슈머 그룹과 오프셋 정보를 삭제
- 구버전 컨슈머 클라이언트 정보 조회
- --zookeeper 인자 사용
- 신버전 컨슈머 클라이언트 정보 조회
- --bootstrap-server 인자 사용
컨슈머 그룹의 리스트와 상세정보 조회
- 구버전 컨슈머 클라이언트 목록 조회
- 테스트 환경에서는 사용 불가
- 로그앤크래시 알파 환경 카프카
./kafka-consumer-groups.sh --zookeeper logncrash-alpha-zk:10013/NHN/NELO2/kafka --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
nelo2-notifier-group
nelo2-legacy-notifier-group
logger-decryptor-group
- 신버전 컨슈머 클라이언트 목록 조회
kafka-consumer-groups.sh --bootstrap-server kafka-test-broker.example.com:9092 --list
- 구버전 컨슈머 클라이언트 컨슈머 그룹 상세 조회
- 로그앤크래시 알파환경 카프카
./kafka-consumer-groups.sh --zookeeper logncrash-alpha-zk:10013/NHN/NELO2/kafka --describe --group nelo-percoalt or-notifier-group
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
Error: Executing consumer group command failed due to org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/nelo-percoaltor-notifier-group/ids
- 신버전 컨슈머 클라이언트 컨슈머 그룹 상세 조회
- 파티션
- 현재오프셋
- 마지막오프셋
- 렉
- 컨슈머아이디
- 호스트
- 클라이언트아이디
kafka-consumer-groups.sh --bootstrap-server kafka-test-broker.example.com:9092 --describe --group consumer-group-name
컨슈머 그룹 삭제하기
- 정보
- 구버전 컨슈머 클라이언트에만 지원
- 주의
- 해당 그룹의 모든 컨슈머 그룹을 셧다운 해야함
kafka-consumer-groups.sh --zookeeper zoo-test1.example.com:2181 --delete --group consumer-group-name
- 특정 토픽의 오프셋 정보만 삭제하는것도 가능
kafka-consumer-groups.sh --zookeeper zoo-test1.example.com:2181 --delete --group consumer-group-name --topic topic-name
오프셋관리 (컨슈머그룹)
- 구버전의 컨슈머 클라이언트 이용
- 기능
- 컨슈머 그룹의 오프셋 조회
- 컨슈머 그룹의 오프셋 삭제
- 컨슈머 그룹의 오프셋 내보내기
- 새로운 오프셋 저장
- 오프셋을 돌릴때 사용
- 오프셋을 넘길떄 사용
오프셋 내보내기
- 자바 클래스 직접 호출
- (테스트 환경에서는 지원 불가, 클래스 존재하지 않음)
kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
오프셋 가져오기
- 자바 클래스 직접 호출
- 오프셋 내보내기로 생성한 파일을 기준으로 재설정
- 주의
- 해당 그룹의 모든 컨슈머들을 중단해야함
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
동적 구성 변경 (클러스터)
- 클러스터 실행 중 토픽과 클라이언트의 구성 변경 가능
- kafka-configs.sh사용
- 설정된 구성은 클러스터에 영구적으로 적용
- 주키퍼에 저장되며 브로커 시작 시 읽어서 사용
- 동적 구성 종류
- per-topic
- per-client
토픽 기본 구성 재설정
- 하나의 클러스터에서 토픽 별 설정 변경
- 기본 토픽 설정은 브로커 구성에서 지정된 기본값 적용
kafka-configs.sh --zookeeper zoo-test1.example.com:2181
--alter --entity-type topics --entity-name
--add-config =[,=...]
- 설정 종류
- cleanup.policy
- compaction일 경우 가장 최근 메시지만 남기고 모두 삭제
- compression.type
- 디스크에 쓸 떄 브로커가 사용하는 압축
- gzip, snappy, lz4
- retention.ms
- min.insync..replicas
- unclean.leader.election.enable
- 이하 책 221 페이지
- https://docs.confluent.io/current/kafka/dynamic-config.html
- cleanup.policy
kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=3600000
Completed Updating config for entity: topic 'test'.
클라이언트 기본 구성 재설정
- 카프카 클라이언트 (프로듀서, 컨슈머의) 할당량(quota) 조정
- 클라이언트 ID의 모든 클라이언트에게 허용되는 초당 바이트 비율
- 하나의 브로커를 기준으로 프로듀서가 메시지를 쓰거나 읽을때 허용된 한도
- e.g. 클러스터에 5개 브로커, 클라이언트의 프로듀서 할당량이 10MB/s 일 경우, 전체 합계는 50MB/s
- 클라이언트 ID의 모든 클라이언트에게 허용되는 초당 바이트 비율
- 종류
- producer_bytes_rate
- consumer_bytes_rate
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name
구성 오버라이드 조회하기
kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --describe --entity-type topics --entity-name test
Configs for topic 'test' are retention.ms=3600000
구성 오버라이드 삭제하기
- 동적 구성 오버라이드는 삭제 가능
- 이후 기본값으로 원상 복귀
kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --alter --entity-type topics --entity-name test --delete-config retent ion.ms
Completed Updating config for entity: topic 'test'.
파티션 관리
- 종류
- 리더 레플리카 재선출
- 브로커에게 파티션 할당 (저수준)
선호 레플리카 선출
- 토픽은 다수의 파티션으로 구성, 파티션은 다수의 레플리카로 복제 가능
- 하나의 레플리카 브로커만이 파티션 리더
- 모든 프로듀서와 컨슈머의 작업은 파티션 리더의 브로커에서 수행
- 카프카 내부에서 해당 브로커가 첫번째 ISR로 정의
- 해당 브로커가 재시작 할떄 어떤 파티션의 리더도 맡지 않게 됨
- 주의
- 파티션 리더 자동 조정
- 브로커 구성을 통해 자동으로 파티션 리더 조정 가능
- 프로덕션 환경에서는 성능 이슈로 사용이 권장되지 않음
- 기능 동작 중 프로듀서 컨슈머 기능이 일시정지
- 파티션 리더 자동 조정
- kafka-preffered-replica-election.sh
- 운영자가 원하는 시점에서 각 파티션의 리더 선출 가능
kafka-preferred-replica-election.sh --zookeeper zoo-test1.example.com:2181
- 파티션 개수가 많은 클러스터에서 한번의 선호 레플리카 선출로 실행되지 않음
- 선호 레플리카 선출 요청은 클러스터 메타데이터의 znode(주키퍼)에 저장
- 요청 내용 크기가 주키퍼 기본 노드(기본 1MB) 보다 크면 요청 실패
- 요청을 여러번 나누어서 실행
{
"partitions": [
{
"partition": 1,
"topic": "foo"
},
{
"partition": 2,
"topic": "foobar"
}
]
}
kafka-preferred-replica-election.sh --zookeeper zoo-test1.example.com:2181 --path-to-json-file
partitions.json
파티션의 레플리카 변경
- 파티션의 레플리카 할당을 수동으로 변경
- 토픽의 파티션들이 브로커들에게 고르게 분배되지 않았을 때
- 브로커가 오프라인이 될 때
- 새로운 브로커가 추가 되었을 때
- kafka-reassign-partitions.sh
- 작업 단계
- 브로커 리스트와 토픽 리스트를 사용하여 파티션 재할당 리스트 생성
- 리스트대로 파티션 재할당 실행
- (생략가능) 재할당으로 생성된 리스트를 사용하여 파티션 재할당의 진행도 확인
- 파티션 재할당 리스트 생성을 위한 기본 정보
- json 파일
- version 은 1로 고정
{
"topics": [
{
"topic": "test-topic2"
},
{
"topic": "test-topic1"
}
],
"version": 1
}
- 파티션 재할당 리스트 생성
- 0,1 브로커에서 2,3 브로커로 재할당
kafka-reassign-partitions.sh --zookeeper zoo-test1.example.com:2181 --generate --topics-to-move-json-file topics.json --broker-list 2,3
- 파티션 재할당 리스트 생성 샘플
- 첫번째 결과 현재 할당 상태
- 두번째 결과 kafka-reassign-partition.sh수행 시 필요한 파라메터 파일
# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --generate
--topics-to-move-json-file topics.json --broker-list 2,3
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"my-topic","partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"replicas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[2,3]},{"topic":"my-topic","partition":10,"replicas":[3,2]},{"topic":"my-topic","partition":1,"replicas":[2,3]},{"topic":"my-topic","partition":4,"replicas":[3,2]},{"topic":"my-topic","partition":7,"replicas":[2,3]},{"topic":"my-topic","partition":6,"replicas":[3,2]},{"topic":"my-topic","partition":15,"replicas":[2,3]},{"topic":"my-topic","partition":0,"replicas":[3,2]},{"topic":"my-topic","partition":3,"replicas":[2,3]},{"topic":"my-topic","partition":11,"replicas":[2,3]},{"topic":"my-topic","partition":8,"replicas":[3,2]},{"topic":"my-topic","partition":12,"replicas":[3,2]},{"topic":"my-topic","partition":13,"replicas":[2,3]},{"topic":"my-topic","partition":2,"replicas":[3,2]},{"topic":"my-topic","partition":14,"replicas":[3,2]},{"topic":"my-topic","partition":9,"replicas":[2,3]}]}
#
- 실행 결과를 별도의 파일로 저장 한뒤 명령 수행
kafka-reassign-partitions.sh --zookeeper zoo-test1.example.com:2181 --execute
--reassignment-json-file reassign.json
- 동작
- 클러스터 컨트롤러에서 각 파티션의 레플리카 리스트에 새로운 레플리카 추가
- 레플리케이션 팩터 증가
- 새로운 레플리카 브로커들에서 현재 리더 레플리카로부터 파티션 복제
- 시간 소요
- 복제 완료 시 컨트롤러가 레플리카 리스트로부터 이전 레플리카 제거
- 레플리케이션 팩터 감소
- 클러스터 컨트롤러에서 각 파티션의 레플리카 리스트에 새로운 레플리카 추가
- 고려사항
- 브로커로부터 많은 파티션이 제거 될 때, 다시 시작 한 뒤 재할당을 시작
- 해당 브로커가 리더로 할당되었던 파티션들이 새로운 리더로 클러스터의 다른 브로커들이 지정
- 검증
- 파티션 재할당이 실행중이거나 완료된 이후 재할당 (진행 상태 혹은 에러 발생) 확인
- 작업에 사용되었던 JSON 객체를 가진 파일을 인자로 사용
- 샘플
kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --verify
--reassignment-json-file reassign.json
Status of partition reassignment:
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,13] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,12] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,14] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,15] completed successfully
Reassignment of partition [my-topic,8] completed successfully
- 정보
- 재할당 작업의 배치 처리
- 파티션 재할당 작업은 클러스터에 큰 영향
- 메모리 페이지 캐시 변경
- 네트워크 사용
- 디스크 IO
- 재할당 작업 진행시 작은 단위 (토픽별 등)으로 진행
- 파티션 재할당 작업은 클러스터에 큰 영향
- 재할당 작업의 배치 처리
복제 팩터 변경하기
- 파티션의 복제 팩터 값을 증가 또는 감소 하는 기능
- 문서화 되지 않은 기능
- 토픽 생성 시 복제 팩터에 필요한 만큼 브로커가 없을때 감소 필요
- 파티션 재할당 단계의 실행 단계에서 사용되는 JSON에 복제 팩터 추가
- 현재 파티션 할당
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [
1
]
}
],
"version": 1
}
- 복제 팩터 증가 시
{
"partitions": [
{
"partition": 0,
"replicas": [
1,
2
],
"topic": "my-topic"
}
],
"version": 1
}
로그 세그먼트 내용 확인
- 메시지의 특정 내용을 확인
- 파티션의 로그 세그먼트 파일의 내용 해독
- 로그 세그먼트 파일들을 쉼표로 구분하여 인자로 전달
******** 접속
[******** ~]$ docker exec -it 48ee47b4d949 /bin/bash
/usr/local/kafka/config
server.properties
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
/tmp/kafka-logs/test-topic1-0
[admin@kafka-broker3 test-topic-2]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
[admin@kafka-broker3 test-topic-2]$
[admin@kafka-broker3 test-topic-2]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log -- print-data-log
Dumping 00000000000000000000.log
Starting offset: 0
- 로그 세그먼트의 인덱스 파일 확인
- 해당 파일이 손상된 경우 컨슈머가 메시지 읽을 때 에러 발생
- --index-sanity-check
- 인덱스가 사용 가능한 상태인지 검사, 모든 인덱스 항목 출력
- --verify-index-only
- 메시지 데이터와 일치하지 않는 인덱스 항목 검사
kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.index,00000000000052368601.log
--index-sanity-check
Dumping 00000000000052368601.index
00000000000052368601.index passed sanity check.
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
...
레플리카 확인
- 파티션 복제 작업
- 일반 카프카 컨슈머 클라이언트가 하는 일과 유사
- 팔로어 레플리카가 현재 오프셋 까지 주기적으로 메시지 복제
- 일반 카프카 컨슈머 클라이언트가 하는 일과 유사
- 클러스터 전체의 파티션 레플리카들이 메시지 차이 없이 복제를 하고 있는지 확인하기 위한 도구
- kafka-replica-verification.sh
- 지정된 토픽 파티션들과 모든 메시지가 모든 레플리카에 있는지 읽어서 검사
- 주의
- 클러스터에 주는 영향
- 레플리카를 확인하기 위해 가장 오래된 오프셋부터 모든 메시지를 읽으므로 클러스터에 영향
- 각 파티션의 모든 레플리카로부터 병렬로 메시지를 읽음
- 샘플
kafka-replica-verification.sh --broker-list kafka-test-broker.example.com:9092 --topic-white-list 'test*'
2019-03-23 23:40:59,871: verification process is started.
2019-03-23 23:41:30,007: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:42:00,099: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:42:30,189: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:43:00,314: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:43:30,418: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:44:00,521: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:44:30,613: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:45:00,699: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:45:30,798: max lag is 0 for partition test-0 at offset 47 among 1 partitions
클라이언트 ACL
안전하지 않은 작업
- 기술적으로는 가능하지만 심각한 상황이 아니면 하지 말하야 하는 관리 작업
클러스터 컨트롤러 변경
- 모든 카프카 클러스터에는 컨트롤러 존재
- 여러 브로커 중 하나에서 실행되는 쓰레드
- 클러스터 작업을 감독하는 책임
- 컨트롤러에서 예외가 발생하거나 실행은 되고 있지만 정상적으로 동작하지 못하는 경우 변경
- zookeeper 최상위 노드 중 /controller 에 등록
- 노드를 삭제할 경우 새로운 컨트롤러 자동 선출
파티션 재할당 작업 없애기
- 파티션 재할당의 정상적인 작업 처리 요청
- 재할당 요청 (주키퍼 노드 생성)
- 클러스터 컨트롤러가 새로운 브로커들에게 파티션 할당
- 데이터가 동기화 될때까지 새로운 브로커들이 각 파티션을 복제하기 시작
- 파티션 레플리카 리스트에 등록된 이전 브로커들을 클러스터 컨트롤러가 삭제
- 모든 재할당 작업은 병렬로 시작되므로 재할당 작업이 취소되는 경우는 없음
- 재할당 도중 브로커에 장애가 생겨 곧바로 다시 시작할 수 없는 경우, 어떤 추가적인 재할당도 시작되지 않음
- 진행중인 파티션 재할당 작업을 제거 하는 과정
- 주키퍼 노드의 카프카 클러스터 경로에서 /admin/reassign_partitions 삭제
- 강제로 컨트롤러 변경
- 주의
- 진행중인 재할당 작업 제거 시 복제 팩터가 설정값보다 더 큰값이 되어 있을 수 있으므로 작업 완료 시 올바르게 설정되어 있는지 확인
토픽 삭제하기
- 토픽 삭제 요청 시 삭제를 요청하는 주키퍼 노드 생성
- /admin/delete_topic
수동으로 토픽 삭제하기
- 주의
- 클러스터가 온라인상태일떄 주키퍼 메타데이터를 변경하는것음 위험
- 과정
- 주키퍼의 모든 브로커 셧다운
- 주키퍼에서 /brokers/topics/토픽이름 삭제
- 각 브로커의 로그 디렉토리에서 해당 파티션 디렉터리들을 삭제
- 디렉터리 이름 토픽이름-#num
- 브로커 재시작
'개발관련 > 오픈소스(들?)' 카테고리의 다른 글
Opensearch (aws elasticsearch) (0) | 2021.08.08 |
---|---|
elasticsearch percolate search POC (0) | 2021.07.07 |
로컬 환경에 kakfa 설치 및 구성 (0) | 2021.06.23 |
elasticsearch 의 저장 과정 (0) | 2021.05.24 |
kafka 2.8.0 (0) | 2021.05.16 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- Generic
- Java
- kafka 2.8.0
- 에픽테토스
- Spring
- 만들면서 배우는 클린 아키텍처
- completablefuture
- pecs
- opensearch
- 기술블로그
- 기술사이트
- Dangling
- 말의품격
- AWS
- jhipster
- Async
- meta character
- fsync
- 클린 아키텍처
- 기술센싱
- Kafka
- flush
- COMMIT
- 개발자
- elasticsearch
- percolate
- 전설로떠나는월가의영웅
- WebSocket
- PatternSyntaxException
- 사기꾼증후군
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
글 보관함