티스토리 뷰

프로듀서

프로듀서 애플리케이션의 역할

  • 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다.
  • 데이터를 전송할 때 리더 파티션을 가지고 있는 브로커와 직접 통신한다.
  • 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.

프로듀서 내부 구조

  • ProducerRecord: 프로듀서에서 생성한 레코드, 오프셋은 미포함
  • send(): 레코드 전송 요청 메소드
  • Partitioner: 어느 파티션으로 전송할 지 지정하는 파티셔너이다.
  • Accumulator: 배치로 묶어서 전송할 데이터를 모으는 버퍼이다.

send()를 해도 바로 보내는것이 아니라 Accumulator내 배치의 임계치까지 데이터가 쌓이면 그때 카프카 클러스터와 TCP 통신하여 데이터를 전송하게 된다. 이러한 배치 처리로 처리량을 높일 수 있다.

Partitioner 파티셔너

프로듀서의 기본 파티셔너
  • 프로듀서 API를 사용하면 UniformStickyPartitioner, RoundRobinPartitioner 2개의 파티셔너를 제공한다.
    (default = UniformStickyPartitioner)

메시지 키가 있을 경우의 동작

  • 파티셔너 둘다 메시지 키가 있을 때 메시지 키의 해시값과 파티션을 매칭하여 레코드를 전송한다.
  • 동일한 메시지 키 ⇒ 동일한 파티션 번호
  • 만약 파티션이 증설되면, 위 매칭이 깨진다
    • 따라서, 미리 충~분히 여유있게 파티션을 만들어 놓는것을 매우 권장한다. 토픽이 10개일 때, 10개 파티션을 만들지 말고 50, 100개씩 만들어 놓자. 이렇게 하므로서 파티션의 개수가 변동이 없도록 해서 같은 키가 같은 파티션으로 가게하자.
    • (의견) 파티션 == 컨슈머 일대일 매핑인데 이렇게 해놓으면 파티션, 컨슈머 낭비가 아닌가…?

메시지 키가 없을 때의 동작

  • UniformStickyPartitioner, RoundRobinPartitioner 둘다 최대한 동일하게 분배하는 로직이 들어있으나 개선된 유니폼-스티키 파티셔너는 분배의 로직이 개선되었다.

RoundRobin 파티셔너

  • Producer record가 들어오는 대로 파티션을 순회 → 전송
  • accumulator에 묶이는 정도가 적으므로 전송성능이 떨어진다

UniformSticky 파티셔너

  • accumulator에서 record들이 배치로 묶일 때까지 기다렸다가 → 전송
  • 배치로 묶인 후 파티션을 순회하면서 보냄 (에스컬레이터 vs 엘레베이터 느낌?). 똑같이 모든 파티션에 분배된다.
if (message.key != null) {
	for (partition in partitioners) partition.add(message)
}

if (message.key == null) {
	partitioners[message.key.hashCode()].add(message)
}

수도코드로 작성하면 이런 형태이다.

프로듀서의 커스텀 파티셔너

  • Partitioner 인터페이스를 제공하므로 사용자 정의 클래스에서 메시지 키 or 메시지 값에 따라 파티션을 지정하는 로직을 적용할 수 있다.
  • 커스텀에서는 위 수도 코드에서 특정 key이면 특정 파티션으로 보내라고 정의할 수 있을것이다.

프로듀서 주요 옵션

필수 옵션

  • bootstrap.servers
    • 프로듀서가 데이터를 전송할 대상 카프카 클러스터 내 브로커의 호스트이름:포트를 작성
    • 2개이상의 브로커 정보를 입력하여 일부 브로커에 이슈가 생겨도 접속에 이슈가 없도록 할 수 있다
  • key.serializer & value.serializer
    • 레코드 메시지의 키-값을 직렬화하는 클래스를 지정한다.
    • string 직렬화로 통일하거나 직렬화에 직렬화 타입을 넣는 등 규칙을 맞추자
    • string 직렬화 미사용의 문제
      • kafka-console-consumer 에서 데이터를 볼 때 string이 아닌 다른 방식의 직렬화 시 못 볼 수 있다.
      • 여러팀이 컨슈머를 사용하는 경우 커스텀 직렬화 타입을 넣은 경우 역직렬화에 어려움이 있다.

선택 옵션

  • acks
    • 프로듀서가 record를 리더 파티션 & 팔로워 파티션에 보내는데 이때 파티션들의
      수신 여부 확인의 종류에 따라 0, 1, -1(all)로 설정한다 (default = 1)
    • 리더 파티션이 정상 수신 후 응답이 왔는지 확인하는 경우가 1이다.
      • HTTP 통신의 ACK와 비슷한 용도인듯 한다
  • linger.ms
    • 배치를 전송하기 전까지 기다리는 최소 시간이다. (default = 0)
    • 1000이면 1초, 10~100정도 넣어서 0.01 ~ 0.1s 정도의 지연을 넣어서 더 많은 데이터를 모아서 보내도록 할 수 있다.
  • retries
    • 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수이다. (default = INT.MAX_VALUE)
  • max.in.flight.request.per.connection
    • 한번에 요청하는 최대 커넥션 개수. 설정된 값만큼 전달 요청을 한다. (default = 5)
    • 보내는 데이터가 많을 때 프로듀서 ↔ 브로커 간 커넥션이 많아진다.
  • partitioner.class
    • 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정
    • v2.5.0 기준 UniformSticky파티셔너가 DefaultPartitioner
  • enable.idempotence
    • 멱등성 프로듀서로 동작할지 여부를 설정한다. (default = false)
  • transactional.id
    • 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 설정한다. (default = null)
    • 이걸 true로 설정 시 멱등성 체크하는 enable.idempotence가 true가 된다.
    • 기본적으로는 데이터를 그냥 전송만 하지만, 이를 체크하면 트랜잭션 프로듀서가 된다.

ISR

  • acks 옵션은 매우 중요하다!
  • 이를 제대로 설정하기 위해 ISR을 먼저 이해할 필요가 있다.

In-Sync-Replicas의 약자로 리더 파티션과 팔로워 파티션간의 레코드에 대해 모두 싱크가 된 상태를 의미한다.

ISR이 된 상태가 중요한데 이유는 프로듀서는 리더 파티션에 데이터를 보내고, 컨슈머는 리터 파티션에서 읽는데, 리더 파티션에 장애가 생긴 경우 ISR이라면 바로 팔로워 파티션이 대응할 수 있기 때문이다.

  • 리더 파티션에 데이터가 적재되고 팔로워 파티션이 이를 복제하는 동안 생기는 시간차로 인해 오프셋 차이가 발생하게 된다.
  • 리더에 10개의 오프셋, 팔로워에 8개의 오프셋이 있는 차이가 존재하는 순간이 존재. 이 경우 ISR에 리더 파티션만 포함되고 해당 팔로워 파티션은 포함되지 않는것.

acks옵션을 통해 카프카 클러스터에 얼마나 신뢰성 높게 저장할지 지정이 가능하다.

  • acks옵션에 따라 신뢰를 높이고 - 성능을 찾추거나 or 신뢰를 낮추고 - 성능을 높이는 선택을 해야한다.
  • 신뢰가 높다는 것은 리더 & 팔로워 모두 동일한 레코드가 저장되는것을 의미. 낮다는건 불일치하다는 것

ACKS 별 동작 방식

(일반적으로 복제개수를 2이상 운영하므로, 복제 개수 2 이상인 경우에 acks별 동작 방식을 확인)

acks = 0

프로듀서가 리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않는다.

프로듀서가 리더 파티션으로 send() 후 리더 파티션은 해당 데이터가 저장된 후 몇번째 오프셋에 저장되었는지를 반환한다. 이때 이 값의 여부를 확인하지 않는다.

UDP처럼 보내고 끝!이라서 속도는 acks = 1 or -1(all) 에 비해 매우 빠르다. 데이터의 유실보다 속도가 중요한 경우 설정 시 효과적이다.

ex) GPS

acks = 1

프로듀서가 리더 파티션으로 데이터를 전송하고 리더 파티션에 정상적으로 저장되었는지 확인한다. 이때 정상적으로 적재되지 않은 경우 적재될 때까지 재시도한다. (최대 retries 값만큼 )

send() 후 데이터는 disk에 저장되고 해당 오프셋을 받을 때까지 확인을 해야하므로 느리다. 또한 리더 파티션에 전송이 되었어도 팔로워가 해당 적재된 레코드를 복제하기 전에 장애, 이슈, 지연이 발생하면 팔로워에 유실이 발생할 수 있다.

팔로워가 복제하는데 이슈가 발생하는게 흔치는 않으므로 웬만하면 해당 옵션으로 처리한다.

acks = -1 또는 all

프로듀서가 리더 파티션으로 데이터를 전송하고 리더 파티션에 정상적으로 저장되었는지 확인한다. 이후 팔로워 파티션에 복제가 완료되었는지 확인한다.

너무 느려서 정상적으로 사용하기 힘들정도이다. 신뢰도는 최상이지만 너무 느리다는 문제가 있다. 여기서도 min.insync.replicas 옵션을 통해 팔로워 파티션의 동기화 파악을 조절할 수 있다.

파티션이 10개일 때 min.insync.replicas가 10이면 10개의 파티션에 모두 복제 되었는지를 확인하면 되지만 이는 과도하며, 일반적으로 2로 한다. 1은 리더 파티션을 포함하므로 하나의 팔로워 파티션까지만 복제 여부가 되었는지 확인한다는 것이다.

acks = all && min.insync.replicas = 1로 하면 리더 파티션의 적재만 확인하므로 acks = 1과 동일하다. 주의하자!

 

반응형
Comments
반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday