여기 내용을 실습
설치
https://github.com/apache/flink-playgrounds 여기 git clone하고 intelliJ로 열었다.
그 안의 table-walkthrough 폴더가 실습하려는 대상 폴더 이다.
동작방식
/flink-playgrounds/table-walkthrough에서 docker-compose build를 수행하면 필요한 이미지들이 만들어지고
docker-compose up -d를 하면 컨테이너가 만들어지면서 마법처럼 모든것들이 수행된다.
단 처음에는 report()함수를 수정하기 전에는 의도적으로 exception이 발생하도록 되어 있으니 주의
약간특이한점은 일반적으로 Flink에서는 jobmanager와 taskmanager를 먼저 시작하여 클러스터를 구성한 다음, 나중에 사용자 애플리케이션을 제출하여 실행되나.
이 예제의 경우 제시된 docker-compose.yml 파일과 Dockerfile 설정에서는 약간 다르게 작동.
이 구성에서는 jobmanager 서비스가 시작될 때 standalone-job 명령어가 지정되어 있으며, 이를 통해 standalone 모드에서 작업을 실행하도록 지시. 또한 Dockerfile에서 spend-report.jar 파일을 usrlib 디렉토리에 복사하는 부분이 있다. Flink는 이 디렉토리의 JAR 파일을 자동으로 인식하고 로드하여, jobmanager 서비스가 시작될 때 spend-report.jar 파일에 포함된 Flink 애플리케이션을 자동으로 실행.
(이것때문에 많이 헷갈림)
flink 코딩에는 다음과 같은 개념들이 사용된다.
실행환경: 배치인지 스트림인지, 소스는 무엇인지
소스설정: 파일인지 DB인지 등
- 소스 설정 과정에서 catalog라는 용어가 등장하는데..
데이터 자체가 아닌 데이터의 구조(메타데이터)를 나타내는 개념으로,
flink에서만 쓰이는 용어는 아니고 작게는 스키마와 비슷한 의미를 가진다. 스키마와 카탈로그의 차이는 다음을 참조
"스카마"와 "카탈로그"는 서로 다른 단계의 메타데이터 구조를 나타내기 때문에 두 용어가 모두 존재
스카마 (Schema):스카마는 특정 데이터베이스나 애플리케이션에서 사용되는 테이블, 뷰, 인덱스, 저장 프로시저 등의 객체와 그 구조를 정의합니다.예를 들어, 어떤 테이블이 어떤 컬럼들로 구성되어 있고, 각 컬럼의 데이터 타입은 무엇인지를 정의하는 것이 스카마의 역할
카탈로그 (Catalog):카탈로그는 데이터베이스 시스템 전체에서 사용되는 여러 스카마와 그 객체들에 대한 정보를 포함하는 중앙 메타데이터 저장소.
대규모 데이터베이스 환경에서는 여러 데이터베이스와 애플리케이션이 있을 수 있으며, 이러한 다양한 데이터 소스와 그 구조를 중앙에서 통합적으로 관리하기 위해 카탈로그가 필요.예를 들어, 하나의 데이터베이스 관리 시스템 내에 여러 데이터베이스가 있을 수 있고, 각 데이터베이스에는 여러 스카마가 있을 수 있다. 이런 구조를 관리하기 위해 카탈로그가 사용됨.
간단히 말하면, 스카마는 "무엇"을 정의하고, 카탈로그는 "어디에" 그것이 있는지를 정의.
대형 데이터 시스템에서는 다양한 데이터 소스와 데이터베이스가 있을 수 있으므로, 이런 복잡한 환경에서 메타데이터를 효과적으로 관리하기 위해 카탈로그 개념이 필요.
프로세스: 소스로 부터 읽어온 데이터를 처리하는 과정
싱크: 처리한 데이터를 어떻게 결과적으로 사용할지 정하는 과정. 리포팅이 될수도 있고 Alert이 될수도 있고 등.
코드레벨로 살펴보자.
실행환경
소스설정
tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'format' = 'csv'\n" +
")");
위처럼 카탈로그 설정을 하는데, 마치 SQL DDL처럼 보이지만 RDBMS에서의 테이블이 실제 생성되는 것은 아니고 Flink의 Table API내에서 가상의 테이블을 정의하는 것이며, WATERMARK, WITH등 flink전용 키워드가 들어가 있음에 주의.
- WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND:
이 부분은 워터마크를 정의하는 것입니다. 워터마크는 Flink에서 스트림 처리 시간의 개념을 다룰 때 사용되는 것으로, 이 경우 transaction_time을 기준으로 5초의 지연을 허용한다는 의미입니다.
- WITH ('connector' = 'kafka', ...):
이 부분은 테이블이 어떤 외부 시스템과 연결되어 있는지, 그리고 그 연결을 위한 설정 정보를 제공하는 부분입니다.
connector는 'kafka'로 설정되어 있어 Kafka와의 연동을 의미합니다.
topic은 Kafka의 transactions 토픽에서 데이터를 읽겠다는 것을 나타냅니다.
properties.bootstrap.servers는 Kafka 브로커의 주소를 나타냅니다.
format은 데이터의 형식을 나타내는데, 이 경우 CSV 형식을 사용한다는 것을 의미합니다.
프로세스
싱크설정
CREATE TABLE spend_report (
account_id BIGINT, -- account_id라는 이름의 BIGINT 타입의 칼럼
log_ts TIMESTAMP(3), -- log_ts라는 이름의 TIMESTAMP 타입의 칼럼 (밀리초 정밀도)
amount BIGINT, -- amount라는 이름의 BIGINT 타입의 칼럼
PRIMARY KEY (account_id, log_ts) NOT ENFORCED -- account_id와 log_ts를 기반으로 하는 기본 키를 정의. NOT ENFORCED는 키 제약 조건을 강제하지 않는다는 의미
)
WITH (
'connector' = 'jdbc', -- JDBC 커넥터를 사용
'url' = 'jdbc:mysql://mysql:3306/sql-demo', -- MySQL 데이터베이스에 연결하기 위한 JDBC URL
'table-name' = 'spend_report', -- 실제 RDBMS에서의 테이블 이름은 'spend_report'
'driver' = 'com.mysql.jdbc.Driver', -- MySQL을 위한 JDBC 드라이버
'username' = 'sql-demo', -- 데이터베이스 연결에 사용할 사용자 이름
'password' = 'demo-sql' -- 데이터베이스 연결에 사용할 비밀번호
);
이 코드는 Flink의 테이블 환경(tEnv) 내에서 spend_report라는 가상 테이블을 정의합니다. 이 테이블은 MySQL 데이터베이스의 sql-demo 스키마 내의 spend_report 테이블과 연결됩니다. 따라서, Flink에서 이 테이블에 대해 실행되는 쿼리는 실제로 해당 MySQL 테이블에 반영될 수 있습니다.
소스-싱크연결
Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");
트러블슈팅
github에서 받은 다음 그냥은 잘 수행되는데, report()함수 구현을 넣은 순간부터 여러가지 문제가 발생했다.
문제1.
아래 flink-connector-kafka 의존성이 필요했는데, 프로젝트의 flink버전인 1.16.0이 지원이 안되고 1.14.6까지밖에 없어서 한참을 헤맸다 ㅠ 결국은 프로젝트 자체를 과거버전을 받아서 flink버전을 낮춰서 겨우 해결됨
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
문제2.
사이트에는 안나오는데 아래 의존성들이 모두 필요했고, 하나하나 exception뜨는거 보면서 해결해 나가야 했다 ㄷ
이걸 pom.xml에 넣어주면 얼마나 좋아 ㅠ
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>
그런데 조금 더 파악해보니 위의 내용을 직접 pom.xml에 써줄 필요가 없었다.
아래 내용이 Dockerfile에 있음을 나중에 확인
FROM maven:3.8-eclipse-temurin-17 AS builder
COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean verify -Dmaven.test.skip
FROM apache/flink:1.16.0-scala_2.12-java11
# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/flink-connector-jdbc-1.16.0.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.16.0/flink-csv-1.16.0.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;
COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar
RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;