스트림즈DSL 라이브러리 추가 kafka-streams를 추가한다. 강의에선 사용하는 버전이 2.5.0이라 해당 버전으로 하였지만 자신이 사용하는 버전에 맞게 설치하면 된다. 필터링 스트림즈 애플리케이션 위처럼 filter() 메서드를 이용하여 stream_log 토픽에서 들어온 메시지 키 또는 값에 대하여 원하는 조건에 맞게 처리하도록 할 수 있다. filter() 메서드는 스트림즈DSL에서 사용 가능한 필터링 스트림즈 프로세서이다. 예로 간단하게 로그의 value 문자열의 길이가 일정길이 이상인 경우만 값을 취하는 필터링 코드를 작성할 수 있다. 코드 private static String APPLICATION_NAME = "streams-filter-application"; private stati..
카프카 스트림즈 카프카에서 공식적으로 지원하는 라이브러리로, 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다. 스트림즈 애플리케이션 또는 카프카 브로커의 장애가 발생하더라도 Exactly Once를 보장할 수 있도록 장애 허용 시스템 (fault tolerant system)을 가지고 있어서 데이터 처리 안정성이 매우 뛰어나다. 만약 토픽에 있는 데이터를 실시간 스트림 처리 필요가 있다면 스트림즈 애플리케이션을 개발하는 것을 1순위로 고려하는 것이 좋다! 프로듀서와 컨슈머를 조합하지 않고 스트림즈를 사용하는 이유 스트림 데이터 처리에 있어 필요한 다양한 기능을 스트림즈 DSL로 제공하기 때문이다. 이에 필요한 프로세서 API를 사용하여 쉽게 코드로 구현이 가능하다는 장점이 ..
멱등성이란? 전달 신뢰성으로 여러번 연산을 수행하더라도 동일한 결과를 나타내는 것을 뜻한다. 따라서 이러한 멱등성 프로듀서는 동일한 데이터를 여러번 전송해도 카프카 클러스터에 단 한번만 저장되게 된다. 반면에, 기본 프로듀서의 동작 방식은 적어도 한번 전달(at least once delivery)를 지원한다. 여기서 적어도 한번 전달이란 프로듀서가 클러스터에 데이터를 전송하여 저장할 때 적어도 한번 이상 데이터를 적재할 수 있고 데이터가 유실되지 않음을 의미한다. 하지만 두 번 이상 적재할 가능성이 있으므로 데이터의 중복이 발생할 수 있다. 메시지 전달에는 아래와 같이 3가지의 경우가 존재한다. at least once: 적어도 한번 이상 전달 at most once: 최대 한번 전달 exactly o..
컨슈머 애플리케이션 컨슈머가 poll()하는 코드 코드 실행 후 아래와 같이 프로듀서에서 메시지를 보내면 앱 실행 화면에서 응답을 확인할 수 있다. 실행중인 애플리케이션의 로그를 통해서도 확인이 가능하다. 동기 오프셋 커밋 컨슈머 poll() 메소드가 호출된 이후에 commitSync() 메소드를 호출함으로써 오프셋 커밋을 명시적으로 수행할 수 있다. commitSync()는 poll()로 받은 가장 마지막 오프셋을 기준으로 커밋을 하게 된다. 참고로, 동기 오프셋 커밋을 사용하는 경우 모든 레코드의 처리가 끝난 이후 commitSync()를 호출해야한다. 프로듀서에서 메시지를 보내면 정상적으로 메시지가 출력되고 커밋이 이뤄지게 된다. 오프셋 값은 kafka-consumer-groups 명령어로 특정 그..
컨슈머 프로듀서가 전송한 데이터는 브로커에 적재된다. 컨슈머는 이 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 담당한다. 예를 들어 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 쌓여있는 고객 데이터를 가져와서 문자 발송 처리를 진행하게 된다. 컨슈머 내부 구조 Fetcher: 리더 파티션으로부터 레코드들을 미리 가져와서 대기하는 역할 poll(): Fetcher에 (이미)있는 레코드들을 리턴한다. ConsumerRecords: 처리하고자 하는 레코드들의 모음이다. 이때 처리된 레코드는 커밋으로 체크된다. 이 레코드에는 오프셋이 포함되어 있으므로 읽은 위치를 확인할 수 있게 된다. 따라서, 처리를 완료한 레코드에 대해서는 commit을 수행하여 처리한 레..
프로듀서 애플리케이션 개발 위 코드를 실행 시키면 실제 test 토픽에 testMessage 레코드가 쌓인것을 확인할 수 있다. 메시지 키를 가진 레코드를 전송하는 프로듀서 키값을 같이 확인하기 위해 --property옵션을 같이 넣어서 실행시켜준다. 키값 없이 보냈던 메시지의 키는 null로 나오고 그 외의 경우 같이 출력되는 것을 확인할 수 있다. 레코드에 파티션 번호를 지정하여 전송하는 프로듀서 파티션을 직접 지정하고 싶다면 토픽이름, 파티션 번호, 메시지 키, 메시지 값을 차례로 입력하여 보내면 된다. 특정 파티션 (3번)에 메시지를 보내고 컨슈머에서 해당 파티션의 메시지를 읽을 수 있다. 커스텀 파티셔너를 가지는 프로듀서 config에서 PARTITIONER_CLASS_CONFIG에 직접 작성한..
프로듀서 프로듀서 애플리케이션의 역할 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다. 데이터를 전송할 때 리더 파티션을 가지고 있는 브로커와 직접 통신한다. 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다. 프로듀서 내부 구조 ProducerRecord: 프로듀서에서 생성한 레코드, 오프셋은 미포함 send(): 레코드 전송 요청 메소드 Partitioner: 어느 파티션으로 전송할 지 지정하는 파티셔너이다. Accumulator: 배치로 묶어서 전송할 데이터를 모으는 버퍼이다. send()를 해도 바로 보내는것이 아니라 Accumulator내 배치의 임계치까지 데이터가 쌓이면 그때 카프카 클러스터와 TCP 통신하여 데이터를 전송하게 된다. 이러한 배..
카프카 CLI tool 토픽 관련 명령을 실행할 때 필수 옵션과 선택옵션이 존재한다. 해당 값들이 어떻게 설정되어있는지, 어떻게 설정할지 확인 후 CLI 툴을 사용하자. zookeeper-server.start.sh / kafka-server-start.sh 카프카 실행을 로컬에서 진행할 것이므로 로컬 통신을 위한 호스트를 지정해주자. 로그와 인덱스 등의 파일을 저장하기 위한 위치도 지정해준다. etc/hosts 파일에 localhost 주소에 대한 "my-kafka"라는 별칭을 추가해준다. kafka-topics.sh 카프카 토픽에 대해 CLI를 진행해보자. kafka-topcis.sh를 통해 토픽 생성이 가능하다. 카프카 토픽을 생성 시 디폴트 값으로 설정된 것을 확인해 볼 수 있다. 다음과 같이 처..
1. 카프카 역사와 미래 프로듀서 -토픽 - 컨슈머 구조 토픽 내 여러 파티션이 존재하며, 프로듀싱은 하나의 파티션에만 쌓인다. 파티션은 큐 구조로 FIFO로 쌓이지만 컨슈밍하여도 제거되지 않는다. 읽은 위치는 commit으로 기록한다. 높은 처리량 프로듀서 → 브로커, 브로커 → 컨슈머로 데이터를 전송할 때 묶음 단위 전송으로 속도를 개선 여러 파티션에 분배하고, 파티션 수 만큼 컨슈머를 늘림으로서 병렬처리를 하여 시간당 데이터 처리량 개선 가능 확장성 특정 이벤트가 몇건 들어올 지 모를 때 가변적으로 대응할 수 있다 데이터를 저장하고 있는 곳이 브로커이며, 사용량이 적고 많은에 따라 브로커 scale-in/out이 가능 영속성 다른 메시징 플랫폼과 다르게 전송받은 데이터를 메모리가 아닌 파일 시스템에..
- Total
- Today
- Yesterday