먼저 Spark 설치는 여기를 보고오자. Spark설치시 Streaming관련 설치도 진행된다.
Kafka에서 메시지를 실시간으로 10초 단위로 읽어서 HDFS에 저장
토픽은 iis_log로 가정.
다음과 같은 python코드를 작성
$ vi spark_streaming_to_hdfs.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "iis_log") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# HDFS뿐 아니라 모니터링을 위해 화면에도 표시
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query2 = df \
.writeStream \
.outputMode("append") \
.format("csv") \
.option("path", "hdfs://localhost:9000/user/hadoop/iis_log") \
.option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint") \
.trigger(processingTime='10 seconds') \
.start()
spark.streams.awaitAnyTermination()
화면에 다음처럼 표시되고, HDFS에는 csv로 컬럼 분리되서 저장된다.
+----+--------------------+
| key| value|
+----+--------------------+
|null|{"@timestamp":"20...|
+----+--------------------+
HDFS가 아닌 RDB(Postgres)에 저장
먼저 RDB에 테이블을 알맞게 만들어 줘야 한다.
CREATE TABLE iis_log (
client_ip VARCHAR(15),
url TEXT,
timestamp character varying
);
python 코드를 아래처럼 작성한다.
$ cat spark_streaming_to_db.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp
# Spark 세션 생성
spark = SparkSession.builder.getOrCreate()
# 스키마 수동 지정. RDB에 넣을때 필요
schema1 = StructType([
StructField("client_ip", StringType()),
StructField("url", StringType()),
StructField("timestamp", StringType())
])
# Kafka에서 데이터 읽기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "iis_log") \
.load()
# JSON 형식으로 변환
df = df.selectExpr("CAST(value AS STRING) as json")
# 스키마 정의. JSON파싱할때 필요
schema2 = StructType([
StructField("message", StringType()),
StructField("@timestamp", StringType())
])
# 데이터 추출
df = df.select(from_json(df.json, schema2).alias("data")).select("data.*")
# 메시지 분리
df = df.withColumn("message_split", split(df["message"], " "))
df = df.withColumn("client_ip", df["message_split"].getItem(2))
df = df.withColumn("url", df["message_split"].getItem(4))
df = df.withColumn("timestamp", df["@timestamp"])
# 필요한 컬럼 선택
df = df.select("client_ip", "url", "timestamp")
# 화면에 출력
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
# 중간 저장소로 Parquet 형식으로 쓰기
query2 = df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "hdfs://localhost:9000/user/hadoop/iis_log_temp") \
.option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint_temp") \
.trigger(processingTime='10 seconds') \
.start()
# 배치 작업으로 데이터를 PostgreSQL에 쓰기. PostgreSQL이 스트림을 지원하지 않아서 배치로.
def write_to_postgres():
df_temp = spark.read.schema(schema1).parquet("hdfs://localhost:9000/user/hadoop/iis_log_temp")
df_temp.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/online_judge") \
.option("dbtable", "iis_log") \
.option("user", "online_judge_admin") \
.option("password", "abcd123$") \
.mode("append") \
.save()
# 배치 작업 실행
while True:
write_to_postgres()
# 스트리밍 작업 중지
query.stop()
# 스트리밍 작업 대기
spark.streams.awaitAnyTermination()
아래 명령어로 실행하면
$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_streaming_to_db.py
아래처럼 dbeaver로 RDB에 들어간것을 확인 가능
비슷한 기능을 하는 flink코드
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, StreamingFileSink
def kafka_to_hdfs():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
kafka_props = {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
kafka_source = FlinkKafkaConsumer("iis_log", SimpleStringSchema(), kafka_props)
ds = env.add_source(kafka_source)
ds.add_sink(StreamingFileSink
.for_row_format("hdfs://localhost:9000/user/hadoop/iis_log", SimpleStringSchema())
.build())
env.execute("kafka_to_hdfs")
if __name__ == '__main__':
kafka_to_hdfs()
Spark Session이 flink에서 Environment에 해당.
지연실행은 둘다 동일하게 지원.
Spark vs Flink 차이
- Spark는 먼저 전체 데이터 세트를 메모리에 로드한 다음 변환을 수행하는 반면, Flink는 데이터를 스트리밍하며 변환을 수행
- 따라서, spark은 true streaming architecture가 아니라 마이크로 배치 구조라 지연시간이 약간 높을 수 있다.
반응형
'Data Engineering' 카테고리의 다른 글
flink Table API를 사용한 실시간 Reporting샘플 (0) | 2023.10.28 |
---|---|
flink (1) | 2023.10.28 |
Apache Flink 설치 (0) | 2023.08.02 |
kafka Consume (0) | 2023.08.02 |
filebeat (0) | 2023.07.31 |