time

long currentTime = System.currentTimeMillis();

 

표준입출력처리

leetcode
2
leet code

아래 예시는 위 인풋을 처리할 수 있다.

import java.util.*
public class coupang1 {
    public static void main(String args[]){
        Scanner scanner = new Scanner(System.in);
        String s = scanner.nextLine(); // 문자열 s 입력
        int n = Integer.parseInt(scanner.nextLine()); // 사전 단어의 개수 입력
        String[] wordDictArray = scanner.nextLine().split(" "); // 공백으로 구분된 사전 단어 입력
        List<String> wordDict = Arrays.asList(wordDictArray); // 배열을 리스트로 변환
        
    }
}

Scanner로 속도가 느릴때

인풋개수가 클때는 Scanner가 느릴 수 있다. 이때 bufferedReader를 쓰면 몇 배 빠르게 할 수 있다.

// before
Scanner sc = new Scanner(System.in);
int n = sc.nextInt()
List<Integer> arr = new ArrayList<>();
for(int i=0;i<n;i++) arr.add(sc.nextInt());

// after
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
int n = Integer.parseInt(br.readLine());
List<Integer> arr = new ArrayList<>();
StringTokenizer st = new StringTokenizer(br.readLine());
for (int i = 0; i < n; i++) arr.add(Integer.parseInt(st.nextToken()));

 

자료구조 관련

LinkedList 사용하기

헷갈리는 linkedList사용법을 정리한다.

예시문제:
두 개의 정렬된 연결 리스트로부터 중앙값을 계산하십시오.
예시 1:
L1 = 1 -> 3
L2 = 2
중앙값은 2.0입니다.
예시 2:
L1 = 1->2
L2 = 3->4
중앙값은 (2+3)/2 = 2.5입니다.

input1:
2
1 2
1
3
output1:
2.0

input2:
3
1 3 5
3
2 4 6
output2:
3.5

솔루션

import java.util.*;
public class Main {
    public static void main(String args[]){
        LinkedList<Integer> list1 = new LinkedList<>();
        LinkedList<Integer> list2 = new LinkedList<>();
        Scanner sc = new Scanner(System.in);
        int n1 = sc.nextInt();
        for(int i=0;i<n1;i++) list1.add(sc.nextInt());  // 추가는 add()
        int n2 = sc.nextInt();
        for(int i=0;i<n2;i++) list2.add(sc.nextInt());
        int N = n1+n2;
        //합친 리스트의 개수가 홀수개이면 n/2 번째인덱스
        //짝수개이면 n/2-1, n/2번째 인덱스의 평균이 답
        int mid_six = N%2==1?N/2:N/2-1;
        int mid_eix = N%2==1?N/2:N/2;

        ListIterator<Integer> it1 = list1.listIterator();  // next()하려면 Iterator필요
        ListIterator<Integer> it2 = list2.listIterator();
        int median1 = 0, median2 = 0;
        for (int i = 0; i <= mid_eix; i++) {
            int v1 = it1.hasNext() ? it1.next() : Integer.MAX_VALUE;  // hasNext()눈여겨 보자
            int v2 = it2.hasNext() ? it2.next() : Integer.MAX_VALUE;

            int value;
            if (v1 < v2) {
                value = v1;
                if (v2 < Integer.MAX_VALUE) it2.previous();  // previous()도 가능
            } else {
                value = v2;
                if (v1 < Integer.MAX_VALUE) it1.previous();
            }

            if (i == mid_six) median1 = value;
            if (i == mid_eix) median2 = value;
        }
        System.out.println((double)(median1 + median2) / 2.0);
        sc.close();
    }
}

LinkedList 선언부, ListIterator, next()/previous() 처리 등을 눈여겨 보자. 

근데 사실 이 문제의 경우에는 꼭 LinkedList를 쓸필요는 없다. ArrayList로도 충분.

ArrayList

c++ vector에 해당하며 다음과 같이 쓸 수 있다.

ArrayList<Integer> list = new ArrayList<>();
list.add(10);
list.add(20);
int value = list.get(1); // 20을 가져옵니다.

c++과 다르게 list[1]등으로 배열인덱스는 쓰지 못함에 주의

 

HashMap

c++ map에 해당하며 다음과 같이  쓸 수 있다.

HashMap<Integer, String> map = new HashMap<>();
HashMap<Integer, List<Integer>> map2 = new HashMap<>();
for (int i = 0; i < N; i++) {
    String a = map.getOrDefault(C[i],"");
    List<Integer> b = map2.getOrDefault(C[i],new ArrayList<>());
    a+=S[i];map.put(C[i],a);
    b.add(i);map2.put(C[i],b);
}

getOrDefault()에 주목해보자. 이걸안쓰고 get()을 쓰면 항상null체크를 해줘야 한다.

(C++의 std::map에서는 요청한 키가 존재하지 않으면 해당 키를 자동으로 생성하고 해당 값 타입의 기본 생성자를 호출하여 값을 초기화)

containKey(key)를 쓰면 put()했는지 체크할 수 있다. (c++에서 map.count()와 같은 기능)

 

iteration하기

다음 4가지 정도 방법이 있다.

//방법1. 전통적인 방법.. Entry개념때문에 복잡하다.
    int total_anagrams = 0;
    Iterator<Map.Entry<String,Integer>> iterator = ana_map.entrySet().iterator();
    int total_anagrams = 0;
    while(iterator.hasNext()){
      Map.Entry<String, Integer> entry = iterator.next();
      total_anagrams += entry.getValue();
    }
    
//방법2. key나 value한쪽만 필요한 경우 다음처럼 간략화 가능
    int total_anagrams = 0;
    for (Integer value : ana_map.values()) {
      total_anagrams += value;
    }
    
//방법3. key나 value 둘다 필요하면서 약간 더 간단한 방법
for (Map.Entry<String, Integer> entry : ana_map.entrySet()) {
  String key = entry.getKey();
  Integer value = entry.getValue();
  // key와 value를 사용한 작업
}

//방법4. 조금 더 간단한 방법(람다 표현식)
ana_map.forEach((key, value) -> {
  // key와 value를 사용한 작업
});

 

Queue

이 문제에 대한 솔루션인데, 단순한 BFS로 풀린다.

queue사용법을 눈여겨보자(add, poll, size)

class Solution {
    private void check(Queue<Integer> q, int[][] mat, int[][] dist, int py, int px, int y, int x){
        int h = mat.length;
        int w = mat[0].length;
        if(y<0||y>=h) return;
        if(x<0||x>=w) return; 
        if(dist[y][x]>dist[py][px] + 1) {
            dist[y][x] = dist[py][px]+1;
            q.add(y);
            q.add(x);
        }
    }
    public int[][] updateMatrix(int[][] mat) {
        int h = mat.length;
        int w = mat[0].length;
        int[][] dist = new int[h][w];

        Queue<Integer> q = new ArrayDeque<>();
        for(int y=0;y<h;y++)for(int x=0;x<w;x++)dist[y][x]=Integer.MAX_VALUE;
        for(int y=0;y<h;y++)for(int x=0;x<w;x++){
            if(mat[y][x]==0) {
                dist[y][x]=0;
                q.add(y);q.add(x);
            }
        }
        while(q.size()>0){
            int cy = q.poll();
            int cx = q.poll();
            check(q,mat,dist,cy,cx,cy-1, cx);
            check(q,mat,dist,cy,cx,cy+1, cx);
            check(q,mat,dist,cy,cx,cy, cx-1);
            check(q,mat,dist,cy,cx,cy, cx+1);
        }
        return dist;
    }    
}

 

Stack

c++과 비슷하게 push(), pop()인데 pop()이 top()+pop()이라고 보면된다. (값을 리턴하면서 즉시 pop도 하는..)

top()만 하려면 peek()를 쓰면된다.(아래 샘플엔 없다)

아래 샘플참조..

import java.util.*;

public class StackStringDecoder {
    
    public static String decodeString(String s) {
        Stack<Integer> stack_repeat = new Stack<>();
        Stack<String> stack_str = new Stack<>();
        String ret_str = new String("");
        int repeat = 0;
        for(int i=0;i<s.length();i++){
            char c = s.charAt(i);
            if(Character.isDigit(c)){
                repeat *= 10;
                repeat += c-'0';
            }else if(c=='['){
                stack_repeat.push(repeat);  // push하는 부분
                stack_str.push(ret_str);
                ret_str = "";
                repeat = 0;
            }else if(c==']'){
                String pop_str = stack_str.pop();  // pop하는 부분
                repeat = stack_repeat.pop();
                for(int j=0;j<repeat;j++)
                    pop_str += ret_str;
                ret_str = pop_str;
                //repeat = 0;
            }else{
                assert(Character.isAlphabetic(c));
                ret_str+=c;
            }
        }
        return ret_str;
    }

    public static void main(String[] args) {
        System.out.println(decodeString("3[a2[c]]")); // "accaccacc"
        System.out.println(decodeString("3[a]2[bc]")); // "aaabcbc"
        System.out.println(decodeString("2[abc]3[cd]ef")); // "abcabccdcdcdef"
        System.out.println(decodeString("2[a3[b]]")); // "abbbabbb"
        System.out.println(decodeString("1[ab2[c]]")); // "abcc"
    }
}

char[]

String은 immutable이라 글자단위로 수정하려면 char[]로 변환해주어야 한다.

char[] chars = myString.toCharArray(); //String에서 char[]로 변환
String myString = new String(chars);  //char[]에서 String으로 변환

한번 println의 경우는 다음과 같이 String으로 변환해주지 않으면 주소가 출력된다.

char[] result = {'H', 'e', 'l', 'l', 'o'};
System.out.println(new String(result)); // "Hello"를 출력합니다.

 

리턴값이 2개 필요할때

아래처럼 int[] r = func(); 해서 r[0], r[1]을 쓰면된다.

static int[] parseNumber(String s, int ix) {
    int number = 0;
    while (ix < s.length() && Character.isDigit(s.charAt(ix))) {
        number = number * 10 + (s.charAt(ix) - '0');
        ix++;
    }
    return new int[] {number, ix};
}

public static void main(String[] args) {
    String expression = "12345+6789";
    int ix = 0;

    int[] result = parseNumber(expression, ix);
    int parsedNumber = result[0];
    int nextIndex = result[1];

    System.out.println("Parsed number: " + parsedNumber); // 출력: Parsed number: 12345
    System.out.println("Next index: " + nextIndex);       // 출력: Next index: 5
}

 

String 핸들링

시간과 같이 특정 글자로 split할때는 아래코드 참조하자(아래는 12포맷을 AM, PM글자 떼버리고 24포맷으로 바꾸는 코드이다)

자바는 아예 split()함수가 있어서 오히려 c++보다 수월하다.

public class Main {
    public static void main(String[] args) {
        String inputTime = "07:05:45PM";
        String outputTime = timeConversion(inputTime);
        System.out.println(outputTime); // 출력: 19:05:45
    }

    public static String timeConversion(String s) {
        String[] timeParts = s.substring(0, 8).split(":");
        int hour = Integer.parseInt(timeParts[0]);

        if (s.endsWith("PM") && hour != 12) {
            hour += 12;
        } else if (s.endsWith("AM") && hour == 12) {
            hour = 0;
        }

        return String.format("%02d:%s:%s", hour, timeParts[1], timeParts[2]);
    }
}

참고삼아 c++버전도 기록해둔다.

#include <iostream>
#include <sstream>
#include <iomanip>

std::string timeConversion(const std::string& s) {
    int hour, minute, second;
    char colon, am_pm;
    std::istringstream ss(s.substr(0, 8));
    ss >> hour >> colon >> minute >> colon >> second;

    if (s.find("PM") != std::string::npos && hour != 12) {
        hour += 12;
    } else if (s.find("AM") != std::string::npos && hour == 12) {
        hour = 0;
    }

    std::ostringstream result;
    result << std::setw(2) << std::setfill('0') << hour << ":"
           << std::setw(2) << std::setfill('0') << minute << ":"
           << std::setw(2) << std::setfill('0') << second;

    return result.str();
}

int main() {
    std::string inputTime = "07:05:45PM";
    std::string outputTime = timeConversion(inputTime);
    std::cout << outputTime; // 출력: 19:05:45
    return 0;
}

sort

기본적인 sorting법

Array의 경우 다음과 같이 Arrays.sort() 또는 Collections.sort()를 쓴다.

import java.util.Arrays;

public class SortArrayExample {
    public static void main(String[] args) {
        int[] numbers = {5, 2, 8, 1, 3, 7};
        // 배열 정렬
        Arrays.sort(numbers);  // 이경우는 Collections.sort()는 못쓴다.
        // 정렬된 배열 출력
        System.out.println("Sorted array: " + Arrays.toString(numbers));
    }
}

역순으로 정렬하려면 다음과 같이 한다.

// 기본 배열은 Collections를 지원하지 않고 reverseOrder()도 사용할 수 없다.
// 따라서 List<Integer>로 변환후 수행한다.
Integer[] numbers = {5, 2, 8, 1, 3, 7};
List<Integer> number_list = Arrays.asList(numbers);
Collections.sort(number_list, Collections.reverseOrder());  // 이경우는 Collections.sort()는 못쓴다.
System.out.println("Sorted array: " + Arrays.toString(numbers));

구조체 정렬시 특정 필드에 대해 정렬하기

class Tweet {
    Long time;
    int tweetId;
}
List<Tweet> feed;
...
//time필드에 대해 정렬
feed.sort((t1, t2) -> t1.time.compareTo(t2.time));

//time필드에 대해 역순정렬
feed.sort((t1, t2) -> t2.time.compareTo(t1.time));

//아래처럼 전통적인 compare함수를 쓸 수도 있다.
Collections.sort(feed, new Comparator<Tweet>() {
    @Override
    public int compare(Tweet t1, Tweet t2) {
        return t1.time.compareTo(t2.time);
    }
});


//먼저 time에 대해 역순정렬하고, tweetId에 대해서 정방향정렬하기
feed.sort(Comparator.comparing((Tweet t) -> -t.time)
                   .thenComparing(t -> t.tweetId));
//방법2
feed.sort(Comparator.comparing((Tweet t) -> t.time)
                   .thenComparing(Comparator.comparing((Tweet t) -> t.tweetId).reversed()));
//방법3
feed.sort(Comparator.comparing((Tweet t) -> t.time)
                   .thenComparing((t1, t2) -> t2.tweetId - t1.tweetId));


//올드스쿨
feed.sort(new Comparator<Tweet>() {
    @Override
    public int compare(Tweet o1, Tweet o2) {
        if (o1.time != o2.time) {
            return o2.time - o1.time; // time에 대해 역순 정렬
        }
        return o1.tweetId - o2.tweetId; // tweetId에 대해 정방향 정렬
    }
});
반응형

설치 운영

우분투에서 설치

sudo apt-get update
sudo apt-get install redis-server

 

트러블슈팅

docker안에서 host의 redis에 접근하도록 하기

실행할때 다음처럼 --network host만 붙여주면 된다.

#!/bin/bash

PORT=$1

docker run \
  --network host \
  -e SPRING_APPLICATION_JSON='{"server":{"port":'"$PORT"'}}' \
  -p $PORT:$PORT \
  auth-service

 

설계관점

 

읽는 경우: Look aside Cache

Cache에 있으면 Cache에서 가져오고 없으면 DB에서 읽고 Cache 업데이트

 

쓰는경우: Write Back

Cache에 저장하고 주기적으로 모아서 DB에 배치업데이트. 

반응형

'Programming > Linux' 카테고리의 다른 글

docker-compose  (0) 2023.10.28
Nginx  (0) 2023.08.16
kafka 설치(우분투 기존)  (0) 2023.07.31
ELK연습  (0) 2023.07.30
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30

Q. 설계디자인 문제의 경우 초기에 질문할것들은?

  • 요구사항을 확인하는게 먼저
    • 기능적요구사항 명확히,
      • 개발범위에 대해 반드시 물어볼 것
    • 성능등 비기능정요구사항 명확히
      • 성능요구사항에 따라, NoSQL도입/fanout복사/캐시/비정규화(CQRS등의 읽기쓰기분리)등을 고려가능
      • 단일실패지점(SPOF)을 failover등으로 대응
      • 스로틀링 이야기가 나오면 트래픽패턴에 대해 반드시 물어볼것
        • 시간대/시즌별 트래픽, 요청유형(특정API에 몰리는가), 특정지역에 몰리는가
        • 고객경험도 반드시 이야기할것(비동기시 UX변경)
      • SLA(Service Level Agreement): 고객에게 제공할 서비스 품질 명시(성능,범위,보상,유지보수등)
  • 제약사항도 확인
    • 네트워크 밴드위스, 시스템사양, 사용자수, 데이터처리량, 외부시스템 디펜던시, 기타리스크요소 등

Q. 서버과부하 처리법

  • 먼저 모니터링을 하여 서버이슈인지 API이슈인지 응답시간확인, 트래픽 패턴 파악
    • 일반적으로
      • 로드벨런서(Nginx등)를 사용하여 트래픽을 분산 
      • 스케일업, 스케일아웃검토.
        • 스케일아웃은 spkie대처가 느리므로 캐싱, 로드밸런싱, 스로틀링, 비동기, 오토스케일링등 추가 고려
        • 또한 스케일 아웃은 downstream에 더 많은 압박을 줄 수 있다.
      • 정적 리소스인경우 CDN(콘텐츠 전송 네트워크) 사용
      • HTTP/2 사용: 압축, 단일연결 멀티플펙싱 등
      • DB이슈는 다음질문으로..
    • 웹서버
      • 리버스 프록시: 정적 컨텐츠를 캐싱하고, 들어오는 요청을 적절한 WEB 서버로 라우팅
      • CDN(Content Delivery Network) 사용
      • 커넥션 풀링: 웹 서버와 WAS 사이의 커넥션을 효율적으로 관리하여 리소스 사용을 최적화
    • WAS
      • 로드 밸런싱: 여러 WAS 인스턴스를 배포하고 로드 밸런서를 사용하여 들어오는 요청을 여러 서버에 고르게 분배합니다.
      • 자동 확장: 트래픽이 증가할 때 자동으로 WAS 인스턴스를 추가하고, 트래픽이 감소하면 인스턴스를 줄이는 기능을 활용할 수 있습니다.
      • 세션 클러스터링: 사용자 세션 정보를 여러 서버 간에 공유하므로, 한 서버가 실패하더라도 세션은 유지됩니다.
    • API이슈이면
      • 코드최적화
      • API호출 최적화
        • spike문제라면 지연로딩(이미지등), 디바운싱(마지막요청만처리), 배치처리(묶음API를 한번에 처리),
        • 웹소켓(반복연결부담줄임..근데 이건 spkie상황에서는 조심해야할듯),
        • 네트워크부하가 문제라면 GraphQL(이것도 서버부하를 증가시킬수 있어서 트레이드오프 조심)
        • gRPC도 고려
      • 캐시 적용(Redis등)
        • 단, 플래시딜등 정확한 재고 트랜잭션 관리가 필요할때는 사용이 제한적
          • 해결책으로 Redis에서는 분산lock제공, 비동기큐 순차처리로 해결가능, 
          • 큐잉의 경우 UX변경 필요. 대기열 안내등
      • 전송방식 변경(gRPC, GraphQL등)
      • 비동기처리 검토(대기열 시스템을 통한 고객 경험개선)
        • 고객UX 새로 디자인
        • 일관성을 위해 결제상태추적/재시도 필요.
        • 순서보장: kafka는 파티션 단위로만 순서를 보장하므로 주의해서 설계
          • 중요한작업은 동일한 파티션으로 처리하도록 설계
      • 스로틀링: 요청수 제한
      • downstream API의 경우(API1 > 2 > 3 의존성 체인)
        • API1을 스케일아웃해도 2,3때문에 병목 유지될수 있음.
        • 서킷브레이크
        • 백 프레셔: 프로듀서-컨슈머간 흐름조절(소비속도가 느리면 생산속도 조절. 아예 멈추기도 함)
        • failover = fallback (문제가 발생했을때 대체 시스템으로 넘어가는 것)
        • failback (failover상황에서 원래 대로 복귀하는 것)
    • DB이슈이면?
      • 다음 질문으로..

Q. DB과부하

  • 쿼리최적화/인덱싱
  •  캐싱
    • 보통 SQL 캐시는 RDBMS에서 자제적으로 수행
    • 캐시 테이블 개념으로 별도 테이블을 만들기도 하며, 머터리얼뷰라고 해서 뷰인데 디스크에 저장하는 형태도 있음
    • Redis등 DB외부에 캐시를 두면 효율이 좋은대신 Key-Value행태로 어떻게 저장할지 추가 고민/구현 필요
  • 데이터베이스 분할
    • Master/Slave구분하여 Master는 Insert, Slave는 Read
      • 순간적인 불일치: Master Insert직후는 Master에서 Read
      • 문제적인 불일치: 마스터 DB의 트랜잭션 로그를 사용하여 슬레이브 DB를 동기화(근데 DBMS가 자동으로 해주는 부분)
      • 정말 문제적인 상황: 예를들어 관리자가 실수로 slave에 insert했다. 그럼 Slave를 Master로 강제동기 초기화.
    • 샤딩
  • NoSQL도입
    • RDB로 커버안되거나 특히 쓰기 작업이 많을때
    • 주문서등 비정형 데이터를 저장해야할때

Q. DB마이그레이션(RDB->NoSQL기준)

  • 데이터 복제: 기존 RDBMS의 데이터를 새로운 NoSQL 데이터베이스로 복제합니다.
  • 동시 업데이트: 변경 과정에서 발생하는 새로운 데이터도 두 데이터베이스 모두에 업데이트합니다.
  • 읽기/쓰기 전환: 복제가 완료되면, 읽기 요청은 먼저 NoSQL로 전환하고, 쓰기 요청은 RDBMS에서 계속 처리합니다.
  • 최종 전환: 모든 테스트와 검증이 완료되면, 쓰기 요청도 NoSQL로 전환하고, RDBMS를 완전히 중단합니다.

Q. 외부 디펜던시(예)payment) 문제해결타임아웃 설정

  • 타임아웃을 설정: 하여, 응답이 지연되는 경우 사용자에게 적절한 응답을 제공하고 시스템 자원을 효율적으로 관리합니다.
  • 장애 처리 메커니즘: payment 시스템과의 통신 중 장애가 발생하면, 사용자에게 명확한 오류 메시지를 제공하고, 필요한 경우 주문을 재시도하거나 나중에 처리할 수 있도록 지원합니다.
  • 분리된 큐 사용: 결제 요청을 분리된 큐에 저장하고, 백그라운드 작업으로 처리합니다. 결제 시스템이 복구되면 큐에 있는 요청을 순차적으로 처리합니다.
  • 회로 차단기 패턴: 외부 서비스의 연속된 실패가 발생하면 일시적으로 해당 서비스 호출을 차단하고, 일정 시간 후에 다시 시도합니다. 이를 통해 시스템 자원을 보호하고, 외부 서비스가 복구될 기회를 제공합니다.
  • 복제 및 재시도 전략: 여러 payment 시스템 제공자와 통합하여, 한 시스템이 실패하면 다른 시스템으로 자동 전환(fail over)
  • 모니터링 및 알림, 문서화 및 사용자 안내

Q. 네트워크 실패 문제 대응

  • 다중 AZ, 서비스별 네트워크 분리/격리, 백업 네트워크
  • 외부서비스면 서킷브레이크

Q. 데이터포인트와 메트릭, 로드테스트

  • 스케일아웃 같은 의사결정을 할때는 관련 측정자료인(1회성도 괜찮음) 데이터포인트들이 필요하다.
    • 예를들어, 응답시간, 에러율(500에러), CPU/메모리/IO 사용률, 대기중인요청 등이다(스레드 개수로 대변될수도)
  • 메트릭은 의사결정후 데이터포인트들을 여러번 반복측정해서 개선된걸 보는거라 둘은 밀접하게 연결돼있다.(데이터포인트는 메트릭의 구성요소)
  • 로드테스트는 길거리 필드테스트가 아니라 부하테스트를 의미하며, 위의 메트릭에 대한 성능평가를 의미하는데, 실제 환경과 비슷한 테스트 환경 구축이 필요한만큼 비용이 들며 항상가능하지는 않다.

Q. Redis등 분산캐시 이용시 고려사항

  • 효율화: 병목데이터 분석하여 어떤부분을 캐시할지 결정..  hot spot.
  • 일관성이 중요하면: 서버간 캐시 불일치가 생길수 있다. 이를 해결하기 위해
    • 캐시 만료: timeout 시간설정(TTL)
    • 캐시 무효화: 쓰기작업등 캐시변경 발생시 전 서버 캐시 무효화(강한 일관성)
    • 캐시 파티셔닝: 캐시를 해시해서 노드별 분산하는 것(각 노드는 독립적 캐시관리 가능) 
  • 가용성, 내구성, 성능향상이 더 중요하면: 캐시 복제(같은 캐시를 여러서버에 복제)
    • 경우에 따라 불일치 어느정도 허용
    • 노드간 비동기 메시징을 통한 최종적 일관성(강한 일관성보다 성능이 좋으나, 일시적 불일치 감수/처리 필요)
  • 경우에 따라 로컬캐시/글로벌캐시를 분리하면 성능향상.
    • 로컬캐시예시: 스티키세션
  • Disk쓰기 옵션을 제공하지만 풀방식이므로 휘발성고려해야한다.
    • AOF(Append Only File) 로깅으로 보완 (저널링)
    • 위의 복제로 보완
    • 중요데이터는 DB로 복제하므로써 보완

Q. AWS옵션

  • 하둡: S3
  • 카산드라: dynamoDB
  • Redis: Amazon ElastiCache for Redis, DAX(DynamoDB Accelerator)
  • Kafka: Kinesis

Q. ACID

  • 원자성(Atomicity): 트랜잭션 all or nothing
  • 일관성(Consistency): 트랜잭션 이전과 이후에도 데이터베이스의 모든 무결성 제약 조건이 만족되어야 합니다.
  • 독립성(Isolation): 한 트랜잭션의 중간 결과는 다른 트랜잭션에게 보이지 않아야 합니다.
  • 지속성(Durability): 트랜잭션이 성공적으로 완료되면, 그 결과는 영구적으로 반영되어야 함을 의미

Q. 분산트랜잭션

  • 2단계 커밋
    • 2대이상의 노드(컴퓨터)에서 트랜잭션을 처리하기 위한 방법
    • 준비단계: 이 단계에서는 트랜잭션을 시작한 노드(코디네이터)가 모든 참여 노드에게 트랜잭션을 커밋할 준비가 되었는지 물음. 각 참여 노드는 이 요청을 받고, 자신의 상태를 확인한 후 준비가 되었다면 '예'를, 그렇지 않다면 '아니오'를 코디네이터에게 보냄.
    • 커밋단계: 코디네이터는 모든 참여 노드로부터 응답을 받은 후, 모든 노드가 '예'라고 응답했다면 트랜잭션을 커밋하라는 메시지를 보냄. 만약 하나라도 '아니오'라고 응답한 노드가 있다면, 트랜잭션을 롤백하라는 메시지를 보냄
    • 단점: 하나의 노드라도 실패하거나 응답이 늦어지면 전체 트랜잭션이 블록되는 블록킹 문제가 있다.
  • 3단계 커밋
    • 프리커밋단계를 추가하여, 2단계 커밋의 블록킹 문제점을 타임아웃을 통해 극복
    • 준비 단계: 2단계 커밋의 준비단계와 동일
    • 프리-커밋 단계: 각 노드에게 프리-커밋 메시지를 보내 블록킹 상황에서의 행동을 알려줌(프리커밋을 받은 노드는 타임아웃시 커밋을 수행하고 받지 않은 노드는 커밋을 수행하지 않음)
    • 커밋단계: 각 노드는 프리-커밋단계에서는 확인만 하고 기다리다가 커밋메시지를 받으면 실제 커밋 시작

Q. 이벤트 소싱

  • 파일시스템,DB에서 쓰이는 저널링의 비즈니스 로직 버전. 
  • 이벤트, 이벤트 저장소, 상태재구성 등의 구성요소가 있으며, 분산환경에서는 주로 타임스탬프로 순서를 관리한다.
  • 사용처: 상태변화를 추적하므로 디버깅에 도움이됨. 시간여행가능. 분산환경에서 일관성문제 발생시 해결실마리 제공.

Q. 인증세션관리

  • 전통적인세션: 서버에서 ID를 발급하고 메모리에 들고 있고, 클라이언트에 전송해서 쿠키로 소통하는 방식
  • JWT: 상태없이 인증정보가 토큰에 저장되어 MSA등 분산환경에 유리. 서버의개인키로 서명되며 싱글사이온에도 적용가능
  • Redis: 분산서버간 세션저장소로 활용될 수 있으며, JWT와 결합하여 캐시형태로 속도향상가능.

Q. 12-Factor 앱이란

  • 12-factor 앱은 소프트웨어 개발자들이 현대적인 웹 애플리케이션을 만들고 배포하는 데 도움이 되는 원칙과 방법론을 제시하는 개념. 이 개념은 Heroku의 개발자인 Adam Wiggins에 의해 처음 제안되었으며, 클라우드, 마이크로서비스, 컨테이너, 지속적인 배포 등과 같은 현대적인 개발 패러다임을 지원
  1. 코드베이스: 애플리케이션은 하나의 코드베이스를 가지며, 다양한 배포 환경에서 실행될 수 있어야 한다.
  2. 의존성: 애플리케이션은 명시적으로 모든 의존성을 선언하고, 시스템에 사전 설치된 패키지에 의존해서는 안 된다.
  3. 설정: 설정 정보는 코드에서 분리되어야 하며, 환경 변수를 통해 관리되어야 한다.
  4. 백엔드 서비스: 백엔드 서비스는 서로 긴밀하게 연결되어야 한다.
  5. 빌드, 릴리즈, 실행: 빌드와 실행 단계는 엄격하게 분리되어야 한다.
  6. 프로세스: 앱 서비스들은 무상태 프로세스로 실행되어야 한다.(JWT등) 앱의 상태는 외부서비스(DB,캐시,세션저장소)에 저장한다.
  7. 포트 바인딩: 서비스는 포트 바인딩을 통해 노출되어야 한다(MSA)
  8. 동시성: 확장성을 위해 프로세스 모델을 사용해야 한다.(MSA)
  9. 처리 유연성: 빠른 시작과 정상적인 종료를 통해 견고성을 보장해야 한다.(당연한소리)
  10. 개발/프로덕션 환경 일치: 개발, 스테이징, 프로덕션 환경은 가능한 한 유사하게 유지해야 한다(당연한 소리)
  11. 로그: 로그는 이벤트 스트림으로 취급되어야 한다.(모니터링, 분석등을 위해 필요. 분산환경에서 합치기도 용이)
  12. 관리 프로세스: 관리 작업을 일회성 프로세스로 실행해야 한다.(앱실행과 마이그레이션등을 분리하라는 것)

Q. 스케일아웃의 단점

  • 스케일 아웃이 성능향상에 도움이 되나, 세션등 서버간 공유되는 자료가 많으면 상태공유문제 발생
  • 보통은 가급적 stateless로 설계하고, 외부저장소(캐시,DB등)를 참조하도록 해서 해결

Q. Monolithic에서 MSA로 전환하는 순서

  • 단계적 이전을 원칙으로 함
  • 도메인분석을 통해 몇개 서비스로 나눌지 결정
    • 비즈니스 중요도, 기술적 복잡도, 팀의 능력을고려. 일반적으로는 비즈니스 영향이 작고 기술적으로 단순한 도메인부터 이전
  • 데이터를 가급적 분리(DB조인도 분리) > 서비스간 API설계 > 통계는 따로 ETL구축 검토
  • 클라이언트와 서비스간 통신을 담당하는 API게이트웨이 도입 검토
  • CI/CD구축
  • 리스크관리
    • circuitBreaker 등

 

Q. MSA구조의 단점

  • 모놀리식에서 쉽게 조인연산으로 되던것이 DB분리로 인해 서비스간 API로 치환되어야 한다.
  • 초기 스타트업에서는 하나의 애플리케이션안에 모듈식 디자인을 적용하는게 나을 수 있다.
    • 모듈은 독립적으로 작동하나 MSA처럼 프로세스/노드분리까지는 하지 않을 수 있다.
    • 따라서 펑션콜이나, 공유메모리, 로컬큐등을 활용 가능

Q. 도메인 주도 설계(Domain-Driven Design, DDD)

  • DDD가 아닌것: DB 스키마 부터 설계. 그걸로 CRUD코드 작성. 비즈니스로직은 코드사이에 흩어짐
  • DDD인것: 도메인 비즈니스부터 고민. 비즈니스로직 분리. 예를들어 뱅킹에서 계좌와 거래라는 도메인이 있을때 이체, 입출금을 비즈니스로직으로 잡을 수 있다. 
  • 비즈니스로직이 간단한 경우 xxxService.java로 비즈니스로직을 분리하는 것으로도 충분하지만,
  • DDD는 좀 더 복잡한 비즈니스를 모델링하기 위한 것으로 그 요소는 다음과 같다.
    1. 바운디드 컨텍스트(Bounded Context): 특정 비즈니스 도메인을 구현하는데 필요한 모델과 규칙을 정의하는 경계. 예를 들어, 은행 시스템에서는 '대출', '예금', '이체' 등이 각각의 바운디드 컨텍스트가 될 수 있다. 즉 도메인에 속하는 용어(규칙,모델포함)들을 정의하는 단계라고 보면 된다.
    2. 애그리게이트(Aggregate): 도메인 모델 내에서 일관성을 유지해야 하는 객체의 집합을 의미. 예를 들어, '대출' 도메인에서는 '대출 계좌', '대출자', '대출 상품' 등이 하나의 애그리게이트를 구성. 도메인 요소간 관계를 규정(묶을 수 있는건 묶는다)
    3. 도메인 이벤트(Domain Event): 비즈니스 로직에서 중요한 변화를 나타내는 이벤트. 예를 들어, '대출' 도메인에서는 '대출 신청', '대출 승인', '대출 상환' 등이 도메인 이벤트가 될 수 있다.
    4. 도메인 서비스(Domain Service): 도메인 서비스는 애그리게이트나 값 객체로 표현하기 어려운 도메인 로직을 수행하는 서비스를 의미. 예를 들어, '대출 신용 평가'는 대출 신청자의 신용 정보를 바탕으로 대출 가능 여부를 판단하는 도메인 서비스가 될 수 있다. 외부로 노출되는 서비스를 의미

Q. ETL프로세스

  • DDD로 하다보면 도메인 서비스별로 DB분리가 일어나는데, 통계/분석은 전체DB를 통합적으로 봐야한다.
  • 이때 ETL프로세스가 따로 배치로 돌면서 각 도메인별 DB에서 정보를 추출해서 변환후 DW에 로딩하게 된다. 
    • DW는 RDB가 될수도 있고(복잡한 쿼리가 필요한 경우), NoSQL이나 하둡/스팍이 될수도 있다. (대용량 처리속도가 우선인 경우)
    • ETL은 보통 Airflow등으로 수행되는 배치작업이지만, 실시간 처리가 필요한경우 Kafka/Flink/스팍Stream등을 붙여준다.

Q.Q. NoSQL vs 스팍/하둡

  • 둘다 대용량 데이터를 스케일아웃 전략으로 처리하도록 만들어졌지만 NoSQL은 실시간성, 스팍/하둡은 배치처리에 초점이 있음.
    • 하둡은 디스크액세스 때문에 실시간이 힘들고, 스팍은 실시간성과 자체 분산처리기술이 있지만 데이터 저장/관리 기능은 없음.
    • 그래서 스팍 단독으로도 NoSQL이 아님
  • 참고로 Redis도 NoSQL의 한 종류임.

Q. NoSQL vs RDB 정규화수준 낮추기

  • 공통점: 조인연산을 줄이고, 성능향상, 중복데이터허용
  • 차이점
    • RDB정규화수준 낮추기: 스키마변경의 제약
    • NoSQL도입: 스키마 유연(문서,키-밸류등), 수평확장용이, 

Q. 몽고DB vs 카산드라

  • 둘다 수평확장에 능한 NoSQL이다.
  • Document기반의 json 처리가 필요하면 몽고DB
  • 컬럼기반의 SQL비슷한 쿼리지원을 원하면 카산드라

Q. 장애복구 관련

  • 단일실패지점 대응
    • 데이터복제, 로드밸런싱, 자동장애복구, 장애격리(MSA)
    • Availability Zone을 분리

Q. Rate Limiting vs Throttling(스로틀링)

  • 전자는 제한 초과하면 거부. 후자는 큐잉되거나 지연해서 최종적으로는 처리됨.
  • 사실 전자는 후자의 한 형태
  • 실제 적용시는 다음을 고려해야함
    • 트래픽패턴(글로벌/로컬), 시간당/유저당, 로컬/중앙 카운팅(빠른응답, 확장성/고급기능 트레이드오프), 
    • 푸시/풀(즉시/폴링 전자는 리소스헤비), 우선순위큐사용(Redis로 가능)
    • 고객경험에 부정적이므로 가급적 줄이는게 좋음

Q. 검색시스템 구현방법

  • 토큰화 > 정규화 > 색인생성(inverted index) > 색인으로 문서매칭 > 문서간 순위에 TD-IDF사용
  • Elastic Search도 이방법을 사용하며, 분산처리를 지원.

Q. TF-IDF

  • 단어 카운팅을 통해 bag of word벡터를 먼저 문서별로 구한다.
  • 문서간 유사도는 보통 코사인유사도로 구하는데 A,B문서의 코사인유사도는 AB의 내적을 |A|*|B|로 나눠줌. 원리?
  • 단어 카운팅 대신에 bag of word에서 자주 등장하는 단어일수록 (분모가 증가하여) 숫자를낮춰 중요도를 낮추는 기법

Q. Kafka

  • 배치보다는 실시간 처리에 강점을 가짐(높은 처리량, 낮은 지연시간) 하지만 메시지가 저장되므로 배치에도 사용가능
  • 더빠른 지연시간을 원하면 RabitMQ, Redis사용가능 (디스크 연속저장 스킵가능)

Q. 비용최적화

  • 필요한 대역폭과 저장공간등을 측정하여 가장 알맞는 클라우드/온프레미스 비용 산정
  • 자동스케일링을 설정하여 안쓸때는 비용절감(온프레미스에서는 제한적이나 쿠버네티스로 어느정도 가능)
  • 저장할때 압축해서 저장

Q. 노드간 동기화 이슈(재고 동기화를 생각해보자)

  • 소스가 있어야 한다 (예를 들어 중앙서버)
  • 소스에서 타겟으로 동기화 방법
    • Sync 방식: 최종재고체크 단계면 고려해볼 수 있다.
    • ASync방식: 지연시간 최소화. 카프카 사용. 지속성에도 장점(디스크에 저장하므로)
  • 전달확인방법(일관성)
    • 2단계커밋
    • 비동기 Ack (preface 주문 ack생각하면됨)
  • 반복실행가능성이 크므로 멱등성이 중요.
    • 고유키(트랜잭션ID)를 사용
    • DB도 멱등하도록 설계. 예를 들어 재고수량을 update하도록 하지말고 트랜잭션ID에 변화수량만 기록.
      • 실제수량은 트랜잭션ID를 고려하며 수행

Q. CQRS 패턴

  • 명령과 조회를 분리(도메인 분리)
  • 명령(insert/update)시 조회쪽 데이터 생성/변경작업해줌
    • 이때 부담을 줄이는게 이벤트소싱
    • 변경사항생겼을때 이벤트 거쳐서 다른 서비스에서 조회모델 저장
  • 조회쪽은 속도를 위해 NoSQL을 쓰는 경우도 많음
  • 명령은 덜자주, 조회는 자주일어나는 모델에서 유용
  • 단점: 데이터 정합성/일관성

Q. 3-tier architecture

  • Presentation Layer, Business Logic Layer, Data Access Layer 이며,
    • 각각 프론트엔드/백엔드WAS/DB정도에 대응된다고 보면 된다.

Q. Hot spot(row) 이슈

  • 해당 부분을 캐시(redis)로 분리

Q. 재고 관리

  • 소스정의: 재고 데이터의 원본은 중앙 재고 관리 시스템이 될 것이며, 이 시스템은 모든 판매 채널과 동기화
  • 일관성이 중요한 경우:
    • 2단계 커밋으로 분산트랜잭션 사용
    • 멱등성 지원 가능하도록 설계
  • 지연시간이 중요한 경우
    • 캐싱, 로드밸런싱
    • 비동기 전송을 사용하여 시스템 간 지연을 최소화
    • Kafka나 RabbitMQ와 같은 메시지 브로커를 사용하여 소스에서 대상으로 수량 데이터를 비동기로 전파
  • 확인 방법: 2단계 커밋과 비동기 Ack 모델을 사용하여 메시지 전달을 보장(2단계 커밋은 맥락이 다르지 않나)
  • 주기적 동기화 검증: 주기적 비교와 실시간 지연 큐를 사용하여 동기화를 검증합니다.
  • 고성능이 필요하면: 캐시적용
  • 영화관 자리예약에는 옵티미스틱 락킹이 많이 쓰인다. (락없이 예약, 결제시 확정)
  • 잘팔리는 스큐는 레디스에 넣어서 가속
  • 마이크로 서비스 분리
    • 주문 서비스: 주문 생성, 주문 상태 관리
    • 결제 서비스: 결제 처리
    • 재고 서비스: 재고 확인, 재고 차감
  • 기능적요구사항
    • 재고 조회, 차감 및 추가, 예약 및 해제, 경고, 이력 관리, 다중 창고 지원
  • 비기능적요구사항
    • 성능, 확장성, 안정성, 보안, 통합성(주문관리, 창고관리, 외부), 사용자 경험, 쉬운인터페이스

Q. 체크아웃 기능 구현하기

  • 장바구니 관리 모듈: 사용자의 선택한 상품을 관리하고, 수량 변경, 쿠폰 적용 등의 기능을 제공합니다.
  • 배송 정보 관리 모듈: 배송 주소, 배송 방법, 배송 시간 등을 관리하고 사용자에게 적절한 옵션을 제공합니다.
  • 결제 처리 모듈: 다양한 결제 방법(신용 카드, 데빗 카드, PayPal 등)을 지원하고, 결제 승인 및 처리를 담당합니다.
  • 주문 검증 모듈: 주문의 유효성을 검사하고, 재고 확인, 가격 검증 등을 수행합니다.
  • 할인 및 프로모션 모듈: 쿠폰, 할인 코드, 회원 할인 등의 프로모션을 관리하고 적용합니다.
  • 알림 및 통신 모듈: 주문 상태 변경, 배송 추적 등의 알림을 사용자에게 전송합니다.
  • 보안 및 인증 모듈: 사용자 인증, 결제 보안 등을 담당하여 전체 프로세스의 보안을 유지합니다.
  • 모니터링 및 로깅 모듈: 시스템의 성능과 에러를 모니터링하고 로깅하여, 문제 발생 시 신속한 대응이 가능하도록 합니다.

 

Q. 모니터링 시스템 설계

  • 수집할 정보 정의: 어떤 정보를 수집할 것인지. CPU 사용률, 메모리 사용량, 디스크 I/O, 네트워크 트래픽 등
  • 성능 고려: 클라이언트가 시스템에 부담을 주지 않도록, 성능에 민감한 부분은 특히 주의
  • 확장성 있는 설계: 시스템이 변화하고 성장함에 따라 새로운 정보를 수집해야 할 수도 있으므로
  • 데이터 수집 주기: 얼마나 자주 데이터를 수집할 것인지 결정/조정기능 추가
  • 데이터 전송 방식: 수집한 데이터를 중앙 서버나 데이터베이스에 어떻게 전송할 것인지. REST API, Kafka, gRPC등
  • 에러 핸들링/보안/테스트 및 검증/문서화: 알림, 와치독

 

Q. 질문시간에 질문한 것들

  • 다가오는 5년 내에 시장에서 차지하고자 하는 위치는 어떻게 되나요?
  • 제가 지원한 포지션에서의 주요 업무와 책임은 무엇인가요?
  •  
  • FTS(IPFO) 시스템의 현재 아키텍처는 어떠한 형태를 가지고 있나요?
  • 현재 애자일 적용 현황이 궁금합니다.
  • 현재 팀의 비즈니스 전략과 목표는 어떻게 되나요?
  • XX의 기술 스택은 어떻게 구성되어 있으며, 이 포지션에서는 어떤 기술이 주로 사용되나요?

 

Q. 병목분석 & 처리 

  1. 예상되는 병목지점 파악
    • 데이터베이스 쿼리 지연: 복잡한 쿼리나 인덱싱이 제대로 이루어지지 않은 경우
    • 웹 서버나 WAS 서버의 부하: 동시 요청 수가 많을 경우
    • 네트워크 지연: 서버 간의 통신이 빈번하거나 대용량 데이터 전송이 필요한 경우
    • 디스크 I/O 병목: 디스크 읽기/쓰기가 빈번한 작업에서 I/O 지연이 발생할 수 있습니다.
  2.  원인 분석
    • 성능 모니터링 도구 활용: 시스템의 CPU, 메모리, 디스크 I/O, 네트워크 사용률 등을 모니터링하여 병목 지점을 찾습니다.
    • 쿼리 분석: 실행 계획을 분석하거나 프로파일러를 사용하여 쿼리 성능을 검토합니다.
    • 로드 테스트: 예상 트래픽을 모방하여 병목 지점을 확인합니다.
    • 로깅 및 추적: 로그와 추적 데이터를 분석하여 성능 저하가 발생하는 지점을 확인합니다.
  3. 해결 방안
    • 캐싱 적용: 자주 접근하는 데이터를 캐시하여 DB 부하를 줄입니다.
    • 로드 밸런싱: 요청을 여러 서버에 분산시켜 부하를 줄입니다.
    • 쿼리 최적화: 쿼리 실행 계획을 분석하고, 필요한 인덱스를 추가하거나 쿼리를 개선합니다.
      • (db부하는 그밖에도 많다. Master/Slave분리, 파티셔닝, 복제, 커넥션풀 등)
    • 하드웨어 업그레이드: 필요한 경우, CPU, 메모리, 디스크 등의 하드웨어를 업그레이드합니다.
반응형

'System Architect' 카테고리의 다른 글

Application  (0) 2023.10.28
graphQL  (0) 2023.10.12
gRPC  (0) 2023.10.11
시스템설계 Q&A 2  (0) 2023.09.20
데이터 분석 관련 정리  (0) 2023.08.19

F5를 눌러서 라인디버깅이 가능한 디버거를 띄우기 위해서는,

 

먼저 package.json에서 다음 부분을 수정해준다.

  "scripts": {
    "dev": "NODE_OPTIONS='--inspect' next dev",
    ...
  },

그 다음, vscode에서 launch.json을 만든다음(이건 그냥 F5누르고 node.js눌러도 기본템플릿은 만들어준다.)

다음 내용을 써준다.

{
    "version": "0.2.0",
    "configurations": [
        {
            "type": "node",
            "request": "launch",
            "name": "Launch Next.js",
            "runtimeExecutable": "${workspaceRoot}/node_modules/.bin/next",
            "cwd": "${workspaceRoot}",
            "console": "integratedTerminal",
            "internalConsoleOptions": "neverOpen",
            "skipFiles": ["<node_internals>/**"]
        },
    ]
}

위처럼 하면 기본적인 디버깅환경은 구축이 된 것이고, Ctrl+F5로 실행하면  break포인트가 잡히지 않는다.

 

하지만 론치과정이 단순히 npm run dev하는 것 보다는 헤비하기 때문에 Ctrl+Shift+B를 눌러서 가볍게 실행하고 싶으면,

다음처럼 launch.json과 같은곳에 tasks.json을 만들어 준다.

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "Run Next.js Without Debugging",
            "type": "shell",
            "command": "npm",
            "args": ["run", "dev"],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            }
        }
    ]
}

 

반응형

'Programming > node.js' 카테고리의 다른 글

react/next.js환경에서 http에서 https로 변경하기  (0) 2023.10.12

다음 명령을 통해 next.js 프로젝트를 생성한다.

sevity@sevityubuntu:~/workspace/online_judge$ mkdir problem-frontend
sevity@sevityubuntu:~/workspace/online_judge/problem-frontend$ npx create-next-app .
✔ Would you like to use TypeScript? … No / Yes
✔ Would you like to use ESLint? … No / Yes
✔ Would you like to use Tailwind CSS? … No / Yes
✔ Would you like to use `src/` directory? … No / Yes
✔ Would you like to use App Router? (recommended) … No / Yes
✔ Would you like to customize the default import alias? … No / Yes

Would you like to use TypeScript? - 추천: 예
Would you like to use ESLint? - 추천: 예
Would you like to use Tailwind CSS? - 추천: 아니오
Would you like to use src/ directory? - 추천: 예
Would you like to use App Router? - 추천: 예
Would you like to customize the default import alias? - 추천: 아니오

 

Bootstrap 테마변경

 Bootswatch와 같은 웹사이트에서 무료로 제공하는 Bootstrap 테마를 선택가능

테마적용방법


1. 테마 CSS 파일 다운로드: 먼저, 테마 페이지에 접속. 페이지 상단의 "Download" 버튼을 클릭하고, "bootstrap.min.css" 파일을 다운로드

2. 프로젝트에 CSS 파일 추가: 다운로드한 CSS 파일을 public/css 디렉토리를 만들고 그 안에 CSS 파일을 추가

3. CSS 파일 import: _app.js 파일을 열고, 다음 코드를 추가

import '../public/css/bootstrap.min.css';

 

next.js pages

/create 아래와 같이 문제를 새로 입력한다.(아래는 아직 실제입력,실제출력등 빠진부분들이 많다)

 

/problems 문제 목록을 아래와 같이 출력한다.

(로그인 상태가 아니라서 username이 없을 경우, frontend-service/login 으로 direct 된다)

 

 

 

 

 

 

 

반응형

문제관리 백엔드 서비스는 online-judge프로젝트의 7개 서비스중 3번째 서비스이다.

  1. 인증 서비스 (Backend): 사용자의 회원 가입, 로그인, 로그아웃, 세션 관리 등을 담당
  2. 인증 서비스 (Frontend): 사용자 인터페이스를 제공 (로그인 폼, 회원가입 폼 등)
  3. 문제 관리 서비스 (Backend): 문제의 추가, 삭제, 수정 등을 관리
  4. 문제 관리 서비스 (Frontend): 문제를 보여주고, 문제 추가, 삭제, 수정 등의 인터페이스를 제공
  5. 제출 관리 서비스 (Backend): 사용자의 코드 제출 및 제출 기록을 관리
  6. 제출 관리 서비스 (Frontend): 코드 제출 인터페이스와 제출 기록 확인 인터페이스를 제공
  7. 채점 서비스 (Backend): 제출된 코드를 채점

다른 서비스는 링크를 눌러확인하자.

 

문제관리 백엔드 서비스는 java/vscod가 아닌 kotlin/IntelliJ로 해보기로 했다. (kotlin호환성이 IntelliJ가 훨씬 좋음)

디렉토리 구성은 다음과 같다.

sevity@sevityubuntu:~/workspace/online_judge/problem-service$ tree -I target
.
├── HELP.md
├── log
│   └── application.log
├── mvnw
├── mvnw.cmd
├── pom.xml
├── run.sh
└── src
    ├── main
    │   ├── kotlin
    │   │   └── com
    │   │       └── sevity
    │   │           └── problemservice
    │   │               ├── controller
    │   │               │   └── ProblemController.kt
    │   │               ├── domain
    │   │               │   ├── Problem.kt
    │   │               │   └── ProblemRepository.kt
    │   │               ├── ProblemServiceApplication.kt
    │   │               └── service
    │   │                   └── ProblemService.kt
    │   └── resources
    │       ├── application.properties
    │       ├── static
    │       └── templates
    └── test
        └── kotlin
            └── com
                └── sevity
                    └── problemservice
                        └── ProblemServiceApplicationTests.kt

18 directories, 13 files

 

문제에 대한 스키마를 다음과 같이 설정했다.

제대로 하려면, real_input, real_output, solution.cpp 등이 추가되어야 하지만, 우선은 간단하게 했다.

CREATE TABLE problems (
  id SERIAL PRIMARY KEY,
  title VARCHAR(100) NOT NULL,
  description TEXT NOT NULL,
  example_input TEXT NOT NULL,
  example_output TEXT NOT NULL
);

 

 

초기 IntelliJ설정

Ultimate버전과 Community버전이 있는데 전자만 SpringBoot관련 기능이 제공된다.

먼저 로컬환경에서 IntelliJ를 실행한 후,  File > New > Project > Spring Initializr를 통해 프로젝트를 로컬에 생성한다.

위의 내용은 프로그램관리 백엔드에 해당하진 않는다. 개요만 참조하자.

그다음 생성된 파일들을 ssh환경으로 원격복사한다.

 

그다음음 다음과 같이 Remote Development > SSH로 들어가서 원격 개발환경을 설정한다.

application.properties 파일을 다음과 같이 설정. 포트번호는 7개 서비스중 3번째라는 의미로 8083으로 부여.

spring.datasource.url=${DATABASE_URL}
spring.datasource.username=${DATABASE_USERNAME}
spring.datasource.password=${DATABASE_PASSWORD}
spring.jpa.hibernate.ddl-auto=update
logging.file.name=log/application.log
# without below line, web login is needed.
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
server.port=8083

알고리즘 문제(problem)에 대한 도메인 entity와 repository를 domain이라는 패키지(폴더)에 다음과 같이 작성한다. 위의 DB스키마와 알맞도록 작성.

먼저 Problem entity는 다음과 같이 작성

package com.sevity.problemservice.domain

import javax.persistence.*

@Entity
@Table(name = "problems")
data class Problem(
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,

    @Column(nullable = false)
    val title: String = "",

    @Column(nullable = false)
    val description: String = "",

    @Column(name = "example_input", nullable = false)
    val exampleInput: String = "",

    @Column(name = "example_output", nullable = false)
    val exampleOutput: String = ""
)

JPA로 DB와 연동을 자동화해주는 Repository는 다음과 같이 작성

package com.sevity.problemservice.domain

import com.sevity.problemservice.domain.Problem
import org.springframework.data.jpa.repository.JpaRepository

interface ProblemRepository : JpaRepository<Problem, Long>

 

MVC에서 C에 해당하는 Control 클래스를 아래와 같이 작성

package com.sevity.problemservice.controller

import com.sevity.problemservice.domain.Problem
import com.sevity.problemservice.service.ProblemService
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/problems")
class ProblemController(private val problemService: ProblemService) {

    @GetMapping
    fun getAllProblems() = problemService.getAllProblems()

    @GetMapping("/{id}")
    fun getProblem(@PathVariable id: Long): Problem = problemService.getProblem(id)


    @PostMapping
    fun createProblem(@RequestBody problem: Problem): Problem = problemService.createProblem(problem)

    @PutMapping("/{id}")
    fun updateProblem(@PathVariable id: Long, @RequestBody problem: Problem): Problem = problemService.updateProblem(id, problem)

    @DeleteMapping("/{id}")
    fun deleteProblem(@PathVariable id: Long): ResponseEntity<Void> {
        problemService.deleteProblem(id)
        return ResponseEntity<Void>(HttpStatus.NO_CONTENT)
    }

    // Add more methods as needed for CRUD operations
}

여기서 /problems라는 하나의 자원(동사가 아닌명사)에 대해서 GET, POST, PUT, DELETE를 모두 사용하는 RESTFul 권장 패턴을 사용하고 있다. problems로 복수형으로 지칭하는 것도 권장 가이드.

 

실제로 호출해 보는 것은 curl을 써도 되지만, postman을 쓰면 편하게 할 수 있다.

postman은 여기서 다운받고,

실행후 new > HTTP를 통해서 GET, POST, PUT, DELETE를 하나씩 테스트 하면된다.

GET은 문제 전체나 특정 id를 받아올 수 있고 아래처럼 url만 지정하고 SEND하면 된다.(결과에서 200 OK확인)

POST는 문제를 등록하는 과정이고, 

headers 탭에 Key: Content-Type, Value: application/json 을 추가하고

body에서 raw를 선택하고 아래처럼 문제내용을 입력하고 SEND해주면 된다.(결과에서 200OK확인)

PUT은 문제를 업데이트 하는 과정이고 POST와 마찬가지 설정으로 하면 된다.(POST, PUT만 다르고 나머진 동일)

DELETE는 지우고자 하는 문제ID를 url끝에 넣어주기만 하면 되고, 200OK가 아닌 204 No Content가 나오면 성공.

 

MVC에서 M과 C를 연결하는 비즈니스 로직에 해당하는 Service클래스를 아래와 같이 작성

package com.sevity.problemservice.service

import com.sevity.problemservice.domain.Problem
import com.sevity.problemservice.domain.ProblemRepository
import org.springframework.stereotype.Service

@Service
class ProblemService(private val problemRepository: ProblemRepository) {

    fun getAllProblems(): List<Problem> = problemRepository.findAll()
    fun createProblem(problem: Problem): Problem = problemRepository.save(problem)
    fun getProblem(id: Long): Problem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
    fun updateProblem(id: Long, problem: Problem): Problem {
        val existingProblem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
        val updatedProblem = existingProblem.copy(
            title = problem.title,
            description = problem.description,
            exampleInput = problem.exampleInput,
            exampleOutput = problem.exampleOutput
        )
        return problemRepository.save(updatedProblem)
    }
    fun deleteProblem(id: Long) {
        val existingProblem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
        problemRepository.delete(existingProblem)
    }

}

 

트러블슈팅


원격으로 열었더니 ._로 시작하는 중복파일들이 생길 경우

._로 시작하는 파일들은 macOS에서 생성하는 메타데이터 파일들이다. 이 파일들은 macOS 외의 시스템에서는 필요하지 않으며 삭제해도 안전하다. 다음 명령으로 삭제

find . -name '._*' -type f -delete

1회 지운 이후로 문제가 생긴적은 없다.

 

javax.persistance 관련 오류

springboot 버전을 3이상으로 했더니, import javax.persistance 에 실패하는 현상 발견됨

여기 참고해서, pom.xml에서 아래처럼 2.5.3으로 버전을 낮춰서 해결

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

 

 

 

반응형

먼저 Spark 설치는 여기를 보고오자. Spark설치시 Streaming관련 설치도 진행된다.

 

Kafka에서 메시지를 실시간으로 10초 단위로 읽어서 HDFS에 저장

토픽은 iis_log로 가정.

다음과 같은 python코드를 작성

$ vi spark_streaming_to_hdfs.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "iis_log") \
  .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# HDFS뿐 아니라 모니터링을 위해 화면에도 표시
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query2 = df \
  .writeStream \
  .outputMode("append") \
  .format("csv") \
  .option("path", "hdfs://localhost:9000/user/hadoop/iis_log") \
  .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint") \
  .trigger(processingTime='10 seconds') \
  .start()

spark.streams.awaitAnyTermination()

화면에 다음처럼 표시되고, HDFS에는 csv로 컬럼 분리되서 저장된다.

+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"@timestamp":"20...|
+----+--------------------+

 

HDFS가 아닌 RDB(Postgres)에 저장

 

먼저 RDB에 테이블을 알맞게 만들어 줘야 한다. 

CREATE TABLE iis_log (
    client_ip VARCHAR(15),
    url TEXT,
    timestamp character varying
);

python 코드를 아래처럼 작성한다.

$ cat spark_streaming_to_db.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, split
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import unix_timestamp


# Spark 세션 생성
spark = SparkSession.builder.getOrCreate()

# 스키마 수동 지정. RDB에 넣을때 필요
schema1 = StructType([
    StructField("client_ip", StringType()),
    StructField("url", StringType()),
    StructField("timestamp", StringType())
])

# Kafka에서 데이터 읽기
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "iis_log") \
    .load()

# JSON 형식으로 변환
df = df.selectExpr("CAST(value AS STRING) as json")

# 스키마 정의. JSON파싱할때 필요
schema2 = StructType([
    StructField("message", StringType()),
    StructField("@timestamp", StringType())
])

# 데이터 추출
df = df.select(from_json(df.json, schema2).alias("data")).select("data.*")

# 메시지 분리
df = df.withColumn("message_split", split(df["message"], " "))
df = df.withColumn("client_ip", df["message_split"].getItem(2))
df = df.withColumn("url", df["message_split"].getItem(4))
df = df.withColumn("timestamp", df["@timestamp"])

# 필요한 컬럼 선택
df = df.select("client_ip", "url", "timestamp")

# 화면에 출력
query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 중간 저장소로 Parquet 형식으로 쓰기
query2 = df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/hadoop/iis_log_temp") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/hadoop/checkpoint_temp") \
    .trigger(processingTime='10 seconds') \
    .start()

# 배치 작업으로 데이터를 PostgreSQL에 쓰기. PostgreSQL이 스트림을 지원하지 않아서 배치로.
def write_to_postgres():
    df_temp = spark.read.schema(schema1).parquet("hdfs://localhost:9000/user/hadoop/iis_log_temp")
    df_temp.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/online_judge") \
        .option("dbtable", "iis_log") \
        .option("user", "online_judge_admin") \
        .option("password", "abcd123$") \
        .mode("append") \
        .save()

# 배치 작업 실행
while True:
    write_to_postgres()

# 스트리밍 작업 중지
query.stop()

# 스트리밍 작업 대기
spark.streams.awaitAnyTermination()

아래 명령어로 실행하면

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 spark_streaming_to_db.py

아래처럼 dbeaver로 RDB에 들어간것을 확인 가능

 

비슷한 기능을 하는 flink코드

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, StreamingFileSink

def kafka_to_hdfs():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    kafka_props = {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}

    kafka_source = FlinkKafkaConsumer("iis_log", SimpleStringSchema(), kafka_props)

    ds = env.add_source(kafka_source)

    ds.add_sink(StreamingFileSink
                .for_row_format("hdfs://localhost:9000/user/hadoop/iis_log", SimpleStringSchema())
                .build())

    env.execute("kafka_to_hdfs")

if __name__ == '__main__':
    kafka_to_hdfs()

Spark Session이 flink에서 Environment에 해당.

지연실행은 둘다 동일하게 지원.

 

 

Spark vs Flink 차이

  • Spark는 먼저 전체 데이터 세트를 메모리에 로드한 다음 변환을 수행하는 반면, Flink는 데이터를 스트리밍하며 변환을 수행
    • 따라서, spark은 true streaming architecture가 아니라 마이크로 배치 구조라 지연시간이 약간 높을 수 있다.

 

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Apache Flink 설치  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

Flink란

주로 실시간 스트림에 대한 처리, 분석을 위해 쓰이며 높은 처리량과 낮은 지연시간을 제공.

장애복구 및 일관성을 보장하는 기능을 제공

Spark이 배치처리에 강점을 갖는다면, Flink는 실시간 스트림 처리에 강점을 가짐

주로 java와 scala로 작성됨. python API도 제공.

 

설치

프로덕션환경에서는 opt/flink에 설치하고 user계정도 추가하는걸 추천하지만,

개인적으로 실습할때는 ~/flink 정도에 설치해도 충분하다.

# flink 홈페이지에서 최신 빌드 다운로드
wget https://dlcdn.apache.org/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz

# ~/flink에 압축해재
mkdir ~/flink
tar -xvzf flink-1.16.2-bin-scala_2.12.tgz -C ~/flink --strip-components 1

그 다음에 cluster 시작이라는 과정을 거쳐야 하는데, 이는 Kafka, Spark, Flink공히 대부분의 분산시스템에서 존재.

이는 여러 노드(컴퓨터)가 함께 작업을 수행하도록 구성하는 과정을 의미하며,

일반적으로 하나 이상의 마스터 노드와 여러 워커 노드로 구성되고 마스토 노드는 작업을 관장하고 워커 노드는 실제 작업을 수행

실제 클러스터를 구성하는 방법은 오픈소스 프로젝트 마다 다른데,

Flink에서는 클러스터는 명시적으로 start-cluster.sh를 수행해서 다음 2가지를 데몬으로 띄우는 과정을 의미한다.

  • JobManager(마스터노드)
  • TaskManager(워커노드)

다음 명령어로 클러스터를 구동하고

cd ~/flink
./bin/start-cluster.sh

jps해서 아래와 같이 보이면 성공

$ ~/flink $ jps
27505 StandaloneSessionClusterEntrypoint # 이게 JobManager를 의미
28553 Jps
27965 TaskManagerRunner

설치확인

아래 example돌려서 잘 설치됐나 확인(localhost:8081을 통해 브라우저에서도 상황을 볼 수 있다)

$ cd ~/flink
$ ./bin/flink run examples/streaming/WordCount.jar --output sample_output
Executing example with default input data.
Use --input to specify file input.
Job has been submitted with JobID dc4552090d31d4e51084899a41d6568a
Program execution finished
Job with JobID dc4552090d31d4e51084899a41d6568a has finished.
Job Runtime: 196 ms

$ cat sample_output/2023-08-02--10/part*
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
(whether,1)
...

위 예제는 아래 내장 데이터를 사용해 단어별 빈도를 구하는 것으로 Flink의 기본 데이터 처리 기능을 보여준다.

To be, or not to be,--that is the question:--
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune
Or to take arms against a sea of troubles,
And by opposing end them?--To die,--to sleep,--
No more; and by a sleep to say we end
The heartache, and the thousand natural shocks
That flesh is heir to,--'tis a consummation
Devoutly to be wish'd. To die,--to sleep;--
To sleep! perchance to dream:--ay, there's the rub;
For in that sleep of death what dreams may come,
When we have shuffled off this mortal coil,
Must give us pause: there's the respect
That makes calamity of so long life;

위의 단어빈도수 계산 작업은 배치로도 할 수 있고 스트림으로도 할 수 있는데 스트림으로 한다는 의미는,

스트림 처리에서의 WordCount는 일반적으로 시간 윈도우를 설정하여 해당 윈도우 내에서의 단어 출현 횟수를 계산
예를 들어, 10초 동안의 데이터 스트림에서 'apple'이라는 단어가 얼마나 많이 나타났는지를 계산.

이러한 윈도우는 특정 시간 간격(예: 10초마다)으로 이동하며, 각 윈도우는 독립적으로 단어의 출현 횟수를 계산.

이렇게 하면 시간에 따른 단어의 사용 빈도를 실시간으로 추적할 수 있다(Flink를 사용하는 이유)
이러한 윈도우 기반의 처리는 스트리밍 데이터에서 흔히 볼 수 있는 패턴이며, Flink는 이러한 유형의 계산을 쉽게 처리할 수 있는 API를 제공.

 

 

트러블슈팅

자바버전 충돌

flink 17.0.7버전의 경우 java17과 충돌나는 현상이 발견되어 java11로 다운그레이드하니 해결됐다.

아래는 다운그레이드 방법

sudo apt update
sudo apt install openjdk-11-jdk
sudo update-alternatives --config java
# 콘솔ui에서 11버전 선택

 

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
kafka Consume  (0) 2023.08.02
filebeat  (0) 2023.07.31

기본적으로 특정 토픽에 대한 메시지를 컨슘하려면 다음과 같이 파이선으로 할 수 있다.

from kafka import KafkaConsumer

# Kafka 서버 주소 설정
consumer = KafkaConsumer('iis_log',
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',  # 이 옵션을 주면 쌓여있는 메시지중 가장 처음 부터 읽음
                         enable_auto_commit=True  # 이 옵션을 주면 마지막 읽은 위치 다음위치 부터 읽게해줌
                         )

for message in consumer:
    # 메시지 출력
    print (message)

이를 위해 필요하면 다음을 설치한다.

pip install kafka-python

 

반응형

'Data Engineering' 카테고리의 다른 글

flink Table API를 사용한 실시간 Reporting샘플  (0) 2023.10.28
flink  (1) 2023.10.28
Spark, Flink를 사용한 실시간 스트림 분석  (0) 2023.08.02
Apache Flink 설치  (0) 2023.08.02
filebeat  (0) 2023.07.31

kafka란

RabbitMQ와 같은 메시지 미들웨어.

LinkedIn에서 개발되어 현재는 Apache Software Foundation의 일부인 오픈 소스 메시지 스트리밍 플랫폼.
대량의 실시간 데이터 스트리밍을 처리하는 데 초점을 맞추고 있고, 높은 처리량, 데이터 복제, 분산 처리 등을 지원

이를 통해 대규모 데이터를 실시간으로 처리할 수 있다.

MSA간 통신에도 자주 쓰임

 

브로커: Kafka에서 메시지를 처리하는 서버

  • 카프카에서는 이벤트를 구분하기 위한 단위로 '토픽'을 사용하며 파일시스템의 디렉토리와 비슷한 개념으로 이해 가능
    • 토픽은 게시/구독 메시징 시스템에서 흔하게 볼 수 있다.
      • 카프카 외적으로는 비슷한 개념에 대해 채널, 큐, 스트림이라는 용어를 사용하기도 함
        • 개인적으로는 채널이 더 직관적이네
    • 프로듀서는 데이터를 '토픽'에 넣고, 컨슈머는 '토픽'에서 데이터를 읽는다.
    • 이러한 토픽은 카프카 클러스터내에서 데이터를 분류하고 구독자가 관심있는 메시지만 구독할 수 있도록 해주는 중요한 역할
    • 카프카 클러스터 전체가 하나의 토픽만 사용한다면 생략도 가능한가? > NO.
  • 토픽의 데이터는 파티션에 분산 저장되며 각 파티션은 순서 유지가 되는 메시지의 로그로 구성
  • 하나의 브로커는 여러 파티션의 데이터를 저장할 수 있으며, 반대로 하나의 파티션도 여러 브로커에 복제될 수 있음에 주의
  • 파티션의 개수는 토픽의 병렬처리 능력을 결정

리더 브로커: 특정 메시지그룹(파티션)의 처리를 담당하는 서버. Kafka에서 각 데이터 파티션에는 하나의 리더 브로커가 있음

  • 리더 브로커는 파티션과 1:1의 관계를 가지며 하나의 파티션을 여러 브로커가 처리할경우 하나만 리더이고 나머지는 팔로워로 동작

 

ZooKeeper

  • 일종의 비서역할로 서버들의 상태관리나 역할분배를 Kafka를 위해서 수행함. Kakfa설치시 같이 설치됨.
  • 또는 일종의 '중앙 데이터 저장소' 역할. Kafka 시스템 내의 여러 서버들이 ZooKeeper를 통해 필요한 정보 공유하고 동기화.
  • 특정 서버에 문제가 생기면 이 정보가 ZooKeeper에 기록되고 다른 서버들이 이를 확인하여 적절히 대응

 

kafka설치과정

Java설치

카프카는 Java기반이므로 java jdk가 안깔렸으면 깔아준다.

sudo apt update
sudo apt install default-jdk

사용자계정에 종속된 설치보다는 여러명이 쓸 수 있도록 아래 처럼 전용 유저를 생성해주는게 좋다.

sudo adduser kafka
sudo adduser kafka sudo

카프카는 자바 기반이므로 jar파일을 받아서 설정해준다. apt등의 패키지 매니저를 사용하면 예전버전일수 있으므로 다운로드해서 해보자.

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir /opt/kafka && cd /opt/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1


#kafka디렉토리 소유권을 kafka사용자로 변경
sudo useradd kafka -m
sudo chown -R kafka:kafka /opt/kafka

/opt/kafka에 설치했으며, /opt 디렉토리는 선택적인(add-on) 애플리케이션 SW패키지를 위한 곳으로 여러사용자가 이용하는 라이브러리인 경우 선호되는 경로다.

 

zookeeper설정

zookeeper는 kafka와 독립적으로 실행되며, 비슷한 설정 매커니즘을 갖는다.

zookeeper설정파일 수정

sevity@sevityubuntu:/opt/kafka/config$ cat zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/var/cache/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

다른 부분은 그냥 두었고, dataDir은 /var/cache 밑에 두어 휘발되지 않도록 설정했다. (기본은 /tmp 밑에 있었던듯)

이를 위해 다음과 같은 권한 설정을 했다.

sudo mkdir /var/cache/zookeeper
sudo chown kafka:kafka /var/cache/zookeeper
sudo chmod 700 /var/cache/zookeeper

 

zookeeper에 대해 systemctl에 등록해서 부팅시 마다 실행되도록 하기위해 다음 파일 작성

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start zookeeper
sudo systemctl enable zookeeper

kafka에 대해서도 마찬가지로 작성

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start kafka
sudo systemctl enable kafka

 

트러블 슈팅

서버로그 실시간 모니터링

tail -f /opt/kafka/logs/server.log

 

jps로 kafka가 잘 실행되고 있는지 확인(QuorumPeerMain과 Kafka가 떠있으면OK)

su - kafka
jps
109092 Jps
108186 QuorumPeerMain(Zookeeper의 시작점)
108573 Kafka

 

 

토픽 메시지가 잘 수신되는지 확인하는 방법

가장 기본적으로는 9092포트를 리슨하고 있는지 확인

sevity@sevityubuntu:/opt/kafka/config$ lsof -i :9092
COMMAND    PID   USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    168958 sevity  116u  IPv6 1324110      0t0  TCP sevityubuntu:46330->sevityubuntu:9092 (ESTABLISHED)
java    168958 sevity  117u  IPv6 1320714      0t0  TCP sevityubuntu:46336->sevityubuntu:9092 (ESTABLISHED)

 

GetOffsetSell을 통해 특정 토픽에 게시된 메지시수를 확인

sevity@sevityubuntu:/opt/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic iis_log --time -1
iis_log:0:4043038

kafka-console-consumer.sh를 사용하여 특정 토픽의 메시지 수신상황을 실시간으로 확인

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iis_log --from-beginning
{"@timestamp":"2023-08-01T01:18:34.407Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.9.0"},"agent":{"version":"8.9.0","ephemeral_id":"98df9137-96e3-4653-b353-2821eebde875","id":"cf839823-29c8-4619-b9c4-85e90804e50e","name":"SEVITY-PC","type":"filebeat"},"ecs":{"version":"8.0.0"},"log":{"file":{"path":"c:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex230801.log"},"offset":16076},"message":"2023-08-01 01:17:24 192.168.0.6 GET /wiki/doku.php id=Advice%20To%20Relieve%20Your%20Acid%20Reflux%20Signs%20and%20symptoms 80 - 196.245.181.117 Mozilla/5.0+(X11;+Ubuntu;+Linux+x86_64;+rv:114.0)+Gecko/20100101+Firefox/114.0 http://sevity.com/ 200 0 0 1182","input":{"type":"log"},"host":{"mac":["00-15-83-EA-2C-EE","00-23-24-63-E7-E3","02-50-41-00-00-01","88-36-6C-F7-D9-04","88-36-6C-F7-D9-06","88-36-6C-F7-D9-07"],"hostname":"sevity-pc","name":"sevity-pc","architecture":"x86_64","os":{"family":"windows","name":"Windows 10 Enterprise","kernel":"10.0.19041.3208 (WinBuild.160101.0800)","build":"19045.3208","type":"windows","platform":"windows","version":"10.0"},"id":"dbb18b3d-fb42-49ec-b731-f430dd2f3fd5","ip":["fe80::a30b:d99a:f7ed:2bac","192.168.100.15","fe80::a4ce:c09a:ea91:8a9","169.254.142.88","fe80::9c93:77c8:66bd:b6c5","169.254.55.217","fe80::dd77:ec13:69c4:6922","169.254.56.104","fe80::310a:1b3d:e9a3:431c","192.168.0.6","fe80::2c27:d03a:a322:765b","169.254.236.36"]}}

 

반응형

'Programming > Linux' 카테고리의 다른 글

Nginx  (0) 2023.08.16
Redis  (0) 2023.08.10
ELK연습  (0) 2023.07.30
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30
스팍(Spark) 설치  (0) 2023.07.30

+ Recent posts