null처리를 하다보면 코드가 지저분해지고, 실수하기가 쉬우며, 실수했을때는 NullPointException이 발생하게 된다. 

java8부터는 이것을 개선하기위해 null이 될 수 있는 객체를 Optional에 넣어서 명시적으로 메소드를 통해 다룰수 있도록 만들었다. 이를 통해 체이닝이 가능해지며 코딩오류 가능성이 줄게 되었다.

(단 람다함수 및 method reference등으로 인해 다소 이해하기 복잡해보일 순 있다)

 

코드예시는 여기 참조

내가 만든 코드 예시는 아래 참조

https://github.com/sevity/problem_solving/blob/main/java/OptionalExample.java

반응형

'Programming > JAVA' 카테고리의 다른 글

reactor #1  (1) 2024.01.01
BiFunction, Function을 사용한 함수형 프로그래밍  (0) 2024.01.01
java enum  (0) 2023.11.03
IntelliJ 팁  (0) 2023.11.03
java공부  (0) 2023.08.12

c++ enum과 다르게 java의 enum은 좀 더 발전된 형태로 상수의 집합이 아닌 타입 자체가 객체이다.

따라서 enum type 마다 속성과 메서드를 가질 수 있으며 생성자로 초기화 한다.

public enum CoffeeSize {
    SMALL(200, 3000),
    MEDIUM(400, 5000),
    LARGE(600, 7000),
    XLARGE(1000, 9000);

    private final int volumeInMl;
    private final long priceInWon;

    CoffeeSize(int volumeInMl, long priceInWon) {
        this.volumeInMl = volumeInMl;
        this.priceInWon = priceInWon;
    }
}

위 예시를 보면 커피 사이즈를 enum으로 SMAL, MEDIUM, LARGE, XLARGE로 나누는데, 각각 볼륨과 가격의 속성을 추가로 가진다.

반응형

'Programming > JAVA' 카테고리의 다른 글

BiFunction, Function을 사용한 함수형 프로그래밍  (0) 2024.01.01
java Optional  (1) 2023.11.04
IntelliJ 팁  (0) 2023.11.03
java공부  (0) 2023.08.12
react/next.js로 만드는 online-judge 사이트 #4 문제관리 프론트엔드  (0) 2023.08.05

 

 

Call Hierachy

Source Insight 처럼 런타임이 아닌데도 콜스택 그래프를 볼 수 있도록 해준다.

오픈소스 분석시 매우 유용하다.

우측 callers of countByProblemId 주목

접근: 메뉴 > 탐색 > 호출계층구조 (Ctrl+Alt+H)

런타임이 아니기 때문에 디버깅시 나오는 콜스택이 외길인데 반해서 콜러가 여러군데로 나오는점이 있으니 분석시 유의.

반응형

브리지 네트워크 (Bridge Network):

Docker에서 기본적으로 제공되는 네트워크 모드입니다.
컨테이너가 독립적인 IP 주소를 가지며, 호스트와는 브리지 네트워크를 통해 통신합니다.

 

bridge는 Docker의 기본 네트워크 드라이버 중 하나입니다. Docker가 설치되면 기본적으로 생성되는 네트워크 모드로, 개별 컨테이너들이 독립된 네트워크 네임스페이스에서 동작하게 합니다. 이렇게 하면 컨테이너 간의 네트워크 통신이 보안적으로 격리됩니다.

bridge 네트워크에 대한 주요 특징은 다음과 같습니다:

  • 격리: 각 컨테이너는 독립된 네트워크 네임스페이스를 가지기 때문에 서로 격리됩니다.
  • 자동 IP 할당: Docker의 내장 DHCP 서버에 의해 자동으로 IP 주소가 할당됩니다. 보통 172.17.0.x 범위의 주소가 기본으로 사용됩니다.
  • 포트 매핑: 호스트 시스템의 특정 포트를 컨테이너의 특정 포트에 매핑(포워딩)할 수 있습니다. 예를 들어, 호스트의 8080 포트를 컨테이너의 80 포트에 매핑할 수 있습니다.
  • 컨테이너 간 통신: 같은 bridge 네트워크에 있는 컨테이너끼리는 IP 주소를 통해 서로 통신할 수 있습니다.

