여기 참조

Windows는 무한 스트림 처리의 핵심입니다. 

Windows는 스트림을 유한한 크기의 "버킷"으로 분할하여 계산을 적용할 수 있습니다.

 

주요개념 요약

윈도우 크기

  • tumbling window: 비중첩(non-overlapping) window
  • sliding window: 중첩 window

시간

  • 발생 시간 (Event Time): 이벤트가 실제로 발생한 시간을 의미합니다. 예를 들어, 센서가 데이터를 생성하거나 사용자가 클릭한 시간 등 실제 사건이 일어난 시간을 나타냅니다.
  • 수집 시간 (Ingestion Time): 이벤트가 시스템 또는 스트리밍 플랫폼에 처음 도착한 시간을 나타냅니다. 발생 시간과 수집 시간 사이에는 네트워크 지연, 메시지 큐 지연 등 다양한 요인으로 인해 차이가 발생할 수 있습니다.
  • 처리 시간 (Processing Time): 이벤트가 실제로 처리되기 시작한 시간을 의미합니다. 처리 시간은 이벤트가 시스템에 도착한 순서대로 처리되므로, 발생 시간이나 수집 시간과는 다를 수 있습니다.

지연(latency)

  • 이벤트 시간 지연 (Event Time Latency):  이벤트가 실제로 발생한 시간 (Event Time)과 이벤트가 시스템에서 처리되는 시간 (Processing Time) 사이의 차이를 나타냅니다.

워터마크

  • 워터마크는 "지금까지 도착한 데이터 중에서 가장 늦게 발생한 이벤트의 시간"을 나타냅니다. 예를 들어, "12:06"의 워터마크가 발행된다면, 12:06 이전에 발생한 모든 이벤트들은 시스템에 이미 도착했거나 곧 도착할 것임을 의미합니다. 즉, 12:06 이후에 12:05의 이벤트가 도착할 확률은 없다는 것을 보장합니다.
  • 이렇게 워터마크를 사용하면, 스트리밍 데이터에서의 시간 관련 처리를 더 정확하게 할 수 있습니다. 워터마크가 "12:06"으로 설정되면, 시스템은 12:06 이전의 모든 이벤트를 안정적으로 처리할 수 있음을 알게 됩니다.

 

window의 생명주기

간단히 말해서, window에 속해야 하는 첫 번째 요소가 도착하자마자 window가 생성되고(고정 윈도우 방식이 아니네)

시간(이벤트 또는 처리 시간)이 종료 타임스탬프 더하기 사용자가 허용한 지연 시간을 지나면 창이 완전히 제거됩니다.

 

Flink는 시간 기반 창에 대해서만 제거를 보장하며 다른 유형( 예: 전역 창)에 대해서는 제거를 보장하지 않습니다(window 할당자 참조 ). 예를 들어, 5분마다 비중첩(또는 텀블링) 윈도우를 생성하고 1분의 허용 지연 시간을 가진 '이벤트 시간 기반' 윈도우 전략의 경우, Flink는 이 간격에 속하는 타임스탬프를 가진 첫 번째 요소가 도착하면 12:00과 12:05 사이의 새 윈도우를 생성하고, 워터마크가 12:06 타임스탬프를 지나가면 제거합니다.

 

또한 각 윈도우에는 Trigger 및 함수( ProcessWindowFunction, ReduceFunction또는 AggregateFunction)( window 함수 참조 )가 연결되어 있습니다

함수는 윈도우 내용에 적용될 계산을 포함하며, 트리거는 윈도우가 함수가 적용될 준비가 되었는지를 나타내는 조건을 지정합니다. 트리거링 정책은 "윈도우 내의 요소 수가 4개 이상일 때"나 "워터마크가 윈도우의 끝을 지날 때"와 같은 것일 수 있습니다. 트리거는 생성과 제거 사이의 어느 시점이든 윈도우의 내용을 삭제하기로 결정할 수도 있습니다.

 

또는 추방자(Evictor)를 사용해서 윈도우 내의 요소들을 제거할수도 있다. 예를 들어 1분 윈도우에 100개가 들어왔는데 10개만 사용하고 나머지 90개는 추방자로 제거.

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

여기 내용을 실습

 

설치

https://github.com/apache/flink-playgrounds 여기 git clone하고 intelliJ로 열었다.

그 안의 table-walkthrough 폴더가 실습하려는 대상 폴더 이다.

 

동작방식

/flink-playgrounds/table-walkthrough에서 docker-compose build를 수행하면 필요한 이미지들이 만들어지고

docker-compose up -d를 하면 컨테이너가 만들어지면서 마법처럼 모든것들이 수행된다.

단 처음에는 report()함수를 수정하기 전에는 의도적으로 exception이 발생하도록 되어 있으니 주의

약간특이한점은 일반적으로 Flink에서는 jobmanager와 taskmanager를 먼저 시작하여 클러스터를 구성한 다음, 나중에 사용자 애플리케이션을 제출하여 실행되나.

이 예제의 경우 제시된 docker-compose.yml 파일과 Dockerfile 설정에서는 약간 다르게 작동.

이 구성에서는 jobmanager 서비스가 시작될 때 standalone-job 명령어가 지정되어 있으며, 이를 통해 standalone 모드에서 작업을 실행하도록 지시. 또한 Dockerfile에서 spend-report.jar 파일을 usrlib 디렉토리에 복사하는 부분이 있다. Flink는 이 디렉토리의 JAR 파일을 자동으로 인식하고 로드하여, jobmanager 서비스가 시작될 때 spend-report.jar 파일에 포함된 Flink 애플리케이션을 자동으로 실행​.
(이것때문에 많이 헷갈림)

 

 

flink 코딩에는 다음과 같은 개념들이 사용된다.

실행환경: 배치인지 스트림인지, 소스는 무엇인지

소스설정: 파일인지 DB인지 등

  • 소스 설정 과정에서 catalog라는 용어가 등장하는데..
    데이터 자체가 아닌 데이터의 구조(메타데이터)를 나타내는 개념으로,
    flink에서만 쓰이는 용어는 아니고 작게는 스키마와 비슷한 의미를 가진다. 스키마와 카탈로그의 차이는 다음을 참조
"스카마"와 "카탈로그"는 서로 다른 단계의 메타데이터 구조를 나타내기 때문에 두 용어가 모두 존재

스카마 (Schema):스카마는 특정 데이터베이스나 애플리케이션에서 사용되는 테이블, 뷰, 인덱스, 저장 프로시저 등의 객체와 그 구조를 정의합니다.예를 들어, 어떤 테이블이 어떤 컬럼들로 구성되어 있고, 각 컬럼의 데이터 타입은 무엇인지를 정의하는 것이 스카마의 역할

카탈로그 (Catalog):카탈로그는 데이터베이스 시스템 전체에서 사용되는 여러 스카마와 그 객체들에 대한 정보를 포함하는 중앙 메타데이터 저장소.
대규모 데이터베이스 환경에서는 여러 데이터베이스와 애플리케이션이 있을 수 있으며, 이러한 다양한 데이터 소스와 그 구조를 중앙에서 통합적으로 관리하기 위해 카탈로그가 필요.예를 들어, 하나의 데이터베이스 관리 시스템 내에 여러 데이터베이스가 있을 수 있고, 각 데이터베이스에는 여러 스카마가 있을 수 있다. 이런 구조를 관리하기 위해 카탈로그가 사용됨.

간단히 말하면, 스카마는 "무엇"을 정의하고, 카탈로그는 "어디에" 그것이 있는지를 정의.
대형 데이터 시스템에서는 다양한 데이터 소스와 데이터베이스가 있을 수 있으므로, 이런 복잡한 환경에서 메타데이터를 효과적으로 관리하기 위해 카탈로그 개념이 필요.

프로세스: 소스로 부터 읽어온 데이터를 처리하는 과정

싱크: 처리한 데이터를 어떻게 결과적으로 사용할지 정하는 과정. 리포팅이 될수도 있고 Alert이 될수도 있고 등.

 

코드레벨로 살펴보자.

실행환경

소스설정

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

위처럼 카탈로그 설정을 하는데, 마치 SQL DDL처럼 보이지만 RDBMS에서의 테이블이 실제 생성되는 것은 아니고 Flink의 Table API내에서 가상의 테이블을 정의하는 것이며, WATERMARK, WITH등 flink전용 키워드가 들어가 있음에 주의. 

  • WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND:
    이 부분은 워터마크를 정의하는 것입니다. 워터마크는 Flink에서 스트림 처리 시간의 개념을 다룰 때 사용되는 것으로, 이 경우 transaction_time을 기준으로 5초의 지연을 허용한다는 의미입니다.
  • WITH ('connector' = 'kafka', ...):
    이 부분은 테이블이 어떤 외부 시스템과 연결되어 있는지, 그리고 그 연결을 위한 설정 정보를 제공하는 부분입니다.
    connector는 'kafka'로 설정되어 있어 Kafka와의 연동을 의미합니다.
    topic은 Kafka의 transactions 토픽에서 데이터를 읽겠다는 것을 나타냅니다.
    properties.bootstrap.servers는 Kafka 브로커의 주소를 나타냅니다.
    format은 데이터의 형식을 나타내는데, 이 경우 CSV 형식을 사용한다는 것을 의미합니다.

프로세스

싱크설정

CREATE TABLE spend_report (
    account_id BIGINT,              -- account_id라는 이름의 BIGINT 타입의 칼럼
    log_ts     TIMESTAMP(3),       -- log_ts라는 이름의 TIMESTAMP 타입의 칼럼 (밀리초 정밀도)
    amount     BIGINT,             -- amount라는 이름의 BIGINT 타입의 칼럼
    PRIMARY KEY (account_id, log_ts) NOT ENFORCED -- account_id와 log_ts를 기반으로 하는 기본 키를 정의. NOT ENFORCED는 키 제약 조건을 강제하지 않는다는 의미
) 
WITH (
   'connector'  = 'jdbc',                  -- JDBC 커넥터를 사용
   'url'        = 'jdbc:mysql://mysql:3306/sql-demo', -- MySQL 데이터베이스에 연결하기 위한 JDBC URL
   'table-name' = 'spend_report',          -- 실제 RDBMS에서의 테이블 이름은 'spend_report'
   'driver'     = 'com.mysql.jdbc.Driver', -- MySQL을 위한 JDBC 드라이버
   'username'   = 'sql-demo',              -- 데이터베이스 연결에 사용할 사용자 이름
   'password'   = 'demo-sql'               -- 데이터베이스 연결에 사용할 비밀번호
);

이 코드는 Flink의 테이블 환경(tEnv) 내에서 spend_report라는 가상 테이블을 정의합니다. 이 테이블은 MySQL 데이터베이스의 sql-demo 스키마 내의 spend_report 테이블과 연결됩니다. 따라서, Flink에서 이 테이블에 대해 실행되는 쿼리는 실제로 해당 MySQL 테이블에 반영될 수 있습니다.

 

소스-싱크연결

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

 

 

트러블슈팅

github에서 받은 다음 그냥은 잘 수행되는데, report()함수 구현을 넣은 순간부터 여러가지 문제가 발생했다.

문제1.

아래 flink-connector-kafka 의존성이 필요했는데, 프로젝트의 flink버전인 1.16.0이 지원이 안되고 1.14.6까지밖에 없어서 한참을 헤맸다 ㅠ 결국은 프로젝트 자체를 과거버전을 받아서 flink버전을 낮춰서 겨우 해결됨

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

문제2.

사이트에는 안나오는데 아래 의존성들이 모두 필요했고, 하나하나 exception뜨는거 보면서 해결해 나가야 했다 ㄷ

이걸 pom.xml에 넣어주면 얼마나 좋아 ㅠ

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
        </dependency>

그런데 조금 더 파악해보니 위의 내용을 직접 pom.xml에 써줄 필요가 없었다.

아래 내용이 Dockerfile에 있음을 나중에 확인

FROM maven:3.8-eclipse-temurin-17 AS builder

COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean verify -Dmaven.test.skip

FROM apache/flink:1.16.0-scala_2.12-java11

# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/flink-connector-jdbc-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.16.0/flink-csv-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;

COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar

RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
    echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

 

cluster

JobManager:

역할: Flink 클러스터의 마스터 노드 역할을 합니다.
책임:
사용자의 프로그램을 시작하고, 중지하며, 장애 복구를 관리합니다.
프로그램이 Flink 클러스터에서 실행되기 위한 작업 분해 및 스케줄링을 담당합니다.
모든 TaskManager들과 통신하며, 각 TaskManager의 상태와 작업 진행 상황을 모니터링합니다.
Checkpointing 및 Savepoint를 관리하여 상태 지속성을 보장합니다.


TaskManager:

역할: Flink 클러스터의 워커 노드 역할을 합니다.
책임:
실제로 데이터 스트림 처리 작업(예: 맵핑, 리듀싱, 필터링 등)을 수행하는 곳입니다.
각 TaskManager는 여러 스레드에서 동시에 여러 태스크를 실행할 수 있습니다.
데이터 처리와 상태 관리를 위한 로컬 저장소를 제공합니다.
작업이 완료되면 결과를 JobManager에 보고합니다.

 

 

ValueState

key(flink 스트림 데이터에서 개발자가 지정한 특정컬럼을 의미)별로 한번씩 초기화 되며, key-value쌍을 저장하는데 사용된다.

일반 map 자료구조형을 쓰는것 대비해서 스레드 안정성 및 일관성 내결함성 Atomic등을 보장

Boolean으로 하더라도 null, true, false의 tri-state를 가지므로 코딩시 주의해야한다.

(아래 코드에서는 그래서 true, null 만사용하고 false란 값은 일부러 사용하지 않았음에 주의. clear()하면 null된다. )

//멤버변수선언은 아래처럼
private transient ValueState<Boolean> flagState;

---

//아래는key별로 한번씩만 초기화(AbstractRichFunction의 open()을 override해서 사용)
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);

---

//아래는 processsElement() 안에서 사용
Boolean bFlag = flagState.value();  // 주의할점은 bFlag를 직접변경해봤자 아래 update()하기전에는 반영안된다는것. 따라서 모든 변경은 update()를 호출하는 방식으로 코딩해야한다.
flagState.update(true);
flagState.clear();

자세한 사용법은 여기참조

반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

먼저 Spark 설치는 여기를 보고오자. Spark설치시 Streaming관련 설치도 진행된다.

 

Kafka에서 메시지를 실시간으로 10초 단위로 읽어서 HDFS에 저장

토픽은 iis_log로 가정.

다음과 같은 python코드를 작성

$ vi spark_streaming_to_hdfs.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "iis_log") \
  .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# HDFS뿐 아니라 모니터링을 위해 화면에도 표시
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query2 = df \
  .writeStream \
  .outputMode("append") \
  .format("csv") \
  .option("path", "hdfs://localhost:9000/user/hadoop/iis_log") \
  .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint") \
  .trigger(processingTime='10 seconds') \
  .start()

spark.streams.awaitAnyTermination()

화면에 다음처럼 표시되고, HDFS에는 csv로 컬럼 분리되서 저장된다.

+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"@timestamp":"20...|
+----+--------------------+

 

HDFS가 아닌 RDB(Postgres)에 저장

 

먼저 RDB에 테이블을 알맞게 만들어 줘야 한다. 

CREATE TABLE iis_log (
    client_ip VARCHAR(15),
    url TEXT,
    timestamp character varying
);

python 코드를 아래처럼 작성한다.

$ cat spark_streaming_to_db.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp


# Spark 세션 생성
spark = SparkSession.builder.getOrCreate()

# 스키마 수동 지정. RDB에 넣을때 필요
schema1 = StructType([
    StructField("client_ip", StringType()),
    StructField("url", StringType()),
    StructField("timestamp", StringType())
])

# Kafka에서 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "iis_log") \
    .load()

# JSON 형식으로 변환
df = df.selectExpr("CAST(value AS STRING) as json")

# 스키마 정의. JSON파싱할때 필요
schema2 = StructType([
    StructField("message", StringType()),
    StructField("@timestamp", StringType())
])

# 데이터 추출
df = df.select(from_json(df.json, schema2).alias("data")).select("data.*")

# 메시지 분리
df = df.withColumn("message_split", split(df["message"], " "))
df = df.withColumn("client_ip", df["message_split"].getItem(2))
df = df.withColumn("url", df["message_split"].getItem(4))
df = df.withColumn("timestamp", df["@timestamp"])

# 필요한 컬럼 선택
df = df.select("client_ip", "url", "timestamp")

# 화면에 출력
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 중간 저장소로 Parquet 형식으로 쓰기
query2 = df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/hadoop/iis_log_temp") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint_temp") \
    .trigger(processingTime='10 seconds') \
    .start()

# 배치 작업으로 데이터를 PostgreSQL에 쓰기. PostgreSQL이 스트림을 지원하지 않아서 배치로.
def write_to_postgres():
    df_temp = spark.read.schema(schema1).parquet("hdfs://localhost:9000/user/hadoop/iis_log_temp")
    df_temp.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/online_judge") \
        .option("dbtable", "iis_log") \
        .option("user", "online_judge_admin") \
        .option("password", "abcd123$") \
        .mode("append") \
        .save()

# 배치 작업 실행
while True:
    write_to_postgres()

# 스트리밍 작업 중지
query.stop()

# 스트리밍 작업 대기
spark.streams.awaitAnyTermination()

아래 명령어로 실행하면

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_streaming_to_db.py

아래처럼 dbeaver로 RDB에 들어간것을 확인 가능

 

비슷한 기능을 하는 flink코드

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, StreamingFileSink

def kafka_to_hdfs():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    kafka_props = {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}

    kafka_source = FlinkKafkaConsumer("iis_log", SimpleStringSchema(), kafka_props)

    ds = env.add_source(kafka_source)

    ds.add_sink(StreamingFileSink
                .for_row_format("hdfs://localhost:9000/user/hadoop/iis_log", SimpleStringSchema())
                .build())

    env.execute("kafka_to_hdfs")

if __name__ == '__main__':
    kafka_to_hdfs()

Spark Session이 flink에서 Environment에 해당.

지연실행은 둘다 동일하게 지원.

 

 

Spark vs Flink 차이

  • Spark는 먼저 전체 데이터 세트를 메모리에 로드한 다음 변환을 수행하는 반면, Flink는 데이터를 스트리밍하며 변환을 수행
    • 따라서, spark은 true streaming architecture가 아니라 마이크로 배치 구조라 지연시간이 약간 높을 수 있다.

 

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

Flink란

주로 실시간 스트림에 대한 처리, 분석을 위해 쓰이며 높은 처리량과 낮은 지연시간을 제공.

장애복구 및 일관성을 보장하는 기능을 제공

Spark이 배치처리에 강점을 갖는다면, Flink는 실시간 스트림 처리에 강점을 가짐

주로 java와 scala로 작성됨. python API도 제공.

 

설치

프로덕션환경에서는 opt/flink에 설치하고 user계정도 추가하는걸 추천하지만,

개인적으로 실습할때는 ~/flink 정도에 설치해도 충분하다.

# flink 홈페이지에서 최신 빌드 다운로드
wget https://dlcdn.apache.org/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz

# ~/flink에 압축해재
mkdir ~/flink
tar -xvzf flink-1.16.2-bin-scala_2.12.tgz -C ~/flink --strip-components 1

그 다음에 cluster 시작이라는 과정을 거쳐야 하는데, 이는 Kafka, Spark, Flink공히 대부분의 분산시스템에서 존재.

이는 여러 노드(컴퓨터)가 함께 작업을 수행하도록 구성하는 과정을 의미하며,

일반적으로 하나 이상의 마스터 노드와 여러 워커 노드로 구성되고 마스토 노드는 작업을 관장하고 워커 노드는 실제 작업을 수행

실제 클러스터를 구성하는 방법은 오픈소스 프로젝트 마다 다른데,

Flink에서는 클러스터는 명시적으로 start-cluster.sh를 수행해서 다음 2가지를 데몬으로 띄우는 과정을 의미한다.

  • JobManager(마스터노드)
  • TaskManager(워커노드)

다음 명령어로 클러스터를 구동하고

cd ~/flink
./bin/start-cluster.sh

jps해서 아래와 같이 보이면 성공

$ ~/flink $ jps
27505 StandaloneSessionClusterEntrypoint # 이게 JobManager를 의미
28553 Jps
27965 TaskManagerRunner

설치확인

아래 example돌려서 잘 설치됐나 확인(localhost:8081을 통해 브라우저에서도 상황을 볼 수 있다)

$ cd ~/flink
$ ./bin/flink run examples/streaming/WordCount.jar --output sample_output
Executing example with default input data.
Use --input to specify file input.
Job has been submitted with JobID dc4552090d31d4e51084899a41d6568a
Program execution finished
Job with JobID dc4552090d31d4e51084899a41d6568a has finished.
Job Runtime: 196 ms

$ cat sample_output/2023-08-02--10/part*
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
(whether,1)
...

위 예제는 아래 내장 데이터를 사용해 단어별 빈도를 구하는 것으로 Flink의 기본 데이터 처리 기능을 보여준다.

To be, or not to be,--that is the question:--
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune
Or to take arms against a sea of troubles,
And by opposing end them?--To die,--to sleep,--
No more; and by a sleep to say we end
The heartache, and the thousand natural shocks
That flesh is heir to,--'tis a consummation
Devoutly to be wish'd. To die,--to sleep;--
To sleep! perchance to dream:--ay, there's the rub;
For in that sleep of death what dreams may come,
When we have shuffled off this mortal coil,
Must give us pause: there's the respect
That makes calamity of so long life;

위의 단어빈도수 계산 작업은 배치로도 할 수 있고 스트림으로도 할 수 있는데 스트림으로 한다는 의미는,

스트림 처리에서의 WordCount는 일반적으로 시간 윈도우를 설정하여 해당 윈도우 내에서의 단어 출현 횟수를 계산
예를 들어, 10초 동안의 데이터 스트림에서 'apple'이라는 단어가 얼마나 많이 나타났는지를 계산.

이러한 윈도우는 특정 시간 간격(예: 10초마다)으로 이동하며, 각 윈도우는 독립적으로 단어의 출현 횟수를 계산.

이렇게 하면 시간에 따른 단어의 사용 빈도를 실시간으로 추적할 수 있다(Flink를 사용하는 이유)
이러한 윈도우 기반의 처리는 스트리밍 데이터에서 흔히 볼 수 있는 패턴이며, Flink는 이러한 유형의 계산을 쉽게 처리할 수 있는 API를 제공.

 

 

트러블슈팅

자바버전 충돌

flink 17.0.7버전의 경우 java17과 충돌나는 현상이 발견되어 java11로 다운그레이드하니 해결됐다.

아래는 다운그레이드 방법

sudo apt update
sudo apt install openjdk-11-jdk
sudo update-alternatives --config java
# 콘솔ui에서 11버전 선택

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

기본적으로 특정 토픽에 대한 메시지를 컨슘하려면 다음과 같이 파이선으로 할 수 있다.

from kafka import KafkaConsumer

# Kafka 서버 주소 설정
consumer = KafkaConsumer('iis_log',
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',  # 이 옵션을 주면 쌓여있는 메시지중 가장 처음 부터 읽음
                         enable_auto_commit=True  # 이 옵션을 주면 마지막 읽은 위치 다음위치 부터 읽게해줌
                         )

for message in consumer:
    # 메시지 출력
    print (message)

이를 위해 필요하면 다음을 설치한다.

pip install kafka-python

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
filebeat  (0) 2023.07.31

실습내용

IIS에서 웹서버 로그를 읽어서 분석서버(같은 랜 네트워크 우분투기반)의 kafka로 전송한다.

 

filebeat는?

filebeat는 ELK로 유명한 Elastic에서 제공하는 경량 로그 수집기이다.

같은 회사 Logstash와의 차이는 후자는 수집뿐 아니라 처리와 변환을 수행가능하다.

filebeat는 그러한 처리 변환 기능을 Flink등 별도의 실시간 로그처리 시스템에 맡기는 구조.

Java가 아닌 Go언어 기반으로 작성됨(Logstash는 Java)

 

로그수집방식은?

기본적으로 모니터링할 폴더를 지정하고, 그안에 생기는 로그들을 라인단위로 읽어서 kafka나 ElasticSearch쪽에 보내는 구조.

 

inputs방식 vs modules 방식

filebeat에는 2가지 설정법이 존재한다.

inputs방식: 일반적인 로그에 대해서 지정하는 방식. 어떤 로그도 가능하여 범용성이 좋다.

modules방식: Apache, Nginx, MySQL등 널리 알려진 특정한 로그에 대해서 수집하는 방식. 해당 로그형식에 대한 사전 구성된 설정을 제공하여 좀더 적합도가 높다.

이번 프로젝트에서는 전자를 사용했다. (후자는 시도했는데 뭔가 잘 안됨)

 

IIS가 돌 고 있는 윈도우 서버에 filebeat 설치하기

여기에 가서 window zip을 고른후 c:\program files\filebeat폴더에 풀어준다.

filebeat.yml을 아래와 같이 수정하여, IIS로 부터 로그를 분석하고, kafaka로 전송하도록 설정한다.

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - c:\inetpub\logs\LogFiles\W3SVC1\u_ex23*
  harvester_limit: 1
  
output.kafka:
  enabled: true
  hosts: ["192.168.0.20:9092"]
  topic: "iis_log"

관리자권한으로 PowerShell 프롬프트를 열고, 다음 명령어를 실행해서 윈도우 서비스로 등록하여 재부팅시에도 실행되도록 만들자.

PS > cd 'C:\Program Files\Filebeat'
PS C:\Program Files\Filebeat> .\install-service-filebeat.ps1

# 다음 커맨드까지 해야 서비스에 등록된다(리눅스에서 systemctl start 와 비슷한 역할)
Start-Service filebeat

# 상태 확인은 다음으로..
Get-Service filebeat

 

 

 

트러블슈팅

PowerShell에서 스크립트 실행이 안되는 경우 아래처럼 정책설정을 해준다.

Set-ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1
# 또는
Set-ExecutionPolicy Unrestricted -Scope Process
# 후자는 모든 프로세스를 허용하는 구조로 돌려놓으려면 아래 커맨드 실행

Set-ExecutionPolicy Default -Scope Process

 

kafka로 로그전달이 안될때,

가장 먼저 확인할 것은 아래처럼 telnet등으로 kafka쪽 서버 9092포트로 접속되는지 확인

telnet 192.168.0.20 9092

 

다음처럼 host등록이 필요한 경우가 있었다.

(설정파일에 IP로 설정했지만 어느순간 hostname으로 변경된 다음 못찾는다고 나옴;)

Windows에서의 호스트 파일 경로: C:\Windows\System32\drivers\etc\hosts
Linux에서의 호스트 파일 경로: /etc/hosts

127.0.0.1   localhost
::1         localhost

# 추가된 매핑 정보
192.168.0.20 sevityubuntu

위처럼 192.168.0.20과 sevityubuntu라는 호스트명을 매핑

 

filebeat를 콘솔에서 실행하고 로그를 보려면 다음처럼 하면 된다.

PS C:\Program Files\Filebeat> .\filebeat -e

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

+ Recent posts