티스토리 뷰

들어가는 말

  • 과거 kafka study 했던 자료를 한번 복기 하면서 정리합니다.
    • https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=171000274
  • docker 환경을 상정하고 있긴하지만 작업 내용을 곰곰히 살펴보면 로컬 환경에 설치하는것을 상정하고 있습니다.
  • 테스트 환경이므로 HA를 위해 앙상블을 구성하지는 않습니다.
    • 애초에 kakfa 가 jvm 기반이지만 메모리를 많이 사용하지는 않으므로 디스크만 적절히 선택하면 된다고 생각합니다.
  • 컨테이너 환경에다가 kakfa 를 구성하고 외부에서 접속하게 하면 port 문제 이외에도 advertised host 쪽에서 이슈가 발생할 수 있습니다.

준비물

  • 번외이긴 하지만 docker 환경을 상정하고 있습니다.
    • docker 가 실행 가능한 환경이 필요합니다.
  • (주키퍼와 카프카를 띄우는것이 주 목적이므로 현 시점에서 도커가 꼭 필요하진 않습니다.)

제일먼저 할 일

운영체제 선택하기

  • 자바 어플리케이션이므로 운영체제에 무관
    • 윈도우, 맥, 리눅스 등
    • 리눅스 환경을 권장
    • (정확히는 스칼라로 짜여져 있습니다.)

번외 docker 로 환경 설정

  • docker 로 centos 컨테이너 이미지를 받은 뒤, 그 안에 수동으로 카프카를 설치하는 과정에 해당합니다.
  • 최초 환경 설정부터 진행할 예정으로 순수 centos 이미지 위에서 작업 진행합니다.
    • 다만 이후에는 kafka 가 패키징 된 docker image를 사용하는것을 권장드립니다.
      • 도커 이미지
        • wurstmeister/kafka:latest
  • https://docs.docker.com/engine/reference/commandline/run/
  • entry point 를 따로 두는 법을 확인할 수 없어 별도로 아래와 같이 실행시켜 둡니다
docker pull centos:7
docker images
docker run -dit --name="kafka-test-1" centos:7 /bin/bash
# docker run -d --name="kafka-test-1" centos:7
# (or -p port:port)

# docker exec -it container_id /bin/bash
docker start container_id
docker attach container_id

번외2 docker 를 통한 kafka image 설치

  • 간단하게 카프카를 이용한 POC를 위해서라면 다음 설치 방법을 추천드립니다.
    • 호스트(도커를 돌리는)의 /etc/hosts 에 127.0.0.1 kafka 등록이 필요하며 호스트(도커를 돌리는) 개발 환경에서는 kafka:9092 로 접속 가능합니다
docker network create docker-network

docker run -d --net=docker-network --name=zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run -d --net=docker-network --name=kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest

docker run --net=docker-network --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181  confluentinc/cp-schema-registry:latest

docker run --net=docker-network --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181


cd $KAFKA_HOME; bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test

cd $KAFKA_HOME; bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning

java 설치하기

  • java 1.8 설치 소개
    • oracle java 대신 openjdk 를 선택합니다.

zookeeper 설치하기

wget http://apache.mirror.cdnetworks.com/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar xvf zookeeper-3.4.2.tar.gz
mv zookeeper-3.4.12 /usr/local/zookeeper
mkdir -p /var/lib/zookeeper
cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
EOF
/usr/local/zookeeper/bin/zkServer.sh start
  • 주키퍼 실행 확인
telnet localhost 2181 
srvr
  • 주키퍼 앙상블 설정
    • (설정은 생략합니다, 테스트 환경에서 HA를 구성할 필요는 없습니다.)
      • (이에따라 카프카에서 주키퍼 앙상블을 설정으로 추가하는 부분도 생략됩니다.)
    • 주키퍼는 카프카와 같은 분산처리 시스템의 서버들의 메타데이터(환경설정 등)을 통합 관리하는데 사용
    • 주키퍼의 클러스터를 앙상블로 지칭
    • 하나의 앙상블은 여러개의 노드로 구성
      • 하나의 서버에만 서비스가 집중되지 않게 분산 하여 동시에 처리
      • 한서버에서 처리한 결과를 다른 서버와 동기화
      • 클라이언트와 연결되어 동작중인 서버에서 문제가 발생할 경우 대기중인 서버중에서 자동 선정하여 이어서 서비스 처리
        • 리더, 팔로워 구조
    • 주키퍼 앙상블은 홀수개의 노드로 구성할 필요가 있습니다.
      • (split brain 방지)

kafka 브로커 설치하기

  • 설치 및 실행
wget http://apache.mirror.cdnetworks.com/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar xvzf kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 /usr/local/kafka
mkdir /tmp/kafka-logs
/usr/local/kafka/bin/kafka-server-start.sh \
-daemon /usr/local/kafka/config/server.properties
  • test 토픽 생성
 /usr/local/kafka/bin/kafka-topics.sh --create \
 --zookeeper localhost:2181 \
 --replication-factor 1 \
 --partitions 1 \
 --topic test
  • test 토픽 생성 확인
 /usr/local/kafka/bin/kafka-topics.sh  \
 --zookeeper localhost:2181 \
 --describe \
 --topic test
  • test 토픽에 메시지 작성
 /usr/local/kafka/bin/kafka-console-producer.sh  \
 --broker-list localhost:9092 \
 --topic test
 AAA
 BBB
  • test 토픽에 메시지 읽기
 /usr/local/kafka/bin/kafka-console-consumer.sh  \
 --bootstrap-server localhost:9092 \
 --topic test \
 --from-beginning

브로커 구성

  • 많은 옵션들이 기본 값으로 설정되어 있습니다.
  • 특정 상황에 맞게 카프카를 조정할 때에는 해당 설정을 변경해야합니다.
  • broker.id
    • 모든 카프카 브로커는 정수로 된 고유한 broker.id를 가져야 합니다.
  • port
    • 브로커 실행에 필요한 TCP 포트를 나타냅니다.
    • 1024 이하의 값으로 설정이 가능하지만 root 권한이 필요하므로 권장하지 않습니다
  • zoopeeker.connect
    • 브로커의 메타데이터를 저장하기 위한 주키퍼를 지정
    • host:port/zookeeper_path(chroot path)
  • log.dirs
    • 로그 저장위치가 아닙니다.
    • 실제 카프카의 메시지(파티션) 데이터 저장소
    • 쉽표로 구분되어 여러개의 경로를 지정 가능
      • 브로커는 모든 경로에 데이터를 저장
      • 이후 사용빈도가 가장 적은 데이터를 교체하는 방식으로 한 파티션의 모든 로그를 같은 경로에 저장
      • 브로커가 새로운 파티션을 저장할 때는 가장 적은 수의 파티션을 저장한 경로를 사용
  • num.recovery.thread.per.data.dir
    • 카프카는 쓰레드풀을 사용하여 로그 세그먼트 처리 (실제 데이터)
      • 스레드풀이 사용되는 경우
        • 브로커가 정상적으로 시작될떄, 각 파티션의 로그 세그먼트를 열기 위해
        • 장애 발생 이후, 각 파티션의 로그 세그먼트들을 검사하고 불필요한 부분 삭제하기 위해
        • 종료될 떄, 로그 세그먼트 파일을 정상적으로 닫기 위해
      • 로그 디렉터리 하나당 하나의 쓰레드만 사용됨
      • 이 쓰레드들은 브로커와 시작시에만 사용되므로 많은 수의 스레드를 설정하는것이 좋음
      • log.dirs * num.recovery.t.p.d.d 수 만큼의 쓰레드가 생성됩니다.
  • auto.create.topics.enable
    • 자동 토픽 생성 허용 여부
      • 메시지가 들어온 시점, 컨슈머, 클라이언트가 요청했을때 존재하지 않는 토픽을 자동으로 생성할지 여부
    • 자동 생성이 적합하지 않은 경우가 있으므로 제어

토픽의 기본 설정

  • 브로커의 설정 이외에 각 토픽 별 설정이 존재합니다.
  • 파티션 개수나 메시지 보존 설정등에 해당됩니다.
  • 과거 버전의 경우 브로커 설정파일에서 .per.topic 이라는 접미사로 토픽에 설정을 적용할 수도 있었지만 현재 버전에서는 가능하지 않습니다.
    • log.retention.hours.per.topic
    • log.retention.bytes.per.topic
    • log.segment.bytes.per.topic
  • num.partition
    • 한 토픽을 몇개의 파티션으로 구성할지
      • topic auto create 시 적용됩니다
    • 파티션 개수는 증가만 됩니다. (감소는 불가능합니다.)
    • 브로커의 수를 고려하여 모든 브로커가 균일하게 파티션을 나뉘어 가질수 있도록 숫자 조절이 필요합니다.
  • log.retention.ms
    • 로그 보관 시간
      • 해당 시간이 지나면 읽혔던, 읽히지 않았던 데이터는 삭제됩니다.
      • 파일의 마지막 수정시간을 기준으로 하며, 브로커 간 파티션이 이동되거나 하는 경우에는 부정확할수도 있습니다.
    • 구성 파일에 설정할 떄에는 log.retention.hours 를 주로 사용하며 기본값은 168시간
    • log.retention.minuate 도 있으며 모두 같은 맥락으로서 같이 설정될 경우 단위가 가장 작은 설정을 ㅣㅇ용
  • log.retention.bytes
    • 전체 크기를 기준으로 보존 처리
    • 모든 파티션에 각각 적용
      • 총 8개의 파티션이 있고, log.retention.bytes가 1GB 일 경우 해당 토픽에 보존되는 메시지들의 전체 크기는 최대 8GB
      • 파티션이 증가함에 따라 영향을 받음
    • 시간과 크기 같이 적용하여 사용 가능
  • log.segment.bytes
    • 로그 세그먼트 파일을 대상으로 크기 단위 처리
    • 메시지들이 쌓여서 log.segment.bytes 를 넘을 경우 사용중인 로그 세그먼트를 닫고, 새로운 파일을 생성
      • 너무 작은 수치를 설정할 경우 파일 연산 요청이 빈번
      • 너무 큰 수치를 설정할 경우 하나의 로그 세그먼트를 채우는데 오랜 시간이 걸림
        • log.retention.ms는 로그 세그먼트가 닫힌 이후부터 적용
        • timestamp 로 offset 을 찾을때에도 영향을 줌
  • log.segment.ms
    • 로그 세그먼트 파일을 대상으로 시간 단위 처리
    • 지정된 시간이 지나면 로그 세그먼트 파일을 닫음
  • message.max.byte
    • 한 메시지의 크기 제한
    • 기본값으로 1MB 로 설정
      • 최대 메시지 크기를 넘을경우 프로듀서에게 에러를 보내고 메시지를 받지 않음
    • 압축된 메시지 크기에 적용
    • 메시지 크기가 클 수록 IO에 영향을 받으므로 브로커 성능에 직결됩니다.
    • 컨슈머 클라이언트의 fetch.message.max.bytes 와 맞춰야 합니다.

파티션 개수의 산정 방법

  • 단위 시간당 토픽의 데이터 쓰루풋
  • 한 파티션의 데이터를 읽을 떄 목표로 하는 최대 처리량
    • 한 파티션에는 반드시 한 컨슈머가 할당됩니다.
    • (한 컨슈머가 여러개의 파티션을 담당할 수 있습니다.)
  • 프로듀서당 최대 처리량도 컨슈머와 같은 방법으로 산정할수 있습니다.
  • 키를 이용하여 파티션에 메시지를 쓰는 경우에 파티션 추가시 개수 산정이 어려울 수 있습니다.
  • 브로커마다 파티션 개수와 디스크 용량 네트워크 처리 속도를 고려해야 합니다
  • 파티션 개수가 너무 많을 경우 각 파티션의 리더 브로커 선정에 시간이 많이 소요될 수도 있습니다.

하드웨어 선택

  • 어느 환경에서도 실행에는 지장이 없습니다
  • 성능이 중요할 경우 다음과 같은 사항을 고려해야 합니다

디스크 처리량

  • 프로듀서 성능은 로그 세그먼트 저장에 사용되는 브로커의 디스크 처리량에 비례
    • 프로듀서가 생성한 카프카 메시지는 서버의 로컬 스토리지에 커밋 되었다는것을 최소 한 브로커가 확인해 줄떄까지 대기

용량

  • 로그 세그먼트의 보관주기, 보관크기, 파티션의 복제 수 에 따라서 선택

메모리

  • 컨슈머는 브로커의 시스템 메모리의 페이지 캐시에 저장된 메시지를 읽습니다.
    • 페이지폴트가 나는 경우 그만큼 컨슈머가 메시지를 처리하는데 시간이 걸립니다.

네트워크

  • 클러스터 크기 조정에 주된 요소
  • 카프카에서는 다수의 컨슈머를 지원하므로 inbound 와 outbound 간에 불균형이 생깁니다
  • 클러스터 복제 같은 경우에는 추가로 고려해야할 사항이 있습니다

CPU

  • 클라이언트는 메시지를 압축하여 전송하므로, 브로커에서 메시지의 체크섬을 검사하고, 오프셋을 할당하기 위해 모든 메시지의 압축을 풀어야 합니다.
  • 이후 다시 압축한뒤 디스크에 저장하므로 CPU 성능에 영향을 받지만 주된 요소는 아닙니다.

클라우드에서 카프카 사용하기

  • AWS 같은 환경에서도 설치되어 사용됩니다.
  • 가장 먼저 고려해야 할 사항이 데이터 보존량이고 그 다음이 프로듀서가 필요로 하는 성능입니다.
  • 실제로 AWS 에서는 m4, r3 이 사용되며 예산이 허용된다면 i2, d2 를 고려할 수 있습니다.

카프카 클러스터

  • 다수의 브로커를 이용하여 클러스터를 구축할 경우 다음과 같은 장점이 있습니다.
    • 다수의 서버로 처리량을 분산시켜 확장
    • 서버 장애에 따른 데이터 유실을 막기 위해 복제
    • 카프카 시스템을 중단시키지 않고 유지보수 수행 가능

브로커 개수

  • 클러스터의 적합한 크기는 몇가지 요소로 결정
    • 메시지를 보존하는데 필요한 디스크 용량과 하나의 브로커에 사용 가능한 스토리지 크기
    • 요청을 처리하기 위한 클러스터 용량
      • 네트워크 인터페이스의 트래픽
      • 디스크 처리량, 시스템 메모리 등

브로커 구성

  • 클러스터 구성 시 고려사항
    • 모든 브로커의 구성파일에 있는 zookeeper.connect 설정이 동일
    • broker.id는 unique 하도록
    • 이외 브로커 간 복제를 제어하는 매개변수

운영체제 조정하기

  • 가상메모리
    • 기존에는 vm.swappiness 를 항상 0으로 설정할 것을 권장, 리눅스 커널 3.5 이상에서는 1로 지정
    • 가급적 스와핑을 하지 않고 페이지 캐시의 크기를 줄이는것이 좋음
    • 더티 페이지의 커널 처리 방법을 조정
      • =vm.dirty_background_ratio 를 10보다 낮게 (5 수준) 설정
        • 시스템의 전체 메모리 크기에 대한 백그라운드 프로세스의 더티 페이지 비율
        • 0으로 지정해서는 안됨
        • 백그라운드 프로세스가 디스크에 써야하는 더티 페이지의 수를 줄임
      • vm.dirty_ratio 를 기본값이 20보다 증가 (60~80 수준)하여 설정
        • 시스템의 전체 메모리에 대한 비율
        • 너무 큰 값을 설정할 경우 시스템 장애 상황에서 디스크에 아직 안쓴 더티 페이지들이 유실될 수 있음
        • 너무 작은 값을 설정할 경우 빈번하게 더티 페이지들을 디스크에 쓰므로
      • /proc/vmstat 파일을 확인하여 현재 더티 페이지 개수를 확인
  • 디스크
    • 파일시스템은 EXT4, XFS 등을 선택
      • EXT4 는 지연 블록 할당 알고리즘을 사용하므로 위험할수 있음
    • 마운트 옵션은 noatime 으로 설정
      • atime 의 경우 파일 읽을 때마다 빈번하게 수정시간이 갱신되므로 디스크 쓰기 발생
  • 네트워크
    • 각 소켓의 송수신 버퍼로 할당되는 기본 메모리와 최대 메모리 변경
      • net.core.wmem.default, net.core.wmem.max
        • 바람직한 값은 131072 (128KiB)
      • net.core.rmem_default, net.core.rmem.max
        • 바람직한 값은 2097152 (2MiB)
    • TCP 소켓의 송수신 버퍼 크기도 설정
      • net.ipv4.tcp_wmem, net.ipv4.tcp_rmem
      • 3개의 정수로 구분되어 있으며 최소, 기본, 최대 순서
      • net.core.wmem.max, net.core.rmem.max 보다 클수 없음
    • net.ipv4.tcp_window_scaling 을 1로 설정
      • TCP의 윈도우 크기 조정 활성화
    • net.ipv4.tcp_max_syn_backlog 를 기본 1024보다 크게 설정
      • 많은 수의 동시 연결 허용
    • net.core.netdev_max_backlog 를 기본 1000보다 크게 설정
      • 커널이 더 많은 패킷을 처리 할수 있음

실제 업무 시 고려사항

  • 가비지 컬렉션 옵션
    • G1GC 권장
      • JDK 버전이 변경되면서 확인이 필요합니다.
      • MaxGCPauseMillis
        • 기존 200밀리초
      • InitiatingHeapOccupancyPercent
        • 사용중인 전체 힙의 비율, 기본 45%, 45% 넘을 경우 GC동작
  • 데이터센터 레이아웃
    • 카프카 0.10 시점으로 브로커에서 랙-인식 방법으로 다른 브로커들에게 새로운 파티션 할당 가능
      • 한 파티션의 레플리카가 랙을 공유하지 않도록
      • broker.rack 설정을 한 경우
      • 새로운 파티션에만 적용되며, 같은랙에 파티션 레플리카가 존재하더라도 정정하지 않음
  • 주키퍼 공동 사용
    • 카프카는 주키퍼에 브로커, 토픽, 파티션에 관한 메타정보 저장
      • 컨슈머 그룹의 맴버십 변경사항이나 클러스터 자체의 변경사항이 생길떄에만 주키퍼에 쓰기 수행
    • 단일 카프카 클러스터에 주키퍼 앙상블을 사용하는것은 바람직하지 않으며, 다수의 클러스터를 하나의 주키퍼 앙상블을 이용
      • 이경우 chroot 주키퍼 경로 이용
    • 0.9.0.0 이전 에서는 카프카에서 직접적으로 주키퍼를 이용하였음
      • 컨슈머 그룹의 구성 정보와 소비된 토픽에 과한 정보를 저장
      • 각 파티션의 오프셋을 주기적으로 커밋하기 위해 사용
      • 이후 버전에서는 카프카 브로커를 통해 동작하도록 새로운 컨슈머 인터페이스 도입
    • 컨슈머는 오프셋을 커밋하기 위해 주키퍼나 카프카 중 하나를 선택하도록 구성
      • 주키퍼에서 오프셋을 관리하는 경우 너무 자주 커밋하면 트래픽 유발
    • 다른 종류의 어플리케이션이 하나의 주키퍼 앙상블을 공유하는것은 지양
      • 카프카는 주키퍼의 지연과 타임아웃에 민감

작업 메모

#cd $KAFKA_HOME
#bin/kafka-topics.sh --zookeeper zookeeper:2181/ --partitions 1 --replication-factor 1 --create --topic test3

bin/kafka-topics.sh --zookeeper zookeeper:2181/ --list
bin/kafka-topics.sh --zookeeper zookeeper:2181/ --describe

cd $KAFKA_HOME; bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test

cd $KAFKA_HOME; bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning


hosts
127.0.0.1 kafka
#window 환경일 경우 windows system32 위치에 설정 할 필요가 있음

docker network create docker-network

docker run -d --net=docker-network --name=zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run -d --net=docker-network --name=kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest

docker run --net=confluent --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181  confluentinc/cp-schema-registry:latest

#docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 confluentinc/cp-schema-registry:latest

'개발관련 > 오픈소스(들?)' 카테고리의 다른 글

Opensearch (aws elasticsearch)  (0) 2021.08.08
kafka 카프카 관리하기  (0) 2021.07.07
elasticsearch percolate search POC  (0) 2021.07.07
elasticsearch 의 저장 과정  (0) 2021.05.24
kafka 2.8.0  (0) 2021.05.16
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함