티스토리 뷰

컨슈머 애플리케이션

컨슈머가 poll()하는 코드

코드 실행 후 아래와 같이 프로듀서에서 메시지를 보내면 앱 실행 화면에서 응답을 확인할 수 있다.

실행중인 애플리케이션의 로그를 통해서도 확인이 가능하다.

 

동기 오프셋 커밋 컨슈머

poll() 메소드가 호출된 이후에 commitSync() 메소드를 호출함으로써 오프셋 커밋을 명시적으로 수행할 수 있다. commitSync()는 poll()로 받은 가장 마지막 오프셋을 기준으로 커밋을 하게 된다.

참고로, 동기 오프셋 커밋을 사용하는 경우 모든 레코드의 처리가 끝난 이후 commitSync()를 호출해야한다.

프로듀서에서 메시지를 보내면 정상적으로 메시지가 출력되고 커밋이 이뤄지게 된다. 오프셋 값은 kafka-consumer-groups 명령어로 특정 그룹에 대한 --describe 옵션으로 확인해 볼 수 있다.

 

동기 오프셋 커밋(레코드 단위) 컨슈머

앞에서 컨슈머가 poll()을 수행할 때 Fetcher에 이미 있는 레코드()을 꺼내온다고 했다. 따라서 poll()마다 커밋을 수행하는 경우 레코드 묶음단위 커밋이 발생한다.

이때 commitSymc(currentOffsetMap)처럼 매개변수로 각 레코드를 맵으로 만들어서 넣어주면 레코드 단위로 커밋을 수행할 수 있다. 다만 이렇게 되면 100개의 레코드를 처리하는 경우 100번의 커밋을 처리하게 되므로 데이터 처리 속도가 느려지게된다. 따라서 이러한 방법은 잘 사용되지 않는다.

 

비동기 오프셋 커밋 컨슈머

직접 커밋 방법을 조정하고 있으므로 ENABLE_AUTO_COMMIT_CONFIG를 false로 설정해야줘야 한다.

동기 오프셋 커밋을 사용하는 경우 커밋 응답을 기다리는 동안 데이터 처리가 중단되므로 처리속도가 느리다. 따라서 더 높은 처리량을 위해서는 비동기 오프셋 커밋인 commitAsync()을 사용해야 한다.

 

loop 내에서 비동기로 커밋을 진행하고 커밋의 응답값을 콜백함수로 처리하도록 할 수 있다. 아래는 반복적인 커밋 수행 후 성공 로그와 메시지 로그가 출력되는 모습이다.


리밸런스 리스너를 가진 컨슈머

리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다. 해당 인터페이스로 구현된 클래스는 onPartitionAssigned(), onParatitionRevoked() 메소드로 이뤄져있다.

  • onPartitionAssigned(): 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드이다.
  • onParatitionRevoked(): 리밸런스가 시작되기 직전에 호출되는 메서드이다. 리밸런스 후 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서 해당 메소드에 커밋을 구현하여 처리가 가능하다.
ConsumerRebalanceListener 구현체

  • 파티션이 할당될 때 (즉, 리밸런스가 끝난 뒤)와 리밸런스가 시작되기 직전에 해당 파티션을 출력하도록 하는 코드이다.

리밸런스 구현체를 컨슈머가 구독할 때 같이 넣어준다. 이제 해당 컨슈머는 리밸런스를 감지하고 오버라이딩 된 메소드를 작동시킨다.

위 설정을 체크해서 멀티 인스턴스를 띄울수 있도록 설정 후 앱을 실행시킨다. 앱이 구동되면 먼저 모든 파티션이 해당 컨슈머로 할당되었다고 할당된 파티션들이에서 출력되는 것을 확인할 수 있다.

이 상태에서 추가적인 컨슈머를 구동시켜보면, 아래와 같이 2개의 컨슈머로 파티션이 리밸런싱되는 것을 확인할 수 있다.

이때, 구동중인 앱을 다시 하나 종료하면 다시 파티션들이 재할당된다.

 

파티션 할당 컨슈머
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));

consumer에 subscribe() 대신 assign()을 사용하면 특정 파티션을 컨슈머에 할당하여 구동시킬 수 있다. 거의 사용되지 않는 방법이다.

 

컨슈머의 안전한 종료

