여러 도커간의 디펜던시를 설정해서 빌드하고 띄우는 일을 할 수 있다.

 

용어

서비스 (Service)

Docker Compose 내에서 쓰이는 개념으로, 도커 컨테이너의 구성 및 실행 방법을 정의한 것.
서비스는 하나의 컨테이너일 수도 있고, 동일한 이미지를 기반으로 하는 여러 컨테이너의 집합일 수도 있다.

예를 들어, docker-compose.yml 파일에 다음과 같은 정의가 있다면:

services:
  web:
    image: nginx:latest
    ports:
      - "80:80"
  database:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=my-password

여기에서 web과 database는 각각 하나의 서비스.
이 경우, 각 서비스는 각각 하나의 컨테이너를 생성하지만, 필요에 따라 한 서비스에서 여러 컨테이너를 생성할 수도 있습니다 (예: 스케일링 목적으로).


결국, "서비스"는 Docker Compose에서 응용 프로그램의 한 부분 (예: 웹 서버, 데이터베이스)을 나타내며, 이 서비스를 구성하는 데 필요한 설정과 함께 도커 이미지 정보를 포함합니다.

 

사용방법

docker-compose build:

이 명령어는 docker-compose.yml 파일에 정의된 모든 서비스의 이미지를 빌드합니다.
docker-compose.yml 파일에서 build: 섹션을 사용하여 Dockerfile의 위치나 빌드 컨텍스트 등을 지정한 서비스에 대해서만 이 명령어를 사용해야 합니다.


docker-compose up -d:

위에서 빌드한 이미지들의 실행 인스턴스인 컨테이너들을 만들어 실행
-d 옵션은 "detached mode"를 의미하며, 백그라운드에서 컨테이너를 실행하라는 것을 의미
이미지가 없다면 빌드도 시도하기 때문에 docker-compose build 없이 이것만 실행해도 되긴 하나,

코드 변경시 재빌드해주진 않기 때문에 코드 변경이 있었다면 build후 up해야 한다.

 

docker-compose down -v:

up과 반대로 컨테이너들 중단하고 없애준다.

-v하면 관련 데이터 볼륨도 제거해준다.

여기서 데이터 볼륨(Data Volume)은 Docker 컨테이너 파일 시스템의 일부를 호스트 시스템에 저장하는데 사용되는 매커니즘으로, 컨테이너간 데이터를 공유하는 공유폴더 개념으로 쓰이거나 데이터를 영구적으로 저장할수 있게 해준다.

그 예시로는 (DB, 로그, config file등이 있다)

 

 

docker-compose ps

docker-compose로 관리되는 컨테이너들의 현재 상태와 관련된 정보를 조회

 

다음내용 확인가능

  • 서비스 상태 확인: docker-compose up 명령을 사용하여 여러 서비스를 시작한 후, 각 서비스의 컨테이너가 올바르게 실행 중인지, 혹은 중지된 상태인지를 확인하기 위해 사용됩니다.
  • 문제 진단: 서비스에 문제가 발생했을 때, 어떤 컨테이너가 문제를 일으키고 있는지 확인하기 위해 사용될 수 있습니다. 예를 들어, 컨테이너가 계속 재시작되는 경우, docker-compose ps를 통해 해당 컨테이너의 상태를 확인할 수 있습니다.
  • 포트 정보 확인: 각 컨테이너에서 어떤 포트가 호스트와 연결되었는지 확인하려 할 때 사용됩니다.
  • 스케일링 확인: docker-compose up --scale 명령을 사용하여 서비스의 인스턴스 수를 조정한 경우, 현재 실행 중인 인스턴스 수를 확인하기 위해 사용됩니다.
  • 컨테이너 이름 확인: docker-compose로 시작된 컨테이너들은 특정한 네이밍 규칙을 가지고 있습니다. 이 이름을 확인하기 위해 docker-compose ps를 사용할 수 있습니다.

아래는 예시

               Name                             Command               State                                  Ports
--------------------------------------------------------------------------------------------------------------------------------------------------
table-walkthrough_data-generator_1   /docker-entrypoint.sh            Up
table-walkthrough_grafana_1          /run.sh                          Up      0.0.0.0:3000->3000/tcp,:::3000->3000/tcp
table-walkthrough_jobmanager_1       /docker-entrypoint.sh stan ...   Up      6123/tcp, 0.0.0.0:8082->8081/tcp,:::8082->8081/tcp
table-walkthrough_kafka_1            start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp
table-walkthrough_mysql_1            docker-entrypoint.sh --def ...   Up      3306/tcp, 33060/tcp
table-walkthrough_taskmanager_1      /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
table-walkthrough_zookeeper_1        /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
sevity@sevityubuntu:~/workspace/flink-playgrounds/table-walkthrough$
반응형

'Programming > Linux' 카테고리의 다른 글

docker network  (0) 2023.10.29
Nginx  (0) 2023.08.16
Redis  (0) 2023.08.10
kafka 설치(우분투 기존)  (0) 2023.07.31
ELK연습  (0) 2023.07.30

여기 내용을 실습

 

설치

https://github.com/apache/flink-playgrounds 여기 git clone하고 intelliJ로 열었다.

그 안의 table-walkthrough 폴더가 실습하려는 대상 폴더 이다.

 

동작방식

/flink-playgrounds/table-walkthrough에서 docker-compose build를 수행하면 필요한 이미지들이 만들어지고

docker-compose up -d를 하면 컨테이너가 만들어지면서 마법처럼 모든것들이 수행된다.

단 처음에는 report()함수를 수정하기 전에는 의도적으로 exception이 발생하도록 되어 있으니 주의

약간특이한점은 일반적으로 Flink에서는 jobmanager와 taskmanager를 먼저 시작하여 클러스터를 구성한 다음, 나중에 사용자 애플리케이션을 제출하여 실행되나.

이 예제의 경우 제시된 docker-compose.yml 파일과 Dockerfile 설정에서는 약간 다르게 작동.

이 구성에서는 jobmanager 서비스가 시작될 때 standalone-job 명령어가 지정되어 있으며, 이를 통해 standalone 모드에서 작업을 실행하도록 지시. 또한 Dockerfile에서 spend-report.jar 파일을 usrlib 디렉토리에 복사하는 부분이 있다. Flink는 이 디렉토리의 JAR 파일을 자동으로 인식하고 로드하여, jobmanager 서비스가 시작될 때 spend-report.jar 파일에 포함된 Flink 애플리케이션을 자동으로 실행​.
(이것때문에 많이 헷갈림)

 

 

flink 코딩에는 다음과 같은 개념들이 사용된다.

실행환경: 배치인지 스트림인지, 소스는 무엇인지

소스설정: 파일인지 DB인지 등

  • 소스 설정 과정에서 catalog라는 용어가 등장하는데..
    데이터 자체가 아닌 데이터의 구조(메타데이터)를 나타내는 개념으로,
    flink에서만 쓰이는 용어는 아니고 작게는 스키마와 비슷한 의미를 가진다. 스키마와 카탈로그의 차이는 다음을 참조
"스카마"와 "카탈로그"는 서로 다른 단계의 메타데이터 구조를 나타내기 때문에 두 용어가 모두 존재

스카마 (Schema):스카마는 특정 데이터베이스나 애플리케이션에서 사용되는 테이블, 뷰, 인덱스, 저장 프로시저 등의 객체와 그 구조를 정의합니다.예를 들어, 어떤 테이블이 어떤 컬럼들로 구성되어 있고, 각 컬럼의 데이터 타입은 무엇인지를 정의하는 것이 스카마의 역할

카탈로그 (Catalog):카탈로그는 데이터베이스 시스템 전체에서 사용되는 여러 스카마와 그 객체들에 대한 정보를 포함하는 중앙 메타데이터 저장소.
대규모 데이터베이스 환경에서는 여러 데이터베이스와 애플리케이션이 있을 수 있으며, 이러한 다양한 데이터 소스와 그 구조를 중앙에서 통합적으로 관리하기 위해 카탈로그가 필요.예를 들어, 하나의 데이터베이스 관리 시스템 내에 여러 데이터베이스가 있을 수 있고, 각 데이터베이스에는 여러 스카마가 있을 수 있다. 이런 구조를 관리하기 위해 카탈로그가 사용됨.

간단히 말하면, 스카마는 "무엇"을 정의하고, 카탈로그는 "어디에" 그것이 있는지를 정의.
대형 데이터 시스템에서는 다양한 데이터 소스와 데이터베이스가 있을 수 있으므로, 이런 복잡한 환경에서 메타데이터를 효과적으로 관리하기 위해 카탈로그 개념이 필요.

프로세스: 소스로 부터 읽어온 데이터를 처리하는 과정

싱크: 처리한 데이터를 어떻게 결과적으로 사용할지 정하는 과정. 리포팅이 될수도 있고 Alert이 될수도 있고 등.

 

코드레벨로 살펴보자.

실행환경

소스설정

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

위처럼 카탈로그 설정을 하는데, 마치 SQL DDL처럼 보이지만 RDBMS에서의 테이블이 실제 생성되는 것은 아니고 Flink의 Table API내에서 가상의 테이블을 정의하는 것이며, WATERMARK, WITH등 flink전용 키워드가 들어가 있음에 주의. 

  • WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND:
    이 부분은 워터마크를 정의하는 것입니다. 워터마크는 Flink에서 스트림 처리 시간의 개념을 다룰 때 사용되는 것으로, 이 경우 transaction_time을 기준으로 5초의 지연을 허용한다는 의미입니다.
  • WITH ('connector' = 'kafka', ...):
    이 부분은 테이블이 어떤 외부 시스템과 연결되어 있는지, 그리고 그 연결을 위한 설정 정보를 제공하는 부분입니다.
    connector는 'kafka'로 설정되어 있어 Kafka와의 연동을 의미합니다.
    topic은 Kafka의 transactions 토픽에서 데이터를 읽겠다는 것을 나타냅니다.
    properties.bootstrap.servers는 Kafka 브로커의 주소를 나타냅니다.
    format은 데이터의 형식을 나타내는데, 이 경우 CSV 형식을 사용한다는 것을 의미합니다.

프로세스

싱크설정

CREATE TABLE spend_report (
    account_id BIGINT,              -- account_id라는 이름의 BIGINT 타입의 칼럼
    log_ts     TIMESTAMP(3),       -- log_ts라는 이름의 TIMESTAMP 타입의 칼럼 (밀리초 정밀도)
    amount     BIGINT,             -- amount라는 이름의 BIGINT 타입의 칼럼
    PRIMARY KEY (account_id, log_ts) NOT ENFORCED -- account_id와 log_ts를 기반으로 하는 기본 키를 정의. NOT ENFORCED는 키 제약 조건을 강제하지 않는다는 의미
) 
WITH (
   'connector'  = 'jdbc',                  -- JDBC 커넥터를 사용
   'url'        = 'jdbc:mysql://mysql:3306/sql-demo', -- MySQL 데이터베이스에 연결하기 위한 JDBC URL
   'table-name' = 'spend_report',          -- 실제 RDBMS에서의 테이블 이름은 'spend_report'
   'driver'     = 'com.mysql.jdbc.Driver', -- MySQL을 위한 JDBC 드라이버
   'username'   = 'sql-demo',              -- 데이터베이스 연결에 사용할 사용자 이름
   'password'   = 'demo-sql'               -- 데이터베이스 연결에 사용할 비밀번호
);

이 코드는 Flink의 테이블 환경(tEnv) 내에서 spend_report라는 가상 테이블을 정의합니다. 이 테이블은 MySQL 데이터베이스의 sql-demo 스키마 내의 spend_report 테이블과 연결됩니다. 따라서, Flink에서 이 테이블에 대해 실행되는 쿼리는 실제로 해당 MySQL 테이블에 반영될 수 있습니다.

 

소스-싱크연결

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

 

 

트러블슈팅

github에서 받은 다음 그냥은 잘 수행되는데, report()함수 구현을 넣은 순간부터 여러가지 문제가 발생했다.

문제1.

아래 flink-connector-kafka 의존성이 필요했는데, 프로젝트의 flink버전인 1.16.0이 지원이 안되고 1.14.6까지밖에 없어서 한참을 헤맸다 ㅠ 결국은 프로젝트 자체를 과거버전을 받아서 flink버전을 낮춰서 겨우 해결됨

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

문제2.

사이트에는 안나오는데 아래 의존성들이 모두 필요했고, 하나하나 exception뜨는거 보면서 해결해 나가야 했다 ㄷ

이걸 pom.xml에 넣어주면 얼마나 좋아 ㅠ

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
        </dependency>

그런데 조금 더 파악해보니 위의 내용을 직접 pom.xml에 써줄 필요가 없었다.

아래 내용이 Dockerfile에 있음을 나중에 확인

FROM maven:3.8-eclipse-temurin-17 AS builder

COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean verify -Dmaven.test.skip

FROM apache/flink:1.16.0-scala_2.12-java11

# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/flink-connector-jdbc-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.16.0/flink-csv-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;

COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar

RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
    echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

 

cluster

JobManager:

역할: Flink 클러스터의 마스터 노드 역할을 합니다.
책임:
사용자의 프로그램을 시작하고, 중지하며, 장애 복구를 관리합니다.
프로그램이 Flink 클러스터에서 실행되기 위한 작업 분해 및 스케줄링을 담당합니다.
모든 TaskManager들과 통신하며, 각 TaskManager의 상태와 작업 진행 상황을 모니터링합니다.
Checkpointing 및 Savepoint를 관리하여 상태 지속성을 보장합니다.


TaskManager:

역할: Flink 클러스터의 워커 노드 역할을 합니다.
책임:
실제로 데이터 스트림 처리 작업(예: 맵핑, 리듀싱, 필터링 등)을 수행하는 곳입니다.
각 TaskManager는 여러 스레드에서 동시에 여러 태스크를 실행할 수 있습니다.
데이터 처리와 상태 관리를 위한 로컬 저장소를 제공합니다.
작업이 완료되면 결과를 JobManager에 보고합니다.

 

 

ValueState

key(flink 스트림 데이터에서 개발자가 지정한 특정컬럼을 의미)별로 한번씩 초기화 되며, key-value쌍을 저장하는데 사용된다.

일반 map 자료구조형을 쓰는것 대비해서 스레드 안정성 및 일관성 내결함성 Atomic등을 보장

Boolean으로 하더라도 null, true, false의 tri-state를 가지므로 코딩시 주의해야한다.

(아래 코드에서는 그래서 true, null 만사용하고 false란 값은 일부러 사용하지 않았음에 주의. clear()하면 null된다. )

//멤버변수선언은 아래처럼
private transient ValueState<Boolean> flagState;

---

//아래는key별로 한번씩만 초기화(AbstractRichFunction의 open()을 override해서 사용)
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);

---

//아래는 processsElement() 안에서 사용
Boolean bFlag = flagState.value();  // 주의할점은 bFlag를 직접변경해봤자 아래 update()하기전에는 반영안된다는것. 따라서 모든 변경은 update()를 호출하는 방식으로 코딩해야한다.
flagState.update(true);
flagState.clear();

자세한 사용법은 여기참조

반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

+ Recent posts