티스토리 뷰

들어가는 말

개요

카프카는 클러스터 관리를 위해 CLI 도구를 제공합니다.

메시지 읽기와 쓰기 도구

  • kafka-console.consumer.sh
  • kafka-console.producer.sh
  • 주의
    • 콘솔 컨슈머 사용 시 메시지 유실 가능
    • 콘솔 프로듀서는 카프카 프로듀서의 모든 기능을 사용할 수 없음
    • 메시지 전송이 까다로움
    • 어플리케이션에서는 자바 클라이언트 라이브러리나, 다른 언어의 서드파티 클라이언트 사용

콘솔 프로듀서

kafka-console-producer.sh --broker-list kafka-test-broker.example.com:9092 --topic test-xep
  • 프로듀서 구성 설정
    • --producer.config 설정파일
    • --producer-property key=value
  • 콘솔 프로듀서 옵션
    • --key-serializer 클래스 이름
      • kafka.serializer.DefaultEncoder
    • --value-serializer 클래스 이름
      • kafka.serializer.DefaultEncoder
    • --compression-code 문자열
      • none, gzip, snappy, lz4
    • --sync
      • 동기화 방식 설정

메시지 라인을 읽는 리더 클래스 옵션

  • kafka.tools.LineMessageReader
    • stdin 으로 부터 메시지 읽어서 프로듀서 레코드를 생성하는 클래스
    • --property
      • ignore.error
        • parse.key 가 'true' 이며 키의 구분자가 없을때 'false' 로 되어 있으면 예외 발생시킴
      • parse.key
        • 'false' 로 설정시 항상 key를 nul로 지정
      • key.seperator
        • 메시지를 읽을때 키와 값을 구분하는 문자 지정, 기본 \t

콘솔 컨슈머

  • kafka-console-consumer.sh
    • DefaultFormatter를 사용하여 그대로 출력
  • 주의
    • 콘솔 컨슈머는 카프카와 같은 버전을 사용
    • 구버전의 콘솔 컨슈머는 잘못된 방법으로 주키퍼와 상호작용하므로 클러스터에 손상
  • 구버전
    • --zookeeper 옵션
kafka-console-consumer.sh --zookeeper zoo-test1.example.com:2181 --topic test
  • 신버전
    • --new-consumer, --bootstrap-server 옵션
 kafka-console-consumer.sh --bootstrap-server kafka-test-broker.example.com:9092 --topic test-xep --from-beginning
  • 대상 토픽
    • --topic
      • 하나의 토픽
    • --whitelist
      • 정규식과 일치하는 모든 토픽
    • --blacklist
      • 정규식과 일치하지 않는 모든 토픽
  • 컨슈머 구성 설정
    • --consumer.config 설정파일
    • --consumer-property key=value
  • 주의
    • --property 옵션의 경우 메시지 포맷터에 구성 옵션을 전달 할 때에만 사용
  • 콘솔 컨슈머 옵션
    • --formatter 클래스 이름
      • kafka.tools.DefaultFormatter
    • --from-beginning
    • --max-message #num
    • --partition #num
      • id 가 num 인 파티션에서만 메시지를 읽음

메시지 포맷터 옵션

  • kafka.tools.LoggingMessageFormatter
    • 표준 출력이 아닌 로거를 사용해서 출력
  • kafka.tools.LoggingMessageFormatter
    • 메시지 체크섬만 출력
  • kafka.tools.NoOpMessageFormatter
    • 메시지 출력하지 않음
  • kafka.tools.DefaultMessageFormatter 의 옵션
    • print.timestamp
      • 'true' 일 경우 각 메시지의 타임 스탬프 표시
    • print.key
      • 'true' 일때 메시지에 키 표시
    • key.seperator
      • 메시지를 출력할 때 키와 값을 구분하는 문자 지정
    • line.seperator
      • 구분자
    • key.deserializer
    • value.deseiralizer

오프셋 토픽 읽기

  • 클러스터 컨슈머 그룹에 어떤 오프셋이 커밋되어 있는지 확인
    * 특정 컨슈머 그룹이 커밋하는 오프셋 확인
    * 오프셋이 얼마나 자주 커밋되어 있는지 확인
  • __consumer_offsets
    • 모든 컨슈머들의 오프셋들이 메시지로 저장
    • kafka.coordinator.GroupMetadataManager$OffstsMessageFormatter
    • kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter
kafka-console-consumer.sh  --bootstrap-server kafka-test-broker.example.com:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

토픽작업

  • kafka-topics.sh
    • 대부분의 토픽 작업 가능
    • 생성
    • 변경
    • 삭제
    • 정보 조회
  • --zookeeper 인자로 주키퍼 연결 문자열 지정 필요
  • 주의
    • 카프카의 CLI 도구의 경우 브로커를 통하지 않고 주키퍼에 직접 접근하므로 브로커와 동일한 버전의 도구 사용 필요

토픽 생성

  • 필요한 인자
    • 토픽 이름
    • 복제 팩터
    • 파티션
  • 참고
    • kafka-topics.sh의 인자로 --config 으로 구성을 override 할 수 있음
    • __ 이름을 가진 토픽의 경우 클러스터 내부적으로 사용하는 토픽으로 간주
    • 토픽 이름에 . 이 들어가 경우 메트릭에서는 _ 로 대체
      • topic.1 -> topic_1
  • 실행
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1
  • 랙 인식을 원하지 않을때
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1 --disable-rack-aware
  • 토픽 이름이 없을때에만 생성
    • 해당 옵션이 없으면 동일한 이름이 있을 경우 에러 발생
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --replication-factor 1 --partitions 1 --disable-rack-aware --if-not-exists

파티션 추가하기

  • 이미 존재하는 토픽의 파티션 개수를 증가
  • --alter 인자 사용
  • 목적
    • 토픽의 크기를 확장
    • 각 파티션의 처리량을 감소
    • 하나의 컨슈머 그룹에서 컨슈머를 증가시키기 위해
  • 주의
    • 메시지에 키가 있는 토픽에 파티션을 추가할 경우 메시지 키에 매핑되는 파티션이 바뀔 수 있음
  • 정보
    • --if-exists 와 --alter 를 같이 사용하는 것은 권장한지 않음
      • 토픽이 없을때 에러가 반환되어야 함
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --create --topic topic-name --partitions 1 --if-not-exists
  • 경고
    • 파티션의 개수는 줄일 수 없음
      • 파티션을 삭제할 경우에는 데이터도 삭제되므로
      • 데이터 순서도 변경되므로 다른 파티션에 재분배 하기도 어려움
    • 필요할 경우 토픽 삭제 후 재생성

토픽 삭제하기

kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --delete --topic topic-name
  • 번외
    • 바로 삭제되는 것은 아니며, 컨슈머, 프로듀서에서 처리가 안되고 이후 재시작 시 삭제됨
    • 브로커 옵션으로 즉시 삭제 가능
      • delete.topic.enable=true

토픽 조회

kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --list

토픽 상세 조회

  • --describe
  • --topic 옵션을 덧붙일 경우 특정 토픽 상세 정보 조회
  • 출력되는 정보
    • 파티션 개수
    • 토픽 구성 오버라이드
    • 각 파티션과 할당된 레플리카 내역
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe
  • --describe 와 조합하여 사용
    • 클러스터 트러블 슈팅을 위해
    • --topic 으로 특정 토픽을 지정하지 않음
    • --list 명령과는 함께 동작하지 않음
  • --topic-with-overrides
    • 브로커 설정의 기본값을 재설정한 토픽을 찾을때
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --topics-with-overrides
  • --underreplicated-partitions
    • 리더 레플리카가 있으면서 하나 이상의 레플리카가 동기화 되지 않은 모든 파티션
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --under-replicated-partitions
  • --unavailable-partitions
    • 리더가 없는 모든 파티션 조회
kafka-topics.sh --zookeeper zoo-test1.example.com:2181 --describe --unavailable-partitions

컨슈머 그룹

  • 컨슈머 그룹 정보
    • 구버전 (0.9.0 이전)
      • 주키퍼에서 관리
    • 신버전 (0.9.0 이후)
      • 카프카 브로커 내부에서 관리
  • kafka-consumer-groups.sh
    • 모든 버전의 컨슈머 그룹 리스트와 오프셋 정보를 포함한 상세 내역 조회
    • 구버전에서만 가능한 기능
      • 컨슈머 그룹과 오프셋 정보를 삭제
    • 구버전 컨슈머 클라이언트 정보 조회
      • --zookeeper 인자 사용
    • 신버전 컨슈머 클라이언트 정보 조회
      • --bootstrap-server 인자 사용

컨슈머 그룹의 리스트와 상세정보 조회

  • 구버전 컨슈머 클라이언트 목록 조회
    • 테스트 환경에서는 사용 불가
    • 로그앤크래시 알파 환경 카프카
./kafka-consumer-groups.sh --zookeeper logncrash-alpha-zk:10013/NHN/NELO2/kafka --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).

nelo2-notifier-group
nelo2-legacy-notifier-group
logger-decryptor-group
  • 신버전 컨슈머 클라이언트 목록 조회
kafka-consumer-groups.sh --bootstrap-server kafka-test-broker.example.com:9092 --list
  • 구버전 컨슈머 클라이언트 컨슈머 그룹 상세 조회
    • 로그앤크래시 알파환경 카프카
./kafka-consumer-groups.sh --zookeeper logncrash-alpha-zk:10013/NHN/NELO2/kafka --describe --group nelo-percoalt or-notifier-group
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).

Error: Executing consumer group command failed due to org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/nelo-percoaltor-notifier-group/ids
  • 신버전 컨슈머 클라이언트 컨슈머 그룹 상세 조회
    • 파티션
    • 현재오프셋
    • 마지막오프셋
    • 컨슈머아이디
    • 호스트
    • 클라이언트아이디
kafka-consumer-groups.sh --bootstrap-server kafka-test-broker.example.com:9092 --describe --group consumer-group-name

컨슈머 그룹 삭제하기

  • 정보
    • 구버전 컨슈머 클라이언트에만 지원
  • 주의
    • 해당 그룹의 모든 컨슈머 그룹을 셧다운 해야함
kafka-consumer-groups.sh --zookeeper zoo-test1.example.com:2181 --delete --group consumer-group-name
  • 특정 토픽의 오프셋 정보만 삭제하는것도 가능
kafka-consumer-groups.sh --zookeeper zoo-test1.example.com:2181 --delete --group consumer-group-name --topic topic-name

오프셋관리 (컨슈머그룹)

  • 구버전의 컨슈머 클라이언트 이용
  • 기능
    • 컨슈머 그룹의 오프셋 조회
    • 컨슈머 그룹의 오프셋 삭제
    • 컨슈머 그룹의 오프셋 내보내기
    • 새로운 오프셋 저장
      • 오프셋을 돌릴때 사용
      • 오프셋을 넘길떄 사용

오프셋 내보내기

  • 자바 클래스 직접 호출
    • (테스트 환경에서는 지원 불가, 클래스 존재하지 않음)
kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739

오프셋 가져오기

  • 자바 클래스 직접 호출
    • 오프셋 내보내기로 생성한 파일을 기준으로 재설정
  • 주의
    • 해당 그룹의 모든 컨슈머들을 중단해야함
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets

동적 구성 변경 (클러스터)

  • 클러스터 실행 중 토픽과 클라이언트의 구성 변경 가능
  • kafka-configs.sh사용
  • 설정된 구성은 클러스터에 영구적으로 적용
    • 주키퍼에 저장되며 브로커 시작 시 읽어서 사용
  • 동적 구성 종류
    • per-topic
    • per-client

토픽 기본 구성 재설정

  • 하나의 클러스터에서 토픽 별 설정 변경
  • 기본 토픽 설정은 브로커 구성에서 지정된 기본값 적용
kafka-configs.sh --zookeeper zoo-test1.example.com:2181
--alter --entity-type topics --entity-name 
--add-config =[,=...]
kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=3600000
Completed Updating config for entity: topic 'test'.

클라이언트 기본 구성 재설정

  • 카프카 클라이언트 (프로듀서, 컨슈머의) 할당량(quota) 조정
    • 클라이언트 ID의 모든 클라이언트에게 허용되는 초당 바이트 비율
      • 하나의 브로커를 기준으로 프로듀서가 메시지를 쓰거나 읽을때 허용된 한도
      • e.g. 클러스터에 5개 브로커, 클라이언트의 프로듀서 할당량이 10MB/s 일 경우, 전체 합계는 50MB/s
  • 종류
    • producer_bytes_rate
    • consumer_bytes_rate
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name

구성 오버라이드 조회하기

kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --describe --entity-type topics --entity-name test
Configs for topic 'test' are retention.ms=3600000

구성 오버라이드 삭제하기

  • 동적 구성 오버라이드는 삭제 가능
    • 이후 기본값으로 원상 복귀
kafka-configs.sh --zookeeper zoo-test1.example.com:2181 --alter --entity-type topics --entity-name test --delete-config retent ion.ms
Completed Updating config for entity: topic 'test'.

파티션 관리

  • 종류
    • 리더 레플리카 재선출
    • 브로커에게 파티션 할당 (저수준)

선호 레플리카 선출

  • 토픽은 다수의 파티션으로 구성, 파티션은 다수의 레플리카로 복제 가능
  • 하나의 레플리카 브로커만이 파티션 리더
  • 모든 프로듀서와 컨슈머의 작업은 파티션 리더의 브로커에서 수행
  • 카프카 내부에서 해당 브로커가 첫번째 ISR로 정의
    • 해당 브로커가 재시작 할떄 어떤 파티션의 리더도 맡지 않게 됨
  • 주의
    • 파티션 리더 자동 조정
      • 브로커 구성을 통해 자동으로 파티션 리더 조정 가능
      • 프로덕션 환경에서는 성능 이슈로 사용이 권장되지 않음
        • 기능 동작 중 프로듀서 컨슈머 기능이 일시정지
  • kafka-preffered-replica-election.sh
    • 운영자가 원하는 시점에서 각 파티션의 리더 선출 가능
 kafka-preferred-replica-election.sh --zookeeper zoo-test1.example.com:2181
  • 파티션 개수가 많은 클러스터에서 한번의 선호 레플리카 선출로 실행되지 않음
    • 선호 레플리카 선출 요청은 클러스터 메타데이터의 znode(주키퍼)에 저장
    • 요청 내용 크기가 주키퍼 기본 노드(기본 1MB) 보다 크면 요청 실패
    • 요청을 여러번 나누어서 실행
{
    "partitions": [
        {
            "partition": 1,
            "topic": "foo"
        },
        {
            "partition": 2,
            "topic": "foobar"
        }
    ]
}
 kafka-preferred-replica-election.sh --zookeeper zoo-test1.example.com:2181  --path-to-json-file
partitions.json

파티션의 레플리카 변경

  • 파티션의 레플리카 할당을 수동으로 변경
    • 토픽의 파티션들이 브로커들에게 고르게 분배되지 않았을 때
    • 브로커가 오프라인이 될 때
    • 새로운 브로커가 추가 되었을 때
  • kafka-reassign-partitions.sh
  • 작업 단계
    • 브로커 리스트와 토픽 리스트를 사용하여 파티션 재할당 리스트 생성
    • 리스트대로 파티션 재할당 실행
    • (생략가능) 재할당으로 생성된 리스트를 사용하여 파티션 재할당의 진행도 확인
  • 파티션 재할당 리스트 생성을 위한 기본 정보
    • json 파일
    • version 은 1로 고정
{
    "topics": [
        {
            "topic": "test-topic2"
        },
        {
            "topic": "test-topic1"
        }
    ],
    "version": 1
}
  • 파티션 재할당 리스트 생성
    • 0,1 브로커에서 2,3 브로커로 재할당
kafka-reassign-partitions.sh --zookeeper zoo-test1.example.com:2181 --generate --topics-to-move-json-file topics.json --broker-list 2,3
  • 파티션 재할당 리스트 생성 샘플
# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --generate
--topics-to-move-json-file topics.json --broker-list 2,3
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"my-topic","partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"replicas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[2,3]},{"topic":"my-topic","partition":10,"replicas":[3,2]},{"topic":"my-topic","partition":1,"replicas":[2,3]},{"topic":"my-topic","partition":4,"replicas":[3,2]},{"topic":"my-topic","partition":7,"replicas":[2,3]},{"topic":"my-topic","partition":6,"replicas":[3,2]},{"topic":"my-topic","partition":15,"replicas":[2,3]},{"topic":"my-topic","partition":0,"replicas":[3,2]},{"topic":"my-topic","partition":3,"replicas":[2,3]},{"topic":"my-topic","partition":11,"replicas":[2,3]},{"topic":"my-topic","partition":8,"replicas":[3,2]},{"topic":"my-topic","partition":12,"replicas":[3,2]},{"topic":"my-topic","partition":13,"replicas":[2,3]},{"topic":"my-topic","partition":2,"replicas":[3,2]},{"topic":"my-topic","partition":14,"replicas":[3,2]},{"topic":"my-topic","partition":9,"replicas":[2,3]}]}
#
  • 실행 결과를 별도의 파일로 저장 한뒤 명령 수행
kafka-reassign-partitions.sh --zookeeper zoo-test1.example.com:2181 --execute
--reassignment-json-file reassign.json
  • 동작
    • 클러스터 컨트롤러에서 각 파티션의 레플리카 리스트에 새로운 레플리카 추가
      • 레플리케이션 팩터 증가
    • 새로운 레플리카 브로커들에서 현재 리더 레플리카로부터 파티션 복제
      • 시간 소요
    • 복제 완료 시 컨트롤러가 레플리카 리스트로부터 이전 레플리카 제거
      • 레플리케이션 팩터 감소
  • 고려사항
    • 브로커로부터 많은 파티션이 제거 될 때, 다시 시작 한 뒤 재할당을 시작
    • 해당 브로커가 리더로 할당되었던 파티션들이 새로운 리더로 클러스터의 다른 브로커들이 지정
  • 검증
    • 파티션 재할당이 실행중이거나 완료된 이후 재할당 (진행 상태 혹은 에러 발생) 확인
    • 작업에 사용되었던 JSON 객체를 가진 파일을 인자로 사용
    • 샘플
 kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --verify
--reassignment-json-file reassign.json
Status of partition reassignment:
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,13] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,12] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,14] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,15] completed successfully
Reassignment of partition [my-topic,8] completed successfully
  • 정보
    • 재할당 작업의 배치 처리
      • 파티션 재할당 작업은 클러스터에 큰 영향
        • 메모리 페이지 캐시 변경
        • 네트워크 사용
        • 디스크 IO
      • 재할당 작업 진행시 작은 단위 (토픽별 등)으로 진행

복제 팩터 변경하기

  • 파티션의 복제 팩터 값을 증가 또는 감소 하는 기능
    • 문서화 되지 않은 기능
    • 토픽 생성 시 복제 팩터에 필요한 만큼 브로커가 없을때 감소 필요
    • 파티션 재할당 단계의 실행 단계에서 사용되는 JSON에 복제 팩터 추가
  • 현재 파티션 할당
{
    "partitions": [
        {
            "topic": "my-topic",
            "partition": 0,
            "replicas": [
                1
            ]
        }
    ],
    "version": 1
}
  • 복제 팩터 증가 시
{
    "partitions": [
        {
            "partition": 0,
            "replicas": [
                1,
                2
            ],
            "topic": "my-topic"
        }
    ],
    "version": 1
}

로그 세그먼트 내용 확인

  • 메시지의 특정 내용을 확인
  • 파티션의 로그 세그먼트 파일의 내용 해독
    • 로그 세그먼트 파일들을 쉼표로 구분하여 인자로 전달
********    접속
[******** ~]$ docker exec -it 48ee47b4d949 /bin/bash

/usr/local/kafka/config 
server.properties
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

/tmp/kafka-logs/test-topic1-0


[admin@kafka-broker3 test-topic-2]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
[admin@kafka-broker3 test-topic-2]$

[admin@kafka-broker3 test-topic-2]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log  -- print-data-log
Dumping 00000000000000000000.log
Starting offset: 0
  • 로그 세그먼트의 인덱스 파일 확인
    • 해당 파일이 손상된 경우 컨슈머가 메시지 읽을 때 에러 발생
    • --index-sanity-check
      • 인덱스가 사용 가능한 상태인지 검사, 모든 인덱스 항목 출력
    • --verify-index-only
      • 메시지 데이터와 일치하지 않는 인덱스 항목 검사
kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.index,00000000000052368601.log
--index-sanity-check
Dumping 00000000000052368601.index
00000000000052368601.index passed sanity check.
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
...

레플리카 확인

  • 파티션 복제 작업
    • 일반 카프카 컨슈머 클라이언트가 하는 일과 유사
      • 팔로어 레플리카가 현재 오프셋 까지 주기적으로 메시지 복제
  • 클러스터 전체의 파티션 레플리카들이 메시지 차이 없이 복제를 하고 있는지 확인하기 위한 도구
  • kafka-replica-verification.sh
    • 지정된 토픽 파티션들과 모든 메시지가 모든 레플리카에 있는지 읽어서 검사
  • 주의
    • 클러스터에 주는 영향
    • 레플리카를 확인하기 위해 가장 오래된 오프셋부터 모든 메시지를 읽으므로 클러스터에 영향
    • 각 파티션의 모든 레플리카로부터 병렬로 메시지를 읽음
  • 샘플
kafka-replica-verification.sh --broker-list kafka-test-broker.example.com:9092 --topic-white-list 'test*'
2019-03-23 23:40:59,871: verification process is started.
2019-03-23 23:41:30,007: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:42:00,099: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:42:30,189: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:43:00,314: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:43:30,418: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:44:00,521: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:44:30,613: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:45:00,699: max lag is 0 for partition test-0 at offset 47 among 1 partitions
2019-03-23 23:45:30,798: max lag is 0 for partition test-0 at offset 47 among 1 partitions

클라이언트 ACL

안전하지 않은 작업

  • 기술적으로는 가능하지만 심각한 상황이 아니면 하지 말하야 하는 관리 작업

클러스터 컨트롤러 변경

  • 모든 카프카 클러스터에는 컨트롤러 존재
    • 여러 브로커 중 하나에서 실행되는 쓰레드
    • 클러스터 작업을 감독하는 책임
  • 컨트롤러에서 예외가 발생하거나 실행은 되고 있지만 정상적으로 동작하지 못하는 경우 변경
  • zookeeper 최상위 노드 중 /controller 에 등록
    • 노드를 삭제할 경우 새로운 컨트롤러 자동 선출

파티션 재할당 작업 없애기

  • 파티션 재할당의 정상적인 작업 처리 요청
    • 재할당 요청 (주키퍼 노드 생성)
    • 클러스터 컨트롤러가 새로운 브로커들에게 파티션 할당
    • 데이터가 동기화 될때까지 새로운 브로커들이 각 파티션을 복제하기 시작
    • 파티션 레플리카 리스트에 등록된 이전 브로커들을 클러스터 컨트롤러가 삭제
  • 모든 재할당 작업은 병렬로 시작되므로 재할당 작업이 취소되는 경우는 없음
    • 재할당 도중 브로커에 장애가 생겨 곧바로 다시 시작할 수 없는 경우, 어떤 추가적인 재할당도 시작되지 않음
  • 진행중인 파티션 재할당 작업을 제거 하는 과정
    • 주키퍼 노드의 카프카 클러스터 경로에서 /admin/reassign_partitions 삭제
    • 강제로 컨트롤러 변경
  • 주의
    • 진행중인 재할당 작업 제거 시 복제 팩터가 설정값보다 더 큰값이 되어 있을 수 있으므로 작업 완료 시 올바르게 설정되어 있는지 확인

토픽 삭제하기

  • 토픽 삭제 요청 시 삭제를 요청하는 주키퍼 노드 생성
    • /admin/delete_topic

수동으로 토픽 삭제하기

  • 주의
    • 클러스터가 온라인상태일떄 주키퍼 메타데이터를 변경하는것음 위험
  • 과정
    • 주키퍼의 모든 브로커 셧다운
    • 주키퍼에서 /brokers/topics/토픽이름 삭제
    • 각 브로커의 로그 디렉토리에서 해당 파티션 디렉터리들을 삭제
      • 디렉터리 이름 토픽이름-#num
    • 브로커 재시작

'개발관련 > 오픈소스(들?)' 카테고리의 다른 글

Opensearch (aws elasticsearch)  (0) 2021.08.08
elasticsearch percolate search POC  (0) 2021.07.07
로컬 환경에 kakfa 설치 및 구성  (0) 2021.06.23
elasticsearch 의 저장 과정  (0) 2021.05.24
kafka 2.8.0  (0) 2021.05.16
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함