먼저 Spark 설치는 여기를 보고오자. Spark설치시 Streaming관련 설치도 진행된다.

 

Kafka에서 메시지를 실시간으로 10초 단위로 읽어서 HDFS에 저장

토픽은 iis_log로 가정.

다음과 같은 python코드를 작성

$ vi spark_streaming_to_hdfs.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "iis_log") \
  .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# HDFS뿐 아니라 모니터링을 위해 화면에도 표시
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query2 = df \
  .writeStream \
  .outputMode("append") \
  .format("csv") \
  .option("path", "hdfs://localhost:9000/user/hadoop/iis_log") \
  .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint") \
  .trigger(processingTime='10 seconds') \
  .start()

spark.streams.awaitAnyTermination()

화면에 다음처럼 표시되고, HDFS에는 csv로 컬럼 분리되서 저장된다.

+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"@timestamp":"20...|
+----+--------------------+

 

HDFS가 아닌 RDB(Postgres)에 저장

 

먼저 RDB에 테이블을 알맞게 만들어 줘야 한다. 

CREATE TABLE iis_log (
    client_ip VARCHAR(15),
    url TEXT,
    timestamp character varying
);

python 코드를 아래처럼 작성한다.

$ cat spark_streaming_to_db.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp


# Spark 세션 생성
spark = SparkSession.builder.getOrCreate()

# 스키마 수동 지정. RDB에 넣을때 필요
schema1 = StructType([
    StructField("client_ip", StringType()),
    StructField("url", StringType()),
    StructField("timestamp", StringType())
])

# Kafka에서 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "iis_log") \
    .load()

# JSON 형식으로 변환
df = df.selectExpr("CAST(value AS STRING) as json")

# 스키마 정의. JSON파싱할때 필요
schema2 = StructType([
    StructField("message", StringType()),
    StructField("@timestamp", StringType())
])

# 데이터 추출
df = df.select(from_json(df.json, schema2).alias("data")).select("data.*")

# 메시지 분리
df = df.withColumn("message_split", split(df["message"], " "))
df = df.withColumn("client_ip", df["message_split"].getItem(2))
df = df.withColumn("url", df["message_split"].getItem(4))
df = df.withColumn("timestamp", df["@timestamp"])

# 필요한 컬럼 선택
df = df.select("client_ip", "url", "timestamp")

# 화면에 출력
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 중간 저장소로 Parquet 형식으로 쓰기
query2 = df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/hadoop/iis_log_temp") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint_temp") \
    .trigger(processingTime='10 seconds') \
    .start()

# 배치 작업으로 데이터를 PostgreSQL에 쓰기. PostgreSQL이 스트림을 지원하지 않아서 배치로.
def write_to_postgres():
    df_temp = spark.read.schema(schema1).parquet("hdfs://localhost:9000/user/hadoop/iis_log_temp")
    df_temp.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/online_judge") \
        .option("dbtable", "iis_log") \
        .option("user", "online_judge_admin") \
        .option("password", "abcd123$") \
        .mode("append") \
        .save()

# 배치 작업 실행
while True:
    write_to_postgres()

# 스트리밍 작업 중지
query.stop()

# 스트리밍 작업 대기
spark.streams.awaitAnyTermination()

아래 명령어로 실행하면

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_streaming_to_db.py

아래처럼 dbeaver로 RDB에 들어간것을 확인 가능

 

비슷한 기능을 하는 flink코드

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, StreamingFileSink

def kafka_to_hdfs():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    kafka_props = {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}

    kafka_source = FlinkKafkaConsumer("iis_log", SimpleStringSchema(), kafka_props)

    ds = env.add_source(kafka_source)

    ds.add_sink(StreamingFileSink
                .for_row_format("hdfs://localhost:9000/user/hadoop/iis_log", SimpleStringSchema())
                .build())

    env.execute("kafka_to_hdfs")

if __name__ == '__main__':
    kafka_to_hdfs()

Spark Session이 flink에서 Environment에 해당.

지연실행은 둘다 동일하게 지원.

 

 

Spark vs Flink 차이

  • Spark는 먼저 전체 데이터 세트를 메모리에 로드한 다음 변환을 수행하는 반면, Flink는 데이터를 스트리밍하며 변환을 수행
    • 따라서, spark은 true streaming architecture가 아니라 마이크로 배치 구조라 지연시간이 약간 높을 수 있다.

 

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

Flink란

주로 실시간 스트림에 대한 처리, 분석을 위해 쓰이며 높은 처리량과 낮은 지연시간을 제공.

장애복구 및 일관성을 보장하는 기능을 제공

Spark이 배치처리에 강점을 갖는다면, Flink는 실시간 스트림 처리에 강점을 가짐

주로 java와 scala로 작성됨. python API도 제공.

 

설치

프로덕션환경에서는 opt/flink에 설치하고 user계정도 추가하는걸 추천하지만,

개인적으로 실습할때는 ~/flink 정도에 설치해도 충분하다.

# flink 홈페이지에서 최신 빌드 다운로드
wget https://dlcdn.apache.org/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz

# ~/flink에 압축해재
mkdir ~/flink
tar -xvzf flink-1.16.2-bin-scala_2.12.tgz -C ~/flink --strip-components 1

그 다음에 cluster 시작이라는 과정을 거쳐야 하는데, 이는 Kafka, Spark, Flink공히 대부분의 분산시스템에서 존재.

이는 여러 노드(컴퓨터)가 함께 작업을 수행하도록 구성하는 과정을 의미하며,

일반적으로 하나 이상의 마스터 노드와 여러 워커 노드로 구성되고 마스토 노드는 작업을 관장하고 워커 노드는 실제 작업을 수행

실제 클러스터를 구성하는 방법은 오픈소스 프로젝트 마다 다른데,

Flink에서는 클러스터는 명시적으로 start-cluster.sh를 수행해서 다음 2가지를 데몬으로 띄우는 과정을 의미한다.

  • JobManager(마스터노드)
  • TaskManager(워커노드)

다음 명령어로 클러스터를 구동하고

cd ~/flink
./bin/start-cluster.sh

jps해서 아래와 같이 보이면 성공

$ ~/flink $ jps
27505 StandaloneSessionClusterEntrypoint # 이게 JobManager를 의미
28553 Jps
27965 TaskManagerRunner

설치확인

아래 example돌려서 잘 설치됐나 확인(localhost:8081을 통해 브라우저에서도 상황을 볼 수 있다)

$ cd ~/flink
$ ./bin/flink run examples/streaming/WordCount.jar --output sample_output
Executing example with default input data.
Use --input to specify file input.
Job has been submitted with JobID dc4552090d31d4e51084899a41d6568a
Program execution finished
Job with JobID dc4552090d31d4e51084899a41d6568a has finished.
Job Runtime: 196 ms

$ cat sample_output/2023-08-02--10/part*
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
(whether,1)
...

위 예제는 아래 내장 데이터를 사용해 단어별 빈도를 구하는 것으로 Flink의 기본 데이터 처리 기능을 보여준다.

To be, or not to be,--that is the question:--
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune
Or to take arms against a sea of troubles,
And by opposing end them?--To die,--to sleep,--
No more; and by a sleep to say we end
The heartache, and the thousand natural shocks
That flesh is heir to,--'tis a consummation
Devoutly to be wish'd. To die,--to sleep;--
To sleep! perchance to dream:--ay, there's the rub;
For in that sleep of death what dreams may come,
When we have shuffled off this mortal coil,
Must give us pause: there's the respect
That makes calamity of so long life;

위의 단어빈도수 계산 작업은 배치로도 할 수 있고 스트림으로도 할 수 있는데 스트림으로 한다는 의미는,

스트림 처리에서의 WordCount는 일반적으로 시간 윈도우를 설정하여 해당 윈도우 내에서의 단어 출현 횟수를 계산
예를 들어, 10초 동안의 데이터 스트림에서 'apple'이라는 단어가 얼마나 많이 나타났는지를 계산.

이러한 윈도우는 특정 시간 간격(예: 10초마다)으로 이동하며, 각 윈도우는 독립적으로 단어의 출현 횟수를 계산.

이렇게 하면 시간에 따른 단어의 사용 빈도를 실시간으로 추적할 수 있다(Flink를 사용하는 이유)
이러한 윈도우 기반의 처리는 스트리밍 데이터에서 흔히 볼 수 있는 패턴이며, Flink는 이러한 유형의 계산을 쉽게 처리할 수 있는 API를 제공.

 

 

트러블슈팅

자바버전 충돌

flink 17.0.7버전의 경우 java17과 충돌나는 현상이 발견되어 java11로 다운그레이드하니 해결됐다.

아래는 다운그레이드 방법

sudo apt update
sudo apt install openjdk-11-jdk
sudo update-alternatives --config java
# 콘솔ui에서 11버전 선택

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

기본적으로 특정 토픽에 대한 메시지를 컨슘하려면 다음과 같이 파이선으로 할 수 있다.

from kafka import KafkaConsumer

# Kafka 서버 주소 설정
consumer = KafkaConsumer('iis_log',
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',  # 이 옵션을 주면 쌓여있는 메시지중 가장 처음 부터 읽음
                         enable_auto_commit=True  # 이 옵션을 주면 마지막 읽은 위치 다음위치 부터 읽게해줌
                         )

for message in consumer:
    # 메시지 출력
    print (message)

이를 위해 필요하면 다음을 설치한다.

pip install kafka-python

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
filebeat  (0) 2023.07.31

+ Recent posts