ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka
    DataEngineering/Kafka 2021. 9. 10. 09:09

    데브원영님의 소금과 설탕같은 강의를 듣고 정리하는 글

    Kafka

    들어가며 (?)

    hadoop 경우 small file에서의 성능이 안 나오기 때문에 배치로 처리를 하고, 실시간성 데이터같은 stream 처리같은 경우엔 (예를 들어서) fluentd+kafka+spark+nosql db 같은 구성으로 메시지를 처리하게 된다.

    kafka같은 경우 spark 어쨌든 저장소가 아니기 때문에 하루면 하루 시간이면 시간 이렇게 들어오는 메시지를 "순차적으로" 저장했다가 spark 만약 죽거나 메시지 끌어가는 subscriber(kafka에서는 consumer 되겠지?) 죽었을 offset 통해 해당 시점으로 다시 돌아갈 있도록 한다. 버퍼 역할을 하는 메시징 !! kafka 특징이자 역할 하나라고 보면 되겠고, 이제 topic별로 메시지 관리하고 분산형 메시징 큐이고 등등의 특징은 후에 차근차근 써보도록 하겠다!!

     

    kafka

    : apache 에서 제공하는 pub-sub 기반의 분산형 메시징 큐

    : dispatch 가 아닌 subscribe 방식, 기존 RabbitMQ에 비해 고성능 (RabbitMQ에서 제공하는 전체 트랜젝션은 제공하지 않음)

     

    그림일기 아니고 티스토리 보고 있는거 맞음

    Before Kafka?

    갑자기 그럴듯해진 그림 나 헛소리 진짜 많이 쓴다..;;

    데이터 전송 라인이 복잡해짐에 따라 장애를 대응하거나 배포 라인 관리가 복잡해지고 어려워졌다. 

     

    그리고 등장한 Kafka는 중간에서 큐의 역할을 하면서 서버에 이슈가 생기거나 갑작스럽게 reck이 내려가는 등의 장애에도 데이터를 손실없이 복구할 수 있는 fault tolerant, 낮은 지연(latency), 높은 처리량(throughput) 의 강점을 갖고 데이터를 효과적으로 많이많이 처리한다.

     

    Topic

    데이터를 저장하는 공간이라고 생각하면 된다.

    - AMQP(Advanced Message Queing Protocol) 과는 다르게 동작한다.

      : MQ 의 오픈소스에 기반한 표준 프로토콜

      : 이 프로토콜에 따른 실제 MQ 제품들은 대표적으로 Erlang, RabbitMQ가 있다고 한다.

      : Exchange, Queue, Binding 이 세 가지 구성요소들이 서로 어떻게 통신할지를 정의한 프로토콜

     

    빨간색 화살표

    1. key가 null이고 기본 partitioner 사용할 경우 : round-robin 으로 파티션 할당

    2. key가 있고, 기본 partitioner 사용할 경우 : 키의 hash값을 구하고, 특정 파티션에 할당

      - 파티션이 늘어나면 컨슈머의 개수를 늘려서 데이터 처리를 분산시킬 수 있다.

      - 파티션의 record들을 언제까지 보관할지 옵션으로 지정할 수 있다. 

    • log.retention.ms : 최대 레코드 보존 기간
    • log.retention.byte : 최대 레코드 보존 크기

    Producer

    • Topic 에 해당하는 메시지를 생성한다
    • 특정 Topic으로 데이터를 publish한다
    • Kafka broker 로 전송할 때 전송 성공 여부를 알 수 있고, 처리를 실패할 경우 재시도한다.

     

    producer 사용하기 - 카프카 라이브러리를 추가함으로써 kafka producer, consumer을 사용할 수 있다. gradle, maven 같은 도구를 통해 라이브러리를 가져올 수 있다.

    // gradle
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'
    
    // maven
    <dependency>
    	<groupID>org.apache.kafka</groupID>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
    </dependency>

    ☢️ dependency에 kafka client 잡을 때 주의할 점 : 그건 바로 버전

    : broker와 client 버전 하위호완성이 완벽하게 모든 버전에 대해 지원하지 않는다. 일부 kafka broker 버전은 특정 client 버전을 지원하기 않기 때문에 버전을 맞추는 것이 중요하다.

    producer 만들기

    public class Producer {
    	public static void main(String[] args) throws IOException {
        	// Producer configuration 
        	Properties configs = new Properties();
            // [1]
            configs.put("bootstrap.servers", "localhost:9092"); 
            // [2]
            configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
            // [3]
            // 설정한 property로 kafka producer instance 생성
            KafkaProducer <String, String> producer = new KafkaProducer <String, String> (configs);
            // 전송할 객체 만들기 - Kafka producer에서 제공하는 ProducerRecord class 사용
            ProducerRecord record = new ProducerRecord <String, String> ("click_log", "login");
            // producer instance에 send 메소드 파라메터에 record 넣고 전송!
            producer.send(record);
            // producer 종료
            producer.close();
        }
    }

    [1]  부트스트랩 서버 설정을 로컬호스트의 카프카를 바라보도록 설정한다.

    - kafka broker 의 주소 목록은 2개 이상의 ip + port 로 설정하는 걸 권장한다.

    - 둘 중 하나 이상의 broker가 비정상일 경우 다른 broker로 매핑되어 사용할 수 있다.

    [2]  key, value : string serializer 로 직렬화 설정

    - serializer : key, value 직렬화하기 : byte array, string, integer 사용 가능함

    - 앞에 topic 설명할 때 잠깐 나왔는데 key 는 메시지를 보내면 topic의 partition이 지정될 때 사용된다.

    [3] producer.send()

    해당 예시의 경우 key를 별도로 지정하지 않았기 때문에 앞에서 언급했듯이 round-robin 방식으로 데이터가 파티션에 할당된다.

    offset 중요행 뒤에 consumer 얘기할 때 또나옴 (스포

    만약 key도 함께 보낸다면 특정 데이터를 특정 파티션에 할당해서 전달할 수 있다. 하지만 토픽에 key를 사용해서 데이터를 전달하다가 파티션이 추가되면 key와 partition 간의 일관성을 보장되지 않기 때문에 key를 사용하는 경우 이를 유의해 파티션을 생성해놓고, 추후에 생성하지 않는 것이 좋다고 한다.

     

    Broker, Replication, ISR

    different servers

    Broker : 카프카에 설치되어 있는 서버 단위 (보통 3개 이상으로 권장한다.)

     - 1 partition, 1 replication topic : 브로커 중 1대에 토픽 정보(data) 가 저장된다.

     - 1 partition, 2 replication topic : 브로커 2대에 Leader partition, Follower partition으로 토픽 정보가 저장된다.

    1 partition, 2 replication

     - 1 partition, 3 replication topic

    더보기

    📌 ack option (0, 1, all)

    1. ack = 0 

    : producer는 leader partition에 데이터를 전송하고 응답을 받지 않는다. 따라서 잘 전송되었는지 알 수 없다, 속도는 빠르지만 데이터 유실 가능성이 있다.

    2. ack = 1

    : Leader partition 에 데이터를 전송하고 응답을 받지만 복제를 완료했는지 알 수 없다. 따라서 Leader partition이 데이터를 받은 즉시 브로커에 장애가 발생한다면 나머지 파티션에 데이터가 전송되지 못한 상태로, 데이터 유실의 가능성이 있긴 하다.

    3. ack = all

    : Leader partition에 데이터를 전송하고 응답값 + follower partition에 복제가 잘 이루어졌는지도 응답값을 받는다. 데이터 유실을 막을 수 있다.

     

    Consumer

    : 폴딩, 데이터를 가져오는 subscriber이다.

    [ 역할 ]

    1. 토픽의 partition 으로부터 데이터를 polling해온다. 메시지를 가져와서 DB에 저장하거나 또 다른 파이프라인에 전달한다.

    2. 파티션의 offset 위치를 기록한다.(commit) offset은 잠깐 말하자면 파티션에 있는 데이터들에게 순차적으로 부여되는 번호다.

    3. consumer group을 통해 병렬처리한다. 파티션의 개수에 따라 consumer를 여러 개 만들면 병렬처리가 가능해지고, 빠르게 데이터를 처리할 수 있다.

    Consumer 사용하기  - 컨슈머 역시 브로커와의 버전 차이로 인해 정상적으로 동작하지 않을 수 있기 때문에 호환 가능한 버전을 확인하는게 필요하다.

    // gradle
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'
    
    // maven
    <dependency>
    	<groupID>org.apache.kafka</groupID>
    	<artifactId>kafka-clients</artifactId>
    	<version>2.3.0</version>
    </dependency>

    Consumer 만들기

    public class Consumer {
    	public static void main(String[] args) {
        	// Consumer configuration 
        	Properties configs = new Properties();
            // bootstrap 옵션 : 카프카 브로커 설정 - 한 브로커 죽으면 다른 브로커가 붙어서 대체할 수 있도록 보통 한 3개 지정해놈
            configs.put("bootstrap.servers", "localhost:9092"); 
            // group id 지정 (=consumer group) : 컨슈머들의 묶음
            configs.put("group.id", "click_log_group");
            // key, value 직렬화 설정
            configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
            // consumer instance 생성 - 데이터 읽고 처리할 수 있음
            KafkaConsumer <String, String> consumer = new KafkaConsumer <String, String> (configs);
            // consumer의 subscribe메소드로 어느 토픽에서 데이터를 가져올지 설정
            consumer.subscribe(Arrays.asList("click_log"));
            // 데이터를 실질적으로 가져가는 polling loop
            while (true) {
            	ConsumerRecords<String, String> records = consumer.poll(500);
                	// record : 실질적으로 처리되는 단위, records : 그 묶음 -> record 단위로 처리할 대이터 가져오기
                	for (ConsumerRecord<String, String> record : records) {
                		System.out.println(record.value());
                	}
            }
        }
    }
    • consumer.assign() : consumer.subscribe() 로 어느 토픽에서 메세지를 가져와라 ~ 라고 지정할 수 있다고 했는데, assign 메소드는 특정 토픽의 전체 파티션이 아닌 일부 파티션 데이터만 가져오고 싶을 경우 사용한다. key가 있는 데이터의 경우 assign 메소드를 통해 데이터 순서를 보장하는 처리가 가능하다.
      TopicParition partition0 = new TopicPartition(topicName, 0);
      TopicParition partition1 = new TopicPartition(topicName, 1);
      consumer.assign(Arrays.asList(partition0, partition1));
    •  polling loop : consumer API 를 통해 브로커로부터 연속적으로 컨슈머가 허락하는 한 많은 데이터를 읽는다. 허락하는 한을 구체적으로 말하면 poll(time)은 지정한 time ms 동안 데이터를 가져온다. time(500) 이니까 0.5초 동안 데이터가 도착하길 기다리고 이후에 코드를 실행하고 records 값을 반환한다. records는 data 배치로 record의 묶음 list로, 실제로 kafka에서 처리를 하는 단위는 record(가장 작은 단위) 이다.
    • 데이터를 넣을 때를 다시 떠올려보자. producer.send() 를 생각해보면 partition에 들어간 데이터는 partition 내에서 고유한 번호 offset을 갖는다고 했었다. 이러한 offset들은 __consumer_offsets 이라는 토픽에 저장되어있기 때문에, 브로커가 죽었거나 컨슈머가 데이터를 땡겨가다가 멈추고 다시 재개할경우 해당 __consumer_offsets 토픽에서 데이터를 어디까지 앍었는지에 대한 정보를 가져와서 해당 데이터부터 다시 consume 해간다.

    Consumer 를 몇 개까지 생성할 수 있을까?

    A. number of consumer <= number of partition

    파티션 개수를 넘어설 수는 없다.

    위 그림의 경우 partition 2개, consumer 2개로 partition들이 각 consumer에 할당되었기 때문에 더 이상 할당될 partition이 없어 consumer 개수를 늘리려고 하면 동작하지 않는다.

     

    히지만 consumer group을 하나 더 두면 consumer를 하나 더 둘 수 있다.

    각 consumer group은 데이터를 따로 읽고, __consumer_offsets 토픽은 consumer group 별 -> 토픽별 로 offset을 나누어서 저장한다. 따라서 하나의 토픽으로 들어온 데이터를 다양한 역할을 하는 여러 consumer들이 각자 데이터를 처리할 수 있다 !!

    다른 그룹을 만들어버린다면? 어쩔셈이지 ?

    그림처럼 consumer group 2개를 두면 총 3개의 consumer가 partition 2개에서 데이터를 가져갈 수 있다. 그래서 한 그룹은 elastic search에서 가져가서 분석, 저장하고 한 그룹은 hadoop에서 가져가서 저장하고 데이터 백업으로 사용하는 등의 작업을 할 수 있게된다.

     

    Lag

    producer에서의 offset과 consumer에서의 offset 간의 차이를 Lag 라고 한다.

    두 application의 데이터 전송과 가져가는 속도의 차이인 것이다. Lag는 각 partition마다 존재하고, partition마다 Lag는 다를 수 있다.

    하나의 토픽 안에서 가장 큰 Lag를 갖는 partition은 record_offset 으로 알 수 있다.

     

    'DataEngineering > Kafka' 카테고리의 다른 글

    Kafka Connect  (0) 2021.09.05

    댓글

Designed by Tistory.