그러나 bridge 네트워크 모드에는 몇 가지 제한사항도 있습니다

  • 외부 네트워크와의 통신은 NAT(Network Address Translation)를 통해 이루어지기 때문에 성능 오버헤드가 있을 수 있습니다.
  • 호스트와 컨테이너 간의 통신을 위해서는 포트 매핑이 필요합니다.
  • 다른 호스트에 있는 컨테이너와의 통신은 기본적으로 지원되지 않습니다.
  • 이러한 제한사항 때문에 복잡한 배포나 여러 호스트 간의 통신이 필요한 경우에는 overlay, macvlan 등의 다른 네트워크 드라이버를 사용하기도 합니다.

 

호스트 네트워크 (Host Network):

컨테이너가 호스트의 네트워크 네임스페이스를 사용합니다.
컨테이너는 호스트의 IP 주소와 포트를 직접 사용할 수 있습니다.

반응형

'Programming > Linux' 카테고리의 다른 글

docker-compose  (0) 2023.10.28
Nginx  (0) 2023.08.16
Redis  (0) 2023.08.10
kafka 설치(우분투 기존)  (0) 2023.07.31
ELK연습  (0) 2023.07.30

여기 참조

Windows는 무한 스트림 처리의 핵심입니다. 

Windows는 스트림을 유한한 크기의 "버킷"으로 분할하여 계산을 적용할 수 있습니다.

 

주요개념 요약

윈도우 크기

  • tumbling window: 비중첩(non-overlapping) window
  • sliding window: 중첩 window

시간

  • 발생 시간 (Event Time): 이벤트가 실제로 발생한 시간을 의미합니다. 예를 들어, 센서가 데이터를 생성하거나 사용자가 클릭한 시간 등 실제 사건이 일어난 시간을 나타냅니다.
  • 수집 시간 (Ingestion Time): 이벤트가 시스템 또는 스트리밍 플랫폼에 처음 도착한 시간을 나타냅니다. 발생 시간과 수집 시간 사이에는 네트워크 지연, 메시지 큐 지연 등 다양한 요인으로 인해 차이가 발생할 수 있습니다.
  • 처리 시간 (Processing Time): 이벤트가 실제로 처리되기 시작한 시간을 의미합니다. 처리 시간은 이벤트가 시스템에 도착한 순서대로 처리되므로, 발생 시간이나 수집 시간과는 다를 수 있습니다.

지연(latency)

  • 이벤트 시간 지연 (Event Time Latency):  이벤트가 실제로 발생한 시간 (Event Time)과 이벤트가 시스템에서 처리되는 시간 (Processing Time) 사이의 차이를 나타냅니다.

워터마크

  • 워터마크는 "지금까지 도착한 데이터 중에서 가장 늦게 발생한 이벤트의 시간"을 나타냅니다. 예를 들어, "12:06"의 워터마크가 발행된다면, 12:06 이전에 발생한 모든 이벤트들은 시스템에 이미 도착했거나 곧 도착할 것임을 의미합니다. 즉, 12:06 이후에 12:05의 이벤트가 도착할 확률은 없다는 것을 보장합니다.
  • 이렇게 워터마크를 사용하면, 스트리밍 데이터에서의 시간 관련 처리를 더 정확하게 할 수 있습니다. 워터마크가 "12:06"으로 설정되면, 시스템은 12:06 이전의 모든 이벤트를 안정적으로 처리할 수 있음을 알게 됩니다.

 

window의 생명주기

간단히 말해서, window에 속해야 하는 첫 번째 요소가 도착하자마자 window가 생성되고(고정 윈도우 방식이 아니네)

시간(이벤트 또는 처리 시간)이 종료 타임스탬프 더하기 사용자가 허용한 지연 시간을 지나면 창이 완전히 제거됩니다.

 

Flink는 시간 기반 창에 대해서만 제거를 보장하며 다른 유형( 예: 전역 창)에 대해서는 제거를 보장하지 않습니다(window 할당자 참조 ). 예를 들어, 5분마다 비중첩(또는 텀블링) 윈도우를 생성하고 1분의 허용 지연 시간을 가진 '이벤트 시간 기반' 윈도우 전략의 경우, Flink는 이 간격에 속하는 타임스탬프를 가진 첫 번째 요소가 도착하면 12:00과 12:05 사이의 새 윈도우를 생성하고, 워터마크가 12:06 타임스탬프를 지나가면 제거합니다.

 

또한 각 윈도우에는 Trigger 및 함수( ProcessWindowFunction, ReduceFunction또는 AggregateFunction)( window 함수 참조 )가 연결되어 있습니다

함수는 윈도우 내용에 적용될 계산을 포함하며, 트리거는 윈도우가 함수가 적용될 준비가 되었는지를 나타내는 조건을 지정합니다. 트리거링 정책은 "윈도우 내의 요소 수가 4개 이상일 때"나 "워터마크가 윈도우의 끝을 지날 때"와 같은 것일 수 있습니다. 트리거는 생성과 제거 사이의 어느 시점이든 윈도우의 내용을 삭제하기로 결정할 수도 있습니다.

 

또는 추방자(Evictor)를 사용해서 윈도우 내의 요소들을 제거할수도 있다. 예를 들어 1분 윈도우에 100개가 들어왔는데 10개만 사용하고 나머지 90개는 추방자로 제거.

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

"응용 프로그램"은 사용자가 특정 작업을 수행하기 위해 실행하는 소프트웨어를 가리킵니다.

이 응용 프로그램은 전통적인 모놀리식 구조일 수도 있고, MSA(Microservices Architecture)와 같은 구조일 수도 있습니다.

따라서 Application은  MSA보다 더 넓은 범위를 가집니다.


응용 프로그램(Application)

사용자가 특정 작업을 수행하기 위해 실행하는 소프트웨어의 집합.

전통적인 모놀리식 구조일 수도 있고, MSA(Microservices Architecture)와 같은 구조일 수도 있습니다.

 

MSA (Microservices Architecture)

응용 프로그램을 작은 서비스들로 분리하는 아키텍처 스타일.
각 서비스는 특정 기능에 초점을 맞추며 독립적으로 동작합니다.

반응형

'System Architect' 카테고리의 다른 글

위임(delegate) 패턴  (0) 2024.02.17
graphQL  (0) 2023.10.12
gRPC  (0) 2023.10.11
시스템설계 Q&A 2  (0) 2023.09.20
데이터 분석 관련 정리  (0) 2023.08.19

여러 도커간의 디펜던시를 설정해서 빌드하고 띄우는 일을 할 수 있다.

 

용어

서비스 (Service)

Docker Compose 내에서 쓰이는 개념으로, 도커 컨테이너의 구성 및 실행 방법을 정의한 것.
서비스는 하나의 컨테이너일 수도 있고, 동일한 이미지를 기반으로 하는 여러 컨테이너의 집합일 수도 있다.

예를 들어, docker-compose.yml 파일에 다음과 같은 정의가 있다면:

services:
  web:
    image: nginx:latest
    ports:
      - "80:80"
  database:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=my-password

여기에서 web과 database는 각각 하나의 서비스.
이 경우, 각 서비스는 각각 하나의 컨테이너를 생성하지만, 필요에 따라 한 서비스에서 여러 컨테이너를 생성할 수도 있습니다 (예: 스케일링 목적으로).


결국, "서비스"는 Docker Compose에서 응용 프로그램의 한 부분 (예: 웹 서버, 데이터베이스)을 나타내며, 이 서비스를 구성하는 데 필요한 설정과 함께 도커 이미지 정보를 포함합니다.

 

사용방법

docker-compose build:

이 명령어는 docker-compose.yml 파일에 정의된 모든 서비스의 이미지를 빌드합니다.
docker-compose.yml 파일에서 build: 섹션을 사용하여 Dockerfile의 위치나 빌드 컨텍스트 등을 지정한 서비스에 대해서만 이 명령어를 사용해야 합니다.


docker-compose up -d:

