먼저 하둡버전을 보고 오길 권한다.

스팍 설치에 관한 부분은 여기 참조.

우리는 하둡버전과 비슷하게 IIS웹서버를 분석해서 map - reduce과정을 통해 IP별 접근 시간및 URL을 정리해서 볼 것이다. 오리지널 웹서버 로그는 대략 다음과 같으며,

sevity@sevityubuntu:~/workspace/hadoop_sevity.com/session_analysis$ cat ../iis_logs/* | head
#Software: Microsoft Internet Information Services 10.0
#Version: 1.0
#Date: 2015-09-06 08:32:43
#Fields: date time s-ip cs-method cs-uri-stem cs-uri-query s-port cs-username c-ip cs(User-Agent) cs(Referer) sc-status sc-substatus sc-win32-status time-taken
2015-09-06 08:32:43 127.0.0.1 GET / - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 - 200 0 0 211
2015-09-06 08:32:43 127.0.0.1 GET /iisstart.png - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 200 0 0 2
2015-09-06 08:32:43 127.0.0.1 GET /favicon.ico - 80 - 127.0.0.1 Mozilla/5.0+(Windows+NT+10.0;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/45.0.2454.85+Safari/537.36 http://127.0.0.1/ 404 0 2 2

map-reduce를 거친 결과는 대략 다음과 같을 것이다.

sevity@sevityubuntu:~/workspace/hadoop_spark/session_analysis$ cat session_output_merged.txt | head
157.55.39.90
[[(datetime.datetime(2016, 6, 7, 20, 44, 21), '/')], [(datetime.datetime(2016, 6, 8, 4, 20, 31), '/menu_wonilnet.asp'), (datetime.datetime(2016, 6, 8, 4, 20, 43), '/menu_friend.asp')], [(datetime.datetime(2019, 9, 16, 23, 58, 6), '/menu_friend.asp')], [(datetime.datetime(2021, 8, 17, 15, 8, 50), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 17, 17, 36, 23), '/menu_wonilnet.asp'), (datetime.datetime(2021, 8, 17, 17, 36, 24), '/menu_friend.asp')], [(datetime.datetime(2021, 8, 18, 1, 53, 52), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 19, 2, 40, 48), '/wiki/doku.php')], [(datetime.datetime(2021, 8, 19, 6, 45, 32), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 20, 10, 0, 31), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 21, 3, 55, 36), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 21, 4, 55, 13), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 23, 6, 49, 26), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 24, 1, 45, 47), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 24, 9, 58, 34), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 25, 11, 55, 24), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 26, 8, 22, 7), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 26, 13, 56, 19), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 27, 21, 59, 24), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 28, 20, 3, 46), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 28, 23, 51, 21), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 29, 18, 40, 40), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 29, 20, 6, 8), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 31, 1, 38, 52), '/wiki/doku.php')], [(datetime.datetime(2021, 8, 31, 5, 29, 5), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 8, 31, 6, 28, 20), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 2, 0, 32, 17), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 2, 16, 19, 20), '/wiki/lib/exe/detail.php')], [(datetime.datetime(2021, 9, 9, 3, 2, 43), '/menu_wonilnet.asp'), (datetime.datetime(2021, 9, 9, 3, 2, 45), '/menu_friend.asp')], [(datetime.datetime(2021, 9, 16, 7, 52, 4), '/wiki/doku.php')]]
138.197.96.197
[[(datetime.datetime(2017, 2, 15, 17, 21, 24), '/'), (datetime.datetime(2017, 2, 15, 17, 21, 26), '/menu_wonilnet.asp'), (datetime.datetime(2017, 2, 15, 17, 21, 26), '/board/dhtml_logo.js')]]
104.236.164.185
[[(datetime.datetime(2017, 11, 28, 11, 45, 56), '/')]]
181.213.93.231
[[(datetime.datetime(2018, 1, 30, 13, 1, 24), '/hndUnblock.cgi'), (datetime.datetime(2018, 1, 30, 13, 1, 28), '/tmUnblock.cgi'), (datetime.datetime(2018, 1, 30, 13, 1, 32), '/moo'), (datetime.datetime(2018, 1, 30, 13, 1, 36), '/'), (datetime.datetime(2018, 1, 30, 13, 1, 42), '/getcfg.php'), (datetime.datetime(2018, 1, 30, 13, 1, 49), '/getcfg.php')]]
117.80.147.63
[[(datetime.datetime(2018, 2, 14, 8, 13, 12), '/')]]

실습을 위한 디렉토리 구조는 대략 다음과 같다.

sevity@sevityubuntu:~/workspace/hadoop_spark$ tree -L 3
.
├── iis_logs (여기에 분석하려는 웹로그 파일들이 있습니다. 실제로는 수없이 많음)
│   ├── u_ex230101.log
│   ├── u_ex230102.log
│   └── u_ex230103.log
├── session_analysis
│   ├── session_output
│   │   ├── part-00001
│   │   ├── part-00002
│   │   ├── part-00003
│   │   └── _SUCCESS
│   ├── session_output_merged.txt
│   └── spark_session_analysis.py(우리가 작성할 python 코드의 위치)
└── spark_test

다음과 같이 spark_session_analysis.py를 작성하자.

여기서 중요한 점은 다음과 같다.

  • 스팍은 하둡과 다르게 하나의 파일에서 map과 reduce를 모두 수행한다.
  • 각 동작은 함수호출 형태로 진행되며,
  • 아래에서 reduce에 해당하는 함수는 groupByKey()이다.
from pyspark import SparkConf, SparkContext
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

def parse_log_line(line):
    try:
        parts = line.strip().split()
        if len(parts) != 15:
            return None

        date_time = parts[0] + " " + parts[1]
        date_time = datetime.strptime(date_time, '%Y-%m-%d %H:%M:%S')
        url = parts[4]
        ip = parts[8]  # changed from parts[2] to parts[8] to use the client IP

        return (ip, (date_time, url))
    except:
        return None

def calculate_sessions(group):
    ip, data = group
    data = list(data)
    data.sort(key = lambda x: x[0])
    sessions = []
    session = [data[0]]
    for i in range(1, len(data)):
        if data[i][0] - session[-1][0] > timedelta(seconds=1800):
            sessions.append(session)
            session = [data[i]]
        else:
            session.append(data[i])
    sessions.append(session)
    return (ip, sessions)


# Spark configuration
# [*]하면 모든 코어 사용
conf = SparkConf().setMaster('local[*]').setAppName('Log Analysis')
sc = SparkContext(conf=conf)

log_lines = sc.textFile('hdfs://localhost:9000/logs/u_ex*')
parsed_lines = log_lines.map(parse_log_line).filter(lambda x: x is not None)
grouped_by_ip = parsed_lines.groupByKey()
sessions = grouped_by_ip.flatMap(calculate_sessions)
sessions.saveAsTextFile('hdfs://localhost:9000/session_output')

다음 명령을 통해 실행하고 결과를 확인한다.

# hdfs에 reduce결과를 쌓을 곳을 clean-up해준다. 
hdfs dfs -rm -r /session_output

# 로컬파일시스템에 복사해올 경로도 clean-up해준다.
rm -rf session_output
rm -f session_output_merged.txt

# python 파일을 실행한다. spark-submit을 통해 실행하면 4040포트를 통해 웹에서 진행상황 확인가능
$SPARK_HOME/bin/spark-submit spark_session_analysis.py 2>&1 | tee r.txt 

# HDFS에서 로컬파일시스템으로 결과 폴더 복사
hadoop fs -get /session_output .

# 위의 폴더구조에서 볼 수 있듯 병렬처리 때문에 결과파일이 여러개로 분리돼 있는데 합쳐준다.
# 합치고 로컬파일시스템으로 복사하는 것까지.
hadoop fs -getmerge hdfs://localhost:9000/session_output ./session_output_merged.txt

# 위의 방법대신 python코드내에서 다음 방법으로 머지해도 된다.
# 단 스팍을 써서 머지하는 방법인 만큼 메모리가 터질수도 있으니 주의
sessions.coalesce(1).saveAsTextFile('hdfs://localhost:9000/session_output')
반응형

'Programming > Linux' 카테고리의 다른 글

kafka 설치(우분투 기존)  (0) 2023.07.31
ELK연습  (0) 2023.07.30
스팍(Spark) 설치  (0) 2023.07.30
하둡 - 실습 - 웹서버 세션분석  (0) 2023.07.29
하둡 - 실습 - 웹서버로그분석  (0) 2023.07.29

+ Recent posts