Apache Kafka는 현대 마이크로서비스 아키텍처에서 핵심적인 메시징 시스템으로 자리잡았습니다.
대용량 실시간 데이터 스트리밍과 이벤트 기반 아키텍처 구현에 필수적인 도구가 되었죠.
이번 포스팅에서는 Java 언어를 사용하여 Kafka Producer와 Consumer를 실무에 바로 적용할 수 있도록 단계별로 구성하는 방법을 상세히 알아보겠습니다. 초보자부터 중급 개발자까지 이해할 수 있도록 예제 코드와 함께 설명드리겠습니다.
Apache Kafka란 무엇인가?
Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼입니다. 높은 처리량(high throughput)과 낮은 지연시간(low latency)을 특징으로 하며, 실시간 데이터 파이프라인과 스트리밍 애플리케이션 구축에 최적화되어 있습니다.
Kafka의 주요 구성 요소는 다음과 같습니다:
- Producer: 메시지를 생성하고 토픽으로 전송하는 애플리케이션
- Consumer: 토픽에서 메시지를 구독하고 처리하는 애플리케이션
- Topic: 메시지가 저장되는 논리적 채널
- Partition: 토픽을 물리적으로 분할한 단위
- Broker: Kafka 서버 인스턴스
Java Kafka 클라이언트 라이브러리 설정
Java에서 Kafka를 사용하기 위해서는 먼저 의존성을 추가해야 합니다. Maven이나 Gradle을 사용하여 kafka-clients 라이브러리를 프로젝트에 포함시켜보겠습니다.
Maven 의존성 설정
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Gradle 의존성 설정
implementation 'org.apache.kafka:kafka-clients:3.6.0'
최신 버전의 kafka-clients를 사용하면 성능 개선사항과 보안 패치를 적용받을 수 있습니다. 안정성을 위해 프로덕션 환경에서는 검증된 버전을 사용하는 것을 권장합니다.
Kafka Producer 구현하기
Kafka Producer는 메시지를 생성하여 지정된 토픽으로 전송하는 역할을 담당합니다. Java에서 Producer를 구현할 때는 KafkaProducer 클래스를 사용합니다.
기본 Producer 설정
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleKafkaProducer {
private static final String TOPIC_NAME = "user-events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Producer 설정
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Producer 인스턴스 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 메시지 전송
for (int i = 0; i < 10; i++) {
String key = "user-" + i;
String value = "User action data: " + i;
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC_NAME, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("메시지 전송 실패: " + exception.getMessage());
} else {
System.out.println("메시지 전송 성공 - Topic: " + metadata.topic() +
", Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
}
}
});
}
} finally {
producer.close();
}
}
}
Producer 성능 최적화 설정
실무에서는 처리량과 안정성을 고려한 Producer 설정이 중요합니다. 다음은 성능 최적화를 위한 고급 설정 예제입니다:
public class OptimizedKafkaProducer {
private KafkaProducer<String, String> createOptimizedProducer() {
Properties props = new Properties();
// 기본 설정
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 성능 최적화 설정
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 배치 크기
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 배치 대기 시간
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 버퍼 메모리
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 압축 타입
// 안정성 설정
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 복제본 확인
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도 횟수
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 재시도 간격
return new KafkaProducer<>(props);
}
}
Kafka Consumer 구현하기
Kafka Consumer는 토픽에서 메시지를 구독하고 처리하는 애플리케이션입니다. Consumer Group을 통해 부하 분산과 장애 복구 기능을 제공받을 수 있습니다.
기본 Consumer 구현
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleKafkaConsumer {
private static final String TOPIC_NAME = "user-events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "user-event-consumer-group";
public static void main(String[] args) {
// Consumer 설정
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Consumer 인스턴스 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// 토픽 구독
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 메시지 소비
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("수신 메시지 - Key: %s, Value: %s, Topic: %s, Partition: %d, Offset: %d%n",
record.key(), record.value(), record.topic(), record.partition(), record.offset());
// 메시지 처리 로직
processMessage(record.key(), record.value());
}
// 수동 커밋 (옵션)
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(String key, String value) {
// 실제 비즈니스 로직 구현
System.out.println("메시지 처리 완료: " + key + " -> " + value);
}
}
Consumer Group과 파티션 할당
Consumer Group은 Kafka의 핵심 개념 중 하나입니다. 같은 그룹에 속한 Consumer들은 토픽의 파티션을 나누어 처리하여 부하를 분산시킵니다.
public class ConsumerGroupExample {
public static void createConsumerWithManualPartitionAssignment() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-assignment-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 수동 파티션 할당
TopicPartition partition0 = new TopicPartition("user-events", 0);
TopicPartition partition1 = new TopicPartition("user-events", 1);
consumer.assign(Arrays.asList(partition0, partition1));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 메시지 처리
processMessage(record);
}
// 처리 완료 후 커밋
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
System.out.printf("파티션 %d에서 메시지 처리: %s%n",
record.partition(), record.value());
}
}
실무 활용 예제: 주문 처리 시스템
실제 업무에서 활용할 수 있는 주문 처리 시스템을 Kafka를 이용해 구현해보겠습니다. 이 예제는 주문 생성, 재고 확인, 결제 처리 등의 이벤트를 비동기적으로 처리합니다.
주문 이벤트 Producer
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.CompletableFuture;
public class OrderEventProducer {
private final KafkaProducer<String, String> producer;
private final ObjectMapper objectMapper;
private static final String ORDER_TOPIC = "order-events";
public OrderEventProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
this.producer = new KafkaProducer<>(props);
this.objectMapper = new ObjectMapper();
}
public CompletableFuture<Void> publishOrderCreatedEvent(OrderCreatedEvent event) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
String eventJson = objectMapper.writeValueAsString(event);
String key = "order-" + event.getOrderId();
ProducerRecord<String, String> record =
new ProducerRecord<>(ORDER_TOPIC, key, eventJson);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
System.out.println("주문 이벤트 발행 성공: " + event.getOrderId());
future.complete(null);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
// 주문 이벤트 DTO 클래스
public static class OrderCreatedEvent {
private String orderId;
private String customerId;
private double amount;
private long timestamp;
// 생성자, getter, setter 생략
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
// ... 기타 getter/setter
}
}
주문 처리 Consumer
public class OrderProcessingConsumer {
private final KafkaConsumer<String, String> consumer;
private final ObjectMapper objectMapper;
public OrderProcessingConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.consumer = new KafkaConsumer<>(props);
this.objectMapper = new ObjectMapper();
}
public void startProcessing() {
consumer.subscribe(Arrays.asList("order-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processOrderEvent(record);
// 개별 메시지 처리 완료 후 커밋
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
System.err.println("주문 처리 실패: " + record.key() + ", 오류: " + e.getMessage());
// 에러 처리 로직 (재시도, 데드레터큐 등)
handleProcessingError(record, e);
}
}
}
} finally {
consumer.close();
}
}
private void processOrderEvent(ConsumerRecord<String, String> record) throws Exception {
OrderCreatedEvent event = objectMapper.readValue(record.value(), OrderCreatedEvent.class);
System.out.println("주문 처리 시작: " + event.getOrderId());
// 1. 재고 확인
if (!checkInventory(event)) {
throw new RuntimeException("재고 부족");
}
// 2. 결제 처리
if (!processPayment(event)) {
throw new RuntimeException("결제 실패");
}
// 3. 배송 준비
prepareShipment(event);
System.out.println("주문 처리 완료: " + event.getOrderId());
}
private boolean checkInventory(OrderCreatedEvent event) {
// 재고 확인 로직
System.out.println("재고 확인 중: " + event.getOrderId());
return true;
}
private boolean processPayment(OrderCreatedEvent event) {
// 결제 처리 로직
System.out.println("결제 처리 중: " + event.getOrderId());
return true;
}
private void prepareShipment(OrderCreatedEvent event) {
// 배송 준비 로직
System.out.println("배송 준비 중: " + event.getOrderId());
}
private void handleProcessingError(ConsumerRecord<String, String> record, Exception e) {
// 에러 처리 로직 (로깅, 알림, 재시도 큐 등)
System.err.println("에러 처리 로직 실행: " + record.key());
}
}
에러 핸들링과 재시도 전략
실무에서는 네트워크 장애, 서버 다운타임 등 다양한 예외 상황을 고려해야 합니다. 안정적인 Kafka 애플리케이션을 위한 에러 핸들링 전략을 살펴보겠습니다.
Producer 에러 핸들링
public class RobustKafkaProducer {
private final KafkaProducer<String, String> producer;
private final int maxRetries = 3;
public void sendMessageWithRetry(String topic, String key, String value) {
int attempt = 0;
while (attempt < maxRetries) {
try {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(10, TimeUnit.SECONDS);
System.out.println("메시지 전송 성공 (시도 " + (attempt + 1) + "): " +
metadata.topic() + "-" + metadata.partition() + "@" + metadata.offset());
return;
} catch (TimeoutException e) {
attempt++;
System.err.println("전송 타임아웃 (시도 " + attempt + "/" + maxRetries + "): " + e.getMessage());
if (attempt >= maxRetries) {
handleFinalFailure(topic, key, value, e);
break;
}
// 지수 백오프 적용
try {
Thread.sleep(1000 * (long) Math.pow(2, attempt - 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} catch (Exception e) {
System.err.println("메시지 전송 실패: " + e.getMessage());
handleFinalFailure(topic, key, value, e);
break;
}
}
}
private void handleFinalFailure(String topic, String key, String value, Exception e) {
// 실패한 메시지를 데드레터큐나 별도 저장소에 보관
System.err.println("최종 실패 처리: " + topic + "/" + key);
// 알림 발송, 로깅 등
}
}
Consumer 에러 핸들링
public class RobustKafkaConsumer {
private final KafkaConsumer<String, String> consumer;
private final Map<String, Integer> failureCount = new ConcurrentHashMap<>();
private final int maxFailures = 3;
public void processMessagesWithErrorHandling() {
consumer.subscribe(Arrays.asList("user-events"));
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
processMessage(record);
// 성공시 실패 카운트 초기화
failureCount.remove(messageKey);
// 수동 커밋
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
handleMessageFailure(messageKey, record, e);
}
}
} catch (Exception e) {
System.err.println("Consumer 폴링 에러: " + e.getMessage());
// Consumer 재시작 로직 등
}
}
}
private void handleMessageFailure(String messageKey, ConsumerRecord<String, String> record, Exception e) {
int currentFailures = failureCount.getOrDefault(messageKey, 0) + 1;
failureCount.put(messageKey, currentFailures);
System.err.printf("메시지 처리 실패 (%d/%d): %s, 오류: %s%n",
currentFailures, maxFailures, messageKey, e.getMessage());
if (currentFailures >= maxFailures) {
// 최대 재시도 횟수 초과
sendToDeadLetterQueue(record, e);
failureCount.remove(messageKey);
// 해당 메시지는 스킵하고 다음 메시지로 진행
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
}
// 재시도 가능한 경우 커밋하지 않음 (재처리됨)
}
private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
// 데드레터큐로 메시지 전송
System.err.println("데드레터큐 전송: " + record.key());
// 실제 구현에서는 별도 토픽이나 저장소로 전송
}
}
성능 모니터링과 메트릭
Kafka 애플리케이션의 성능을 모니터링하고 최적화하는 것은 운영 환경에서 매우 중요합니다. JMX 메트릭을 활용한 모니터링 방법을 알아보겠습니다.
Producer 메트릭 모니터링
public class KafkaMetricsMonitor {
private final KafkaProducer<String, String> producer;
private final ScheduledExecutorService scheduler;
public KafkaMetricsMonitor(KafkaProducer<String, String> producer) {
this.producer = producer;
this.scheduler = Executors.newScheduledThreadPool(1);
}
public void startMonitoring() {
scheduler.scheduleAtFixedRate(this::collectAndLogMetrics, 0, 30, TimeUnit.SECONDS);
}
private void collectAndLogMetrics() {
Map<MetricName, ? extends Metric> metrics = producer.metrics();
// 중요 메트릭 추출
double recordSendRate = getMetricValue(metrics, "record-send-rate");
double recordSizeAvg = getMetricValue(metrics, "record-size-avg");
double batchSizeAvg = getMetricValue(metrics, "batch-size-avg");
double requestLatencyAvg = getMetricValue(metrics, "request-latency-avg");
System.out.printf("Producer 메트릭 - 전송률: %.2f msg/sec, 평균 크기: %.2f bytes, " +
"배치 크기: %.2f, 지연시간: %.2f ms%n",
recordSendRate, recordSizeAvg, batchSizeAvg, requestLatencyAvg);
// 임계값 체크 및 알림
if (requestLatencyAvg > 1000) {
System.err.println("경고: Producer 지연시간이 높습니다 (" + requestLatencyAvg + "ms)");
}
}
private double getMetricValue(Map<MetricName, ? extends Metric> metrics, String metricName) {
return metrics.entrySet().stream()
.filter(entry -> entry.getKey().name().equals(metricName))
.findFirst()
.map(entry -> (Double) entry.getValue().metricValue())
.orElse(0.0);
}
}
Spring Boot와 Kafka 통합
Spring Boot 환경에서 Kafka를 더욱 쉽게 사용할 수 있는 방법을 살펴보겠습니다. Spring Kafka는 설정과 사용을 단순화해주는 추상화 레이어를 제공합니다.
Spring Boot 설정
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: spring-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
Spring Kafka Producer
@Service
public class SpringKafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message)
.addCallback(
result -> System.out.println("메시지 전송 성공: " + key),
failure -> System.err.println("메시지 전송 실패: " + failure.getMessage())
);
}
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
String message = convertToJson(event);
sendMessage("order-events", "order-" + event.getOrderId(), message);
}
}
마무리
Java를 사용한 Kafka Producer와 Consumer 구성은 현대적인 마이크로서비스 아키텍처에서 필수적인 기술입니다. 이번 포스팅에서 다룬 내용들을 정리하면 다음과 같습니다:
핵심 포인트:
- Kafka 클라이언트 라이브러리 설정과 기본 구성 방법
- Producer와 Consumer의 실무 활용 예제
- 에러 핸들링과 재시도 전략 구현
- 성능 모니터링과 메트릭 수집 방법
- Spring Boot 환경에서의 Kafka 통합
실제 프로덕션 환경에서는 보안 설정, 스키마 레지스트리 활용, 클러스터 구성 등 추가적인 고려사항들이 있습니다. 하지만 이번 가이드를 통해 Java Kafka 애플리케이션의 기초를 탄탄히 다졌다면, 더 복잡한 요구사항도 차근차근 해결해 나갈 수 있을 것입니다.
Kafka를 활용한 이벤트 기반 아키텍처는 시스템의 확장성과 유연성을 크게 향상시켜줍니다. 지속적인 학습과 실습을 통해 더욱 견고하고 효율적인 분산 시스템을 구축해보시기 바랍니다.
추가 고려사항
보안 설정
프로덕션 환경에서는 SASL(Simple Authentication and Security Layer)과 SSL을 통한 보안 설정이 필수입니다:
public class SecureKafkaProducer {
private KafkaProducer<String, String> createSecureProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "secure-kafka-cluster:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// SSL 설정
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"your-username\" password=\"your-password\";");
// SSL 인증서 설정
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
return new KafkaProducer<>(props);
}
}
스키마 레지스트리 활용
대규모 시스템에서는 Avro 스키마 레지스트리를 사용하여 데이터 호환성을 관리합니다:
public class AvroKafkaProducer {
private KafkaProducer<String, GenericRecord> createAvroProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
return new KafkaProducer<>(props);
}
public void sendAvroMessage(String orderId, double amount) {
String schemaString = """
{
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
""";
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecord record = new GenericData.Record(schema);
record.put("orderId", orderId);
record.put("amount", amount);
record.put("timestamp", System.currentTimeMillis());
producer.send(new ProducerRecord<>("order-events-avro", orderId, record));
}
}
트랜잭션 처리
Kafka 트랜잭션을 통해 정확히 한 번(exactly-once) 처리를 보장할 수 있습니다:
public class TransactionalKafkaProducer {
private final KafkaProducer<String, String> producer;
public TransactionalKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 트랜잭션 설정
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-tx");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
this.producer = new KafkaProducer<>(props);
this.producer.initTransactions();
}
public void processOrderTransactionally(String orderId, String customerData, String inventoryData) {
try {
producer.beginTransaction();
// 여러 토픽에 관련 메시지 전송
producer.send(new ProducerRecord<>("order-events", orderId, customerData));
producer.send(new ProducerRecord<>("inventory-updates", orderId, inventoryData));
producer.send(new ProducerRecord<>("customer-notifications", orderId, "주문이 접수되었습니다"));
// 모든 메시지가 성공적으로 전송되면 커밋
producer.commitTransaction();
System.out.println("트랜잭션 처리 완료: " + orderId);
} catch (Exception e) {
System.err.println("트랜잭션 실패, 롤백 수행: " + e.getMessage());
producer.abortTransaction();
throw new RuntimeException("주문 처리 실패", e);
}
}
}
성능 튜닝 체크리스트
실제 운영 환경에서 고려해야 할 성능 튜닝 포인트들입니다:
Producer 튜닝:
- batch.size: 배치 크기 조정 (기본값: 16384)
- linger.ms: 배치 대기 시간 (기본값: 0)
- compression.type: 압축 알고리즘 선택 (snappy, gzip, lz4, zstd)
- buffer.memory: 메모리 버퍼 크기 (기본값: 33554432)
Consumer 튜닝:
- fetch.min.bytes: 최소 페치 크기 (기본값: 1)
- fetch.max.wait.ms: 최대 대기 시간 (기본값: 500)
- max.poll.records: 한 번에 가져올 레코드 수 (기본값: 500)
- session.timeout.ms: 세션 타임아웃 (기본값: 10000)
public class PerformanceTunedConsumer {
private KafkaConsumer<String, String> createOptimizedConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-performance-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 성능 최적화 설정
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 100ms
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 1000개
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536); // 64KB
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072); // 128KB
return new KafkaConsumer<>(props);
}
public void processWithBulkOperation() {
KafkaConsumer<String, String> consumer = createOptimizedConsumer();
consumer.subscribe(Arrays.asList("high-volume-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
List<String> batch = new ArrayList<>();
// 배치 처리를 위한 데이터 수집
for (ConsumerRecord<String, String> record : records) {
batch.add(record.value());
}
// 벌크 처리 (데이터베이스 배치 삽입 등)
processBatch(batch);
// 배치 처리 완료 후 한 번에 커밋
consumer.commitSync();
System.out.println("배치 처리 완료: " + batch.size() + "개 메시지");
}
}
}
private void processBatch(List<String> batch) {
// 실제 배치 처리 로직 (데이터베이스 배치 삽입 등)
System.out.println("배치 처리 중: " + batch.size() + "개 항목");
}
}
이렇게 Java로 구성한 Kafka Producer와 Consumer는 현대적인 분산 시스템의 핵심 구성요소가 됩니다. 적절한 설정과 에러 핸들링, 성능 튜닝을 통해 안정적이고 확장 가능한 메시징 시스템을 구축할 수 있습니다.
실무에서는 모니터링, 로깅, 알림 시스템과의 연동도 중요하니, 이러한 부분들도 함께 고려하여 완성도 높은 Kafka 애플리케이션을 개발해보시기 바랍니다.
'자바(Java) 실무와 이론' 카테고리의 다른 글
Java 모듈 시스템 완벽 이해: Java 9 이후 모던 자바 개발의 핵심 (0) | 2025.05.27 |
---|---|
Java의 GC 튜닝 실전 사례: Throughput vs Latency 중심 (0) | 2025.05.23 |
Java와 Kotlin 비교 – Spring 개발자 관점에서 (0) | 2025.05.23 |
Java 21부터 달라진 주요 기능 요약: 실무 개발자가 알아야 할 핵심 변화점 (0) | 2025.05.23 |
자바로 만드는 자동 메일링 시스템 – JavaMailSender 완벽 정복 (0) | 2025.05.11 |