kafka는 다음과 같은 기본 구성요소들이 있습니다.
1. producer(publisher, source application)
2. consumer(subscriber, target application)
3. broker
4. topic
5. partition
이글에선 producer에 대해 알아보겠습니다.
producer
producer는 publisher또는 source application이라고도 합니다.
producer가 하는 역할은 다음과 같습니다.
1. topic에 보낼 메세지 생성
2. topic으로 메세지 전송(publish)
3. 전송 성공여부 확인/재시도
producer가 topic으로 데이터를 보낼때는
1. 직렬화
2. 파티셔닝
3. 압축
4. 배치
5. 전달
순으로 진행 됩니다.
kafka는 client와 broker간의 하위호환이 전부다 되어있지는 않으므로 두 버전간의 호환성을 확인해줘야합니다.
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
직렬화 방식은 위와 같은 코드로 설정해줄 수 있습니다.
bootstrap.servers는 일반적으로 fault tolerance를 위해 3개 이상으로 이루어집니다.
여러서버를 인자로 넘겨줄때에는 "127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092" 처럼 입력해주면 됩니다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
...
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("example-topic", null, "Hello World"); // Topic, key, value
RecordMetadata<RecordMetadata> future = producer.send(record);
try {
RecordMetadata metadata = metadata.get();
describeMetadata(metadata); // 응답 결과를 콘솔에 출력합니다.
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
private static void describeMetadata(RecordMetadata metadata) {
System.out.println("=== Metadata...");
System.out.println("topic : " + metadata.topic());
System.out.println("partition : " + metadata.partition());
System.out.println("offset : " + metadata.offset());
System.out.println("timestamp : " + metadata.timestamp());
}
파티셔닝은 위 ProducerRecord의 key 값을 해쉬함수에 넣아 나온 출력대로 결정됩니다. key값에 null을 넣거나 인자를 2개만 넣을시(overloading되어 topic,value로 인식) round robin방식으로 파티셔닝 됩니다.
전송은 producer.send를 통해 전송할 수 있고 .get()을 통해 future를 기다려 메세지가 브로커에게 메세지를 성공적으로 전달했는지 확인 할 수 있습니다.
이 외에도 producer의 설정값에는 여러가지가 있는데
buffer.memory, compression.type, ack, batch.size, linger.ms, max.request.size 등이 있습니다.
이중 ack와 compression.type에 대해 좀 알아보면
ack의 설정값엔 0,1,-1이 있는데
0 : producer가 kafka로 부터 어떠한 ack도 기다리지 않습니다. 메세지전송이 실패하더라도 결과를 모르기때문에 재요청하지도 않습니다. 이로인해 메세지 손실률이 높지만 응답을 기다리지 않으므로 매우 빠르게 메세지를 계속 보낼 수 있습니다
1 : kafka leader에 한해서만 ack를 받습니다. 하지만 나머지 follower서버에 대해서는 받지않습니다.
-1 : follower서버로부터도 ack를 받습니다. 당연히 메세지 손실률은 낫지만 성능이 떨어집니다.
compression.type은 데이터를 압축해서 보낼떄 어떤 타입으로 압축할지 정할 수 있습니다.
옵션으로는 none, gzip, snappy, lz4e등등이 있습니다.
'분산처리' 카테고리의 다른 글
airflow에서 크롤링(selenium) 할 경우 무한대기 (0) | 2023.10.17 |
---|---|
kafka의 기본 구성요소들 broker, topic, partition (0) | 2023.01.14 |
kafka의 기본 구성요소들 consumer (0) | 2023.01.10 |
Kafka를 사용하는 이유 (1) | 2023.01.09 |