티스토리 뷰
들어가는 말
- 과거 kafka study 했던 자료를 한번 복기 하면서 정리합니다.
- https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=171000274
- docker 환경을 상정하고 있긴하지만 작업 내용을 곰곰히 살펴보면 로컬 환경에 설치하는것을 상정하고 있습니다.
- 테스트 환경이므로 HA를 위해 앙상블을 구성하지는 않습니다.
- 애초에 kakfa 가 jvm 기반이지만 메모리를 많이 사용하지는 않으므로 디스크만 적절히 선택하면 된다고 생각합니다.
- 컨테이너 환경에다가 kakfa 를 구성하고 외부에서 접속하게 하면 port 문제 이외에도 advertised host 쪽에서 이슈가 발생할 수 있습니다.
준비물
- 번외이긴 하지만 docker 환경을 상정하고 있습니다.
- docker 가 실행 가능한 환경이 필요합니다.
- (주키퍼와 카프카를 띄우는것이 주 목적이므로 현 시점에서 도커가 꼭 필요하진 않습니다.)
제일먼저 할 일
운영체제 선택하기
- 자바 어플리케이션이므로 운영체제에 무관
- 윈도우, 맥, 리눅스 등
- 리눅스 환경을 권장
- (정확히는 스칼라로 짜여져 있습니다.)
번외 docker 로 환경 설정
- docker 로 centos 컨테이너 이미지를 받은 뒤, 그 안에 수동으로 카프카를 설치하는 과정에 해당합니다.
- 최초 환경 설정부터 진행할 예정으로 순수 centos 이미지 위에서 작업 진행합니다.
- 다만 이후에는 kafka 가 패키징 된 docker image를 사용하는것을 권장드립니다.
- 도커 이미지
- wurstmeister/kafka:latest
- 도커 이미지
- 다만 이후에는 kafka 가 패키징 된 docker image를 사용하는것을 권장드립니다.
- https://docs.docker.com/engine/reference/commandline/run/
- entry point 를 따로 두는 법을 확인할 수 없어 별도로 아래와 같이 실행시켜 둡니다
docker pull centos:7
docker images
docker run -dit --name="kafka-test-1" centos:7 /bin/bash
# docker run -d --name="kafka-test-1" centos:7
# (or -p port:port)
# docker exec -it container_id /bin/bash
docker start container_id
docker attach container_id
번외2 docker 를 통한 kafka image 설치
- 간단하게 카프카를 이용한 POC를 위해서라면 다음 설치 방법을 추천드립니다.
- 호스트(도커를 돌리는)의 /etc/hosts 에 127.0.0.1 kafka 등록이 필요하며 호스트(도커를 돌리는) 개발 환경에서는 kafka:9092 로 접속 가능합니다
docker network create docker-network
docker run -d --net=docker-network --name=zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run -d --net=docker-network --name=kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest
docker run --net=docker-network --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 confluentinc/cp-schema-registry:latest
docker run --net=docker-network --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
cd $KAFKA_HOME; bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test
cd $KAFKA_HOME; bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
java 설치하기
- java 1.8 설치 소개
- oracle java 대신 openjdk 를 선택합니다.
zookeeper 설치하기
wget http://apache.mirror.cdnetworks.com/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar xvf zookeeper-3.4.2.tar.gz
mv zookeeper-3.4.12 /usr/local/zookeeper
mkdir -p /var/lib/zookeeper
cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
EOF
/usr/local/zookeeper/bin/zkServer.sh start
- 주키퍼 실행 확인
telnet localhost 2181
srvr
- 주키퍼 앙상블 설정
- (설정은 생략합니다, 테스트 환경에서 HA를 구성할 필요는 없습니다.)
- (이에따라 카프카에서 주키퍼 앙상블을 설정으로 추가하는 부분도 생략됩니다.)
- 주키퍼는 카프카와 같은 분산처리 시스템의 서버들의 메타데이터(환경설정 등)을 통합 관리하는데 사용
- 주키퍼의 클러스터를 앙상블로 지칭
- 하나의 앙상블은 여러개의 노드로 구성
- 하나의 서버에만 서비스가 집중되지 않게 분산 하여 동시에 처리
- 한서버에서 처리한 결과를 다른 서버와 동기화
- 클라이언트와 연결되어 동작중인 서버에서 문제가 발생할 경우 대기중인 서버중에서 자동 선정하여 이어서 서비스 처리
- 리더, 팔로워 구조
- 주키퍼 앙상블은 홀수개의 노드로 구성할 필요가 있습니다.
- (split brain 방지)
- (설정은 생략합니다, 테스트 환경에서 HA를 구성할 필요는 없습니다.)
kafka 브로커 설치하기
- 설치 및 실행
wget http://apache.mirror.cdnetworks.com/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar xvzf kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 /usr/local/kafka
mkdir /tmp/kafka-logs
/usr/local/kafka/bin/kafka-server-start.sh \
-daemon /usr/local/kafka/config/server.properties
- test 토픽 생성
/usr/local/kafka/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
- test 토픽 생성 확인
/usr/local/kafka/bin/kafka-topics.sh \
--zookeeper localhost:2181 \
--describe \
--topic test
- test 토픽에 메시지 작성
/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
AAA
BBB
- test 토픽에 메시지 읽기
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
브로커 구성
- 많은 옵션들이 기본 값으로 설정되어 있습니다.
- 특정 상황에 맞게 카프카를 조정할 때에는 해당 설정을 변경해야합니다.
- broker.id
- 모든 카프카 브로커는 정수로 된 고유한 broker.id를 가져야 합니다.
- port
- 브로커 실행에 필요한 TCP 포트를 나타냅니다.
- 1024 이하의 값으로 설정이 가능하지만 root 권한이 필요하므로 권장하지 않습니다
- zoopeeker.connect
- 브로커의 메타데이터를 저장하기 위한 주키퍼를 지정
- host:port/zookeeper_path(chroot path)
- log.dirs
- 로그 저장위치가 아닙니다.
- 실제 카프카의 메시지(파티션) 데이터 저장소
- 쉽표로 구분되어 여러개의 경로를 지정 가능
- 브로커는 모든 경로에 데이터를 저장
- 이후 사용빈도가 가장 적은 데이터를 교체하는 방식으로 한 파티션의 모든 로그를 같은 경로에 저장
- 브로커가 새로운 파티션을 저장할 때는 가장 적은 수의 파티션을 저장한 경로를 사용
- num.recovery.thread.per.data.dir
- 카프카는 쓰레드풀을 사용하여 로그 세그먼트 처리 (실제 데이터)
- 스레드풀이 사용되는 경우
- 브로커가 정상적으로 시작될떄, 각 파티션의 로그 세그먼트를 열기 위해
- 장애 발생 이후, 각 파티션의 로그 세그먼트들을 검사하고 불필요한 부분 삭제하기 위해
- 종료될 떄, 로그 세그먼트 파일을 정상적으로 닫기 위해
- 로그 디렉터리 하나당 하나의 쓰레드만 사용됨
- 이 쓰레드들은 브로커와 시작시에만 사용되므로 많은 수의 스레드를 설정하는것이 좋음
- log.dirs * num.recovery.t.p.d.d 수 만큼의 쓰레드가 생성됩니다.
- 스레드풀이 사용되는 경우
- 카프카는 쓰레드풀을 사용하여 로그 세그먼트 처리 (실제 데이터)
- auto.create.topics.enable
- 자동 토픽 생성 허용 여부
- 메시지가 들어온 시점, 컨슈머, 클라이언트가 요청했을때 존재하지 않는 토픽을 자동으로 생성할지 여부
- 자동 생성이 적합하지 않은 경우가 있으므로 제어
- 자동 토픽 생성 허용 여부
토픽의 기본 설정
- 브로커의 설정 이외에 각 토픽 별 설정이 존재합니다.
- 파티션 개수나 메시지 보존 설정등에 해당됩니다.
- 과거 버전의 경우 브로커 설정파일에서 .per.topic 이라는 접미사로 토픽에 설정을 적용할 수도 있었지만 현재 버전에서는 가능하지 않습니다.
- log.retention.hours.per.topic
- log.retention.bytes.per.topic
- log.segment.bytes.per.topic
- num.partition
- 한 토픽을 몇개의 파티션으로 구성할지
- topic auto create 시 적용됩니다
- 파티션 개수는 증가만 됩니다. (감소는 불가능합니다.)
- 브로커의 수를 고려하여 모든 브로커가 균일하게 파티션을 나뉘어 가질수 있도록 숫자 조절이 필요합니다.
- 한 토픽을 몇개의 파티션으로 구성할지
- log.retention.ms
- 로그 보관 시간
- 해당 시간이 지나면 읽혔던, 읽히지 않았던 데이터는 삭제됩니다.
- 파일의 마지막 수정시간을 기준으로 하며, 브로커 간 파티션이 이동되거나 하는 경우에는 부정확할수도 있습니다.
- 구성 파일에 설정할 떄에는 log.retention.hours 를 주로 사용하며 기본값은 168시간
- log.retention.minuate 도 있으며 모두 같은 맥락으로서 같이 설정될 경우 단위가 가장 작은 설정을 ㅣㅇ용
- 로그 보관 시간
- log.retention.bytes
- 전체 크기를 기준으로 보존 처리
- 모든 파티션에 각각 적용
- 총 8개의 파티션이 있고, log.retention.bytes가 1GB 일 경우 해당 토픽에 보존되는 메시지들의 전체 크기는 최대 8GB
- 파티션이 증가함에 따라 영향을 받음
- 시간과 크기 같이 적용하여 사용 가능
- log.segment.bytes
- 로그 세그먼트 파일을 대상으로 크기 단위 처리
- 메시지들이 쌓여서 log.segment.bytes 를 넘을 경우 사용중인 로그 세그먼트를 닫고, 새로운 파일을 생성
- 너무 작은 수치를 설정할 경우 파일 연산 요청이 빈번
- 너무 큰 수치를 설정할 경우 하나의 로그 세그먼트를 채우는데 오랜 시간이 걸림
- log.retention.ms는 로그 세그먼트가 닫힌 이후부터 적용
- timestamp 로 offset 을 찾을때에도 영향을 줌
- log.segment.ms
- 로그 세그먼트 파일을 대상으로 시간 단위 처리
- 지정된 시간이 지나면 로그 세그먼트 파일을 닫음
- message.max.byte
- 한 메시지의 크기 제한
- 기본값으로 1MB 로 설정
- 최대 메시지 크기를 넘을경우 프로듀서에게 에러를 보내고 메시지를 받지 않음
- 압축된 메시지 크기에 적용
- 메시지 크기가 클 수록 IO에 영향을 받으므로 브로커 성능에 직결됩니다.
- 컨슈머 클라이언트의 fetch.message.max.bytes 와 맞춰야 합니다.
파티션 개수의 산정 방법
- 단위 시간당 토픽의 데이터 쓰루풋
- 한 파티션의 데이터를 읽을 떄 목표로 하는 최대 처리량
- 한 파티션에는 반드시 한 컨슈머가 할당됩니다.
- (한 컨슈머가 여러개의 파티션을 담당할 수 있습니다.)
- 프로듀서당 최대 처리량도 컨슈머와 같은 방법으로 산정할수 있습니다.
- 키를 이용하여 파티션에 메시지를 쓰는 경우에 파티션 추가시 개수 산정이 어려울 수 있습니다.
- 브로커마다 파티션 개수와 디스크 용량 네트워크 처리 속도를 고려해야 합니다
- 파티션 개수가 너무 많을 경우 각 파티션의 리더 브로커 선정에 시간이 많이 소요될 수도 있습니다.
하드웨어 선택
- 어느 환경에서도 실행에는 지장이 없습니다
- 성능이 중요할 경우 다음과 같은 사항을 고려해야 합니다
디스크 처리량
- 프로듀서 성능은 로그 세그먼트 저장에 사용되는 브로커의 디스크 처리량에 비례
- 프로듀서가 생성한 카프카 메시지는 서버의 로컬 스토리지에 커밋 되었다는것을 최소 한 브로커가 확인해 줄떄까지 대기
용량
- 로그 세그먼트의 보관주기, 보관크기, 파티션의 복제 수 에 따라서 선택
메모리
- 컨슈머는 브로커의 시스템 메모리의 페이지 캐시에 저장된 메시지를 읽습니다.
- 페이지폴트가 나는 경우 그만큼 컨슈머가 메시지를 처리하는데 시간이 걸립니다.
네트워크
- 클러스터 크기 조정에 주된 요소
- 카프카에서는 다수의 컨슈머를 지원하므로 inbound 와 outbound 간에 불균형이 생깁니다
- 클러스터 복제 같은 경우에는 추가로 고려해야할 사항이 있습니다
CPU
- 클라이언트는 메시지를 압축하여 전송하므로, 브로커에서 메시지의 체크섬을 검사하고, 오프셋을 할당하기 위해 모든 메시지의 압축을 풀어야 합니다.
- 이후 다시 압축한뒤 디스크에 저장하므로 CPU 성능에 영향을 받지만 주된 요소는 아닙니다.
클라우드에서 카프카 사용하기
- AWS 같은 환경에서도 설치되어 사용됩니다.
- 가장 먼저 고려해야 할 사항이 데이터 보존량이고 그 다음이 프로듀서가 필요로 하는 성능입니다.
- 실제로 AWS 에서는 m4, r3 이 사용되며 예산이 허용된다면 i2, d2 를 고려할 수 있습니다.
카프카 클러스터
- 다수의 브로커를 이용하여 클러스터를 구축할 경우 다음과 같은 장점이 있습니다.
- 다수의 서버로 처리량을 분산시켜 확장
- 서버 장애에 따른 데이터 유실을 막기 위해 복제
- 카프카 시스템을 중단시키지 않고 유지보수 수행 가능
브로커 개수
- 클러스터의 적합한 크기는 몇가지 요소로 결정
- 메시지를 보존하는데 필요한 디스크 용량과 하나의 브로커에 사용 가능한 스토리지 크기
- 요청을 처리하기 위한 클러스터 용량
- 네트워크 인터페이스의 트래픽
- 디스크 처리량, 시스템 메모리 등
브로커 구성
- 클러스터 구성 시 고려사항
- 모든 브로커의 구성파일에 있는 zookeeper.connect 설정이 동일
- broker.id는 unique 하도록
- 이외 브로커 간 복제를 제어하는 매개변수
운영체제 조정하기
- 가상메모리
- 기존에는 vm.swappiness 를 항상 0으로 설정할 것을 권장, 리눅스 커널 3.5 이상에서는 1로 지정
- 가급적 스와핑을 하지 않고 페이지 캐시의 크기를 줄이는것이 좋음
- 더티 페이지의 커널 처리 방법을 조정
- =vm.dirty_background_ratio 를 10보다 낮게 (5 수준) 설정
- 시스템의 전체 메모리 크기에 대한 백그라운드 프로세스의 더티 페이지 비율
- 0으로 지정해서는 안됨
- 백그라운드 프로세스가 디스크에 써야하는 더티 페이지의 수를 줄임
- vm.dirty_ratio 를 기본값이 20보다 증가 (60~80 수준)하여 설정
- 시스템의 전체 메모리에 대한 비율
- 너무 큰 값을 설정할 경우 시스템 장애 상황에서 디스크에 아직 안쓴 더티 페이지들이 유실될 수 있음
- 너무 작은 값을 설정할 경우 빈번하게 더티 페이지들을 디스크에 쓰므로
- /proc/vmstat 파일을 확인하여 현재 더티 페이지 개수를 확인
- =vm.dirty_background_ratio 를 10보다 낮게 (5 수준) 설정
- 디스크
- 파일시스템은 EXT4, XFS 등을 선택
- EXT4 는 지연 블록 할당 알고리즘을 사용하므로 위험할수 있음
- 마운트 옵션은 noatime 으로 설정
- atime 의 경우 파일 읽을 때마다 빈번하게 수정시간이 갱신되므로 디스크 쓰기 발생
- 파일시스템은 EXT4, XFS 등을 선택
- 네트워크
- 각 소켓의 송수신 버퍼로 할당되는 기본 메모리와 최대 메모리 변경
- net.core.wmem.default, net.core.wmem.max
- 바람직한 값은 131072 (128KiB)
- net.core.rmem_default, net.core.rmem.max
- 바람직한 값은 2097152 (2MiB)
- net.core.wmem.default, net.core.wmem.max
- TCP 소켓의 송수신 버퍼 크기도 설정
- net.ipv4.tcp_wmem, net.ipv4.tcp_rmem
- 3개의 정수로 구분되어 있으며 최소, 기본, 최대 순서
- net.core.wmem.max, net.core.rmem.max 보다 클수 없음
- net.ipv4.tcp_window_scaling 을 1로 설정
- TCP의 윈도우 크기 조정 활성화
- net.ipv4.tcp_max_syn_backlog 를 기본 1024보다 크게 설정
- 많은 수의 동시 연결 허용
- net.core.netdev_max_backlog 를 기본 1000보다 크게 설정
- 커널이 더 많은 패킷을 처리 할수 있음
- 각 소켓의 송수신 버퍼로 할당되는 기본 메모리와 최대 메모리 변경
실제 업무 시 고려사항
- 가비지 컬렉션 옵션
- G1GC 권장
- JDK 버전이 변경되면서 확인이 필요합니다.
- MaxGCPauseMillis
- 기존 200밀리초
- InitiatingHeapOccupancyPercent
- 사용중인 전체 힙의 비율, 기본 45%, 45% 넘을 경우 GC동작
- G1GC 권장
- 데이터센터 레이아웃
- 카프카 0.10 시점으로 브로커에서 랙-인식 방법으로 다른 브로커들에게 새로운 파티션 할당 가능
- 한 파티션의 레플리카가 랙을 공유하지 않도록
- broker.rack 설정을 한 경우
- 새로운 파티션에만 적용되며, 같은랙에 파티션 레플리카가 존재하더라도 정정하지 않음
- 카프카 0.10 시점으로 브로커에서 랙-인식 방법으로 다른 브로커들에게 새로운 파티션 할당 가능
- 주키퍼 공동 사용
- 카프카는 주키퍼에 브로커, 토픽, 파티션에 관한 메타정보 저장
- 컨슈머 그룹의 맴버십 변경사항이나 클러스터 자체의 변경사항이 생길떄에만 주키퍼에 쓰기 수행
- 단일 카프카 클러스터에 주키퍼 앙상블을 사용하는것은 바람직하지 않으며, 다수의 클러스터를 하나의 주키퍼 앙상블을 이용
- 이경우 chroot 주키퍼 경로 이용
- 0.9.0.0 이전 에서는 카프카에서 직접적으로 주키퍼를 이용하였음
- 컨슈머 그룹의 구성 정보와 소비된 토픽에 과한 정보를 저장
- 각 파티션의 오프셋을 주기적으로 커밋하기 위해 사용
- 이후 버전에서는 카프카 브로커를 통해 동작하도록 새로운 컨슈머 인터페이스 도입
- 컨슈머는 오프셋을 커밋하기 위해 주키퍼나 카프카 중 하나를 선택하도록 구성
- 주키퍼에서 오프셋을 관리하는 경우 너무 자주 커밋하면 트래픽 유발
- 다른 종류의 어플리케이션이 하나의 주키퍼 앙상블을 공유하는것은 지양
- 카프카는 주키퍼의 지연과 타임아웃에 민감
- 카프카는 주키퍼에 브로커, 토픽, 파티션에 관한 메타정보 저장
작업 메모
#cd $KAFKA_HOME
#bin/kafka-topics.sh --zookeeper zookeeper:2181/ --partitions 1 --replication-factor 1 --create --topic test3
bin/kafka-topics.sh --zookeeper zookeeper:2181/ --list
bin/kafka-topics.sh --zookeeper zookeeper:2181/ --describe
cd $KAFKA_HOME; bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test
cd $KAFKA_HOME; bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
hosts
127.0.0.1 kafka
#window 환경일 경우 windows system32 위치에 설정 할 필요가 있음
docker network create docker-network
docker run -d --net=docker-network --name=zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run -d --net=docker-network --name=kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest
docker run --net=confluent --rm confluentinc/cp-kafka:latest kafka-topics --create --topic test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 confluentinc/cp-schema-registry:latest
#docker run -d --network=docker-network --name=schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=127.0.0.1 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 confluentinc/cp-schema-registry:latest
'개발관련 > 오픈소스(들?)' 카테고리의 다른 글
Opensearch (aws elasticsearch) (0) | 2021.08.08 |
---|---|
kafka 카프카 관리하기 (0) | 2021.07.07 |
elasticsearch percolate search POC (0) | 2021.07.07 |
elasticsearch 의 저장 과정 (0) | 2021.05.24 |
kafka 2.8.0 (0) | 2021.05.16 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- Spring
- percolate
- Async
- AWS
- elasticsearch
- kafka 2.8.0
- 말의품격
- flush
- 사기꾼증후군
- Kafka
- fsync
- opensearch
- 기술센싱
- Dangling
- 클린 아키텍처
- Java
- 에픽테토스
- 만들면서 배우는 클린 아키텍처
- Generic
- WebSocket
- PatternSyntaxException
- COMMIT
- pecs
- 기술블로그
- meta character
- jhipster
- 기술사이트
- 개발자
- completablefuture
- 전설로떠나는월가의영웅
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
글 보관함