컨슈머 애플리케이션은 "반드시" 안전하게 종료되어야 한다. 앱을 설계할 때 정상적으로 종료되기 위해 고민을 할 필요가 있다. 컨슈머가 정상적으로 종료되지 않은 경우 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다.

 

KafkaConsumer는 컨슈머를 안전하게 종료하기 위해서 wakeup() 메소드를 지원한다. wakeup()메서드가 실행된 이후에 poll() 메소드가 실행되면 WakeupException() 예외가 발생된다. 따라서 해당 예외를 받은 뒤에는 데이터 처리를 위해 사용했던 자원들을 해제하고 close() 메소드로 종료하면 된다.

위 코드와 같이 addShutdownHook()을 통해 명시적으로 shutdownHook을 처리할 수 있게 된다. 이후 shutdownHook이 발생하면 consumer.wakeup()이 실행되고, poll() 메소드 수행 시 WakeupException()이 발생되며 catch 구문에서 일련의 처리를 진행 후 finally close()를 할 수 있게 된다.

 kill -term 51468

로 51468의 프로세스를 terminate 하라는 명령을 보냈고, 아래와 같이 처리가 되는것을 확인할 수 있다.

 

멀티스레드 컨슈머

1 쓰레드 - 1 컨슈머 쌍으로 작동하므로 여러개의 컨슈머를 이용해 토픽 내 파티션을 컨슈밍 하고 싶다면 멀티-쓰레드/멀티-프로세스 로 처리가 가능하다. (단일 쓰레드 n개 프로세스 / n개 쓰레드 단일 프로세스로 할지는 선택의 문제이다)

  • 각 프로세스마다 스레드를 두는 경우 장애 격리가 가능하다는 장점이 있지만 여러 프로세스를 띄우므로 부담이 있을 수 있다.
    • 배포 자동화가 잘되어있다면 (k8s 등을 통한 배포), 용량에 대한 부담이 적다면 추천된다.
  • 프로세스 내 멀티 스레드로 처리하는 경우 효율적이지만 스레드의 장애가 프로세스의 장애로 번질 수 있다는 부담이 있다.
    • 배포 자동화가 아직 되어있지않고 물리서버를 사용하는 경우 추천된다.

컨슈머 랙

컨슈머 랙이란 프로듀서가 생성한 레코드가 쌓인 오프셋과 컨슈머가 컨슈밍한 레코드의 오프셋의 차이 만큼을 나타낸다. 이 값이 크면 클 수록 처리량이 생산량을 못따라가고 있어 지연이 발생한다는 의미이므로 꼭 모니터링 해야하는 값이다. 따라서 이 값의 변화를 통해 컨슈머의 정상 작동 여부를 알 수 있다.

모니터링 해야하는 컨슈머 랙의 수는 토픽의 파티션 수 x 컨슈머 그룹이 된다. 만약 토픽의 파티션이 3개이고, 컨슈머 그룹이 2개라면 6개의 컨슈머 랙이 생성되는 것이다.

 

컨슈머 랙 모니터링

컨슈머 랙을 모니터링 하지 않는다면 컨슈머의 장애도, 프로듀서의 생산과 컨슈머의 처리량의 적절함도 알 수가 없어진다. 따라서 컨슈머 랙 모니터링은 데이터 파이프라인을 운영하는데 핵심적인 역할을 한다. 모니터링을 통해 파티션, 컨슈머 개수를 정하는데 참고가 가능하다.

 

예를들어, 평소에는 컨슈머 랙이 적게 유지되다가 이벤트 등으로 인해 컨슈머 랙이 증가한다면 파티션과 컨슈머를 늘림으로써 처리량을 높이도록 대응할 수 있을 것이다.

 

컨슈머 랙을 모니터링을 위한 아키텍쳐를 구축하는게 무엇보다 선행되어야 한다. 매우 중요 !

 

파티션 or 컨슈머 이슈

복수의 파티션과 컨슈머를 운용하는데 특정 컨슈머 랙만 높은 값이라면 리더 파티션 또는 컨슈머에 이슈가 발생했음을 알 수 있다.


컨슈머 랙 모니터링

`kafka-consumer-groups.sh` 명령어 사용

기본적으로 cli 환경에서 `kafka-consumer-groups.sh` 명령어를 통해 모니터링이 가능하지만 이는 1회성 확인에 불과하다. 하지만 컨슈머 랙은 지속적으로 모니터링이 필요하므로 실제 환경에서는 해당 방법으로는 부족하다.

 

