반응형

Docker network는 컨테이너끼리 통신하거나 호스트와 포트를 연결하기 위해 Docker가 제공하는 네트워크 구성이다.

컨테이너 네트워크를 볼 때는 bridge, host, overlay 같은 driver 차이와 포트 매핑, 컨테이너 이름 기반 통신을 함께 확인해야 한다.

 

핵심 정리

Docker 기본 bridge 네트워크에서는 컨테이너가 내부 IP를 받고 같은 네트워크 안의 컨테이너끼리 통신할 수 있다. 외부에서 접근하려면 호스트 포트와 컨테이너 포트를 매핑해야 하며, Compose를 쓰면 service들이 같은 네트워크에서 이름으로 서로를 찾을 수 있다.

  • bridge는 단일 호스트에서 가장 기본으로 쓰는 Docker 네트워크 driver다.
  • 포트 매핑은 호스트의 포트를 컨테이너 내부 포트로 연결한다.
  • 같은 사용자 정의 bridge 네트워크 안에서는 컨테이너 이름으로 통신할 수 있다.
  • host network는 컨테이너가 호스트 네트워크를 직접 쓰게 하므로 격리가 줄어든다.
  • Compose는 service들을 위한 기본 네트워크를 만들고 service 이름을 DNS처럼 사용할 수 있게 한다.

Docker 네트워크 오류는 컨테이너 내부 포트, 호스트 포트, 같은 네트워크 소속 여부를 순서대로 확인하면 대부분 범위를 좁힐 수 있다.

이어서 볼 글

 

브리지 네트워크 (Bridge Network):

Docker에서 기본적으로 제공되는 네트워크 모드입니다.
컨테이너가 독립적인 IP 주소를 가지며, 호스트와는 브리지 네트워크를 통해 통신합니다.

 

bridge는 Docker의 기본 네트워크 드라이버 중 하나입니다. Docker가 설치되면 기본적으로 생성되는 네트워크 모드로, 개별 컨테이너들이 독립된 네트워크 네임스페이스에서 동작하게 합니다. 이렇게 하면 컨테이너 간의 네트워크 통신이 보안적으로 격리됩니다.

bridge 네트워크에 대한 주요 특징은 다음과 같습니다:

  • 격리: 각 컨테이너는 독립된 네트워크 네임스페이스를 가지기 때문에 서로 격리됩니다.
  • 자동 IP 할당: Docker의 내장 DHCP 서버에 의해 자동으로 IP 주소가 할당됩니다. 보통 172.17.0.x 범위의 주소가 기본으로 사용됩니다.
  • 포트 매핑: 호스트 시스템의 특정 포트를 컨테이너의 특정 포트에 매핑(포워딩)할 수 있습니다. 예를 들어, 호스트의 8080 포트를 컨테이너의 80 포트에 매핑할 수 있습니다.
  • 컨테이너 간 통신: 같은 bridge 네트워크에 있는 컨테이너끼리는 IP 주소를 통해 서로 통신할 수 있습니다.

그러나 bridge 네트워크 모드에는 몇 가지 제한사항도 있습니다

  • 외부 네트워크와의 통신은 NAT(Network Address Translation)를 통해 이루어지기 때문에 성능 오버헤드가 있을 수 있습니다.
  • 호스트와 컨테이너 간의 통신을 위해서는 포트 매핑이 필요합니다.
  • 다른 호스트에 있는 컨테이너와의 통신은 기본적으로 지원되지 않습니다.
  • 이러한 제한사항 때문에 복잡한 배포나 여러 호스트 간의 통신이 필요한 경우에는 overlay, macvlan 등의 다른 네트워크 드라이버를 사용하기도 합니다.

 

호스트 네트워크 (Host Network):

컨테이너가 호스트의 네트워크 네임스페이스를 사용합니다.
컨테이너는 호스트의 IP 주소와 포트를 직접 사용할 수 있습니다.

반응형
반응형

docker-compose는 여러 컨테이너 서비스를 하나의 YAML 파일에 정의하고 함께 빌드, 실행, 중지할 수 있게 해주는 Docker 도구다.

Compose를 쓸 때는 service, image, build, ports, volumes, networks가 각각 어떤 컨테이너 설정으로 이어지는지 구분해야 한다.

 

핵심 정리

docker-compose.yml은 여러 컨테이너를 하나의 애플리케이션 묶음처럼 다루기 위한 설정 파일이다. web, database 같은 service를 정의하고 docker compose up으로 필요한 컨테이너와 네트워크, 볼륨을 함께 생성해 실행한다.

  • service는 Compose 안에서 관리하는 컨테이너 구성 단위다.
  • image는 사용할 기존 이미지를 지정하고 build는 Dockerfile로 이미지를 만들 때 쓴다.
  • ports는 호스트 포트와 컨테이너 포트를 연결한다.
  • volumes는 컨테이너가 삭제되어도 유지할 데이터나 설정 파일을 연결한다.
  • networks는 여러 service가 서로 통신할 Docker 네트워크를 정한다.

Compose 문제를 볼 때는 먼저 어떤 service가 어떤 이미지와 포트, 볼륨, 네트워크를 쓰는지 표처럼 나누어 보면 실행 오류를 좁히기 쉽다.

이어서 볼 글

 

여러 도커간의 디펜던시를 설정해서 빌드하고 띄우는 일을 할 수 있다.

용어

서비스 (Service)

Docker Compose 내에서 쓰이는 개념으로, 도커 컨테이너의 구성 및 실행 방법을 정의한 것.
서비스는 하나의 컨테이너일 수도 있고, 동일한 이미지를 기반으로 하는 여러 컨테이너의 집합일 수도 있다.

예를 들어, docker-compose.yml 파일에 다음과 같은 정의가 있다면:

services:
  web:
    image: nginx:latest
    ports:
      - "80:80"
  database:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=my-password

여기에서 web과 database는 각각 하나의 서비스.
이 경우, 각 서비스는 각각 하나의 컨테이너를 생성하지만, 필요에 따라 한 서비스에서 여러 컨테이너를 생성할 수도 있습니다 (예: 스케일링 목적으로).


결국, "서비스"는 Docker Compose에서 응용 프로그램의 한 부분 (예: 웹 서버, 데이터베이스)을 나타내며, 이 서비스를 구성하는 데 필요한 설정과 함께 도커 이미지 정보를 포함합니다.

사용방법

docker-compose build:

이 명령어는 docker-compose.yml 파일에 정의된 모든 서비스의 이미지를 빌드합니다.
docker-compose.yml 파일에서 build: 섹션을 사용하여 Dockerfile의 위치나 빌드 컨텍스트 등을 지정한 서비스에 대해서만 이 명령어를 사용해야 합니다.


docker-compose up -d:

위에서 빌드한 이미지들의 실행 인스턴스인 컨테이너들을 만들어 실행
-d 옵션은 "detached mode"를 의미하며, 백그라운드에서 컨테이너를 실행하라는 것을 의미
이미지가 없다면 빌드도 시도하기 때문에 docker-compose build 없이 이것만 실행해도 되긴 하나,

코드 변경시 재빌드해주진 않기 때문에 코드 변경이 있었다면 build후 up해야 한다.

docker-compose down -v:

up과 반대로 컨테이너들 중단하고 없애준다.

-v하면 관련 데이터 볼륨도 제거해준다.

여기서 데이터 볼륨(Data Volume)은 Docker 컨테이너 파일 시스템의 일부를 호스트 시스템에 저장하는데 사용되는 매커니즘으로, 컨테이너간 데이터를 공유하는 공유폴더 개념으로 쓰이거나 데이터를 영구적으로 저장할수 있게 해준다.

그 예시로는 (DB, 로그, config file등이 있다)

docker-compose ps

docker-compose로 관리되는 컨테이너들의 현재 상태와 관련된 정보를 조회

다음내용 확인가능

  • 서비스 상태 확인: docker-compose up 명령을 사용하여 여러 서비스를 시작한 후, 각 서비스의 컨테이너가 올바르게 실행 중인지, 혹은 중지된 상태인지를 확인하기 위해 사용됩니다.
  • 문제 진단: 서비스에 문제가 발생했을 때, 어떤 컨테이너가 문제를 일으키고 있는지 확인하기 위해 사용될 수 있습니다. 예를 들어, 컨테이너가 계속 재시작되는 경우, docker-compose ps를 통해 해당 컨테이너의 상태를 확인할 수 있습니다.
  • 포트 정보 확인: 각 컨테이너에서 어떤 포트가 호스트와 연결되었는지 확인하려 할 때 사용됩니다.
  • 스케일링 확인: docker-compose up --scale 명령을 사용하여 서비스의 인스턴스 수를 조정한 경우, 현재 실행 중인 인스턴스 수를 확인하기 위해 사용됩니다.
  • 컨테이너 이름 확인: docker-compose로 시작된 컨테이너들은 특정한 네이밍 규칙을 가지고 있습니다. 이 이름을 확인하기 위해 docker-compose ps를 사용할 수 있습니다.

아래는 예시

               Name                             Command               State                                  Ports
--------------------------------------------------------------------------------------------------------------------------------------------------
table-walkthrough_data-generator_1   /docker-entrypoint.sh            Up
table-walkthrough_grafana_1          /run.sh                          Up      0.0.0.0:3000->3000/tcp,:::3000->3000/tcp
table-walkthrough_jobmanager_1       /docker-entrypoint.sh stan ...   Up      6123/tcp, 0.0.0.0:8082->8081/tcp,:::8082->8081/tcp
table-walkthrough_kafka_1            start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp
table-walkthrough_mysql_1            docker-entrypoint.sh --def ...   Up      3306/tcp, 33060/tcp
table-walkthrough_taskmanager_1      /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
table-walkthrough_zookeeper_1        /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
sevity@sevityubuntu:~/workspace/flink-playgrounds/table-walkthrough$
반응형
반응형

Nginx는 정적 파일을 직접 제공하는 웹 서버로도 쓰이고, 뒤쪽 애플리케이션 서버로 요청을 넘기는 리버스 프록시로도 자주 쓰인다.

Nginx 설정을 볼 때는 server 블록, location 블록, proxy_pass, upstream 연결, SSL termination 위치를 나누어 확인해야 한다.

 

핵심 정리

Nginx는 클라이언트 요청을 받아 정적 파일을 제공하거나 백엔드 서버로 전달한다. HTTPS 인증서 처리를 Nginx에서 끝내고 내부 애플리케이션과는 HTTP로 통신하는 구성을 SSL termination이라고 하며, 실제 운영에서 자주 쓰인다.

  • server 블록은 도메인, 포트, 인증서 같은 요청 처리 단위를 정의한다.
  • location 블록은 URL 경로별 처리 규칙을 나눈다.
  • proxy_pass는 요청을 뒤쪽 애플리케이션 서버로 전달한다.
  • SSL termination은 TLS 처리를 Nginx나 로드밸런서에서 끝내는 구성이다.
  • 502 오류는 Nginx 설정뿐 아니라 upstream 서버 상태와 포트 연결도 함께 확인해야 한다.

Nginx 문제를 볼 때는 외부 HTTPS 연결, Nginx listen 설정, upstream 애플리케이션 포트, 방화벽을 순서대로 나누어 확인하면 원인 범위가 좁아진다.

이어서 볼 글

 

주 역할

웹서버/캐시/로드밸런서/리버스프록시

 

SSL Termination

클라이언트와는 https통신을 하면서 백단 서버들과는 http(gRPC포함)통신을 하여, 복호화 부담을 줄이는 기능

 

실전꿀팁

여기가면 실제로 많이 사용되는 Nginx설정을 보고 적용할 수 있다.

반응형
반응형

Redis는 메모리에 데이터를 저장해 빠르게 읽고 쓰는 key-value 저장소이며, 캐시뿐 아니라 여러 서버 설계 패턴에 함께 쓰입니다.

String, Hash, List, Set, Sorted Set 같은 자료구조를 제공하고 TTL, persistence, replication 설정에 따라 캐시와 저장소 사이의 역할이 달라집니다.

 

핵심 정리

Redis를 캐시로 사용할 때는 데이터가 언제 만료되는지, 캐시 미스가 났을 때 누가 DB를 조회하는지, 쓰기 시점에 캐시와 DB를 어떻게 맞출지 정해야 합니다. Cache-Aside는 애플리케이션이 직접 캐시 조회와 DB 조회를 제어하고, Read-Through와 Write-Through는 캐시 계층이나 라이브러리가 일부 흐름을 맡는 방식입니다.

  • Redis는 메모리 기반이라 읽기와 쓰기가 빠르지만 메모리 용량을 설계에 반영해야 합니다.
  • 캐시 데이터에는 TTL과 eviction 정책을 함께 고려해야 합니다.
  • String, Hash, List, Set, Sorted Set 중 어떤 자료구조를 쓸지에 따라 구현이 달라집니다.
  • Cache-Aside는 애플리케이션 코드가 cache miss, DB 조회, cache set을 직접 처리합니다.
  • Read-Through는 캐시 get 호출 뒤 miss 처리와 채우기를 캐시 계층이 맡는 방식입니다.
  • 운영 환경에서는 persistence, replication, backup, 장애 복구 전략을 함께 봐야 합니다.

Redis를 단순히 빠른 임시 저장소로만 보면 캐시 무효화와 데이터 일관성 문제가 늦게 드러납니다. 어떤 데이터가 원본이고, Redis에는 어느 시점에 어떤 형태로 복사되는지 먼저 정하는 것이 중요합니다.

이어서 볼 글

 

설치 운영

우분투에서 설치

sudo apt-get update
sudo apt-get install redis-server

트러블슈팅

docker안에서 host의 redis에 접근하도록 하기

실행할때 다음처럼 --network host만 붙여주면 된다.

#!/bin/bash

PORT=$1

docker run \
  --network host \
  -e SPRING_APPLICATION_JSON='{"server":{"port":'"$PORT"'}}' \
  -p $PORT:$PORT \
  auth-service

설계관점

유사한 목적(“캐시-미스 시 DB에서 데이터 가져와 캐시에 채우기”)을 갖고 있지만, 구현 주체흐름 제어가 다릅니다.

 
Read-Through vs Write Through
둘다 캐시라이브러리를 쓰는 형태
항목 Read-Through Write-Through
읽기(Read) 애플리케이션이 cache.get(key)만 호출
– miss 시 캐시 라이브러리가 DB에서 가져와 자동으로 채워 줌
애플리케이션이 cache.get(key)만 호출
– miss 시 캐시→DB 채우기(읽기 방식은 동일)
쓰기(Write) 애플리케이션이 DB에 직접 쓰고,
캐시는 별도 무효화 로직(예: Cache-Aside) 필요
애플리케이션이 cache.put(key, value) 호출
– 캐시에 먼저 쓰고, 라이브러리가 동기적으로 DB에도 써 줌
앱 코드 관점 • Read: cache.get(key)
• Write: db.save(...) + 별도 cache.del(key)
• Read: cache.get(key)
• Write: cache.put(key, value) (내부에서 DB 쓰기까지 수행)
 
  • Read-Through는 애플리케이션이 캐시에 get()만 호출하면 되고,
    쓰기는 보통 DB에 직접 쓰고 캐시 무효화를 따로 처리
  • Write-Through는 애플리케이션이 읽을 땐 get(), 쓸 땐 put()을 호출하며,
    put()이 캐시와 DB 쓰기를 동기적으로 처리해 줌
 
Cache-Aside vs Read-Through
항목 Cache-Aside Read-Through
핵심 아이디어 앱이 직접 “캐시 꺼내→DB 가져와→캐시 세팅” 캐시 라이브러리가 “get() 호출만으로 내부에서 DB 조회→캐시 세팅”
누가 제어하나 애플리케이션 코드 캐시 미들웨어 / 프레임워크
코드 호출 방식 cache.get → miss? db.query → cache.set cache.get 만 호출
쓰기(무효화) db.update 후 애플리케이션이 cache.del 보통 Write-Through와 결합해 cache.put→db.write
자동화 수준 수동 로직(직접 구현) 캐시 솔루션에 내장 (투명하게 작동)
 

결론:

  • 목적은 같지만,
  • Cache-Aside는 “앱이 직접 관리”
  • Read-Through는 “라이브러리가 대신 관리”

Write-Through vs Write-Back

항목 Write-Through Write-Back
핵심 아이디어 캐시에 쓰면 즉시 동기적으로 DB에도 쓰기 캐시에 먼저 쓰고, 나중에 배치나 트리거로 DB에 반영
쓰기 흐름 1. cache.put(key, value) 호출
2. 캐시 저장 완료 후
3. 즉시 DB 쓰기
1. cache.put(key, value) 호출
2. 캐시만 업데이트
3. 일정 시점에 Dirty-entry를 모아서 DB 쓰기
DB 쓰기 타이밍 동기적, 쓰기 호출 시점에 바로 반영 비동기적, 지연(예: TTL, 배치, evict 시점)에 반영
데이터 일관성 보장 강력(쓰기 직후 DB와 캐시 일치) 약함(아직 DB에 반영되지 않은 쓰기 데이터가 캐시에만 존재할 수 있음)
쓰기 레이턴시 상대적으로 높음(캐시+DB 두 번 쓰기) 상대적으로 낮음(한 번만 캐시에 쓰기)
장애 복구 & 내구성 쓰기 실패 시 즉시 예외 처리 가능 캐시 장애나 장애 복구 시 Dirty 데이터 유실 위험
캐시 오염(Dirty) 관리 없음 “Dirty bit” 관리 필요 → evict, 주기적 flush 로직 구현
구현 복잡도 낮음(동기 쓰기만 구현) 높음(Dirty 식별·스케줄·재시도 로직 등 추가)
적합한 사용 사례 • 데이터 일관성이 최우선일 때
• 쓰기 빈도가 낮거나 허용 가능한 레이턴시일 때
• 쓰기 빈도가 매우 높고, 약간의 지연 반영을 허용할 때
• DB 부하 완화가 급선무일 때
반응형
반응형

Kafka는 대량의 이벤트를 토픽에 기록하고, 프로듀서와 컨슈머가 이를 비동기적으로 주고받게 하는 분산 스트리밍 플랫폼입니다.

이 글은 Kafka를 이해할 때 먼저 잡아야 하는 토픽, 파티션, 브로커, 리더와 팔로워, 컨슈머 그룹 개념을 정리합니다.

 

핵심 정리

Kafka에서 프로듀서는 메시지를 토픽에 쓰고, 컨슈머는 토픽에서 메시지를 읽습니다. 토픽은 다시 파티션으로 나뉘며, 파티션은 메시지 순서를 보존하는 로그 단위입니다. 브로커는 파티션 데이터를 저장하고 처리하는 서버이고, 복제된 파티션 중 하나는 리더가 되어 읽기와 쓰기의 기준점이 됩니다. 컨슈머 그룹은 여러 컨슈머가 같은 토픽을 나누어 읽게 해 병렬 처리와 장애 대응을 가능하게 합니다.

  • 토픽은 이벤트를 분류하는 논리적인 이름입니다.
  • 파티션은 토픽 내부에서 데이터를 나누어 저장하고 병렬 처리 능력을 결정합니다.
  • 브로커는 Kafka 클러스터 안에서 파티션 데이터를 저장하는 서버입니다.
  • 리더 파티션은 실제 읽기와 쓰기의 기준이 되고 팔로워는 복제본 역할을 합니다.
  • 컨슈머 그룹을 쓰면 여러 컨슈머가 파티션을 나누어 처리할 수 있습니다.
  • ZooKeeper 설명은 오래된 Kafka 운영 방식과 연결되므로, 새 설치에서는 KRaft 기반 공식 문서를 확인해야 합니다.

원문은 Kafka의 기본 구조를 넓게 적어 둔 글입니다. 이번 보강은 처음 읽는 사람이 토픽과 파티션을 헷갈리지 않게 하고, ZooKeeper 중심 설명이 최신 설치 기준과 다를 수 있다는 점을 앞에서 표시했습니다.

이어서 볼 글

 

kafka란

RabbitMQ와 같은 메시지 미들웨어.

LinkedIn에서 개발되어 현재는 Apache Software Foundation의 일부인 오픈 소스 메시지 스트리밍 플랫폼.
대량의 실시간 데이터 스트리밍을 처리하는 데 초점을 맞추고 있고, 높은 처리량, 데이터 복제, 분산 처리 등을 지원

이를 통해 대규모 데이터를 실시간으로 처리할 수 있다.

MSA간 통신에도 자주 쓰임

브로커: Kafka에서 메시지를 처리하는 서버

  • 카프카에서는 이벤트를 구분하기 위한 단위로 '토픽'을 사용하며 파일시스템의 디렉토리와 비슷한 개념으로 이해 가능
    • 토픽은 게시/구독 메시징 시스템에서 흔하게 볼 수 있다.
      • 카프카 외적으로는 비슷한 개념에 대해 채널, 큐, 스트림이라는 용어를 사용하기도 함
        • 개인적으로는 채널이 더 직관적이네
    • 프로듀서는 데이터를 '토픽'에 넣고, 컨슈머는 '토픽'에서 데이터를 읽는다.
    • 이러한 토픽은 카프카 클러스터내에서 데이터를 분류하고 구독자가 관심있는 메시지만 구독할 수 있도록 해주는 중요한 역할
    • 카프카 클러스터 전체가 하나의 토픽만 사용한다면 생략도 가능한가? > NO.
  • 토픽의 데이터는 파티션에 분산 저장되며 각 파티션은 순서 유지가 되는 메시지의 로그로 구성
  • 하나의 브로커는 여러 파티션의 데이터를 저장할 수 있으며, 반대로 하나의 파티션도 여러 브로커에 복제될 수 있음에 주의
    • 단 leader, follower구조를 가지면 복제는 읽기성능개선용이 아니라 장애 대비용이다(active-standby)
  • 파티션의 개수는 토픽의 병렬처리 능력을 결정

리더 브로커: 특정 메시지그룹(파티션)의 처리를 담당하는 서버. Kafka에서 각 데이터 파티션에는 하나의 리더 브로커가 있음

  • 리더 브로커는 파티션과 1:1의 관계를 가지며 하나의 파티션을 여러 브로커가 처리할경우 하나만 리더이고 나머지는 팔로워로 동작

ZooKeeper

  • 일종의 비서역할로 서버들의 상태관리나 역할분배를 Kafka를 위해서 수행함. Kakfa설치시 같이 설치됨.
  • 또는 일종의 '중앙 데이터 저장소' 역할. Kafka 시스템 내의 여러 서버들이 ZooKeeper를 통해 필요한 정보 공유하고 동기화.
  • 특정 서버에 문제가 생기면 이 정보가 ZooKeeper에 기록되고 다른 서버들이 이를 확인하여 적절히 대응

심화

토픽, 키, 밸류가 있을때 키별로 파티션으로 분배되고(특정 키는 특정파티션으로 들어감) 데이터 전송순서는 키별로만 보장됨

그룹id개념

  • 컨슈머 관점에서 소비하는 팀이 여러개임을 가정하고 group id를 통해서 별도 offset관리가 가능함

MSA간 통신에 API(Rest, gRPC)보다 kafka가 스루풋에 유리한 이유

  • Kafka는 데이터를 디스크에 순차적으로 추가(write)만 합니다.
  • Zero Copy + Sendfile() 사용(유저공간 메모리에 올리지 않고 커널 버퍼만 탐)
  • Kafka는 메시지를 1건씩 처리하지 않고, batch로 묶어 전송하고 압축까지함
  • 파티셔닝으로 병렬 처리

TPS비교항목

  API 서버 (REST/gRPC 등) Kafka (Broker 기준)
TPS 수준 수천~수만 TPS (10K~50K) 수십만~수백만 TPS (100K~1M 이상)
지연 시간 수 ms ~ 수백 ms 수 ms ~ 수십 ms

하이브리드도 많이 씀

  • 주문 API → Kafka enqueue → 매칭엔진

Producer > Broker > Consumer

  • Producer > Broker 구간은 push (1-5ms)
  • Broker > Consumer 구간은 pull방식 (5-20ms)

Kafka에서 “Exactly-Once”를 보장하려면 트랜잭션이 필요

Kafka는 기본적으로 At-Least-Once(최소 1번) 처리 모델.
Producer가 실패 시 재시도하면 중복 메시지가 발생할 수 있기 때문.

그걸 방지하려면:

  1. 중복 없이 보내야 하고 (Producer → Kafka)
    1. 아래와 같은 부분성공 시나리오때문에 트랜색션처리 필요
      1. A 토픽엔 성공, B 토픽엔 실패 → 시스템 불일치
      2. 배치 중 네트워크 끊김 → 일부 메시지만 브로커에 기록
    2. 구현방법
        1. enable.idempotence=true → 중복 방지
        2. 트랜잭션을 쓰려면
          • transactional.id 를 설정하고
          • initTransactions() 호출 후
          • beginTransaction()/commitTransaction() (또는 abortTransaction()) 을 직접 사용해야 합니다.
        즉, 두 기능은 별개지만, 트랜잭션을 쓰려면 반드시 Idempotent Producer 가 활성화된 상태여야 합니다.
  2. 커밋된 메시지만 읽고 (Kafka → Consumer)
    1. isolation.level=read_committed 로 설정하면, 트랜잭션 커밋된 메시지(=commitTransaction())만 컨슈머가 보도록 함
  3. 소비한 메시지는 한 번만 처리하고 커밋해야 함 (Consumer → DB)
    1. "enable.auto.commit=false"  자동 commit끔
    2. 외부api호출과 연계한다면 호출성공후 outbox OK처리
      1. 이것도 외부api호출후 시스템이 죽으면 완전한 exactly-once는 안되는데, 멱등성 로직으로 보완(외부 api호출시 멱등성처리)
      2. 또는 saga패턴을 통한 보상 매커니즘 작동

장애대응

로깅이 아닌 핵심컴포넌트로 사용시 

1. 고가용성 세팅: 브로커 3대이상 운영, 하나 죽어도 자동리더선출로 단일 브로커 장애 정도는 무중단으로 흡수할 수 있어야 함

2. Producer acks=all설정: 모든 브로커가 받아야 다음 진행

3. Producer auto.offset.commit → false : 오프셋 관리 꼼꼼하게

4. Producer 반복된 전송실패시 임시저장소에 적재하고 replay worker가 주기적으로 꺼내서 재전송시도(3회정도 실패하면 실패로그 기록해서 스레드를 계속차지하지 않게 함)

Kafka 전송 실패 → 3회 retry (100ms 간격) → 그래도 실패 → Redis fallback 저장 → @Scheduled replay worker가 Kafka 재전송 시도 → 여전히 실패 시 운영 Slack 알림 or DLT 적재

5. Consumer DLT

  • 소비중 Json 파싱에러등으로 오류가 발생하면 DLT로 보내고 메인 서비스는 정상적으로 계속처리
  • DLT를 나중에 따로 모아서 분석/재처리/알림 등 가능
  • 주 토픽 (orders)
       └── Consumer 처리 중 에러 발생
            └── Dead Letter Topic (orders.DLT)로 해당 메시지 저장, replay consumer가 소비, 도는 운영이슈로.

6. 고가용성세팅에도 불구하고 모든 브로커가 죽었을때: 단일 장애지점(SPOF)가 되지 않도록 별도 로깅시스템(역시 카프카지만)으로 S3 > hiveTable구조를 만들고 복원에 사용

고객관점

방법1. react로 500ms등 타임박스를 설정하고, 비동기로 몇차리 전송시도후 안되면 실패로 판정하고 고객에게 안내(다시해보세요 등)

방법2. 완전비동기로 설계하여 고객에게는 주문이전송되었습니다 정도로하고 성공은 비동기로 고객에게 전달

다만, 현재 페이지가 유지될경우 websocket을 통해 상태메시지 갱신. 이때 api서버와 push서버는 분리해주는 게 좋음(영향을 줄 수 있으므로)

방법3. 완전동기화. 모래시계로 외부api가 timeout날때까지 기다리거나 하는 방식인데 권장되지는 않음. 정말 중요한 프로세스면 선택할수도

kafka설치과정

Java설치

카프카는 Java기반이므로 java jdk가 안깔렸으면 깔아준다.

sudo apt update
sudo apt install default-jdk

사용자계정에 종속된 설치보다는 여러명이 쓸 수 있도록 아래 처럼 전용 유저를 생성해주는게 좋다.

sudo adduser kafka
sudo adduser kafka sudo

카프카는 자바 기반이므로 jar파일을 받아서 설정해준다. apt등의 패키지 매니저를 사용하면 예전버전일수 있으므로 다운로드해서 해보자.

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir /opt/kafka && cd /opt/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

#kafka디렉토리 소유권을 kafka사용자로 변경
sudo useradd kafka -m
sudo chown -R kafka:kafka /opt/kafka

/opt/kafka에 설치했으며, /opt 디렉토리는 선택적인(add-on) 애플리케이션 SW패키지를 위한 곳으로 여러사용자가 이용하는 라이브러리인 경우 선호되는 경로다.

zookeeper설정

zookeeper는 kafka와 독립적으로 실행되며, 비슷한 설정 매커니즘을 갖는다.

zookeeper설정파일 수정

sevity@sevityubuntu:/opt/kafka/config$ cat zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/var/cache/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

다른 부분은 그냥 두었고, dataDir은 /var/cache 밑에 두어 휘발되지 않도록 설정했다. (기본은 /tmp 밑에 있었던듯)

이를 위해 다음과 같은 권한 설정을 했다.

sudo mkdir /var/cache/zookeeper
sudo chown kafka:kafka /var/cache/zookeeper
sudo chmod 700 /var/cache/zookeeper

zookeeper에 대해 systemctl에 등록해서 부팅시 마다 실행되도록 하기위해 다음 파일 작성

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start zookeeper
sudo systemctl enable zookeeper

kafka에 대해서도 마찬가지로 작성

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start kafka
sudo systemctl enable kafka

트러블 슈팅

서버로그 실시간 모니터링

tail -f /opt/kafka/logs/server.log

jps로 kafka가 잘 실행되고 있는지 확인(QuorumPeerMain과 Kafka가 떠있으면OK)

su - kafka
jps
109092 Jps
108186 QuorumPeerMain(Zookeeper의 시작점)
108573 Kafka

토픽 메시지가 잘 수신되는지 확인하는 방법

가장 기본적으로는 9092포트를 리슨하고 있는지 확인

sevity@sevityubuntu:/opt/kafka/config$ lsof -i :9092
COMMAND    PID   USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    168958 sevity  116u  IPv6 1324110      0t0  TCP sevityubuntu:46330->sevityubuntu:9092 (ESTABLISHED)
java    168958 sevity  117u  IPv6 1320714      0t0  TCP sevityubuntu:46336->sevityubuntu:9092 (ESTABLISHED)

GetOffsetSell을 통해 특정 토픽에 게시된 메지시수를 확인

sevity@sevityubuntu:/opt/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic iis_log --time -1
iis_log:0:4043038

kafka-console-consumer.sh를 사용하여 특정 토픽의 메시지 수신상황을 실시간으로 확인

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iis_log --from-beginning
{"@timestamp":"2023-08-01T01:18:34.407Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.9.0"},"agent":{"version":"8.9.0","ephemeral_id":"98df9137-96e3-4653-b353-2821eebde875","id":"cf839823-29c8-4619-b9c4-85e90804e50e","name":"SEVITY-PC","type":"filebeat"},"ecs":{"version":"8.0.0"},"log":{"file":{"path":"c:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex230801.log"},"offset":16076},"message":"2023-08-01 01:17:24 192.168.0.6 GET /wiki/doku.php id=Advice%20To%20Relieve%20Your%20Acid%20Reflux%20Signs%20and%20symptoms 80 - 196.245.181.117 Mozilla/5.0+(X11;+Ubuntu;+Linux+x86_64;+rv:114.0)+Gecko/20100101+Firefox/114.0 http://sevity.com/ 200 0 0 1182","input":{"type":"log"},"host":{"mac":["00-15-83-EA-2C-EE","00-23-24-63-E7-E3","02-50-41-00-00-01","88-36-6C-F7-D9-04","88-36-6C-F7-D9-06","88-36-6C-F7-D9-07"],"hostname":"sevity-pc","name":"sevity-pc","architecture":"x86_64","os":{"family":"windows","name":"Windows 10 Enterprise","kernel":"10.0.19041.3208 (WinBuild.160101.0800)","build":"19045.3208","type":"windows","platform":"windows","version":"10.0"},"id":"dbb18b3d-fb42-49ec-b731-f430dd2f3fd5","ip":["fe80::a30b:d99a:f7ed:2bac","192.168.100.15","fe80::a4ce:c09a:ea91:8a9","169.254.142.88","fe80::9c93:77c8:66bd:b6c5","169.254.55.217","fe80::dd77:ec13:69c4:6922","169.254.56.104","fe80::310a:1b3d:e9a3:431c","192.168.0.6","fe80::2c27:d03a:a322:765b","169.254.236.36"]}}
반응형
반응형

ELK Stack은 로그를 수집하고 저장한 뒤 Kibana에서 검색과 시각화를 할 수 있게 묶은 로그 분석 구성입니다.

Filebeat는 로그 파일을 읽어 보내고, Kafka는 중간 버퍼 역할을 하며, Logstash는 데이터를 가공해 Elasticsearch에 넣는 흐름으로 이해하면 됩니다.

 

핵심 정리

ELK 로그 파이프라인을 볼 때는 각 컴포넌트의 책임을 분리해서 보는 것이 중요합니다. Filebeat는 서버에 가볍게 붙어 파일 변경을 읽고 이벤트를 전달합니다. Kafka를 사이에 두면 로그가 몰릴 때도 생산자와 소비자를 분리하고 버퍼링할 수 있습니다. Logstash는 필터로 필드를 파싱하거나 변환한 뒤 Elasticsearch에 저장합니다. Kibana는 저장된 로그를 검색하고 대시보드로 시각화하는 역할을 맡습니다.

  • Filebeat는 애플리케이션 서버의 로그 파일을 읽어 전달합니다.
  • Kafka는 로그 이벤트가 몰릴 때 중간 큐와 버퍼 역할을 할 수 있습니다.
  • Logstash는 grok, mutate 같은 필터로 로그 필드를 가공합니다.
  • Elasticsearch는 가공된 로그를 검색 가능한 인덱스로 저장합니다.
  • Kibana는 Elasticsearch에 저장된 로그를 검색하고 시각화합니다.
  • 운영 환경에서는 파싱 실패, 인덱스 이름, 타임스탬프, 보존 기간을 함께 봐야 합니다.
  • 로그 파이프라인 문제는 수집, 전송, 가공, 저장, 조회 단계별로 끊어서 확인하는 것이 빠릅니다.

원문은 IIS 로그를 분석 서버로 보내는 실습 흐름이므로, Filebeat, Kafka, Logstash, Elasticsearch, Kibana의 역할을 먼저 나누었습니다. 전체 구조를 역할별로 보면 어느 구간에서 로그가 끊겼는지 추적하기가 쉬워집니다.

이어서 볼 글

 

 

먼저 IIS서버에 Filebeat를 깔아서 우분투 분석 서버에 kafka전송을 한다.

반응형
반응형

Spark 웹서버 세션분석 실습은 IIS 로그를 읽어 IP별 접근 시간과 URL을 묶고, 연속된 방문 기록을 세션 단위로 정리하는 데이터 처리 흐름을 보여 준다.

이 글은 Hadoop 방식의 map-reduce 실습과 비슷한 문제를 Spark로 처리하며, 원본 IIS 로그에서 IP, 시간, URL을 추출해 사용자별 방문 흐름을 만드는 메모다.

 

핵심 정리

웹서버 세션분석의 기본 목표는 원본 로그를 사람이 읽기 쉬운 방문 단위로 바꾸는 것이다. IIS 로그에는 날짜, 시간, 서버 IP, 요청 메서드, URI, 포트, 클라이언트 IP, User-Agent, Referer, 상태 코드, 처리 시간 같은 필드가 들어 있다. 원문 실습은 이런 로그를 읽어 클라이언트 IP별로 접근 시간과 URL을 모으고, 시간 순서에 따라 묶인 방문 흐름을 출력한다. Spark를 쓰면 파일 여러 개를 분산 처리하고, map 단계에서 필요한 필드를 뽑고, reduce나 groupBy 계열 처리로 IP별 이벤트를 모을 수 있다. 세션분석에서는 정렬 기준과 세션을 끊는 시간 간격을 어떻게 정할지가 결과 품질에 큰 영향을 준다.

  • IIS 로그에서 분석에 필요한 필드를 먼저 골라낸다.
  • 클라이언트 IP는 방문자를 묶는 기본 키로 사용할 수 있다.
  • 요청 시간과 URL을 함께 남기면 방문 순서를 복원할 수 있다.
  • map 단계에서는 원본 로그 한 줄을 구조화된 이벤트로 바꾼다.
  • reduce나 groupBy 단계에서는 IP별 이벤트를 모은다.
  • 시간 순 정렬을 해야 실제 방문 흐름을 읽을 수 있다.
  • 세션 구분 기준은 일정 시간 이상 간격이 벌어졌는지로 정할 수 있다.
  • 봇, 정적 파일, 상태 코드 필터링 여부에 따라 분석 결과가 달라질 수 있다.

원문은 IIS 로그 샘플과 Spark 처리 결과 예시를 보여 주는 실습 글입니다. 보강문에서는 로그 필드 추출, IP별 그룹화, 시간 정렬, 세션 분리 기준이라는 분석 흐름을 먼저 정리했습니다. 세션분석은 코드보다 기준 정의가 중요하므로, 어떤 요청을 방문으로 볼지와 언제 새 세션으로 나눌지를 명확히 정해야 합니다.

 

먼저 하둡버전을 보고 오길 권한다.

스팍 설치에 관한 부분은 여기 참조.

우리는 하둡버전과 비슷하게 IIS웹서버를 분석해서 map - reduce과정을 통해 IP별 접근 시간및 URL을 정리해서 볼 것이다. 오리지널 웹서버 로그는 대략 다음과 같으며,

sevity@sevityubuntu:~/workspace/hadoop_sevity.com/session_analysis$ cat ../iis_logs/* | head
#Software: Microsoft Internet Information Services 10.0
#Version: 1.0
#Date: 2015-09-06 08:32:43
#Fields: date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status sc-substatus sc-win32-status time-taken
2015-09-06 08:32:43 127.0.0.1 GET / - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 - 200 0 0 211
2015-09-06 08:32:43 127.0.0.1 GET /iisstart.png - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 200 0 0 2
2015-09-06 08:32:43 127.0.0.1 GET /favicon.ico - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 404 0 2 2

map-reduce를 거친 결과는 대략 다음과 같을 것이다.

sevity@sevityubuntu:~/workspace/hadoop_spark/session_analysis$ cat session_output_merged.txt | head
157.55.39.90
[[(datetime.datetime(2016, 6, 7, 20, 44, 21), '/')], [(datetime.datetime(2016, 6, 8, 4, 20, 31), '/menu_wonilnet.asp'), (datetime.datetime(2016, 6, 8, 4, 20, 43), '/menu_friend.asp')], [(datetime.datetime(2019, 9, 16, 23, 58, 6), '/menu_friend.asp')], [(datetime.datetime(2021, 8, 17, 15, 8, 50), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 17, 17, 36, 23), '/menu_wonilnet.asp'), (datetime.datetime(2021, 8, 17, 17, 36, 24), '/menu_friend.asp')], [(datetime.datetime(2021, 8, 18, 1, 53, 52), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 19, 2, 40, 48), '/wiki/doku.php')], [(datetime.datetime(2021, 8, 19, 6, 45, 32), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 20, 10, 0, 31), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 21, 3, 55, 36), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 21, 4, 55, 13), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 23, 6, 49, 26), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 24, 1, 45, 47), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 24, 9, 58, 34), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 25, 11, 55, 24), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 26, 8, 22, 7), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 26, 13, 56, 19), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 27, 21, 59, 24), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 28, 20, 3, 46), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 28, 23, 51, 21), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 29, 18, 40, 40), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 29, 20, 6, 8), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 31, 1, 38, 52), '/wiki/doku.php')], [(datetime.datetime(2021, 8, 31, 5, 29, 5), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 31, 6, 28, 20), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 2, 0, 32, 17), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 2, 16, 19, 20), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 9, 3, 2, 43), '/menu_wonilnet.asp'), (datetime.datetime(2021, 9, 9, 3, 2, 45), '/menu_friend.asp')], [(datetime.datetime(2021, 9, 16, 7, 52, 4), '/wiki/doku.php')]]
138.197.96.197
[[(datetime.datetime(2017, 2, 15, 17, 21, 24), '/'), (datetime.datetime(2017, 2, 15, 17, 21, 26), '/menu_wonilnet.asp'), (datetime.datetime(2017, 2, 15, 17, 21, 26), '/board/dhtml_logo.js')]]
104.236.164.185
[[(datetime.datetime(2017, 11, 28, 11, 45, 56), '/')]]
181.213.93.231
[[(datetime.datetime(2018, 1, 30, 13, 1, 24), '/hndUnblock.cgi'), (datetime.datetime(2018, 1, 30, 13, 1, 28), '/tmUnblock.cgi'), (datetime.datetime(2018, 1, 30, 13, 1, 32), '/moo'), (datetime.datetime(2018, 1, 30, 13, 1, 36), '/'), (datetime.datetime(2018, 1, 30, 13, 1, 42), '/getcfg.php'), (datetime.datetime(2018, 1, 30, 13, 1, 49), '/getcfg.php')]]
117.80.147.63
[[(datetime.datetime(2018, 2, 14, 8, 13, 12), '/')]]

실습을 위한 디렉토리 구조는 대략 다음과 같다.

sevity@sevityubuntu:~/workspace/hadoop_spark$ tree -L 3
.
├── iis_logs (여기에 분석하려는 웹로그 파일들이 있습니다. 실제로는 수없이 많음)
│   ├── u_ex230101.log
│   ├── u_ex230102.log
│   └── u_ex230103.log
├── session_analysis
│   ├── session_output
│   │   ├── part-00001
│   │   ├── part-00002
│   │   ├── part-00003
│   │   └── _SUCCESS
│   ├── session_output_merged.txt
│   └── spark_session_analysis.py(우리가 작성할 python 코드의 위치)
└── spark_test

다음과 같이 spark_session_analysis.py를 작성하자.

여기서 중요한 점은 다음과 같다.

  • 스팍은 하둡과 다르게 하나의 파일에서 map과 reduce를 모두 수행한다.
  • 각 동작은 함수호출 형태로 진행되며,
  • 아래에서 reduce에 해당하는 함수는 groupByKey()이다.
from pyspark import SparkConf, SparkContext
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

def parse_log_line(line):
    try:
        parts = line.strip().split()
        if len(parts) != 15:
            return None

        date_time = parts[0] + " " + parts[1]
        date_time = datetime.strptime(date_time, '%Y-%m-%d %H:%M:%S')
        url = parts[4]
        ip = parts[8]  # changed from parts[2] to parts[8] to use the client IP

        return (ip, (date_time, url))
    except:
        return None

def calculate_sessions(group):
    ip, data = group
    data = list(data)
    data.sort(key = lambda x: x[0])
    sessions = []
    session = [data[0]]
    for i in range(1, len(data)):
        if data[i][0] - session[-1][0] > timedelta(seconds=1800):
            sessions.append(session)
            session = [data[i]]
        else:
            session.append(data[i])
    sessions.append(session)
    return (ip, sessions)


# Spark configuration
# [*]하면 모든 코어 사용
conf = SparkConf().setMaster('local[*]').setAppName('Log Analysis')
sc = SparkContext(conf=conf)

log_lines = sc.textFile('hdfs://localhost:9000/logs/u_ex*')
parsed_lines = log_lines.map(parse_log_line).filter(lambda x: x is not None)
grouped_by_ip = parsed_lines.groupByKey()
sessions = grouped_by_ip.flatMap(calculate_sessions)
sessions.saveAsTextFile('hdfs://localhost:9000/session_output')

다음 명령을 통해 실행하고 결과를 확인한다.

# hdfs에 reduce결과를 쌓을 곳을 clean-up해준다. 
hdfs dfs -rm -r /session_output

# 로컬파일시스템에 복사해올 경로도 clean-up해준다.
rm -rf session_output
rm -f session_output_merged.txt

# python 파일을 실행한다. spark-submit을 통해 실행하면 4040포트를 통해 웹에서 진행상황 확인가능
$SPARK_HOME/bin/spark-submit spark_session_analysis.py 2>&1 | tee r.txt 

# HDFS에서 로컬파일시스템으로 결과 폴더 복사
hadoop fs -get /session_output .

# 위의 폴더구조에서 볼 수 있듯 병렬처리 때문에 결과파일이 여러개로 분리돼 있는데 합쳐준다.
# 합치고 로컬파일시스템으로 복사하는 것까지.
hadoop fs -getmerge hdfs://localhost:9000/session_output ./session_output_merged.txt

# 위의 방법대신 python코드내에서 다음 방법으로 머지해도 된다.
# 단 스팍을 써서 머지하는 방법인 만큼 메모리가 터질수도 있으니 주의
sessions.coalesce(1).saveAsTextFile('hdfs://localhost:9000/session_output')
반응형
반응형

Spark 설치는 단순히 패키지를 받는 일보다 Java와 Hadoop 연동, SPARK_HOME, PATH, PYSPARK_PYTHON, spark-env.sh, spark-shell, spark-submit 확인까지 이어지는 환경 구성 작업으로 보는 것이 좋다.

이 글은 PySpark만 간단히 설치하는 방법과 Hadoop 연동을 위해 pre-built Spark 패키지를 내려받아 환경변수와 설정 파일을 맞추는 흐름을 함께 정리한 메모다.

 

핵심 정리

Spark는 JVM 기반 빅데이터 처리 프레임워크이며 Scala, Java, Python, R 같은 여러 언어에서 사용할 수 있다. 간단한 Python 실습만 필요하다면 pyspark 설치로 시작할 수 있지만, 기존 Hadoop과 HDFS를 함께 쓰려면 Spark 배포판, Hadoop 설정 경로, 환경변수를 맞춰야 한다. 원문 예시는 Spark 압축 파일을 풀고 SPARK_HOME과 PATH를 지정한 뒤 spark-env.sh에 HADOOP_CONF_DIR과 SPARK_HOME을 연결하는 흐름을 보여 준다. 설치 확인은 spark-shell 프롬프트가 뜨는지 보는 단계와 spark-submit으로 HDFS의 test.txt를 읽는 단계로 나눌 수 있다. 버전과 다운로드 경로는 시간이 지나면 바뀔 수 있으므로 실제 설치 시에는 공식 배포 페이지의 현재 버전과 Hadoop 호환 패키지를 확인해야 한다.

  • Spark는 JVM 위에서 동작하며 여러 언어 인터페이스를 제공한다.
  • 간단한 PySpark 실습만 필요하면 pyspark 설치로 시작할 수 있다.
  • Hadoop과 연동하려면 Hadoop용으로 빌드된 Spark 배포판을 고르는 편이 좋다.
  • SPARK_HOME은 Spark 설치 경로를 가리키도록 설정한다.
  • PATH에 Spark 실행 파일 경로를 추가하면 spark-shell과 spark-submit을 쉽게 실행할 수 있다.
  • HADOOP_CONF_DIR은 Spark가 Hadoop 설정을 찾는 데 필요하다.
  • spark-shell 실행은 기본 설치 확인에 좋다.
  • spark-submit으로 HDFS 파일을 읽어 보면 Spark와 Hadoop 연동을 함께 확인할 수 있다.

원문은 특정 Spark 버전과 Hadoop 연동 환경을 기준으로 설치 과정을 기록한 글입니다. 보강문에서는 버전 번호보다 설치 흐름과 확인 지점을 중심으로 정리했습니다. 실제 환경에서는 Spark, Java, Hadoop, Python 버전 조합이 중요하므로, 명령어를 그대로 복사하기 전에 현재 배포판과 호환성을 먼저 확인하는 편이 안전합니다.

기본개념

스파크(Spark)는 하둡과 마찬가지로 자바(Java) 기반의 빅 데이터 처리 프레임워크입니다. 
하지만 스파크는 자바 외에도 스칼라(Scala), 파이썬(Python), R 등 여러 언어를 지원합니다.

스파크의 핵심 코드는 스칼라로 작성되어 있습니다. 스칼라는 함수형 프로그래밍과 객체지향 프로그래밍을 모두 지원하는 현대적인 다중 패러다임 프로그래밍 언어입니다. 스칼라는 JVM(Java Virtual Machine) 위에서 동작하므로 자바와 호환성이 좋습니다.

스파크는 또한 파이스파크(PySpark)라는 인터페이스를 통해 파이썬에서도 사용할 수 있습니다. 이는 데이터 과학자들이 파이썬으로 빅 데이터 처리를 수행할 수 있게 해주는 큰 장점입니다. 

설치과정

가장 간단하게는 그냥 pip install pyspark 하면 깔린다.

하지만, 먼저 설치한 하둡하고 연결하려면 아래 과정을 거치면 된다.

먼저 공식홈페이지에 가서 Pre-built for Apache Hadoop을 선택하고 다운로드를 받는다.

아래는 해당 다운로드 링크를 wget으로 받는 과정을 설명한다.

아래처럼 wget으로 받고 압축을 풀어준다.

cd /opt/spark
sudo wget https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
sudo tar -xvzf spark-3.4.1-bin-hadoop3.tgz

그다음 ~/.bashrc파일에 아래 3줄을 추가하고 source ~/.bashrc로 적용해준다.

export SPARK_HOME=/opt/spark/spark-3.4.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3

그다음 설정파일에 대한 설정을 해준다.

cd /opt/spark/spark-3.4.1-bin-hadoop3/conf
cp spark-env.sh.template spark-env.sh

#spark-env.sh에 아래 두 줄 추가
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_HOME=$SPARK_HOME

#추가한 내용 반영
source spark-env.sh

그 다음 pyspark을 설치해준다.

pip3 install pyspark

설치가 잘 됐는지 테스트

다음 명령어를 통해 spark shell이 잘 뜨는지 확인(scala> 프롬프트가 뜨면 성공)

$SPARK_HOME/bin/spark-shell

다음 파이선 스크립트를 통해 하둡 HDFS와 잘 연동되는지 확인

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.text("hdfs://localhost:9000/test.txt")
df.show()
# 먼저 hdfs로 test.txt를 복사해준다.
$ cat test.txt
hello spark
$ hdfs dfs -put test.txt /

# 그다음 다음명령으로 test.py 수행
$ $SPARK_HOME/bin/spark-submit test.py
+-----------+
|      value|
+-----------+
|hello spark|
+-----------+

위의 spark-submit을 통한 실행시 test.py가 끝나기 전에 4040포트로 웹접속을 하면 다음과 같이 웹으로 경과확인이 가능하다!

반응형
반응형

Hadoop으로 IIS 웹서버 로그를 분석할 때는 HDFS에 로그를 올린 뒤, IP와 시간 기준으로 방문 세션을 묶는 MapReduce 흐름을 먼저 잡아야 합니다.

이 글은 Hadoop 초기화, HDFS와 YARN 시작, IIS 로그 컬럼 확인, client IP별 30분 단위 세션 묶기를 실습한 기록입니다.

 

핵심 정리

웹서버 세션 분석은 로그 한 줄을 그대로 세는 작업이 아니라 같은 사용자의 연속 요청을 하나의 방문 흐름으로 묶는 작업입니다. 원문에서는 Hadoop을 재초기화하고 HDFS와 YARN을 시작한 뒤 IIS 로그에서 date, time, c-ip, cs-uri-stem 컬럼을 뽑아 사용합니다. 이후 mapper 단계에서는 IP와 요청 정보를 정리하고, reducer 단계에서는 같은 IP의 요청을 시간순으로 정렬해 30분 이상 끊기면 새 세션으로 나누는 방식으로 접근합니다.

  • HDFS는 로그 파일을 분산 파일 시스템에 저장하는 역할을 맡습니다.
  • YARN은 MapReduce 같은 작업이 클러스터 자원을 사용하도록 관리합니다.
  • IIS 로그에서는 date, time, c-ip, cs-uri-stem 같은 컬럼이 세션 분석의 핵심 입력입니다.
  • 세션 기준은 보통 같은 client IP에서 일정 시간 안에 이어진 요청 묶음으로 잡습니다.
  • 원문에서는 30분 단위로 요청 간격을 끊어 새 세션 여부를 판단하는 흐름을 사용했습니다.
  • 결과는 IP별 start_time, end_time, duration, url_list 형태로 보면 디버깅이 쉽습니다.

원문은 Hadoop 명령, IIS 로그 샘플, MapReduce로 IP별 세션을 묶는 출력 예시가 길게 이어진 실습 글이었습니다. 보강 블록은 실습의 목표와 데이터 컬럼, 세션 분리 기준을 먼저 드러내도록 정리했습니다.

이어서 볼 글

 

먼저 필요한경우 하둡을 포맷하고 초기화 해준다.

stop-dfs.sh  # 분산 파일 시스템(HDFS) 중지
stop-yarn.sh # 리소스 관리자(YARN)중지
hdfs namenode -format
start-dfs.sh
# YARN은 Hadoop의 주요 컴포넌트 중 하나로, 클러스터 리소스 관리 및 job scheduling을 담당하고 있습니다.
# 따라서, 만약 MapReduce 작업을 수행할 예정이라면, 이 명령어를 통해 YARN을 시작해야 합니다.
start-yarn.sh

# 나같은 경우 아래를 해야하는 경우가 있었다.
rm -rf /hadoop/data/*

# 헬스체크
hadoop fsck /

내 웹서버 로그 샘플은 다음과 같음

evity@sevityubuntu:~/workspace/hadoop_sevity.com/session_analysis$ cat ../iis_logs/* | head
#Software: Microsoft Internet Information Services 10.0
#Version: 1.0
#Date: 2015-09-06 08:32:43
#Fields: date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status sc-substatus sc-win32-status time-taken
2015-09-06 08:32:43 127.0.0.1 GET / - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 - 200 0 0 211
2015-09-06 08:32:43 127.0.0.1 GET /iisstart.png - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 200 0 0 2
2015-09-06 08:32:43 127.0.0.1 GET /favicon.ico - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 404 0 2 2

이중에서 다음 컬럼을 뽑아서 map-reduce를 해보도록 하자.

날짜와 시간 (2015-09-06 08:32:43)
클라이언트 IP (c-ip)
요청된 URL (cs-uri-stem)

먼저 매핑과정을 통해 다음처럼 클라이언트 IP별로 30분 단위로 세션으로 묶어서 URL들을 뽑는다.

119.207.72.161  {"start_time": "2020-10-03 00:14:47", "end_time": "2020-10-03 00:14:48", "duration": 1.0, "url_list": ["/favicon.ico", "/board/dhtml_logo.js", "/menu_wonilnet.asp", "/", "/css/common.css"]}
66.249.65.54    {"start_time": "2020-10-03 00:27:31", "end_time": "2020-10-03 00:27:31", "duration": 0.0, "url_list": ["/wiki/doku.php"]}
80.82.70.187    {"start_time": "2020-10-03 01:59:09", "end_time": "2020-10-03 01:59:09", "duration": 0.0, "url_list": ["/cache/global/img/gs.gif"]}
139.59.95.139   {"start_time": "2020-10-03 02:11:31", "end_time": "2020-10-03 02:11:31", "duration": 0.0, "url_list": ["/"]}

그다음 리듀스 과정을 통해 IP별로 총 체류시간과 url_list를 묶어준다.

66.249.64.128   {"total_duration": 1694.0, "url_list": ["/menu_wonilnet.asp", "/menu_friend.asp", "/robots.txt", "/", "/menu_wonilnet_newImage.asp", "/favicon.ico"], "start_time": "2017-09-03 04:19:21", "end_time": "2018-07-29 20:43:11"}
66.249.70.30    {"total_duration": 1694.0, "url_list": ["/menu_wonilnet.asp", "/menu_friend.asp", "/robots.txt", "/wiki/doku.php", "/wiki/lib/tpl/dokuwiki/images/favicon.ico", "/", "/wiki/feed.php", "/wiki/lib/exe/js.php", "/wiki/lib/exe/css.php", "/css/common.css", "/wiki/lib/exe/detail.php", "/favicon.ico", "/wiki/lib/exe/fetch.php"], "start_time": "2017-07-26 01:31:14", "end_time": "2022-02-01 09:49:49"}
2.37.132.159    {"total_duration": 1689.0, "url_list": ["/"], "start_time": "2018-03-23 22:13:24", "end_time": "2018-03-23 22:43:34"}

폴더구조는 다음과 같으며, 이번 프로젝트의 현재 디렉토리는 session_analysis이다.

sevity@sevityubuntu:~/workspace/hadoop_sevity.com$ tree -L 1
.
├── iis_logs
├── ip_cnt
└── session_analysis

mapper.py를 다음과 같이 작성한다.

sevity@sevityubuntu:~/workspace/hadoop_sevity.com/session_analysis$ cat mapper.py
#!/usr/bin/env python3

import sys
import json
import codecs
from datetime import datetime, timedelta
import logging

logging.basicConfig(stream=sys.stderr, level=logging.ERROR)

previous_ip = None
url_list = []
start_time = None
end_time = None
session_timeout = timedelta(seconds=1800)  # 1800 seconds = 30 minutes

def reset_variables():
    global url_list, start_time, end_time
    url_list = []
    start_time = None
    end_time = None

# Using codecs to get stdin with a fallback in case of a UTF-8 decoding error
stdin = codecs.getreader('utf-8')(sys.stdin.buffer, errors='ignore')

for line in stdin:
    try:
        parts = line.strip().split()
        if len(parts) != 15:
            continue

        date_time = parts[0] + " " + parts[1]
        date_time = datetime.strptime(date_time, '%Y-%m-%d %H:%M:%S')

        ip = parts[8]  # changed from parts[2] to parts[8] to use the client IP
        url = parts[4]

        # If there is a previous ip and the current ip is different or a session timeout has occurred
        if previous_ip and (previous_ip != ip or (date_time - end_time) > session_timeout):
            print('%s\t%s' % (previous_ip, json.dumps({"start_time": str(start_time), "end_time": str(end_time), "duration": (end_time - start_time).total_seconds(), "url_list": list(set(url_list))})))
            reset_variables()

        if not start_time or date_time < start_time:
            start_time = date_time
        if not end_time or date_time > end_time:
            end_time = date_time

        url_list.append(url)
        previous_ip = ip

    except Exception as e:
        logging.error(f"Error processing line: {line.strip()}, Error: {e}")

# Print the last session
if previous_ip:
    print('%s\t%s' % (previous_ip, json.dumps({"start_time": str(start_time), "end_time": str(end_time), "duration": (end_time - start_time).total_seconds(), "url_list": list(set(url_list))})))

reducer.py를 다음과 같이 작성한다.

sevity@sevityubuntu:~/workspace/hadoop_sevity.com/session_analysis$ cat reducer.py
#!/usr/bin/env python3

import sys
import json

previous_ip = None
total_duration = 0.0
url_list = []
start_time = None
end_time = None

for line in sys.stdin:
    ip, session_info = line.strip().split('\t')
    session_info = json.loads(session_info)

    if previous_ip and previous_ip != ip:
        print('%s\t%s' % (previous_ip, json.dumps({"total_duration": total_duration, "url_list": list(set(url_list)), "start_time": start_time, "end_time": end_time})))

        total_duration = 0.0
        url_list = []
        start_time = None
        end_time = None

    if not start_time or session_info["start_time"] < start_time:
        start_time = session_info["start_time"]
    if not end_time or session_info["end_time"] > end_time:
        end_time = session_info["end_time"]

    total_duration += float(session_info["duration"])
    url_list.extend(session_info["url_list"])

    previous_ip = ip

if previous_ip:
    print('%s\t%s' % (previous_ip, json.dumps({"total_duration": total_duration, "url_list": list(set(url_list)), "start_time": start_time, "end_time": end_time})))

다음명령을 통해 map-reduce과정을 병렬로 진행한다.

hdfs dfs -rm -r /output && \
	hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar    \
    -file ~/workspace/hadoop_sevity.com/session_analysis/mapper.py     \
    -mapper 'python3 mapper.py'     \
    -file ~/workspace/hadoop_sevity.com/session_analysis/reducer.py     \
    -reducer 'python3 reducer.py'     \
    -input /logs/* -output /output \
    2>&1 | tee r.txt

결과를 HDFS에서 로컬파일시스템으로 가져온다.

rm -rf output && hadoop fs -get /output .

다음 python파일을 통해 duration역순으로 정렬한다.

import json
import sys

def get_duration(json_str):
    data = json.loads(json_str)
    return data.get('total_duration', 0)

lines = sys.stdin.readlines()

# 각 줄을 total_duration 값에 따라 정렬
lines.sort(key=lambda line: get_duration(line.split('\t', 1)[1]), reverse=True)

for line in lines:
    print(line, end='')
python3 sort.py < output/part-00000 > result.txt

result.txt를 보면 다음과 같다.

45.61.187.81    {"total_duration": 186353.0, "url_list": ["/"], "start_time": "2022-07-23 09:13:06", "end_time": "2022-08-22 20:57:09"}
127.0.0.1       {"total_duration": 161933.0, "url_list": ["/w/lib/tpl/dokuwiki/images/favicon.ico", "/wonilnet_pub/2010-06-09_192759_resize.png", "/w/lib/exe/css.php", "/image/top_menu_icon.gif", "/upgsvr/service.exe", "/favicon.ico", "/index.asp", "/w/lib/tpl/dokuwiki/images/pagetools-sprite.png", "/scrape", "/w/lib/tpl/dokuwiki/images/search.png", "/wiki/doku.php", "/w/lib/tpl/dokuwiki/images/usertools.png", "/", "/images/next5.gif", "/wonilnet_pub/2010-06-09_192656_resize.png", "/w/lib/tpl/dokuwiki/images/button-php.gif", "/image/rightconer_shadow.gif", "/menu_wonilnet_dbwork.asp", "/iisstart.png", "/w/lib/images/external-link.png", "/tc.js", "/w", "/css/common.css", "/a.php", "/images/num1_on_a.gif", "/w/lib/tpl/dokuwiki/images/button-donate.gif", "/w/lib/tpl/dokuwiki/images/button-html5.png", "/menu_friend.asp", "/w/lib/tpl/dokuwiki/images/button-dw.png", "/announce.php", "/w/doku.php", "/w/lib/tpl/dokuwiki/images/logo.png", "/announce", "/images/nonext.gif", "/images/num5_off.gif", "/image/garo_shadow.gif", "/scrape.php", "/image/sero_shadow.gif", "/w/lib/tpl/dokuwiki/images/button-css.png", "/images/num2_off.gif", "/menu_wonilnet.asp", "/favicon.png", "/image/leftconer_shadow.gif", "/images/num4_off.gif", "/images/num3_off.gif", "/image/line02.gif", "/board/dhtml_logo.js", "/w/lib/tpl/dokuwiki/images/page-gradient.png", "/w/lib/images/license/button/cc-by-sa.png", "/w/lib/exe/js.php", "/pub/02111401.jpg", "/w/lib/exe/indexer.php"], "start_time": "2015-09-06 08:32:43", "end_time": "2021-10-13 12:56:47"}
45.61.188.237   {"total_duration": 121316.0, "url_list": ["/"], "start_time": "2022-10-03 20:31:14", "end_time": "2022-10-26 20:10:54"}
64.62.252.174   {"total_duration": 102775.0, "url_list": ["/stock/test1/main.cpp", "/menu_wonilnet.asp", "/menu_friend.asp", "/stock/test1/id.cpp", "/robots.txt", "/wiki/lib/exe/css.php", "/wiki/lib/exe/opensearch.php", "/wiki/feed.php", "/wiki/doku.php", "/wiki/lib/exe/", "/wiki/lib/exe/indexer.php", "/wiki/lib/exe/js.php", "/wiki/lib/exe/mediamanager.php", "/wiki/", "/wiki/lib/exe/detail.php", "/wiki/lib/exe/fetch.php"], "start_time": "2022-08-11 04:54:40", "end_time": "2023-07-06 16:23:29"}

이 과정을 통해 다음과 같이 해킹을 시도하는 서버접근을 발견할 수 있었다.

124.173.69.66   {"total_duration": 1685.0, "url_list": ["/xw1.php", "/9510.php", "/link.php", "/sbkc.php", "/index.php", "/phpmyadmin/scripts/setup.php", "/xmlrpc.php", "/jsc.php.php", "/phpmyadmin3333/index.php", "/phpNyAdmin/index.php", "/hm.php", "/411.php", "/hhhhhh.php", "/key.php", "/92.php", "/pma/scripts/db___.init.php", "/aojiao.php", "/1/index.php", "/3.php", "/slider.php", "/plus/tou.php", "/g.php", "/ssaa.php", "/sss.php", "/lala-dpr.php", "/605.php", "/admin/index.php", "/plus/yunjitan.php", "/d.php", "/boots.php", "/win1.php", "/hell.php", "/uploader.php", "/images/vuln.php", "/error.php", "/que.php", "/099.php", "/aa.php", "/12345.php", "/666.php", "/api.php", "/plus/90sec.php", "/xiao.php", "/fusheng.php", "/xiaoxi.php", "/plus/ma.php", "/paylog.php", "/321/index.php", "/qwqw.php", "/qiangkezhi.php", "/liangchen.php", "/mysql.php", "/ganshiqiang.php", "/images/stories/cmd.php", "/xiong.php", "/datas.php", "/xixi.php", "/aaa.php", "/awstatstotals/awstatstotals.php", "/conflg.php", "/php2MyAdmin/index.php", "/think.php",
반응형
반응형

IIS 웹서버 로그를 Hadoop에서 분석하려면 먼저 로그 파일을 수집하고 HDFS에 적재한 뒤, 각 컬럼이 의미하는 값을 해석해야 합니다.

원문은 Windows IIS 로그를 리눅스 분석 환경으로 복사하고, hdfs dfs put으로 HDFS에 올린 뒤, date, time, c-ip, URL, status 컬럼을 확인하는 실습입니다.

 

핵심 정리

Hadoop 로그 분석의 첫 단계는 MapReduce 코드를 작성하기 전에 원본 로그가 어떤 형태인지 확인하는 것입니다. 원문에서는 scp로 IIS 로그 파일을 가져오고, HDFS의 로그 디렉터리에 적재한 뒤, head로 샘플 라인을 읽어 필드 목록을 확인합니다. IIS 로그는 주석 라인에 필드 순서가 적혀 있으므로 date, time, s-ip, cs-method, cs-uri-stem, c-ip, sc-status, time-taken 같은 컬럼을 먼저 구분해야 이후 세션 분석이나 접속 IP 집계를 안정적으로 할 수 있습니다.

  • scp 단계는 웹서버의 IIS 로그 파일을 분석 서버로 가져오는 과정입니다.
  • hdfs dfs put 단계는 로컬 로그 파일을 HDFS 분석 입력 경로로 올리는 과정입니다.
  • Hadoop 작업이 오래 걸릴 때는 로그 디렉터리의 최신 파일을 tail 방식으로 확인하면 진행 상태를 보기 쉽습니다.
  • IIS 로그에서 #Fields 라인은 뒤에 나오는 데이터 컬럼의 순서를 알려줍니다.
  • 접속 분석에는 date, time, c-ip, cs-uri-stem, sc-status, time-taken 컬럼이 자주 쓰입니다.
  • 원본 컬럼을 먼저 해석해두면 MapReduce mapper에서 어떤 값을 key와 value로 보낼지 정하기 쉽습니다.

원문은 IIS 로그를 복사하고 HDFS에 넣은 뒤 컬럼 의미를 해석하는 준비 단계가 중심이었습니다. 보강 블록은 이 글이 세션 분석 전처리 글이라는 점을 분명히 보이게 정리했습니다.

이어서 볼 글

 

어떤 IP에서 가장많이 접속했는지 확인

필자는 개인 IIS 웹서버를 운영하고 있음

다음을 통해 웹서버에서 하둡이 설치된 우분투 리눅스로 가져오고 HDFS로 복사(파일 복사가 RDB에서는 insert에 해당한다)

# 웹서버에서 하둡이설치된 리눅스로 로그파일들 복사
scp linowmik@secsm.org@192.168.0.6:/inetpub/logs/LogFiles/W3SVC1/* ./iis_logs

# HDFS로 복사
hdfs dfs -put ./iis_logs/* /logs/

# 위 복사가 너무 오래걸리면 아래 명령을 통해 진행상황 모니터링 가능
cd $HADOOP_HOME/logs
sudo tail -f $(ls -Art $HADOOP_HOME/logs | tail -n 1)
# 위명령어 설명은 다음과 같음
# ls -Art $HADOOP_HOME/logs: $HADOOP_HOME/logs 디렉토리 내의 파일을 날짜별로 오름차순 정렬합니다. -Art 옵션은 -A (숨겨진 파일 포함), -r (역순), -t (수정 시간별 정렬) 옵션을 동시에 사용하는 것입니다.
# tail -n 1: 파일 리스트 중에서 가장 최신의 파일 (리스트의 마지막 항목)을 선택합니다.
# tail -f: 선택한 파일의 내용을 실시간으로 출력합니다. -f 옵션은 'follow'의 약자로, 파일의 내용이 변경될 때마다 이를 반영하여 출력합니다.

로그가 어떤 형식인지 잠시 확인한다

sevity@sevityubuntu:~/workspace/hadoop_sevity.com$ cat iis_logs/*.log | head
#Software: Microsoft Internet Information Services 10.0
#Version: 1.0
#Date: 2015-09-06 08:32:43
#Fields: date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status sc-substatus sc-win32-status time-taken
2015-09-06 08:32:43 127.0.0.1 GET / - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 - 200 0 0 211
2015-09-06 08:32:43 127.0.0.1 GET /iisstart.png - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 200 0 0 2
2015-09-06 08:32:43 127.0.0.1 GET /favicon.ico - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 404 0 2 2

#으로 시작하는 라인을 제외하면 날짜 시간 IP 등의 순임을 알 수 있다.

  • 2023-03-23 00:20:47 : 요청이 발생한 날짜와 시간.
  • 192.168.0.6 : 서버의 IP 주소 또는 호스트 이름.
  • GET : HTTP 요청 방법. 이 경우, 클라이언트는 서버로부터 정보를 요청하고 있습니다.
  • /wiki/doku.php : 클라이언트가 요청한 리소스의 경로.
  • id=learning_rate&do=login&sectok=bac058c83457f1bfade853577f509baf : 요청에 전달된 파라미터입니다.
  • 80 : 사용된 포트 번호. HTTP의 기본 포트는 80입니다.
  • - : 이 필드는 일반적으로 RFC 1413 ident 프로토콜을 통해 확인된 원격 사용자 이름을 나타냅니다. 여기서 "-"는 이 정보가 사용 불가능하다는 것을 의미합니다.
  • 216.244.66.237 : 클라이언트의 IP 주소.
  • Mozilla/5.0+(compatible;+DotBot/1.2;++https://opensiteexplorer.org/dotbot;+help@moz.com) : 사용자 에이전트. 이는 클라이언트가 사용하는 웹 브라우저 또는 봇을 설명합니다. 이 경우에는 DotBot이라는 봇이 서버에 요청을 보냈습니다.
  • - : 이 필드는 일반적으로 "referrer" URL을 나타냅니다. 여기서 "-"는 이 정보가 사용 불가능하다는 것을 의미합니다.
  • 200 : HTTP 상태 코드. 200은 성공을 의미합니다.
  • 0 : 서브 상태 코드. 이는 서버에 따라 다르게 해석될 수 있습니다.
  • 0 : Win32 상태 코드. 이는 서버에 따라 다르게 해석될 수 있습니다.
  • 1068 : 서버가 클라이언트에게 보낸 바이트 .

이를위한 mapper.py를 다음처럼 작성한다.

import sys
import logging

# Set up logging
# logging.basicConfig(filename="mapper.log", level=logging.DEBUG)
# 아래처럼 하면 하둡로그랑 섞여서 나와서 디버깅할때 좋다.
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

# Ignore bad characters
for line in sys.stdin.buffer:
    try:
        line = line.decode('utf-8', 'ignore')
    except UnicodeDecodeError as e:
        logging.error(f"Error decoding line: {line.strip()}, Error: {e}")
        continue

    try:
        data = line.strip().split(" ")
        if len(data) == 15:
            date, time, s_ip, cs_method, cs_uri_stem, cs_uri_query, s_port, cs_username, c_ip, cs_user_agent, cs_referer, sc_status, sc_substatus, sc_win32_status, time_taken = data
            logging.debug(f"Client IP: {c_ip}, Count: 1")
            print("{0}\t{1}".format(c_ip, 1))
    except Exception as e:
        # 예외 발생 시 로그 기록
        logging.error(f"Error processing line: {line.strip()}, Error: {e}")

관심있는 IP와 카운트1을 결과로 출력함을 알 수 있다. (이러면 하둡이 스트리밍하여 Reduce쪽으로 넘긴다)

이제 아래와 같이 reducer.py를 작성하여 IP별 카운트를 집계한다.

#!/usr/bin/env python3

import sys
import logging

# Set up logging
logging.basicConfig(filename="reducer.log", level=logging.DEBUG)

last_key = None
running_total = 0
key = None

try:
    for input in sys.stdin:
        key, value = input.strip().split("\t", 1)
        if last_key == key:
            running_total += int(value)
        else:
            if last_key:
                logging.debug(f"Key: {last_key}, Running Total: {running_total}")
                print("{0}\t{1}".format(last_key, running_total))
            running_total = int(value)
            last_key = key

    if last_key == key and key is not None:
        logging.debug(f"Key: {last_key}, Running Total: {running_total}")
        print("{0}\t{1}".format(last_key, running_total))

except Exception as e:
    # 예외 발생 시 로그 기록
    logging.error(f"Error processing input: {e}")

다음명령을 통해 map-reduce를 수행(RDB에서 SELECT에 해당한다)

# 이전실행 클린업
hdfs dfs -rm -r /output

# 실행
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
	-file ~/workspace/hadoop_sevity.com/mapper.py \
    -mapper 'python3 mapper.py' \
    -file ~/workspace/hadoop_sevity.com/reducer.py \
    -reducer 'python3 reducer.py' \
    -input /logs/* -output /output 2>&1 | tee r.txt

그다음 결과를 hdfs에서 로컬로 가져와서 결과 보기

# 로컬로 가져오기
hadoop fs -get /output .

# IP카운트(2번째컬럼) 역순으로 정렬
sort -k2,2nr -t $'\t' output/* > sorted_output.txt

# 결과보기(상위 10개)
cat sorted_output.txt | head
216.244.66.237	69027
192.168.0.1	27738
211.189.165.105	20114
64.62.252.174	13246
119.207.72.84	8959
211.53.177.20	6565
66.249.82.88	5636
66.249.82.89	5632
66.249.82.90	5312
194.110.203.7	5157

# 가장 많이 접속한 IP에 대한 정보를 역추적해서 보기
grep -m 10 216.244.66.237 iis_logs/* | head
iis_logs/u_ex180725.log:2018-07-25 05:40:56 192.168.0.2 GET /robots.txt - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 404 0 2 499
iis_logs/u_ex180725.log:2018-07-25 05:44:26 192.168.0.2 GET / - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 200 0 0 641
iis_logs/u_ex180805.log:2018-08-05 02:39:00 192.168.0.2 GET /robots.txt - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 404 0 2 194
iis_logs/u_ex180805.log:2018-08-05 02:42:02 192.168.0.2 GET / - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 200 0 0 181
iis_logs/u_ex181027.log:2018-10-27 06:25:55 192.168.0.2 GET /robots.txt - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 404 0 2 183
iis_logs/u_ex181027.log:2018-10-27 06:35:36 192.168.0.2 GET / - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 200 0 0 176
iis_logs/u_ex181107.log:2018-11-07 01:04:26 192.168.0.2 GET /robots.txt - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 404 0 2 448
iis_logs/u_ex181107.log:2018-11-07 01:13:17 192.168.0.2 GET / - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 200 0 0 668
iis_logs/u_ex190201.log:2019-02-01 21:31:17 192.168.0.2 GET /robots.txt - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 404 0 2 118
iis_logs/u_ex190201.log:2019-02-01 21:45:12 192.168.0.2 GET / - 80 - 216.244.66.237 Mozilla/5.0+(compatible;+DotBot/1.1;+http://www.opensiteexplorer.org/dotbot,+help@moz.com) - 200 0 0 118
sevity@sevityubuntu:~/workspace/hadoop_sevity.com$
반응형

+ Recent posts