티스토리 뷰

컨슈머

  • 프로듀서가 전송한 데이터는 브로커에 적재된다. 컨슈머는 이 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 담당한다.
    예를 들어 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 쌓여있는 고객 데이터를 가져와서 문자 발송 처리를 진행하게 된다.
컨슈머 내부 구조

  • Fetcher: 리더 파티션으로부터 레코드들을 미리 가져와서 대기하는 역할
  • poll(): Fetcher에 (이미)있는 레코드들을 리턴한다.
  • ConsumerRecords: 처리하고자 하는 레코드들의 모음이다. 이때 처리된 레코드는 커밋으로 체크된다. 이 레코드에는 오프셋이 포함되어 있으므로 읽은 위치를 확인할 수 있게 된다.
    • 따라서, 처리를 완료한 레코드에 대해서는 commit을 수행하여 처리한 레코드를 기록해야 지속적인 컨슈밍이 가능하다.
컨슈머 그룹

3번째 방식처럼 일대일 대응으로 처리하는 것이 강력히 권장된다

  • 컨슈머 그룹은 일반적으로 동일한 로직을 갖는 컨슈머들의 모음이다.
  • 각 컨슈머들은 1개 이상의 파티션들에 할당되어서 데이터를 가져갈 수 있고, 동일 컨슈머그룹 내 컨슈머들은 동일한 처리를 하게 된다.
    • 따라서, 컨슈머 그룹내 컨슈머의 수는 가져가고자하는 토픽의 파티션 수보다 작거나 같다는 특징이 있다.
  • 토픽 내 파티션의 레코드들은 컨슈밍하여도 소멸되지 않으므로 다른 로직을 가진 컨슈머 그룹이 중복으로 해당 토픽을 컨슈밍하여 원하는 추가 로직을 적용할 수 있다.

Q. 컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우 어떻게 될까?

A. 4개의 컨슈머로 이루어진 컨슈머 그룹으로 3개의 파티션을 가진 토픽을 컨슈밍 시 1개의 컨슈머는 유후 상태로 남는다. 이는 실질적인 데이터 처리를 못하는 불필요한 스레드이다.

컨슈머 그룹 활용 이유

왼쪽 구조 -> 오른쪽 구조로 개선

왼쪽과 같이 데이터 파이프라인이 구축되어 있을 때를 보자.

  • 이 경우는 수집한 데이터에 대해 동기적으로 엘라스틱 서치에 저장하고, 하둡에 저장하는 구조로 엘라스틱 서치에 장애가 발생한 경우 복구를 기다리거나, 하둡에만 쌓아야만 할 수 있다.

이러한 구조에서 카프카 컨슈머 그룹을 사용하므로서 파이프라인에 필요한 컨슈머 그룹을 구축하여 격리된 환경을 만들어 주는 것이다.

  • 이를 통해 엘라스틱 서치에 장애가 발생해도 복구 후 읽은 오프셋 부터 토픽을 다시 컨슈밍 하면 되며, 하둡은 이와 별개로 지속적인 컨슈밍이 이뤄질 수 있게 된다.

구조를 만들 때 우선적으로 토픽에 데이터를 쌓고, 이후 데이터가 필요한 곳이 어딘지 확인한 후 맞게 컨슈머 그룹을 만드는게 유연하게 설계할 수 있는 방법이다.

리밸런싱

컨슈머 그룹 내 컨슈머 일부에 장애가 발생하는 경우, 장애가 발생한 컨슈머에 할당된 파티션이 정상 컨슈머에 소유권이 넘어가게 되는데 이러한 과정을 일컫는다.

  • 컨슈머가 추가/제거 되는 두 상황에 발생한다. 이슈가 발생한 컨슈머는 빠르게 그룹내에서 제외하여 모든 파티션이 지속적으로 데이터가 컨슈밍 될 수 있도록 해야하고, 정상 컨슈머가 추가되면 다시 파티션을 일대일 대응 시켜 처리량을 높이도록 해야한다.

위 리밸런싱을 위한 코드 작성이 필요하다. 할당과 해제를 위한 메소드 각각을 작성해줘야 한다.

 

커밋

컨슈머는 브로커로부터 어디까지 데이터를 가져갔는지 기록하기 위해 commit()을 이용한다. 해당 커밋은 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇번 오프셋을 가져갔는지 브로커 내부 토픽인 "__consumer_offsets"에 기록된다.

이때, 컨슈머 동작 이슈로 오프셋이 기록되지 못한다면 데이터 처리의 중복이 발생한다. 따라서 컨슈머 애플리케이션은 commit이 정상적으로 수행되었는지 검증해야 한다.

Assignor

컨슈머와 파티션 할당 정책은 컨슈머의 assignor에 의해 결정된다. 카프카 2.5.0 디폴트 assignor는 RangeAssignor이다.

  • RangeAssignor: 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 사전 순 정렬하여 할당한다.
  • RoundRobinAssignor: 모든 파티션을 컨슈머에서 번갈아가면서 할당한다.
  • StickyAssignor: 최대한 파티션을 균등하게 배분하면서 할당한다.

일반적으로 컨슈머 그룹의 컨슈머와 토픽 내 파티션을 일대일 대응하므로 어떤 어싸이너를 사용하더라도 큰 상관이 없다.

 

컨슈머 주요 옵션

필수옵션

bootstrap.servers
  • 프로듀서가 데이터를 전송할 브로커의 호스트이름:포트를 1개이상 작성.
  • 2개 이상의 브로커 정보를 입력하여 일부 브로커에 이슈가 발생해도 접속에 이슈가 없도록 설정할 수 있다.
key.deserializer, value.deserializer
  • 레코드의 메시지 키/값을 역직렬화하는 클래스를 지정한다.

선택옵션

group.id
  • 컨슈머 그룹 아이디를 지정한다. subscribe() 메서드로 토픽을 구독하여 사용할 때는 옵션값이 필수이다. 따라서 subscribe()와 group.id는 같이 다닌다.
auto.offset.reset
  • 컨슈머 그룹이 특정 파티션을 읽을 때 지정된 컨슈머 오프셋이 없는 경우 어느 오프셋 부터 읽을지 정한다.
  • 가장 먼저온 or 가장 최근인 오프셋을 결정한다. 기본값은 latest이다.
    • latest: 오프셋이 가장 큰, 가장 최근 오프셋부터 읽기 시작한다.
    • earliest: 오프셋이 가장 작은, 가장 오래된 오프셋부터 읽기 시작한다.
    • none: 커밋한 기록이 있는지 찾아보고, 없으면 오류를 있다면 있는 오프셋부터 읽기 시작한다.
  • 특정 토픽에 대해 이미 커밋한 이력이 있다면 무시된다. 즉, 토픽 파티션에 쌓다가 이제 컨슈밍하여 데이터를 사용하기로 결정되었을 때 처음 한번만 사용되는 옵션이다.
enable.auto.commit
  • 자동 커밋 or 수동 커밋을 결정. 기본값은 true이다.
auto.commit.interval.ms
  • 자동 커밋일 경우 오프셋 커밋 간 간격을 지정한다. 기본값은 5000ms로 5초이다.
max.poll.records
  • poll() 메소드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500이다.
session.timeout.ms
  • 컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 기본값은 10000ms (10초)이다. 이 시간이 지나면 연결이 끊긴것으로 판단하고 리밸런싱을 진행한다.
hearbeat.interval.ms
  • 컨슈머가 브로커와 연결이 끊어졌는지 확인하는 하트비트 전송 시간의 간격이다. 이것을 보낸 후 "session.timeout.ms"가 지나도 보내지 않는다면 오류인것이다. timeout 또는 interval이 너무 길다면 오류를 제때 확인하지 못해서 제대로 컨슈밍이 되지 않을 수 있다. 너무 짧아도 리밸런싱이 잦게 일어나므로 부하를 줄 수 있다.
max.poll.interval.ms
  • poll() 메서드를 호출하는 간격의 최대 시간이다. 기본값은 300,000ms (5분)이다. poll()을 하고 5분이 지나도 poll()이 이뤄지지 않으면 컨슈머가 작동하지 않는다고 생각하고 리밸런싱이 일어난다.
    • 만약 처리에 오래걸리는 메시지인 경우 이 값을 올려야 쓸데없는 리밸런싱이 일어나는 것을 막을 수 있다. 처리가 신속히 이뤄지는 메시지이면 이를 짧게하여 에러상황을 빠르게 확인하게 할 수 있다.
isolation.level
  • 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 때 사용한다. 이를 사용하면 레코드를 트랜잭션으로 묶어서 atomic하게 보내게 된다. 잘 사용하지 않는다. (멱등성 프로듀서, 트랜잭션 프로듀서-컨슈머에서 사용)
반응형
Comments
반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday