먼저 Spark 설치는 여기를 보고오자. Spark설치시 Streaming관련 설치도 진행된다.

 

Kafka에서 메시지를 실시간으로 10초 단위로 읽어서 HDFS에 저장

토픽은 iis_log로 가정.

다음과 같은 python코드를 작성

$ vi spark_streaming_to_hdfs.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "iis_log") \
  .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# HDFS뿐 아니라 모니터링을 위해 화면에도 표시
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query2 = df \
  .writeStream \
  .outputMode("append") \
  .format("csv") \
  .option("path", "hdfs://localhost:9000/user/hadoop/iis_log") \
  .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint") \
  .trigger(processingTime='10 seconds') \
  .start()

spark.streams.awaitAnyTermination()

화면에 다음처럼 표시되고, HDFS에는 csv로 컬럼 분리되서 저장된다.

+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"@timestamp":"20...|
+----+--------------------+

 

HDFS가 아닌 RDB(Postgres)에 저장

 

먼저 RDB에 테이블을 알맞게 만들어 줘야 한다. 

CREATE TABLE iis_log (
    client_ip VARCHAR(15),
    url TEXT,
    timestamp character varying
);

python 코드를 아래처럼 작성한다.

$ cat spark_streaming_to_db.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp


# Spark 세션 생성
spark = SparkSession.builder.getOrCreate()

# 스키마 수동 지정. RDB에 넣을때 필요
schema1 = StructType([
    StructField("client_ip", StringType()),
    StructField("url", StringType()),
    StructField("timestamp", StringType())
])

# Kafka에서 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "iis_log") \
    .load()

# JSON 형식으로 변환
df = df.selectExpr("CAST(value AS STRING) as json")

# 스키마 정의. JSON파싱할때 필요
schema2 = StructType([
    StructField("message", StringType()),
    StructField("@timestamp", StringType())
])

# 데이터 추출
df = df.select(from_json(df.json, schema2).alias("data")).select("data.*")

# 메시지 분리
df = df.withColumn("message_split", split(df["message"], " "))
df = df.withColumn("client_ip", df["message_split"].getItem(2))
df = df.withColumn("url", df["message_split"].getItem(4))
df = df.withColumn("timestamp", df["@timestamp"])

# 필요한 컬럼 선택
df = df.select("client_ip", "url", "timestamp")

# 화면에 출력
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 중간 저장소로 Parquet 형식으로 쓰기
query2 = df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/hadoop/iis_log_temp") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint_temp") \
    .trigger(processingTime='10 seconds') \
    .start()

# 배치 작업으로 데이터를 PostgreSQL에 쓰기. PostgreSQL이 스트림을 지원하지 않아서 배치로.
def write_to_postgres():
    df_temp = spark.read.schema(schema1).parquet("hdfs://localhost:9000/user/hadoop/iis_log_temp")
    df_temp.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/online_judge") \
        .option("dbtable", "iis_log") \
        .option("user", "online_judge_admin") \
        .option("password", "abcd123$") \
        .mode("append") \
        .save()

# 배치 작업 실행
while True:
    write_to_postgres()

# 스트리밍 작업 중지
query.stop()

# 스트리밍 작업 대기
spark.streams.awaitAnyTermination()

아래 명령어로 실행하면

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_streaming_to_db.py

아래처럼 dbeaver로 RDB에 들어간것을 확인 가능

 

비슷한 기능을 하는 flink코드

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, StreamingFileSink

def kafka_to_hdfs():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    kafka_props = {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}

    kafka_source = FlinkKafkaConsumer("iis_log", SimpleStringSchema(), kafka_props)

    ds = env.add_source(kafka_source)

    ds.add_sink(StreamingFileSink
                .for_row_format("hdfs://localhost:9000/user/hadoop/iis_log", SimpleStringSchema())
                .build())

    env.execute("kafka_to_hdfs")

if __name__ == '__main__':
    kafka_to_hdfs()

Spark Session이 flink에서 Environment에 해당.

지연실행은 둘다 동일하게 지원.

 

 

Spark vs Flink 차이

  • Spark는 먼저 전체 데이터 세트를 메모리에 로드한 다음 변환을 수행하는 반면, Flink는 데이터를 스트리밍하며 변환을 수행
    • 따라서, spark은 true streaming architecture가 아니라 마이크로 배치 구조라 지연시간이 약간 높을 수 있다.

 

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

+ Recent posts