주 역할

웹서버/캐시/로드밸런서/리버스프록시

 

SSL Termination

클라이언트와는 https통신을 하면서 백단 서버들과는 http(gRPC포함)통신을 하여, 복호화 부담을 줄이는 기능

 

실전꿀팁

여기가면 실제로 많이 사용되는 Nginx설정을 보고 적용할 수 있다.

반응형

'Programming > Linux' 카테고리의 다른 글

docker network  (0) 2023.10.29
docker-compose  (0) 2023.10.28
Redis  (0) 2023.08.10
kafka  (0) 2023.07.31
ELK연습  (0) 2023.07.30

일반적으로 셔플 알고리즘을 떠올리면 다음과 같은 naive solution을 생각하기 쉽다.

void LnNumber::ShuffleArray(long* arr, long count)
{
      for (int i=0;i<count;i++)
      {
            long toInx = rand() % count;
            {//swap
                  long temp = arr[i];
                  arr[toInx] = arr[i];
                  arr[i] = temp;
            }
      }
}

하지만 위 코드는 순진해보이는 겉모습과 다르게 엄청난 바이어스를 포함하고 있는데,

카드 3장의 모든 순열은 3!(=3 x 2 x 1) = 6이지만 위의 toInx를 보면 3 x 3 x 3 = 27로 6으로 나눠떨어지지 않는 경우의 수를 가지고 있다!

 

따라서 아래처럼 덜 직관적이지만 n!에 맞춘 경우의 수를 가진 코드로 수정해줘야 한다.

이 코드를 Fisher-Yates shuffle이라고 한다.

void LnNumber::ShuffleArray(long* arr, long count) {
    for (int i = count - 1; i > 0; i--) {
        long toInx = rand() % (i + 1); // 0부터 i까지의 랜덤한 인덱스(i포함)
        // swap
        long temp = arr[i];
        arr[i] = arr[toInx];
        arr[toInx] = temp;
    }
}

이코드의 toInx를 보면 카드3장의 예시에서 3 x 2 x 1 = 6 = 3!의 경우의 수를 가진다.

 

sort

가장 기본적인 sort는 Array.sort()또는 Collections.sort()를 쓰면 된다. (char[] 같은건 전자만 가능한듯?)

struct node 같은걸 쓰면서 특정 필드에 대해 소팅하고 싶으면 다음 기법을 쓰자.

class Tweet {
    Long time;
    int tweetId;
}
List<Tweet> feed;
...

//time에 대해서 sort하고 싶을때
feed.sort((t1, t2) -> t1.time.compareTo(t2.time));  
//역순을 원하면 t1, t2를 바꿔준다.
feed.sort((t1, t2) -> t2.time.compareTo(t1.time));

//약간 올드하게 아래처럼도 가능
Collections.sort(feed, new Comparator<Tweet>() {
    @Override
    public int compare(Tweet t1, Tweet t2) {
        return t1.time.compareTo(t2.time);
    }
});

여러 필드에 대해서 복잡하게 정렬해야하면 아래 방법 사용

feed.sort(Comparator.comparing((Tweet t) -> t.time)
                   .thenComparing(Comparator.comparing((Tweet t) -> t.tweetId).reversed()));
반응형

LIS(Longest Increasing Subsequence)는 가장 긴 증가하는 부분 수열을 찾는 알고리즘이다.

즉, 주어진 수열에서 일부 숫자를 제거하여 만든 부분 수열 중에서, 오름차순으로 정렬된 가장 긴 수열을 찾는 문제.
예를 들어, 수열 [10, 20, 10, 30, 20, 40]이 있다면, LIS는 [10, 20, 30, 40]이 된다.

예시 백준 문제는 여기 있다.

DP를 연습할때 가장 먼저 나오는 문제이지만, 의외로 2가지 실수하기 쉬운 포인트와. 한가지 인사이트를 제공한다.

 

2가지 실수하기 쉬운 포인트

아래는 이문제에 대한 정답코드인데, 아래 주석으로 여기라고 되어 있는 부분을 풀때마다 실수했다.

#include <stdio.h>
#include <algorithm>
using namespace std;

// dp[3] = 5 ; 인덱스 3까지 가장 긴 증가하는 부분수열의 길이는 5이다.
int dp[1001]; 
int A[1001];

int main(void)
{
	int N; scanf("%d", &N);
	for (int i = 0; i < N; i++) scanf("%d", A + i),dp[i]=1;  // 여기. dp[i]=1초기화 부분!
	for (int i = 0; i < N; i++) {
		for (int j = 0; j < i; j++) {
			if (A[i] > A[j]) 
				dp[i] = max(dp[i], dp[j] + 1);
		}
	}
	int ans = 1;
	for (int i = 0; i < N; i++) ans = max(ans, dp[i]);  // 여기. dp[n-1]아니어도 답이 될 수 있음!
	printf("%d\n", ans);
	return 0;
}

 

한가지 인사이트

이 문제는 놀랍게도 recursive+memoization보다 iterative 방식이 더 생각하기 편하고,

편한 recursive+memoization을 떠올리면 dp[ix][cur_val] 형태가 돼서 냅색DP형태가 나오는데 문제 범위에 따라 메모리초과 오류가 발생하여 해결이 어렵다.

아래는 편한 recursive버전이다.

#include <bits/stdc++.h>
using namespace std;
int dp[1001][1001];
int in[1001];
int n;
int dfs(int ix, int cur_max_val){
    if(ix>=n) return 0;
    if(dp[ix][cur_max_val]) return dp[ix][cur_max_val];
    
    //포함
    int r1 = -1;
    if(cur_max_val < in[ix])
        r1= 1 + dfs(ix+1, in[ix]);
    
    //불포함
    int r2 = dfs(ix+1, cur_max_val);

    return dp[ix][cur_max_val] = max(r1,r2);
}
int main() {
    scanf("%d", &n);
    for(int i=0;i<n;i++) scanf("%d", in+i);
    printf("%d\n", dfs(0, 0));

    return 0;
}

자세히 들여다 보면, dp[n]을 1로 초기화 하는 부분도 필요없고, 마지막에 정렬하는 부분도 필요없어서 실수 확률이 확연히 줄어든다.

하지만 위 코드를 dp[ix]로 1차원만 사용하도록 변경하는건 쉽지 않다.

아래처럼 할 수 있긴한데, dfs함수내에 iterative한 부분이 들어 있어 완전한 DFS라고 부를 수 있을지 약간 애매하다.

#include <bits/stdc++.h>
using namespace std;
#define FOR(i,a,b) for(int i=(a);i<(b);++i) 
#define REP(i,n) FOR(i,0,n) 
typedef vector<int> VI;

int dp[1001];
int in[1001];
int N;
int go(int n)
{
         if(n==0) return 1;
         if(dp[n]) return dp[n];
 
         int maxR=1;
         REP(i,n){
            if(in[n] >in[i])
                maxR = max(maxR, go(i)+1);                 
         }
         return dp[n] = maxR;
}
 
int main()
{
        scanf("%d\n", &N);
        REP(i, N)scanf("%d",&in[i]); 
        int r=0;
        REP(i,N) r=max(r,go(i));
        cout << r <<endl;
        return 0;
}

 

실제 시퀀스를 출력하는 방법

아래처럼 prev배열하나를 추가하면 어렵지 않게 구현가능하다.

이문제의 답이기도 하다.

#include <stdio.h>
#include <algorithm>
using namespace std;

// dp[3] = 5 ; 인덱스 3까지 가장 긴 증가하는 부분수열의 길이는 5이다.
int dp[1001];
int A[1001];
int prv[1001];

int main(void)
{
    fill(prv, prv + 1001, -1);
    int N;
    scanf("%d", &N);
    for (int i = 0; i < N; i++)scanf("%d", A + i), dp[i] = 1; // 여기. dp[i]=1초기화 부분!
    for (int i = 0; i < N; i++)
    {
        for (int j = 0; j < i; j++)
        {
            if (A[i] > A[j])
                if (dp[j] + 1 > dp[i])
                {
                    dp[i] = dp[j] + 1;
                    prv[i] = j;
                }
        }
    }
    int ans = 1;
    int end_ix = 0;
    for (int i = 0; i < N; i++)
    {
        if (ans < dp[i])
        {
            ans = dp[i];
            end_ix = i;
        }
    }
    printf("%d\n", ans);
    vector<int> v;
    v.push_back(A[end_ix]);
    while (prv[end_ix] != -1)
    {
        end_ix = prv[end_ix];
        v.push_back(A[end_ix]);
    }
    reverse(v.begin(), v.end());
    for (auto e : v)
        printf("%d ", e);

    return 0;
}
반응형

'Programming > Algorithm' 카테고리의 다른 글

배열 제대로 shuffle 하기 (Fisher-Yates shffle)  (0) 2023.08.13
기하 - 2선분의 교차점 구하기  (0) 2023.07.25
meet in the middle 알고리즘  (0) 2022.03.02
투 포인터  (0) 2022.03.01
이분 그래프(Bipartite Graph)  (0) 2022.02.22

time

long currentTime = System.currentTimeMillis();

 

표준입출력처리

leetcode
2
leet code

아래 예시는 위 인풋을 처리할 수 있다.

import java.util.*
public class coupang1 {
    public static void main(String args[]){
        Scanner scanner = new Scanner(System.in);
        String s = scanner.nextLine(); // 문자열 s 입력
        int n = Integer.parseInt(scanner.nextLine()); // 사전 단어의 개수 입력
        String[] wordDictArray = scanner.nextLine().split(" "); // 공백으로 구분된 사전 단어 입력
        List<String> wordDict = Arrays.asList(wordDictArray); // 배열을 리스트로 변환
        
    }
}

Scanner로 속도가 느릴때

인풋개수가 클때는 Scanner가 느릴 수 있다. 이때 bufferedReader를 쓰면 몇 배 빠르게 할 수 있다.

// before
Scanner sc = new Scanner(System.in);
int n = sc.nextInt()
List<Integer> arr = new ArrayList<>();
for(int i=0;i<n;i++) arr.add(sc.nextInt());

// after
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
int n = Integer.parseInt(br.readLine());
List<Integer> arr = new ArrayList<>();
StringTokenizer st = new StringTokenizer(br.readLine());
for (int i = 0; i < n; i++) arr.add(Integer.parseInt(st.nextToken()));

 

자료구조 관련

LinkedList 사용하기

헷갈리는 linkedList사용법을 정리한다.

예시문제:
두 개의 정렬된 연결 리스트로부터 중앙값을 계산하십시오.
예시 1:
L1 = 1 -> 3
L2 = 2
중앙값은 2.0입니다.
예시 2:
L1 = 1->2
L2 = 3->4
중앙값은 (2+3)/2 = 2.5입니다.

input1:
2
1 2
1
3
output1:
2.0

input2:
3
1 3 5
3
2 4 6
output2:
3.5

솔루션

import java.util.*;
public class Main {
    public static void main(String args[]){
        LinkedList<Integer> list1 = new LinkedList<>();
        LinkedList<Integer> list2 = new LinkedList<>();
        Scanner sc = new Scanner(System.in);
        int n1 = sc.nextInt();
        for(int i=0;i<n1;i++) list1.add(sc.nextInt());  // 추가는 add()
        int n2 = sc.nextInt();
        for(int i=0;i<n2;i++) list2.add(sc.nextInt());
        int N = n1+n2;
        //합친 리스트의 개수가 홀수개이면 n/2 번째인덱스
        //짝수개이면 n/2-1, n/2번째 인덱스의 평균이 답
        int mid_six = N%2==1?N/2:N/2-1;
        int mid_eix = N%2==1?N/2:N/2;

        ListIterator<Integer> it1 = list1.listIterator();  // next()하려면 Iterator필요
        ListIterator<Integer> it2 = list2.listIterator();
        int median1 = 0, median2 = 0;
        for (int i = 0; i <= mid_eix; i++) {
            int v1 = it1.hasNext() ? it1.next() : Integer.MAX_VALUE;  // hasNext()눈여겨 보자
            int v2 = it2.hasNext() ? it2.next() : Integer.MAX_VALUE;

            int value;
            if (v1 < v2) {
                value = v1;
                if (v2 < Integer.MAX_VALUE) it2.previous();  // previous()도 가능
            } else {
                value = v2;
                if (v1 < Integer.MAX_VALUE) it1.previous();
            }

            if (i == mid_six) median1 = value;
            if (i == mid_eix) median2 = value;
        }
        System.out.println((double)(median1 + median2) / 2.0);
        sc.close();
    }
}

LinkedList 선언부, ListIterator, next()/previous() 처리 등을 눈여겨 보자. 

근데 사실 이 문제의 경우에는 꼭 LinkedList를 쓸필요는 없다. ArrayList로도 충분.

ArrayList

c++ vector에 해당하며 다음과 같이 쓸 수 있다.

ArrayList<Integer> list = new ArrayList<>();
list.add(10);
list.add(20);
int value = list.get(1); // 20을 가져옵니다.

c++과 다르게 list[1]등으로 배열인덱스는 쓰지 못함에 주의

 

HashMap

c++ map에 해당하며 다음과 같이  쓸 수 있다.

HashMap<Integer, String> map = new HashMap<>();
HashMap<Integer, List<Integer>> map2 = new HashMap<>();
for (int i = 0; i < N; i++) {
    String a = map.getOrDefault(C[i],"");
    List<Integer> b = map2.getOrDefault(C[i],new ArrayList<>());
    a+=S[i];map.put(C[i],a);
    b.add(i);map2.put(C[i],b);
}

getOrDefault()에 주목해보자. 이걸안쓰고 get()을 쓰면 항상null체크를 해줘야 한다.

(C++의 std::map에서는 요청한 키가 존재하지 않으면 해당 키를 자동으로 생성하고 해당 값 타입의 기본 생성자를 호출하여 값을 초기화)

containKey(key)를 쓰면 put()했는지 체크할 수 있다. (c++에서 map.count()와 같은 기능)

 

iteration하기

다음 4가지 정도 방법이 있다.

//방법1. 전통적인 방법.. Entry개념때문에 복잡하다.
    int total_anagrams = 0;
    Iterator<Map.Entry<String,Integer>> iterator = ana_map.entrySet().iterator();
    int total_anagrams = 0;
    while(iterator.hasNext()){
      Map.Entry<String, Integer> entry = iterator.next();
      total_anagrams += entry.getValue();
    }
    
//방법2. key나 value한쪽만 필요한 경우 다음처럼 간략화 가능
    int total_anagrams = 0;
    for (Integer value : ana_map.values()) {
      total_anagrams += value;
    }
    
//방법3. key나 value 둘다 필요하면서 약간 더 간단한 방법
for (Map.Entry<String, Integer> entry : ana_map.entrySet()) {
  String key = entry.getKey();
  Integer value = entry.getValue();
  // key와 value를 사용한 작업
}

//방법4. 조금 더 간단한 방법(람다 표현식)
ana_map.forEach((key, value) -> {
  // key와 value를 사용한 작업
});

 

Queue

이 문제에 대한 솔루션인데, 단순한 BFS로 풀린다.

queue사용법을 눈여겨보자(add, poll, size)

class Solution {
    private void check(Queue<Integer> q, int[][] mat, int[][] dist, int py, int px, int y, int x){
        int h = mat.length;
        int w = mat[0].length;
        if(y<0||y>=h) return;
        if(x<0||x>=w) return; 
        if(dist[y][x]>dist[py][px] + 1) {
            dist[y][x] = dist[py][px]+1;
            q.add(y);
            q.add(x);
        }
    }
    public int[][] updateMatrix(int[][] mat) {
        int h = mat.length;
        int w = mat[0].length;
        int[][] dist = new int[h][w];

        Queue<Integer> q = new ArrayDeque<>();
        for(int y=0;y<h;y++)for(int x=0;x<w;x++)dist[y][x]=Integer.MAX_VALUE;
        for(int y=0;y<h;y++)for(int x=0;x<w;x++){
            if(mat[y][x]==0) {
                dist[y][x]=0;
                q.add(y);q.add(x);
            }
        }
        while(q.size()>0){
            int cy = q.poll();
            int cx = q.poll();
            check(q,mat,dist,cy,cx,cy-1, cx);
            check(q,mat,dist,cy,cx,cy+1, cx);
            check(q,mat,dist,cy,cx,cy, cx-1);
            check(q,mat,dist,cy,cx,cy, cx+1);
        }
        return dist;
    }    
}

 

Stack

c++과 비슷하게 push(), pop()인데 pop()이 top()+pop()이라고 보면된다. (값을 리턴하면서 즉시 pop도 하는..)

top()만 하려면 peek()를 쓰면된다.(아래 샘플엔 없다)

아래 샘플참조..

import java.util.*;

public class StackStringDecoder {
    
    public static String decodeString(String s) {
        Stack<Integer> stack_repeat = new Stack<>();
        Stack<String> stack_str = new Stack<>();
        String ret_str = new String("");
        int repeat = 0;
        for(int i=0;i<s.length();i++){
            char c = s.charAt(i);
            if(Character.isDigit(c)){
                repeat *= 10;
                repeat += c-'0';
            }else if(c=='['){
                stack_repeat.push(repeat);  // push하는 부분
                stack_str.push(ret_str);
                ret_str = "";
                repeat = 0;
            }else if(c==']'){
                String pop_str = stack_str.pop();  // pop하는 부분
                repeat = stack_repeat.pop();
                for(int j=0;j<repeat;j++)
                    pop_str += ret_str;
                ret_str = pop_str;
                //repeat = 0;
            }else{
                assert(Character.isAlphabetic(c));
                ret_str+=c;
            }
        }
        return ret_str;
    }

    public static void main(String[] args) {
        System.out.println(decodeString("3[a2[c]]")); // "accaccacc"
        System.out.println(decodeString("3[a]2[bc]")); // "aaabcbc"
        System.out.println(decodeString("2[abc]3[cd]ef")); // "abcabccdcdcdef"
        System.out.println(decodeString("2[a3[b]]")); // "abbbabbb"
        System.out.println(decodeString("1[ab2[c]]")); // "abcc"
    }
}

char[]

String은 immutable이라 글자단위로 수정하려면 char[]로 변환해주어야 한다.

char[] chars = myString.toCharArray(); //String에서 char[]로 변환
String myString = new String(chars);  //char[]에서 String으로 변환

한번 println의 경우는 다음과 같이 String으로 변환해주지 않으면 주소가 출력된다.

char[] result = {'H', 'e', 'l', 'l', 'o'};
System.out.println(new String(result)); // "Hello"를 출력합니다.

 

리턴값이 2개 필요할때

아래처럼 int[] r = func(); 해서 r[0], r[1]을 쓰면된다.

static int[] parseNumber(String s, int ix) {
    int number = 0;
    while (ix < s.length() && Character.isDigit(s.charAt(ix))) {
        number = number * 10 + (s.charAt(ix) - '0');
        ix++;
    }
    return new int[] {number, ix};
}

public static void main(String[] args) {
    String expression = "12345+6789";
    int ix = 0;

    int[] result = parseNumber(expression, ix);
    int parsedNumber = result[0];
    int nextIndex = result[1];

    System.out.println("Parsed number: " + parsedNumber); // 출력: Parsed number: 12345
    System.out.println("Next index: " + nextIndex);       // 출력: Next index: 5
}

 

String 핸들링

시간과 같이 특정 글자로 split할때는 아래코드 참조하자(아래는 12포맷을 AM, PM글자 떼버리고 24포맷으로 바꾸는 코드이다)

자바는 아예 split()함수가 있어서 오히려 c++보다 수월하다.

public class Main {
    public static void main(String[] args) {
        String inputTime = "07:05:45PM";
        String outputTime = timeConversion(inputTime);
        System.out.println(outputTime); // 출력: 19:05:45
    }

    public static String timeConversion(String s) {
        String[] timeParts = s.substring(0, 8).split(":");
        int hour = Integer.parseInt(timeParts[0]);

        if (s.endsWith("PM") && hour != 12) {
            hour += 12;
        } else if (s.endsWith("AM") && hour == 12) {
            hour = 0;
        }

        return String.format("%02d:%s:%s", hour, timeParts[1], timeParts[2]);
    }
}

참고삼아 c++버전도 기록해둔다.

#include <iostream>
#include <sstream>
#include <iomanip>

std::string timeConversion(const std::string& s) {
    int hour, minute, second;
    char colon, am_pm;
    std::istringstream ss(s.substr(0, 8));
    ss >> hour >> colon >> minute >> colon >> second;

    if (s.find("PM") != std::string::npos && hour != 12) {
        hour += 12;
    } else if (s.find("AM") != std::string::npos && hour == 12) {
        hour = 0;
    }

    std::ostringstream result;
    result << std::setw(2) << std::setfill('0') << hour << ":"
           << std::setw(2) << std::setfill('0') << minute << ":"
           << std::setw(2) << std::setfill('0') << second;

    return result.str();
}

int main() {
    std::string inputTime = "07:05:45PM";
    std::string outputTime = timeConversion(inputTime);
    std::cout << outputTime; // 출력: 19:05:45
    return 0;
}

sort

기본적인 sorting법

Array의 경우 다음과 같이 Arrays.sort() 또는 Collections.sort()를 쓴다.

import java.util.Arrays;

public class SortArrayExample {
    public static void main(String[] args) {
        int[] numbers = {5, 2, 8, 1, 3, 7};
        // 배열 정렬
        Arrays.sort(numbers);  // 이경우는 Collections.sort()는 못쓴다.
        // 정렬된 배열 출력
        System.out.println("Sorted array: " + Arrays.toString(numbers));
    }
}

역순으로 정렬하려면 다음과 같이 한다.

// 기본 배열은 Collections를 지원하지 않고 reverseOrder()도 사용할 수 없다.
// 따라서 List<Integer>로 변환후 수행한다.
Integer[] numbers = {5, 2, 8, 1, 3, 7};
List<Integer> number_list = Arrays.asList(numbers);
Collections.sort(number_list, Collections.reverseOrder());  // 이경우는 Collections.sort()는 못쓴다.
System.out.println("Sorted array: " + Arrays.toString(numbers));

구조체 정렬시 특정 필드에 대해 정렬하기

class Tweet {
    Long time;
    int tweetId;
}
List<Tweet> feed;
...
//time필드에 대해 정렬
feed.sort((t1, t2) -> t1.time.compareTo(t2.time));

//time필드에 대해 역순정렬
feed.sort((t1, t2) -> t2.time.compareTo(t1.time));

//아래처럼 전통적인 compare함수를 쓸 수도 있다.
Collections.sort(feed, new Comparator<Tweet>() {
    @Override
    public int compare(Tweet t1, Tweet t2) {
        return t1.time.compareTo(t2.time);
    }
});


//먼저 time에 대해 역순정렬하고, tweetId에 대해서 정방향정렬하기
feed.sort(Comparator.comparing((Tweet t) -> -t.time)
                   .thenComparing(t -> t.tweetId));
//방법2
feed.sort(Comparator.comparing((Tweet t) -> t.time)
                   .thenComparing(Comparator.comparing((Tweet t) -> t.tweetId).reversed()));
//방법3
feed.sort(Comparator.comparing((Tweet t) -> t.time)
                   .thenComparing((t1, t2) -> t2.tweetId - t1.tweetId));


//올드스쿨
feed.sort(new Comparator<Tweet>() {
    @Override
    public int compare(Tweet o1, Tweet o2) {
        if (o1.time != o2.time) {
            return o2.time - o1.time; // time에 대해 역순 정렬
        }
        return o1.tweetId - o2.tweetId; // tweetId에 대해 정방향 정렬
    }
});
반응형

설치 운영

우분투에서 설치

sudo apt-get update
sudo apt-get install redis-server

 

트러블슈팅

docker안에서 host의 redis에 접근하도록 하기

실행할때 다음처럼 --network host만 붙여주면 된다.

#!/bin/bash

PORT=$1

docker run \
  --network host \
  -e SPRING_APPLICATION_JSON='{"server":{"port":'"$PORT"'}}' \
  -p $PORT:$PORT \
  auth-service

 

설계관점

 

유사한 목적(“캐시-미스 시 DB에서 데이터 가져와 캐시에 채우기”)을 갖고 있지만, 구현 주체흐름 제어가 다릅니다.

 
Read-Through vs Write Through
둘다 캐시라이브러리를 쓰는 형태

 

항목 Read-Through Write-Through
읽기(Read) 애플리케이션이 cache.get(key)만 호출
– miss 시 캐시 라이브러리가 DB에서 가져와 자동으로 채워 줌
애플리케이션이 cache.get(key)만 호출
– miss 시 캐시→DB 채우기(읽기 방식은 동일)
쓰기(Write) 애플리케이션이 DB에 직접 쓰고,
캐시는 별도 무효화 로직(예: Cache-Aside) 필요
애플리케이션이 cache.put(key, value) 호출
– 캐시에 먼저 쓰고, 라이브러리가 동기적으로 DB에도 써 줌
앱 코드 관점 • Read: cache.get(key)
• Write: db.save(...) + 별도 cache.del(key)
• Read: cache.get(key)
• Write: cache.put(key, value) (내부에서 DB 쓰기까지 수행)
 
  • Read-Through는 애플리케이션이 캐시에 get()만 호출하면 되고,
    쓰기는 보통 DB에 직접 쓰고 캐시 무효화를 따로 처리
  • Write-Through는 애플리케이션이 읽을 땐 get(), 쓸 땐 put()을 호출하며,
    put()이 캐시와 DB 쓰기를 동기적으로 처리해 줌
 
Cache-Aside vs Read-Through

 

항목 Cache-Aside Read-Through
핵심 아이디어 앱이 직접 “캐시 꺼내→DB 가져와→캐시 세팅” 캐시 라이브러리가 “get() 호출만으로 내부에서 DB 조회→캐시 세팅”
누가 제어하나 애플리케이션 코드 캐시 미들웨어 / 프레임워크
코드 호출 방식 cache.get → miss? db.query → cache.set cache.get 만 호출
쓰기(무효화) db.update 후 애플리케이션이 cache.del 보통 Write-Through와 결합해 cache.put→db.write
자동화 수준 수동 로직(직접 구현) 캐시 솔루션에 내장 (투명하게 작동)
 

결론:

  • 목적은 같지만,
  • Cache-Aside는 “앱이 직접 관리”
  • Read-Through는 “라이브러리가 대신 관리”

 

 

Write-Through vs Write-Back

 

항목 Write-Through Write-Back
핵심 아이디어 캐시에 쓰면 즉시 동기적으로 DB에도 쓰기 캐시에 먼저 쓰고, 나중에 배치나 트리거로 DB에 반영
쓰기 흐름 1. cache.put(key, value) 호출
2. 캐시 저장 완료 후
3. 즉시 DB 쓰기
1. cache.put(key, value) 호출
2. 캐시만 업데이트
3. 일정 시점에 Dirty-entry를 모아서 DB 쓰기
DB 쓰기 타이밍 동기적, 쓰기 호출 시점에 바로 반영 비동기적, 지연(예: TTL, 배치, evict 시점)에 반영
데이터 일관성 보장 강력(쓰기 직후 DB와 캐시 일치) 약함(아직 DB에 반영되지 않은 쓰기 데이터가 캐시에만 존재할 수 있음)
쓰기 레이턴시 상대적으로 높음(캐시+DB 두 번 쓰기) 상대적으로 낮음(한 번만 캐시에 쓰기)
장애 복구 & 내구성 쓰기 실패 시 즉시 예외 처리 가능 캐시 장애나 장애 복구 시 Dirty 데이터 유실 위험
캐시 오염(Dirty) 관리 없음 “Dirty bit” 관리 필요 → evict, 주기적 flush 로직 구현
구현 복잡도 낮음(동기 쓰기만 구현) 높음(Dirty 식별·스케줄·재시도 로직 등 추가)
적합한 사용 사례 • 데이터 일관성이 최우선일 때
• 쓰기 빈도가 낮거나 허용 가능한 레이턴시일 때
• 쓰기 빈도가 매우 높고, 약간의 지연 반영을 허용할 때
• DB 부하 완화가 급선무일 때
반응형

'Programming > Linux' 카테고리의 다른 글

docker-compose  (0) 2023.10.28
Nginx  (1) 2023.08.16
kafka  (0) 2023.07.31
ELK연습  (0) 2023.07.30
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30

F5를 눌러서 라인디버깅이 가능한 디버거를 띄우기 위해서는,

 

먼저 package.json에서 다음 부분을 수정해준다.

  "scripts": {
    "dev": "NODE_OPTIONS='--inspect' next dev",
    ...
  },

그 다음, vscode에서 launch.json을 만든다음(이건 그냥 F5누르고 node.js눌러도 기본템플릿은 만들어준다.)

다음 내용을 써준다.

{
    "version": "0.2.0",
    "configurations": [
        {
            "type": "node",
            "request": "launch",
            "name": "Launch Next.js",
            "runtimeExecutable": "${workspaceRoot}/node_modules/.bin/next",
            "cwd": "${workspaceRoot}",
            "console": "integratedTerminal",
            "internalConsoleOptions": "neverOpen",
            "skipFiles": ["<node_internals>/**"]
        },
    ]
}

위처럼 하면 기본적인 디버깅환경은 구축이 된 것이고, Ctrl+F5로 실행하면  break포인트가 잡히지 않는다.

 

하지만 론치과정이 단순히 npm run dev하는 것 보다는 헤비하기 때문에 Ctrl+Shift+B를 눌러서 가볍게 실행하고 싶으면,

다음처럼 launch.json과 같은곳에 tasks.json을 만들어 준다.

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "Run Next.js Without Debugging",
            "type": "shell",
            "command": "npm",
            "args": ["run", "dev"],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            }
        }
    ]
}

 

반응형

'Programming > node.js' 카테고리의 다른 글

react/next.js환경에서 http에서 https로 변경하기  (0) 2023.10.12

다음 명령을 통해 next.js 프로젝트를 생성한다.

sevity@sevityubuntu:~/workspace/online_judge$ mkdir problem-frontend
sevity@sevityubuntu:~/workspace/online_judge/problem-frontend$ npx create-next-app .
✔ Would you like to use TypeScript? … No / Yes
✔ Would you like to use ESLint? … No / Yes
✔ Would you like to use Tailwind CSS? … No / Yes
✔ Would you like to use `src/` directory? … No / Yes
✔ Would you like to use App Router? (recommended) … No / Yes
✔ Would you like to customize the default import alias? … No / Yes

Would you like to use TypeScript? - 추천: 예
Would you like to use ESLint? - 추천: 예
Would you like to use Tailwind CSS? - 추천: 아니오
Would you like to use src/ directory? - 추천: 예
Would you like to use App Router? - 추천: 예
Would you like to customize the default import alias? - 추천: 아니오

 

Bootstrap 테마변경

 Bootswatch와 같은 웹사이트에서 무료로 제공하는 Bootstrap 테마를 선택가능

테마적용방법


1. 테마 CSS 파일 다운로드: 먼저, 테마 페이지에 접속. 페이지 상단의 "Download" 버튼을 클릭하고, "bootstrap.min.css" 파일을 다운로드

2. 프로젝트에 CSS 파일 추가: 다운로드한 CSS 파일을 public/css 디렉토리를 만들고 그 안에 CSS 파일을 추가

3. CSS 파일 import: _app.js 파일을 열고, 다음 코드를 추가

import '../public/css/bootstrap.min.css';

 

next.js pages

/create 아래와 같이 문제를 새로 입력한다.(아래는 아직 실제입력,실제출력등 빠진부분들이 많다)

 

/problems 문제 목록을 아래와 같이 출력한다.

(로그인 상태가 아니라서 username이 없을 경우, frontend-service/login 으로 direct 된다)

 

 

 

 

 

 

 

반응형

문제관리 백엔드 서비스는 online-judge프로젝트의 7개 서비스중 3번째 서비스이다.

  1. 인증 서비스 (Backend): 사용자의 회원 가입, 로그인, 로그아웃, 세션 관리 등을 담당
  2. 인증 서비스 (Frontend): 사용자 인터페이스를 제공 (로그인 폼, 회원가입 폼 등)
  3. 문제 관리 서비스 (Backend): 문제의 추가, 삭제, 수정 등을 관리
  4. 문제 관리 서비스 (Frontend): 문제를 보여주고, 문제 추가, 삭제, 수정 등의 인터페이스를 제공
  5. 제출 관리 서비스 (Backend): 사용자의 코드 제출 및 제출 기록을 관리
  6. 제출 관리 서비스 (Frontend): 코드 제출 인터페이스와 제출 기록 확인 인터페이스를 제공
  7. 채점 서비스 (Backend): 제출된 코드를 채점

다른 서비스는 링크를 눌러확인하자.

 

문제관리 백엔드 서비스는 java/vscod가 아닌 kotlin/IntelliJ로 해보기로 했다. (kotlin호환성이 IntelliJ가 훨씬 좋음)

디렉토리 구성은 다음과 같다.

sevity@sevityubuntu:~/workspace/online_judge/problem-service$ tree -I target
.
├── HELP.md
├── log
│   └── application.log
├── mvnw
├── mvnw.cmd
├── pom.xml
├── run.sh
└── src
    ├── main
    │   ├── kotlin
    │   │   └── com
    │   │       └── sevity
    │   │           └── problemservice
    │   │               ├── controller
    │   │               │   └── ProblemController.kt
    │   │               ├── domain
    │   │               │   ├── Problem.kt
    │   │               │   └── ProblemRepository.kt
    │   │               ├── ProblemServiceApplication.kt
    │   │               └── service
    │   │                   └── ProblemService.kt
    │   └── resources
    │       ├── application.properties
    │       ├── static
    │       └── templates
    └── test
        └── kotlin
            └── com
                └── sevity
                    └── problemservice
                        └── ProblemServiceApplicationTests.kt

18 directories, 13 files

 

문제에 대한 스키마를 다음과 같이 설정했다.

제대로 하려면, real_input, real_output, solution.cpp 등이 추가되어야 하지만, 우선은 간단하게 했다.

CREATE TABLE problems (
  id SERIAL PRIMARY KEY,
  title VARCHAR(100) NOT NULL,
  description TEXT NOT NULL,
  example_input TEXT NOT NULL,
  example_output TEXT NOT NULL
);

 

 

초기 IntelliJ설정

Ultimate버전과 Community버전이 있는데 전자만 SpringBoot관련 기능이 제공된다.

먼저 로컬환경에서 IntelliJ를 실행한 후,  File > New > Project > Spring Initializr를 통해 프로젝트를 로컬에 생성한다.

위의 내용은 프로그램관리 백엔드에 해당하진 않는다. 개요만 참조하자.

그다음 생성된 파일들을 ssh환경으로 원격복사한다.

 

그다음음 다음과 같이 Remote Development > SSH로 들어가서 원격 개발환경을 설정한다.

application.properties 파일을 다음과 같이 설정. 포트번호는 7개 서비스중 3번째라는 의미로 8083으로 부여.

spring.datasource.url=${DATABASE_URL}
spring.datasource.username=${DATABASE_USERNAME}
spring.datasource.password=${DATABASE_PASSWORD}
spring.jpa.hibernate.ddl-auto=update
logging.file.name=log/application.log
# without below line, web login is needed.
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
server.port=8083

알고리즘 문제(problem)에 대한 도메인 entity와 repository를 domain이라는 패키지(폴더)에 다음과 같이 작성한다. 위의 DB스키마와 알맞도록 작성.

먼저 Problem entity는 다음과 같이 작성

package com.sevity.problemservice.domain

import javax.persistence.*

@Entity
@Table(name = "problems")
data class Problem(
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,

    @Column(nullable = false)
    val title: String = "",

    @Column(nullable = false)
    val description: String = "",

    @Column(name = "example_input", nullable = false)
    val exampleInput: String = "",

    @Column(name = "example_output", nullable = false)
    val exampleOutput: String = ""
)

JPA로 DB와 연동을 자동화해주는 Repository는 다음과 같이 작성

package com.sevity.problemservice.domain

import com.sevity.problemservice.domain.Problem
import org.springframework.data.jpa.repository.JpaRepository

interface ProblemRepository : JpaRepository<Problem, Long>

 

MVC에서 C에 해당하는 Control 클래스를 아래와 같이 작성

package com.sevity.problemservice.controller

import com.sevity.problemservice.domain.Problem
import com.sevity.problemservice.service.ProblemService
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/problems")
class ProblemController(private val problemService: ProblemService) {

    @GetMapping
    fun getAllProblems() = problemService.getAllProblems()

    @GetMapping("/{id}")
    fun getProblem(@PathVariable id: Long): Problem = problemService.getProblem(id)


    @PostMapping
    fun createProblem(@RequestBody problem: Problem): Problem = problemService.createProblem(problem)

    @PutMapping("/{id}")
    fun updateProblem(@PathVariable id: Long, @RequestBody problem: Problem): Problem = problemService.updateProblem(id, problem)

    @DeleteMapping("/{id}")
    fun deleteProblem(@PathVariable id: Long): ResponseEntity<Void> {
        problemService.deleteProblem(id)
        return ResponseEntity<Void>(HttpStatus.NO_CONTENT)
    }

    // Add more methods as needed for CRUD operations
}

여기서 /problems라는 하나의 자원(동사가 아닌명사)에 대해서 GET, POST, PUT, DELETE를 모두 사용하는 RESTFul 권장 패턴을 사용하고 있다. problems로 복수형으로 지칭하는 것도 권장 가이드.

 

실제로 호출해 보는 것은 curl을 써도 되지만, postman을 쓰면 편하게 할 수 있다.

postman은 여기서 다운받고,

실행후 new > HTTP를 통해서 GET, POST, PUT, DELETE를 하나씩 테스트 하면된다.

GET은 문제 전체나 특정 id를 받아올 수 있고 아래처럼 url만 지정하고 SEND하면 된다.(결과에서 200 OK확인)

POST는 문제를 등록하는 과정이고, 

headers 탭에 Key: Content-Type, Value: application/json 을 추가하고

body에서 raw를 선택하고 아래처럼 문제내용을 입력하고 SEND해주면 된다.(결과에서 200OK확인)

PUT은 문제를 업데이트 하는 과정이고 POST와 마찬가지 설정으로 하면 된다.(POST, PUT만 다르고 나머진 동일)

DELETE는 지우고자 하는 문제ID를 url끝에 넣어주기만 하면 되고, 200OK가 아닌 204 No Content가 나오면 성공.

 

MVC에서 M과 C를 연결하는 비즈니스 로직에 해당하는 Service클래스를 아래와 같이 작성

package com.sevity.problemservice.service

import com.sevity.problemservice.domain.Problem
import com.sevity.problemservice.domain.ProblemRepository
import org.springframework.stereotype.Service

@Service
class ProblemService(private val problemRepository: ProblemRepository) {

    fun getAllProblems(): List<Problem> = problemRepository.findAll()
    fun createProblem(problem: Problem): Problem = problemRepository.save(problem)
    fun getProblem(id: Long): Problem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
    fun updateProblem(id: Long, problem: Problem): Problem {
        val existingProblem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
        val updatedProblem = existingProblem.copy(
            title = problem.title,
            description = problem.description,
            exampleInput = problem.exampleInput,
            exampleOutput = problem.exampleOutput
        )
        return problemRepository.save(updatedProblem)
    }
    fun deleteProblem(id: Long) {
        val existingProblem = problemRepository.findById(id).orElseThrow { NoSuchElementException("Problem not found") }
        problemRepository.delete(existingProblem)
    }

}

 

트러블슈팅


원격으로 열었더니 ._로 시작하는 중복파일들이 생길 경우

._로 시작하는 파일들은 macOS에서 생성하는 메타데이터 파일들이다. 이 파일들은 macOS 외의 시스템에서는 필요하지 않으며 삭제해도 안전하다. 다음 명령으로 삭제

find . -name '._*' -type f -delete

1회 지운 이후로 문제가 생긴적은 없다.

 

javax.persistance 관련 오류

springboot 버전을 3이상으로 했더니, import javax.persistance 에 실패하는 현상 발견됨

여기 참고해서, pom.xml에서 아래처럼 2.5.3으로 버전을 낮춰서 해결

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

 

 

 

반응형

kafka란

RabbitMQ와 같은 메시지 미들웨어.

LinkedIn에서 개발되어 현재는 Apache Software Foundation의 일부인 오픈 소스 메시지 스트리밍 플랫폼.
대량의 실시간 데이터 스트리밍을 처리하는 데 초점을 맞추고 있고, 높은 처리량, 데이터 복제, 분산 처리 등을 지원

이를 통해 대규모 데이터를 실시간으로 처리할 수 있다.

MSA간 통신에도 자주 쓰임

 

브로커: Kafka에서 메시지를 처리하는 서버

  • 카프카에서는 이벤트를 구분하기 위한 단위로 '토픽'을 사용하며 파일시스템의 디렉토리와 비슷한 개념으로 이해 가능
    • 토픽은 게시/구독 메시징 시스템에서 흔하게 볼 수 있다.
      • 카프카 외적으로는 비슷한 개념에 대해 채널, 큐, 스트림이라는 용어를 사용하기도 함
        • 개인적으로는 채널이 더 직관적이네
    • 프로듀서는 데이터를 '토픽'에 넣고, 컨슈머는 '토픽'에서 데이터를 읽는다.
    • 이러한 토픽은 카프카 클러스터내에서 데이터를 분류하고 구독자가 관심있는 메시지만 구독할 수 있도록 해주는 중요한 역할
    • 카프카 클러스터 전체가 하나의 토픽만 사용한다면 생략도 가능한가? > NO.
  • 토픽의 데이터는 파티션에 분산 저장되며 각 파티션은 순서 유지가 되는 메시지의 로그로 구성
  • 하나의 브로커는 여러 파티션의 데이터를 저장할 수 있으며, 반대로 하나의 파티션도 여러 브로커에 복제될 수 있음에 주의
    • 단 leader, follower구조를 가지면 복제는 읽기성능개선용이 아니라 장애 대비용이다(active-standby)
  • 파티션의 개수는 토픽의 병렬처리 능력을 결정

리더 브로커: 특정 메시지그룹(파티션)의 처리를 담당하는 서버. Kafka에서 각 데이터 파티션에는 하나의 리더 브로커가 있음

  • 리더 브로커는 파티션과 1:1의 관계를 가지며 하나의 파티션을 여러 브로커가 처리할경우 하나만 리더이고 나머지는 팔로워로 동작

 

ZooKeeper

  • 일종의 비서역할로 서버들의 상태관리나 역할분배를 Kafka를 위해서 수행함. Kakfa설치시 같이 설치됨.
  • 또는 일종의 '중앙 데이터 저장소' 역할. Kafka 시스템 내의 여러 서버들이 ZooKeeper를 통해 필요한 정보 공유하고 동기화.
  • 특정 서버에 문제가 생기면 이 정보가 ZooKeeper에 기록되고 다른 서버들이 이를 확인하여 적절히 대응

 

심화

토픽, 키, 밸류가 있을때 키별로 파티션으로 분배되고(특정 키는 특정파티션으로 들어감) 데이터 전송순서는 키별로만 보장됨

그룹id개념

  • 컨슈머 관점에서 소비하는 팀이 여러개임을 가정하고 group id를 통해서 별도 offset관리가 가능함

MSA간 통신에 API(Rest, gRPC)보다 kafka가 스루풋에 유리한 이유

  • Kafka는 데이터를 디스크에 순차적으로 추가(write)만 합니다.
  • Zero Copy + Sendfile() 사용(유저공간 메모리에 올리지 않고 커널 버퍼만 탐)
  • Kafka는 메시지를 1건씩 처리하지 않고, batch로 묶어 전송하고 압축까지함
  • 파티셔닝으로 병렬 처리

TPS비교항목

  API 서버 (REST/gRPC 등) Kafka (Broker 기준)
TPS 수준 수천~수만 TPS (10K~50K) 수십만~수백만 TPS (100K~1M 이상)
지연 시간 수 ms ~ 수백 ms 수 ms ~ 수십 ms

하이브리드도 많이 씀

  • 주문 API → Kafka enqueue → 매칭엔진

Producer > Broker > Consumer

  • Producer > Broker 구간은 push (1-5ms)
  • Broker > Consumer 구간은 pull방식 (5-20ms)

 

Kafka에서 “Exactly-Once”를 보장하려면 트랜잭션이 필요

Kafka는 기본적으로 At-Least-Once(최소 1번) 처리 모델.
Producer가 실패 시 재시도하면 중복 메시지가 발생할 수 있기 때문.

그걸 방지하려면:

  1. 중복 없이 보내야 하고 (Producer → Kafka)
    1. 아래와 같은 부분성공 시나리오때문에 트랜색션처리 필요
      1. A 토픽엔 성공, B 토픽엔 실패 → 시스템 불일치
      2. 배치 중 네트워크 끊김 → 일부 메시지만 브로커에 기록
    2. 구현방법
        1. enable.idempotence=true → 중복 방지
        2. 트랜잭션을 쓰려면
          • transactional.id 를 설정하고
          • initTransactions() 호출 후
          • beginTransaction()/commitTransaction() (또는 abortTransaction()) 을 직접 사용해야 합니다.
        즉, 두 기능은 별개지만, 트랜잭션을 쓰려면 반드시 Idempotent Producer 가 활성화된 상태여야 합니다.
  2. 커밋된 메시지만 읽고 (Kafka → Consumer)
    1. isolation.level=read_committed 로 설정하면, 트랜잭션 커밋된 메시지(=commitTransaction())만 컨슈머가 보도록 함
  3. 소비한 메시지는 한 번만 처리하고 커밋해야 함 (Consumer → DB)
    1. "enable.auto.commit=false"  자동 commit끔
    2. 외부api호출과 연계한다면 호출성공후 outbox OK처리
      1. 이것도 외부api호출후 시스템이 죽으면 완전한 exactly-once는 안되는데, 멱등성 로직으로 보완(외부 api호출시 멱등성처리)
      2. 또는 saga패턴을 통한 보상 매커니즘 작동

 

장애대응

로깅이 아닌 핵심컴포넌트로 사용시 

1. 고가용성 세팅: 브로커 3대이상 운영, 하나 죽어도 자동리더선출로 단일 브로커 장애 정도는 무중단으로 흡수할 수 있어야 함

2. Producer acks=all설정: 모든 브로커가 받아야 다음 진행

3. Producer auto.offset.commit → false : 오프셋 관리 꼼꼼하게

4. Producer 반복된 전송실패시 임시저장소에 적재하고 replay worker가 주기적으로 꺼내서 재전송시도(3회정도 실패하면 실패로그 기록해서 스레드를 계속차지하지 않게 함)

Kafka 전송 실패 → 3회 retry (100ms 간격) → 그래도 실패 → Redis fallback 저장 → @Scheduled replay worker가 Kafka 재전송 시도 → 여전히 실패 시 운영 Slack 알림 or DLT 적재

5. Consumer DLT

  • 소비중 Json 파싱에러등으로 오류가 발생하면 DLT로 보내고 메인 서비스는 정상적으로 계속처리
  • DLT를 나중에 따로 모아서 분석/재처리/알림 등 가능
  • 주 토픽 (orders)
       └── Consumer 처리 중 에러 발생
            └── Dead Letter Topic (orders.DLT)로 해당 메시지 저장, replay consumer가 소비, 도는 운영이슈로.

6. 고가용성세팅에도 불구하고 모든 브로커가 죽었을때: 단일 장애지점(SPOF)가 되지 않도록 별도 로깅시스템(역시 카프카지만)으로 S3 > hiveTable구조를 만들고 복원에 사용

 

고객관점

방법1. react로 500ms등 타임박스를 설정하고, 비동기로 몇차리 전송시도후 안되면 실패로 판정하고 고객에게 안내(다시해보세요 등)

방법2. 완전비동기로 설계하여 고객에게는 주문이전송되었습니다 정도로하고 성공은 비동기로 고객에게 전달

다만, 현재 페이지가 유지될경우 websocket을 통해 상태메시지 갱신. 이때 api서버와 push서버는 분리해주는 게 좋음(영향을 줄 수 있으므로)

방법3. 완전동기화. 모래시계로 외부api가 timeout날때까지 기다리거나 하는 방식인데 권장되지는 않음. 정말 중요한 프로세스면 선택할수도

 

kafka설치과정

Java설치

카프카는 Java기반이므로 java jdk가 안깔렸으면 깔아준다.

sudo apt update
sudo apt install default-jdk

사용자계정에 종속된 설치보다는 여러명이 쓸 수 있도록 아래 처럼 전용 유저를 생성해주는게 좋다.

sudo adduser kafka
sudo adduser kafka sudo

카프카는 자바 기반이므로 jar파일을 받아서 설정해준다. apt등의 패키지 매니저를 사용하면 예전버전일수 있으므로 다운로드해서 해보자.

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir /opt/kafka && cd /opt/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1


#kafka디렉토리 소유권을 kafka사용자로 변경
sudo useradd kafka -m
sudo chown -R kafka:kafka /opt/kafka

/opt/kafka에 설치했으며, /opt 디렉토리는 선택적인(add-on) 애플리케이션 SW패키지를 위한 곳으로 여러사용자가 이용하는 라이브러리인 경우 선호되는 경로다.

 

zookeeper설정

zookeeper는 kafka와 독립적으로 실행되며, 비슷한 설정 매커니즘을 갖는다.

zookeeper설정파일 수정

sevity@sevityubuntu:/opt/kafka/config$ cat zookeeper.properties
# the directory where the snapshot is stored.
dataDir=/var/cache/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

다른 부분은 그냥 두었고, dataDir은 /var/cache 밑에 두어 휘발되지 않도록 설정했다. (기본은 /tmp 밑에 있었던듯)

이를 위해 다음과 같은 권한 설정을 했다.

sudo mkdir /var/cache/zookeeper
sudo chown kafka:kafka /var/cache/zookeeper
sudo chmod 700 /var/cache/zookeeper

 

zookeeper에 대해 systemctl에 등록해서 부팅시 마다 실행되도록 하기위해 다음 파일 작성

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start zookeeper
sudo systemctl enable zookeeper

kafka에 대해서도 마찬가지로 작성

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

# 그 다음 systemctl을 통해 시작하고 재부팅 시에도 항상 실행되도록 설정
sudo systemctl start kafka
sudo systemctl enable kafka

 

트러블 슈팅

서버로그 실시간 모니터링

tail -f /opt/kafka/logs/server.log

 

jps로 kafka가 잘 실행되고 있는지 확인(QuorumPeerMain과 Kafka가 떠있으면OK)

su - kafka
jps
109092 Jps
108186 QuorumPeerMain(Zookeeper의 시작점)
108573 Kafka

 

 

토픽 메시지가 잘 수신되는지 확인하는 방법

가장 기본적으로는 9092포트를 리슨하고 있는지 확인

sevity@sevityubuntu:/opt/kafka/config$ lsof -i :9092
COMMAND    PID   USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    168958 sevity  116u  IPv6 1324110      0t0  TCP sevityubuntu:46330->sevityubuntu:9092 (ESTABLISHED)
java    168958 sevity  117u  IPv6 1320714      0t0  TCP sevityubuntu:46336->sevityubuntu:9092 (ESTABLISHED)

 

GetOffsetSell을 통해 특정 토픽에 게시된 메지시수를 확인

sevity@sevityubuntu:/opt/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic iis_log --time -1
iis_log:0:4043038

kafka-console-consumer.sh를 사용하여 특정 토픽의 메시지 수신상황을 실시간으로 확인

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iis_log --from-beginning
{"@timestamp":"2023-08-01T01:18:34.407Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.9.0"},"agent":{"version":"8.9.0","ephemeral_id":"98df9137-96e3-4653-b353-2821eebde875","id":"cf839823-29c8-4619-b9c4-85e90804e50e","name":"SEVITY-PC","type":"filebeat"},"ecs":{"version":"8.0.0"},"log":{"file":{"path":"c:\\inetpub\\logs\\LogFiles\\W3SVC1\\u_ex230801.log"},"offset":16076},"message":"2023-08-01 01:17:24 192.168.0.6 GET /wiki/doku.php id=Advice%20To%20Relieve%20Your%20Acid%20Reflux%20Signs%20and%20symptoms 80 - 196.245.181.117 Mozilla/5.0+(X11;+Ubuntu;+Linux+x86_64;+rv:114.0)+Gecko/20100101+Firefox/114.0 http://sevity.com/ 200 0 0 1182","input":{"type":"log"},"host":{"mac":["00-15-83-EA-2C-EE","00-23-24-63-E7-E3","02-50-41-00-00-01","88-36-6C-F7-D9-04","88-36-6C-F7-D9-06","88-36-6C-F7-D9-07"],"hostname":"sevity-pc","name":"sevity-pc","architecture":"x86_64","os":{"family":"windows","name":"Windows 10 Enterprise","kernel":"10.0.19041.3208 (WinBuild.160101.0800)","build":"19045.3208","type":"windows","platform":"windows","version":"10.0"},"id":"dbb18b3d-fb42-49ec-b731-f430dd2f3fd5","ip":["fe80::a30b:d99a:f7ed:2bac","192.168.100.15","fe80::a4ce:c09a:ea91:8a9","169.254.142.88","fe80::9c93:77c8:66bd:b6c5","169.254.55.217","fe80::dd77:ec13:69c4:6922","169.254.56.104","fe80::310a:1b3d:e9a3:431c","192.168.0.6","fe80::2c27:d03a:a322:765b","169.254.236.36"]}}

 

반응형

'Programming > Linux' 카테고리의 다른 글

Nginx  (1) 2023.08.16
Redis  (0) 2023.08.10
ELK연습  (0) 2023.07.30
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30
스팍(Spark) 설치  (0) 2023.07.30

먼저 IIS서버에 Filebeat를 깔아서 우분투 분석 서버에 kafka전송을 한다.

반응형

'Programming > Linux' 카테고리의 다른 글

Redis  (0) 2023.08.10
kafka  (0) 2023.07.31
스팍 - 실습 - 웹서버 세션분석  (0) 2023.07.30
스팍(Spark) 설치  (0) 2023.07.30
하둡 - 실습 - 웹서버 세션분석  (1) 2023.07.29

+ Recent posts