kafka란

RabbitMQ와 같은 메시지 미들웨어.

LinkedIn에서 개발되어 현재는 Apache Software Foundation의 일부인 오픈 소스 메시지 스트리밍 플랫폼.
대량의 실시간 데이터 스트리밍을 처리하는 데 초점을 맞추고 있고, 높은 처리량, 데이터 복제, 분산 처리 등을 지원

이를 통해 대규모 데이터를 실시간으로 처리할 수 있다.

MSA간 통신에도 자주 쓰임

 

브로커: Kafka에서 메시지를 처리하는 서버

  • 카프카에서는 이벤트를 구분하기 위한 단위로 '토픽'을 사용하며 파일시스템의 디렉토리와 비슷한 개념으로 이해 가능
    • 토픽은 게시/구독 메시징 시스템에서 흔하게 볼 수 있다.
      • 카프카 외적으로는 비슷한 개념에 대해 채널, 큐, 스트림이라는 용어를 사용하기도 함
        • 개인적으로는 채널이 더 직관적이네
    • 프로듀서는 데이터를 '토픽'에 넣고, 컨슈머는 '토픽'에서 데이터를 읽는다.
    • 이러한 토픽은 카프카 클러스터내에서 데이터를 분류하고 구독자가 관심있는 메시지만 구독할 수 있도록 해주는 중요한 역할
    • 카프카 클러스터 전체가 하나의 토픽만 사용한다면 생략도 가능한가? > NO.
  • 토픽의 데이터는 파티션에 분산 저장되며 각 파티션은 순서 유지가 되는 메시지의 로그로 구성
  • 하나의 브로커는 여러 파티션의 데이터를 저장할 수 있으며, 반대로 하나의 파티션도 여러 브로커에 복제될 수 있음에 주의
    • 단 leader, follower구조를 가지면 복제는 읽기성능개선용이 아니라 장애 대비용이다(active-standby)
  • 파티션의 개수는 토픽의 병렬처리 능력을 결정

리더 브로커: 특정 메시지그룹(파티션)의 처리를 담당하는 서버. Kafka에서 각 데이터 파티션에는 하나의 리더 브로커가 있음

  • 리더 브로커는 파티션과 1:1의 관계를 가지며 하나의 파티션을 여러 브로커가 처리할경우 하나만 리더이고 나머지는 팔로워로 동작

 

ZooKeeper

  • 일종의 비서역할로 서버들의 상태관리나 역할분배를 Kafka를 위해서 수행함. Kakfa설치시 같이 설치됨.
  • 또는 일종의 '중앙 데이터 저장소' 역할. Kafka 시스템 내의 여러 서버들이 ZooKeeper를 통해 필요한 정보 공유하고 동기화.
  • 특정 서버에 문제가 생기면 이 정보가 ZooKeeper에 기록되고 다른 서버들이 이를 확인하여 적절히 대응

 

심화

토픽, 키, 밸류가 있을때 키별로 파티션으로 분배되고(특정 키는 특정파티션으로 들어감) 데이터 전송순서는 키별로만 보장됨

그룹id개념

  • 컨슈머 관점에서 소비하는 팀이 여러개임을 가정하고 group id를 통해서 별도 offset관리가 가능함

MSA간 통신에 API(Rest, gRPC)보다 kafka가 스루풋에 유리한 이유

  • Kafka는 데이터를 디스크에 순차적으로 추가(write)만 합니다.
  • Zero Copy + Sendfile() 사용(유저공간 메모리에 올리지 않고 커널 버퍼만 탐)
  • Kafka는 메시지를 1건씩 처리하지 않고, batch로 묶어 전송하고 압축까지함
  • 파티셔닝으로 병렬 처리

TPS비교항목

  API 서버 (REST/gRPC 등) Kafka (Broker 기준)
TPS 수준 수천~수만 TPS (10K~50K) 수십만~수백만 TPS (100K~1M 이상)
지연 시간 수 ms ~ 수백 ms 수 ms ~ 수십 ms

하이브리드도 많이 씀

  • 주문 API → Kafka enqueue → 매칭엔진

Producer > Broker > Consumer

  • Producer > Broker 구간은 push (1-5ms)
  • Broker > Consumer 구간은 pull방식 (5-20ms)

 

Kafka에서 “Exactly-Once”를 보장하려면 트랜잭션이 필요

Kafka는 기본적으로 At-Least-Once(최소 1번) 처리 모델.
Producer가 실패 시 재시도하면 중복 메시지가 발생할 수 있기 때문.

그걸 방지하려면:

  1. 중복 없이 보내야 하고 (Producer → Kafka)
    1. 아래와 같은 부분성공 시나리오때문에 트랜색션처리 필요
      1. A 토픽엔 성공, B 토픽엔 실패 → 시스템 불일치
      2. 배치 중 네트워크 끊김 → 일부 메시지만 브로커에 기록
    2. 구현방법
        1. enable.idempotence=true → 중복 방지
        2. 트랜잭션을 쓰려면
          • transactional.id 를 설정하고
          • initTransactions() 호출 후
          • beginTransaction()/commitTransaction() (또는 abortTransaction()) 을 직접 사용해야 합니다.
        즉, 두 기능은 별개지만, 트랜잭션을 쓰려면 반드시 Idempotent Producer 가 활성화된 상태여야 합니다.
  2. 커밋된 메시지만 읽고 (Kafka → Consumer)
    1. isolation.level=read_committed 로 설정하면, 트랜잭션 커밋된 메시지(=commitTransaction())만 컨슈머가 보도록 함
  3. 소비한 메시지는 한 번만 처리하고 커밋해야 함 (Consumer → DB)
    1. "enable.auto.commit=false"  자동 commit끔
    2. 외부api호출과 연계한다면 호출성공후 outbox OK처리
      1. 이것도 외부api호출후 시스템이 죽으면 완전한 exactly-once는 안되는데, 멱등성 로직으로 보완(외부 api호출시 멱등성처리)
      2. 또는 saga패턴을 통한 보상 매커니즘 작동

 

장애대응

로깅이 아닌 핵심컴포넌트로 사용시 

1. 고가용성 세팅: 브로커 3대이상 운영, 하나 죽어도 자동리더선출로 단일 브로커 장애 정도는 무중단으로 흡수할 수 있어야 함

2. Producer acks=all설정: 모든 브로커가 받아야 다음 진행

3. Producer auto.offset.commit → false : 오프셋 관리 꼼꼼하게

4. Producer 반복된 전송실패시 임시저장소에 적재하고 replay worker가 주기적으로 꺼내서 재전송시도(3회정도 실패하면 실패로그 기록해서 스레드를 계속차지하지 않게 함)

Kafka 전송 실패 → 3회 retry (100ms 간격) → 그래도 실패 → Redis fallback 저장 → @Scheduled replay worker가 Kafka 재전송 시도 → 여전히 실패 시 운영 Slack 알림 or DLT 적재

5. Consumer DLT

  • 소비중 Json 파싱에러등으로 오류가 발생하면 DLT로 보내고 메인 서비스는 정상적으로 계속처리
  • DLT를 나중에 따로 모아서 분석/재처리/알림 등 가능
  • 주 토픽 (orders)
       └── Consumer 처리 중 에러 발생
            └── Dead Letter Topic (orders.DLT)로 해당 메시지 저장, replay consumer가 소비, 도는 운영이슈로.

6. 고가용성세팅에도 불구하고 모든 브로커가 죽었을때: 단일 장애지점(SPOF)가 되지 않도록 별도 로깅시스템(역시 카프카지만)으로 S3 > hiveTable구조를 만들고 복원에 사용

 

고객관점

방법1. react로 500ms등 타임박스를 설정하고, 비동기로 몇차리 전송시도후 안되면 실패로 판정하고 고객에게 안내(다시해보세요 등)

방법2. 완전비동기로 설계하여 고객에게는 주문이전송되었습니다 정도로하고 성공은 비동기로 고객에게 전달

다만, 현재 페이지가 유지될경우 websocket을 통해 상태메시지 갱신. 이때 api서버와 push서버는 분리해주는 게 좋음(영향을 줄 수 있으므로)

방법3. 완전동기화. 모래시계로 외부api가 timeout날때까지 기다리거나 하는 방식인데 권장되지는 않음. 정말 중요한 프로세스면 선택할수도

 

kafka설치과정

Java설치

카프카는 Java기반이므로 java jdk가 안깔렸으면 깔아준다.

sudo apt update
sudo apt install default-jdk

사용자계정에 종속된 설치보다는 여러명이 쓸 수 있도록 아래 처럼 전용 유저를 생성해주는게 좋다.

sudo adduser kafka
sudo adduser kafka sudo

카프카는 자바 기반이므로 jar파일을 받아서 설정해준다. apt등의 패키지 매니저를 사용하면 예전버전일수 있으므로 다운로드해서 해보자.

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir /opt/kafka && cd /opt/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1


#kafka디렉토리 소유권을 kafka사용자로 변경
sudo useradd kafka -m
sudo chown -R kafka:kafka /opt/kafka

/opt/kafka에 설치했으며, /opt 디렉토리는 선택적인(add-on) 애플리케이션 SW패키지를 위한 곳으로 여러사용자가 이용하는 라이브러리인 경우 선호되는 경로다.

 

zookeeper설정

zookeeper는 kafka와 독립적으로 실행되며, 비슷한 설정 매커니즘을 갖는다.

zookeeper설정파일 수정

sevity@sevityubuntu:/opt/kafka/config$ cat zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/var/cache/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

다른 부분은 그냥 두었고, dataDir은 /var/cache 밑에 두어 휘발되지 않도록 설정했다. (기본은 /tmp 밑에 있었던듯)

이를 위해 다음과 같은 권한 설정을 했다.

sudo mkdir /var/cache/zookeeper
sudo chown kafka:kafka /var/cache/zookeeper
sudo chmod 700 /var/cache/zookeeper

 

zookeeper에 대해 systemctl에 등록해서 부팅시 마다 실행되도록 하기위해 다음 파일 작성

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start zookeeper
sudo systemctl enable zookeeper

kafka에 대해서도 마찬가지로 작성

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start kafka
sudo systemctl enable kafka

 

트러블 슈팅

서버로그 실시간 모니터링

tail -f /opt/kafka/logs/server.log

 

jps로 kafka가 잘 실행되고 있는지 확인(QuorumPeerMain과 Kafka가 떠있으면OK)

su - kafka
jps
109092 Jps
108186 QuorumPeerMain(Zookeeper의 시작점)
108573 Kafka

 

 

토픽 메시지가 잘 수신되는지 확인하는 방법

가장 기본적으로는 9092포트를 리슨하고 있는지 확인

sevity@sevityubuntu:/opt/kafka/config$ lsof -i :9092
COMMAND    PID   USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    168958 sevity  116u  IPv6 1324110      0t0  TCP sevityubuntu:46330->sevityubuntu:9092 (ESTABLISHED)
java    168958 sevity  117u  IPv6 1320714      0t0  TCP sevityubuntu:46336->sevityubuntu:9092 (ESTABLISHED)

 

GetOffsetSell을 통해 특정 토픽에 게시된 메지시수를 확인

sevity@sevityubuntu:/opt/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic iis_log --time -1
iis_log:0:4043038

kafka-console-consumer.sh를 사용하여 특정 토픽의 메시지 수신상황을 실시간으로 확인

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iis_log --from-beginning
{"@timestamp":"2023-08-01T01:18:34.407Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.9.0"},"agent":{"version":"8.9.0","ephemeral_id":"98df9137-96e3-4653-b353-2821eebde875","id":"cf839823-29c8-4619-b9c4-85e90804e50e","name":"SEVITY-PC","type":"filebeat"},"ecs":{"version":"8.0.0"},"log":{"file":{"path":"c:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex230801.log"},"offset":16076},"message":"2023-08-01 01:17:24 192.168.0.6 GET /wiki/doku.php id=Advice%20To%20Relieve%20Your%20Acid%20Reflux%20Signs%20and%20symptoms 80 - 196.245.181.117 Mozilla/5.0+(X11;+Ubuntu;+Linux+x86_64;+rv:114.0)+Gecko/20100101+Firefox/114.0 http://sevity.com/ 200 0 0 1182","input":{"type":"log"},"host":{"mac":["00-15-83-EA-2C-EE","00-23-24-63-E7-E3","02-50-41-00-00-01","88-36-6C-F7-D9-04","88-36-6C-F7-D9-06","88-36-6C-F7-D9-07"],"hostname":"sevity-pc","name":"sevity-pc","architecture":"x86_64","os":{"family":"windows","name":"Windows 10 Enterprise","kernel":"10.0.19041.3208 (WinBuild.160101.0800)","build":"19045.3208","type":"windows","platform":"windows","version":"10.0"},"id":"dbb18b3d-fb42-49ec-b731-f430dd2f3fd5","ip":["fe80::a30b:d99a:f7ed:2bac","192.168.100.15","fe80::a4ce:c09a:ea91:8a9","169.254.142.88","fe80::9c93:77c8:66bd:b6c5","169.254.55.217","fe80::dd77:ec13:69c4:6922","169.254.56.104","fe80::310a:1b3d:e9a3:431c","192.168.0.6","fe80::2c27:d03a:a322:765b","169.254.236.36"]}}

 

반응형

'Programming > Linux' 카테고리의 다른 글

Nginx  (1) 2023.08.16
Redis  (0) 2023.08.10
ELK연습  (0) 2023.07.30
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30
스팍(Spark) 설치  (0) 2023.07.30

+ Recent posts