위에서 빌드한 이미지들의 실행 인스턴스인 컨테이너들을 만들어 실행
-d 옵션은 "detached mode"를 의미하며, 백그라운드에서 컨테이너를 실행하라는 것을 의미
이미지가 없다면 빌드도 시도하기 때문에 docker-compose build 없이 이것만 실행해도 되긴 하나,

코드 변경시 재빌드해주진 않기 때문에 코드 변경이 있었다면 build후 up해야 한다.

 

docker-compose down -v:

up과 반대로 컨테이너들 중단하고 없애준다.

-v하면 관련 데이터 볼륨도 제거해준다.

여기서 데이터 볼륨(Data Volume)은 Docker 컨테이너 파일 시스템의 일부를 호스트 시스템에 저장하는데 사용되는 매커니즘으로, 컨테이너간 데이터를 공유하는 공유폴더 개념으로 쓰이거나 데이터를 영구적으로 저장할수 있게 해준다.

그 예시로는 (DB, 로그, config file등이 있다)

 

 

docker-compose ps

docker-compose로 관리되는 컨테이너들의 현재 상태와 관련된 정보를 조회

 

다음내용 확인가능

  • 서비스 상태 확인: docker-compose up 명령을 사용하여 여러 서비스를 시작한 후, 각 서비스의 컨테이너가 올바르게 실행 중인지, 혹은 중지된 상태인지를 확인하기 위해 사용됩니다.
  • 문제 진단: 서비스에 문제가 발생했을 때, 어떤 컨테이너가 문제를 일으키고 있는지 확인하기 위해 사용될 수 있습니다. 예를 들어, 컨테이너가 계속 재시작되는 경우, docker-compose ps를 통해 해당 컨테이너의 상태를 확인할 수 있습니다.
  • 포트 정보 확인: 각 컨테이너에서 어떤 포트가 호스트와 연결되었는지 확인하려 할 때 사용됩니다.
  • 스케일링 확인: docker-compose up --scale 명령을 사용하여 서비스의 인스턴스 수를 조정한 경우, 현재 실행 중인 인스턴스 수를 확인하기 위해 사용됩니다.
  • 컨테이너 이름 확인: docker-compose로 시작된 컨테이너들은 특정한 네이밍 규칙을 가지고 있습니다. 이 이름을 확인하기 위해 docker-compose ps를 사용할 수 있습니다.

아래는 예시

               Name                             Command               State                                  Ports
--------------------------------------------------------------------------------------------------------------------------------------------------
table-walkthrough_data-generator_1   /docker-entrypoint.sh            Up
table-walkthrough_grafana_1          /run.sh                          Up      0.0.0.0:3000->3000/tcp,:::3000->3000/tcp
table-walkthrough_jobmanager_1       /docker-entrypoint.sh stan ...   Up      6123/tcp, 0.0.0.0:8082->8081/tcp,:::8082->8081/tcp
table-walkthrough_kafka_1            start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp
table-walkthrough_mysql_1            docker-entrypoint.sh --def ...   Up      3306/tcp, 33060/tcp
table-walkthrough_taskmanager_1      /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
table-walkthrough_zookeeper_1        /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
sevity@sevityubuntu:~/workspace/flink-playgrounds/table-walkthrough$
반응형

'Programming > Linux' 카테고리의 다른 글

docker network  (0) 2023.10.29
Nginx  (0) 2023.08.16
Redis  (0) 2023.08.10
kafka 설치(우분투 기존)  (0) 2023.07.31
ELK연습  (0) 2023.07.30

여기 내용을 실습

 

설치

https://github.com/apache/flink-playgrounds 여기 git clone하고 intelliJ로 열었다.

그 안의 table-walkthrough 폴더가 실습하려는 대상 폴더 이다.

 

동작방식

/flink-playgrounds/table-walkthrough에서 docker-compose build를 수행하면 필요한 이미지들이 만들어지고

docker-compose up -d를 하면 컨테이너가 만들어지면서 마법처럼 모든것들이 수행된다.

단 처음에는 report()함수를 수정하기 전에는 의도적으로 exception이 발생하도록 되어 있으니 주의

약간특이한점은 일반적으로 Flink에서는 jobmanager와 taskmanager를 먼저 시작하여 클러스터를 구성한 다음, 나중에 사용자 애플리케이션을 제출하여 실행되나.

이 예제의 경우 제시된 docker-compose.yml 파일과 Dockerfile 설정에서는 약간 다르게 작동.

이 구성에서는 jobmanager 서비스가 시작될 때 standalone-job 명령어가 지정되어 있으며, 이를 통해 standalone 모드에서 작업을 실행하도록 지시. 또한 Dockerfile에서 spend-report.jar 파일을 usrlib 디렉토리에 복사하는 부분이 있다. Flink는 이 디렉토리의 JAR 파일을 자동으로 인식하고 로드하여, jobmanager 서비스가 시작될 때 spend-report.jar 파일에 포함된 Flink 애플리케이션을 자동으로 실행​.
(이것때문에 많이 헷갈림)

 

 

flink 코딩에는 다음과 같은 개념들이 사용된다.

실행환경: 배치인지 스트림인지, 소스는 무엇인지

소스설정: 파일인지 DB인지 등

  • 소스 설정 과정에서 catalog라는 용어가 등장하는데..
    데이터 자체가 아닌 데이터의 구조(메타데이터)를 나타내는 개념으로,
    flink에서만 쓰이는 용어는 아니고 작게는 스키마와 비슷한 의미를 가진다. 스키마와 카탈로그의 차이는 다음을 참조
"스카마"와 "카탈로그"는 서로 다른 단계의 메타데이터 구조를 나타내기 때문에 두 용어가 모두 존재

스카마 (Schema):스카마는 특정 데이터베이스나 애플리케이션에서 사용되는 테이블, 뷰, 인덱스, 저장 프로시저 등의 객체와 그 구조를 정의합니다.예를 들어, 어떤 테이블이 어떤 컬럼들로 구성되어 있고, 각 컬럼의 데이터 타입은 무엇인지를 정의하는 것이 스카마의 역할

카탈로그 (Catalog):카탈로그는 데이터베이스 시스템 전체에서 사용되는 여러 스카마와 그 객체들에 대한 정보를 포함하는 중앙 메타데이터 저장소.
대규모 데이터베이스 환경에서는 여러 데이터베이스와 애플리케이션이 있을 수 있으며, 이러한 다양한 데이터 소스와 그 구조를 중앙에서 통합적으로 관리하기 위해 카탈로그가 필요.예를 들어, 하나의 데이터베이스 관리 시스템 내에 여러 데이터베이스가 있을 수 있고, 각 데이터베이스에는 여러 스카마가 있을 수 있다. 이런 구조를 관리하기 위해 카탈로그가 사용됨.

간단히 말하면, 스카마는 "무엇"을 정의하고, 카탈로그는 "어디에" 그것이 있는지를 정의.
대형 데이터 시스템에서는 다양한 데이터 소스와 데이터베이스가 있을 수 있으므로, 이런 복잡한 환경에서 메타데이터를 효과적으로 관리하기 위해 카탈로그 개념이 필요.

프로세스: 소스로 부터 읽어온 데이터를 처리하는 과정

싱크: 처리한 데이터를 어떻게 결과적으로 사용할지 정하는 과정. 리포팅이 될수도 있고 Alert이 될수도 있고 등.

 

코드레벨로 살펴보자.

실행환경

소스설정

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

위처럼 카탈로그 설정을 하는데, 마치 SQL DDL처럼 보이지만 RDBMS에서의 테이블이 실제 생성되는 것은 아니고 Flink의 Table API내에서 가상의 테이블을 정의하는 것이며, WATERMARK, WITH등 flink전용 키워드가 들어가 있음에 주의. 

  • WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND:
    이 부분은 워터마크를 정의하는 것입니다. 워터마크는 Flink에서 스트림 처리 시간의 개념을 다룰 때 사용되는 것으로, 이 경우 transaction_time을 기준으로 5초의 지연을 허용한다는 의미입니다.
  • WITH ('connector' = 'kafka', ...):
    이 부분은 테이블이 어떤 외부 시스템과 연결되어 있는지, 그리고 그 연결을 위한 설정 정보를 제공하는 부분입니다.
    connector는 'kafka'로 설정되어 있어 Kafka와의 연동을 의미합니다.
    topic은 Kafka의 transactions 토픽에서 데이터를 읽겠다는 것을 나타냅니다.
    properties.bootstrap.servers는 Kafka 브로커의 주소를 나타냅니다.
    format은 데이터의 형식을 나타내는데, 이 경우 CSV 형식을 사용한다는 것을 의미합니다.

프로세스

싱크설정

CREATE TABLE spend_report (
    account_id BIGINT,              -- account_id라는 이름의 BIGINT 타입의 칼럼
    log_ts     TIMESTAMP(3),       -- log_ts라는 이름의 TIMESTAMP 타입의 칼럼 (밀리초 정밀도)
    amount     BIGINT,             -- amount라는 이름의 BIGINT 타입의 칼럼
    PRIMARY KEY (account_id, log_ts) NOT ENFORCED -- account_id와 log_ts를 기반으로 하는 기본 키를 정의. NOT ENFORCED는 키 제약 조건을 강제하지 않는다는 의미
) 
WITH (
   'connector'  = 'jdbc',                  -- JDBC 커넥터를 사용
   'url'        = 'jdbc:mysql://mysql:3306/sql-demo', -- MySQL 데이터베이스에 연결하기 위한 JDBC URL
   'table-name' = 'spend_report',          -- 실제 RDBMS에서의 테이블 이름은 'spend_report'
   'driver'     = 'com.mysql.jdbc.Driver', -- MySQL을 위한 JDBC 드라이버
   'username'   = 'sql-demo',              -- 데이터베이스 연결에 사용할 사용자 이름
   'password'   = 'demo-sql'               -- 데이터베이스 연결에 사용할 비밀번호
);

이 코드는 Flink의 테이블 환경(tEnv) 내에서 spend_report라는 가상 테이블을 정의합니다. 이 테이블은 MySQL 데이터베이스의 sql-demo 스키마 내의 spend_report 테이블과 연결됩니다. 따라서, Flink에서 이 테이블에 대해 실행되는 쿼리는 실제로 해당 MySQL 테이블에 반영될 수 있습니다.

 

소스-싱크연결

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

 

 

트러블슈팅

github에서 받은 다음 그냥은 잘 수행되는데, report()함수 구현을 넣은 순간부터 여러가지 문제가 발생했다.

문제1.

아래 flink-connector-kafka 의존성이 필요했는데, 프로젝트의 flink버전인 1.16.0이 지원이 안되고 1.14.6까지밖에 없어서 한참을 헤맸다 ㅠ 결국은 프로젝트 자체를 과거버전을 받아서 flink버전을 낮춰서 겨우 해결됨

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

문제2.

사이트에는 안나오는데 아래 의존성들이 모두 필요했고, 하나하나 exception뜨는거 보면서 해결해 나가야 했다 ㄷ

이걸 pom.xml에 넣어주면 얼마나 좋아 ㅠ

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
        </dependency>

그런데 조금 더 파악해보니 위의 내용을 직접 pom.xml에 써줄 필요가 없었다.

아래 내용이 Dockerfile에 있음을 나중에 확인

FROM maven:3.8-eclipse-temurin-17 AS builder

COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean verify -Dmaven.test.skip

FROM apache/flink:1.16.0-scala_2.12-java11

# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/flink-connector-jdbc-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.16.0/flink-csv-1.16.0.jar; \
    wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;

COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar

RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
    echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
    echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

 

cluster

JobManager:

역할: Flink 클러스터의 마스터 노드 역할을 합니다.
책임:
사용자의 프로그램을 시작하고, 중지하며, 장애 복구를 관리합니다.
프로그램이 Flink 클러스터에서 실행되기 위한 작업 분해 및 스케줄링을 담당합니다.
모든 TaskManager들과 통신하며, 각 TaskManager의 상태와 작업 진행 상황을 모니터링합니다.
Checkpointing 및 Savepoint를 관리하여 상태 지속성을 보장합니다.


TaskManager:

역할: Flink 클러스터의 워커 노드 역할을 합니다.
책임:
실제로 데이터 스트림 처리 작업(예: 맵핑, 리듀싱, 필터링 등)을 수행하는 곳입니다.
각 TaskManager는 여러 스레드에서 동시에 여러 태스크를 실행할 수 있습니다.
데이터 처리와 상태 관리를 위한 로컬 저장소를 제공합니다.
작업이 완료되면 결과를 JobManager에 보고합니다.

 

 

ValueState

key(flink 스트림 데이터에서 개발자가 지정한 특정컬럼을 의미)별로 한번씩 초기화 되며, key-value쌍을 저장하는데 사용된다.

일반 map 자료구조형을 쓰는것 대비해서 스레드 안정성 및 일관성 내결함성 Atomic등을 보장

Boolean으로 하더라도 null, true, false의 tri-state를 가지므로 코딩시 주의해야한다.

(아래 코드에서는 그래서 true, null 만사용하고 false란 값은 일부러 사용하지 않았음에 주의. clear()하면 null된다. )

//멤버변수선언은 아래처럼
private transient ValueState<Boolean> flagState;

---

//아래는key별로 한번씩만 초기화(AbstractRichFunction의 open()을 override해서 사용)
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);

---

//아래는 processsElement() 안에서 사용
Boolean bFlag = flagState.value();  // 주의할점은 bFlag를 직접변경해봤자 아래 update()하기전에는 반영안된다는것. 따라서 모든 변경은 update()를 호출하는 방식으로 코딩해야한다.
flagState.update(true);
flagState.clear();

자세한 사용법은 여기참조

반응형

'Data Engineering' 카테고리의 다른 글

flink window  (0) 2023.10.29
flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02

branch생성시점의 master가 너무 오래돼서, 최신 master의 branch로 업데이트 하고 싶을때

방법1. merge

방법2. rebase

 

타인이 올린 PR의 Files changed를 터미널에서 확인하기

현재 master의 최신이 아닌 PR을 올린사람이 작업을 시작할때의 master가 기준이 됨에 유의

git fetch origin
git diff $(git merge-base origin/master origin/pr-branch-name)..origin/pr-branch-name

 

 

 

타인이 올린 PR을 내 로컬로 가져와서 검토하기

 

예를들어 위와 같은 PR이 올라왔다고 하자.

git fetch origin pull/51/head:bump-twisted 라고 치면 로컬에 "bump-twisted"라고 하는 브랜치가 생성된다(이것은 임의로 지정한 이름이며 생략하면 "pull/51/head"가 된다)

git branch를 해보면 아래처럼 bump-twisted라는 브랜치가 로컬 저장소에 추가된 것을 볼 수 있다.

$ git branch
* master
  bump-twisted

아직은 master를 바라보고 있어 PR의 내용이 로컬저장소에 반영되지 않았다.

git checkout bump-twisted 라고 치면 반영된다.

만약 PR을 올린사람이 merge전에 PR의 내용을 변경했다면

git pull origin pull/51/head:bump-twisted 다시 이렇게 치면 된다. 

 

위 방법대신

git branch -r을 해보면 다음처럼 이미 해당 PR들의 branch 들이 생성되어 있는 것을 볼 수 있는데,

(coin) sevity@raspberrypi:~/workspace/temp/coin_strategy $ git branch -r
  origin/HEAD -> origin/master
  origin/dependabot/pip/tornado-6.3.3
  origin/dependabot/pip/twisted-23.8.0
  origin/dependabot/pip/urllib3-1.26.18
  origin/master

여기서 해당 PR을 확인하고 git pull origin dependabot/pip/twisted-23.8.0:bump-twisted 이렇게 하는 방법도 있다.

 

 

반응형

'Programming > Git' 카테고리의 다른 글

git 초기설정  (0) 2023.06.06
git log  (0) 2019.12.09
git 자주 쓰는 명령어 모음  (0) 2019.09.27
git branch 관련  (0) 2019.04.17
github  (0) 2018.11.07

+ Recent posts