metrics() 메소드 사용

위와 같이 KafkaConsumer 인스턴스의 metrics() 메소드를 사용하여 모니터링 할 수 있다. 해당 메소드는 `records-lag-max`, `records-lag`, `records-lag-avg` 3개의 지표를 제공한다.

 

단점

  • 컨슈머가 정상 동작하는 경우에만 확인이 가능하다. 컨슈머가 비정상적으로 종료되었다면 해당 컨슈머에 쌓인 랙을 확인할 수가 없다.
  • 여러 종료의 컨슈머 그룹을 운영한다면 각 컨슈머 애플리케이션에 컨슈머 랙 모니터링 코드를 중복해서 작성해야 한다.
  • 카프카 서드 파티 애플리케이션과 자바 외 카프카에서는 사용이 적절하지 않다.

 

외부 모니터링 툴

가장 좋은 모니터링 방법은 외부 모니터링 툴을 사용하는 것이다. 데이터 독, 컨플루언트 컨트롤 센터와 같은 카프카 클러스터 종합 모니터링 툴을 사용하면 다양한 지표를 모니터링 할 수 있다. 이 지표에 컨슈머 랙이 포함되어있다. 컨슈머 랙만 모니터링 하는 외부 툴로는 버로우가 있다. 참고로 우리는 CMAK(cluster manager afache kafka)를 사용중에 있다.

 

카프카 버로우(kafka burrow)
  • 링크드인에서 개발하여 오픈소스로 공개한 컨슈머 랙 체크 tool 이다.
  • REST API를 통해 컨슈머 그룹 별 컨슈머 랙을 확인할 수 있다는 특징이 있다.
  • 다수의 카프카 클러스터 (dev, prod)을 한번에 연결해서 모니터링이 가능하다.

burrow github의 wiki에 들어가면 각 기능에 대한 REST API 설명이 잘 정리되어 있다.

 

컨슈머 랙 이슈 판별

카프카 버로우의 장점으로 컨슈머와 파티션의 상태를 단순 컨슈머 랙의 threshhold(임계치)에 대한 도달 여부로 결정하지 않는다는 것이다. 임계치에 닿고 다시 컨슈머가 빠진다면 이는 유의미한 이슈 상황이라고 볼 수 없기 때문으로 쓸데없이 에러 알림을 받을 필요가 없기 때문이다.

컨슈머 랙 평가

여기서 버로우는 컨슈머 랙의 threshhold 도달 체크 방식이 아니라 슬라이딩 윈도우 계산을 통해 문제가 생긴 파티션과 컨슈머의 상태를 나타내게 된다. 이러한 방식을 컨슈머 랙 평가라 부른다.

이때 파티션은 OK, STALLED, STOPPED / 파티션은 OK, WARNING, ERROR로 표현한다

정상적인(이상적인) 컨슈머 랙 상황 / 컨슈머 처리량이 부족한 상황 / 컨슈머 처리량을 전혀 못따라가는 상황

1번 경우: 파티션 OK - 컨슈머 OK

  • 컨슈머 오프셋이 최신 오프셋을 잘 따라가고 있다.

2번 경우: 파티션 OK - 컨슈머 WARNING

  • 파티션의 생산하는 오프셋을 컨슈머가 제대로 못따라가고 컨슈머랙이 리니어하게 증가하고 있다.

3번 경우 : 파티션 STALLED - 컨슈머 ERROR

  • 최신 오프셋이 지속적으로 증가하는 반면, 컨슈머 오프셋이 멈춰있는 경우이다. 컨슈머가 모종의 이유로 데이터를 더 이상 가져오지 못하고 있다. 이런 경우 비정상으로 판단하고 이메일, 슬랙등을 통해 알림을 보내 조치를 취해야 한다.

 

컨슈머 랙 모니터링 아키텍쳐

카프카 버로우의 데이터를 텔레그래프로 엘라스틱 서치에 수집하는 형태가 있다. 텔레그래프는 REST API를 지속적으로 호출해서 JSON 형태의 데이터를 엘라스틱 서치에 담게 된다. 시간 순서로 담긴 시계열 데이터를 그라파나로 시각화 할 수 있게 된다.

반응형
Comments
반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday