티스토리 뷰

스트림즈DSL 라이브러리 추가

  • kafka-streams를 추가한다. 강의에선 사용하는 버전이 2.5.0이라 해당 버전으로 하였지만 자신이 사용하는 버전에 맞게 설치하면 된다.

필터링 스트림즈 애플리케이션

  • 위처럼 filter() 메서드를 이용하여 stream_log 토픽에서 들어온 메시지 키 또는 값에 대하여 원하는 조건에 맞게 처리하도록 할 수 있다.
  • filter() 메서드는 스트림즈DSL에서 사용 가능한 필터링 스트림즈 프로세서이다.
  • 예로 간단하게 로그의 value 문자열의 길이가 일정길이 이상인 경우만 값을 취하는 필터링 코드를 작성할 수 있다.

코드

private static String APPLICATION_NAME = "streams-filter-application";
private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";

public static void main(String[] args) {

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> streamLog = builder.stream(STREAM_LOG);
//        KStream<String, String> filteredStream = streamLog.filter(
//                (key, value) -> value.length() > 5);
//        filteredStream.to(STREAM_LOG_FILTER);

    streamLog.filter((key, value) -> value.length() > 5).to(STREAM_LOG_FILTER);


    KafkaStreams streams;
    streams = new KafkaStreams(builder.build(), props);
    streams.start();
}
  • 추가적인 프레임워크 없이 단순 자바 메인코드로 작성이 가능하다.
  • stream() 메서드로 특정 토픽에 있는 메시지를 가져오고, filter()로 원하는 필터링을 수행하고, to() 메서드로 특정 토픽에 데이터를 저장하게 된다.

테스트

1. 스트림을 위해 source를 위한 topic과 destination을 위한 topic 2개를 생성해준다.

2. 토픽을 생성했으면 메시지를 토픽에 쌓아주기 위한 stream_log 프로듀서 와 필터링 되어 쌓여있는 토픽을 읽기위한 stream_log_filter 컨슈머를 작동시킨다.

3. consumer에서 필터링된 메시지만 확인할 수 있다.

스트림즈DSL - KTable과 KStream을 join()

  • KTable은 키를 기준으로 하나의 값만 가지는 Map과 같은 구조라고 설명했으며, 주소와 같은 최신 데이터 하나만을 관리할 때 사용한다.
  • KStream은 지속적으로 들어노는 값을 가지는 구조이며 모든 이력들을 관리할 때 사용한다.

카프카의 join()의 특징으로 지금까지 DB 조인의 경우 정적으로 저장된 데이터에 대해서만 조인을 사용했지만 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다는 점이 있다.

따라서 위 주문과 주소 데이터에 대해 조인을 하므로서 사용자의 이벤트 데이터를 DB에 저장하지 않고도 들어오는 주문마다 어느 주소로 보내야 하는지 처리할 수 있게 된다. 이렇게 이벤트 기반 스트리밍 데이터 파이프라인을 구성할 수 있다.

위 구조처럼 somin이라는 사용자 키에 대해서 stream에서는 iPhone 주문이 table에서는 Busan 주소가 존재하여 join을 통해 somin의 iPhone주문을 Busan주소로 보내도록 이벤트를 만들 수 있게된다.

추가로 somin의 주소가 Jeju로 변경되더라도 table은 최신 데이터를 갖게 되므로 변경된 Jeju주소로 처리하게 된다.

이때, stream에 들어온 키:값에 대해서 table에 키가 없다면 처리하지 못하게 되므로 주의하자.

 

테스트

1. 먼저 주소, 주문, 주문_조인 토픽 3개를 생성해준다.

  • 이때 co-partitioning을 위해 조인 대상이 되는 주문과 주소 토픽은 파티션 3개로 꼭! 갖게 맞춰야 함을 인지하자.

2. 주문과 주소 데이터를 위해 address, order 토픽에 producer를 실행 후 메시지를 생성해준다.

  • 이때 주의할 점으로 join()은 자동으로 메시지 키를 기준으로 적용되므로 꼭 조인이 되길 바라는 메시지들의 키를 맞춰줘야 한다.

3. 조인을 위한 스트림즈 애플리케이션을 실행시켜준다.

public class KStreamJoinKTable {

    private static String APPLICATION_NAME = "order-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_TABLE = "address";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}

join()이 이뤄지고 to()order_join 토픽으로 보낸 데이터를 consumer를 통해 확인할 수 있다. 또한 address 프로듀서로 groom의 주소가 추가로 들어오면서 table이 업데이트 되고 groom에 새 order가 들어오면 해당 주소로 join()이 이뤄지는 것을 볼 수 있다.


스트림즈DSL - GlobalKTable과 KStream을 join()

order 토픽과 address 토픽은 코파티셔닝 되어 있으므로 각각 KStream과 KTable로 선언해서 조인을 할 수 있었다. 하지만 코파티셔닝 되어있지 않은 토픽을 조인하는 경우는 바로 할 수 없으며 2가지 방법으로 조인이 가능하다.

  • 첫번째 방법. 리파티셔닝을 수행하여 코파티셔닝이 된 상태로 조인을 진행한다.
  • 두번째 방법. KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용한다.

테스트

파티션이 2개인 address_v2 토픽

1. 기존 토픽과 파티션 수가 다르도록 2개로 설정하여 address_v2 토픽을 생성한다. 이제 order 토픽과는 파티션 수가 다르므로 코파티셔닝 되지 않은 상태이다.

 

2. GlobalKTable과 KStream을 조인하는 애플리케이션을 작성 후 실행한다.

public class KStreamJoinGlobalKTable {

    private static String APPLICATION_NAME = "global-table-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_GLOBAL_TABLE = "address_v2";	// 새로만든 파티션 2개 토픽
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressGlobalTable,
                (orderKey, orderValue) -> orderKey,					// 조인할 키를 설정
                (order, address) -> order + " send to " + address)  // 전송할 메시지 값 설정
                .to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}

3. address_v2와 order 토픽에 프로듀서로 메시지를 전송 후 order_join 토픽에 쌓인 메시지를 컨슈머로 확인해본다.

위와 같이 파티션이 3개인 order 토픽과 파티션이 2개인 address_v2 토픽을 KStream과 GlobalKTable을 통해 join()을 수행하여 데이터를 전송하고 order_join 토픽에서 컨슈밍 받는것을 확인할 수 있다.

이때, 위에서 말했듯이 GlobalKTable의 전략은 자신의 모든 메시지를 조인 대상 토픽인 KStream의 모든 파티션에 데이터를 보내어 join을 수행하는 것이다. 따라서 부하가 심할 수 있으니 항상 주의하도록 하자!


스트림즈DLS - window processing

윈도우 연산은 스트림 데이터를 분석할 때 가장 많이 활용되는 프로세싱이다.

  • 특정 시간대에 대응하여 취합 연산을 처리할 때 활용한다.
  • 카프카 스트림즈는 4가지의 윈도우 프로세싱 방법을 지원한다.
    • 텀블링 윈도우, 호핑 윈도우, 슬라이딩 윈도우, 세션 윈도우
  • 모든 프로세싱은 메시지 키를 기준으로 취합된다.
    • 따라서 해당 토픽에 동일한 파티션에는 동일한 케시지 키가 있는 레코드가 존재해야만 정확한 취합이 가능하다.

텀블링 윈도우

  • 서로 겹치지 않은 윈도우를 특정 간격으로 지속 처리할 때 사용한다.
  • 위 사진의 경우 5 단위마다 처리하고 있다.
  • 윈도우 최대 사이즈에 도달하면 해당 시점에 데이터를 취합하여 결과를 도출한다.
    • 따라서, 매번 요청이 올때마다 처리하지 않고 bulk 처리를 하므로 더욱 효율적으로 연산을 수행할 수 있다.
  • 단위 시간당 데이터가 필요할 경우 사용할 수 있다.
    • ex) 매 5분간 접속한 고객의 수를 측정하여 방문자 추이를 실시간 취합하는 경우

호핑 윈도우

  • 겹치는 윈도우를 갖는다.
  • 일정 시간 간격으로 겹치는 윈도우가 존재하는 윈도우 연산을 처리할 때 사용한다.
  • 윈도우 사이즈를 위한 변수와 윈도우 간격을 위한 변수 2가지를 갖는다.
    • 윈도우 사이즈: 연산을 수행할 최대 윈도우 크기
    • 윈도우 간격: 서로 다른 윈도우 간 간격
    • 위 예제 호핑 윈도우의 경우 윈도우 사이즈는 10, 윈도우 간격은 5이다.
  • 윈도우끼리 겹치지 않는 텀블링 윈도우와 다르게 동일한 키의 데이터가 서로 다른 윈도우에서 여러번 연산될 수 있다.

 

슬라이딩 윈도우

  • 데이터의 정확한 시간을 바탕으로 윈도우 사이즈에 포함되는 데이터를 모두 연산에 포함시킨다.
    • 정확한 시간이란? : 각각의 레코드에 포함된 timestamp의 시간을 말한다.
  • 이를 통해 특정 레코드들이 정확히 윈도우 사이즈 시간안에 포함되어있음을 파악할 수 있다.

세션 윈도우

  • 동일 메시지 키의 데이터한 세션에 묶어 연산할 때 사용한다.
  • 세션의 최대 만료시간까지 윈도우 사이즈를 갖게된다.
  • 구체적인 윈도우 사이즈가 있는 것이 아니라 세션의 최대 만료시간에 따라 윈도우 크기가 가변적이다.
  • 세션이 종료되고 해당 윈도우 내 모든 데이터를 취합 후 연산한다.

텀블링 윈도우 예제

  • 텀블링 윈도우를 사용하기 위해서 groupByKey, windowedBy를 사용해야 한다.
  • windowedBy의 파라미터는 텀블링 윈도우의 사이즈를 의미한다.
  • 텀블링 연산으로 출력된 데이터는 KTable로 커밋 interval마다 출력된다.

주의사항

텀블링 사이즈를 5초로 설정하고 input과 같이 데이터가 들어왔다고 가정하자.

이때 우리는 오른쪽 위 output을 예상할 것이다. 하지만 commit 간격이 3초라면 커밋 간격마다 출력하게 되어 3, 6, 9초에 output을 받게 된다.

그래서 실제 output은 위와같은 형태가 된다.

 

따라서, 3초대에 받은 (A,2 - 0초~5초) 데이터를 저장한 후 6초에 다시 받은 (A,3 - 0초~5초)로 다시 upsert하여 윈도우에 맞는 데이터를 구하도록 구현하는 것이 중요하다. (A,1 - 6초~10초)도 9초에 받은 (A, 2 - 6초~10초)로 다시 upsert 해야 할것이다.


스트림즈 DSL - Queryable store

KTable은 카프카 토픽의 데이터를 로컬 기반의 rocksDB에 Materialized View로 만들어 두고 사용한다.

  • 따라서 레코드의 메시지 키, 메시지 값을 기반으로 key-value Stroe로 사용이 가능하다.
  • 특정 토픽을 KTable로 사용하고 ReadOnlyKeyValueStroe로 뷰를 가져오면 메시지 키를 기반으로 토픽 데이터를 조회할 수 있다.
  • 마치 카프카를 사용하여 로컬 캐시를 구현한 것과 유사하다.

코드


프로세서 API

프로세서 API도 스트림즈와 같은 토폴로지 형태의 구조를 갖는다. 스트림즈 DSL이 데이터 처리, 분기, 조인을 위한 다양한 메서드들을 제공하지만 추가적인 상세 로직 구현이 필요한 경우 프로세서 API를 활용할 수 있다. 프로세서 API 단독 사용 시 스트림즈DSL의 KStream, KTable, GlobalKTable의 사용은 불가하며 함께 구현하여 사용할 때 사용이 가능하다.

 

프로세서 API를 구현하기 위해서는 Processor 또는 Transformer 인터페이스로 구현한 클래스가 필요하다.

  • Processor 인터페이스: 일정 로직이 이루어 진 뒤 다음 프로세서로 데이터를 넘기지 않을 때 사용
  • Transformer 인터페이스: 일정 로직이 이루어 진 뒤 다음 프로세서로 데이터가 넘길 때 사용

하지만 Processor에서도 `context`를 활용하면 넘길 수 있긴하므로 때에따라 넘긴다면 사용할 수 있다.

Processor 예시

스트림즈 DSL과 같은 토폴로지 형식이므로 사용시에도 아래처럼 source, filter, sync 프로세서를 설정하면 된다.

 

반응형
Comments
반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday