diff --git a/common/messaging/src/main/java/messaging/config/KafkaConsumerConfig.java b/common/messaging/src/main/java/messaging/config/KafkaConsumerConfig.java index 5ebda89d..97c49273 100644 --- a/common/messaging/src/main/java/messaging/config/KafkaConsumerConfig.java +++ b/common/messaging/src/main/java/messaging/config/KafkaConsumerConfig.java @@ -36,7 +36,7 @@ public ConsumerFactory consumerFactory() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // 초기 오프셋 위치 설정 + // 초기 오프셋 읽기 위치 설정 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 수동 커밋 diff --git a/common/messaging/src/main/resources/application-kafka.yml b/common/messaging/src/main/resources/application-kafka.yml index dff382e7..686feeec 100644 --- a/common/messaging/src/main/resources/application-kafka.yml +++ b/common/messaging/src/main/resources/application-kafka.yml @@ -1,6 +1,6 @@ spring: kafka: - bootstrap-servers: kafka:29092 + bootstrap-servers: kafka1:29092,kafka2:29093 consumer: group-id: ${spring.application.name}-group properties: diff --git a/docker-compose/docker-compose-kafka.yml b/docker-compose/docker-compose-kafka.yml index b4e86ed7..f58fb77c 100644 --- a/docker-compose/docker-compose-kafka.yml +++ b/docker-compose/docker-compose-kafka.yml @@ -1,38 +1,72 @@ version: "3.1" services: - zookeeper: + zookeeper1: image: confluentinc/cp-zookeeper:latest - container_name: zookeeper + container_name: zookeeper1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888 - kafka: + zookeeper2: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper2 + ports: + - "2182:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 2 + ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888 + + kafka1: image: confluentinc/cp-kafka:latest - container_name: kafka + container_name: kafka1 ports: - "9092:9092" - "29092:29092" environment: - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://kafka:29092 + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://kafka1:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_BROKER_ID: 1 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 depends_on: - - zookeeper + - zookeeper1 + - zookeeper2 + + kafka2: + image: confluentinc/cp-kafka:latest + container_name: kafka2 + ports: + - "9093:9093" + - "29093:29093" + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://kafka2:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + depends_on: + - zookeeper1 + - zookeeper2 kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - - kafka + - kafka1 + - kafka2 ports: - 8080:8080 environment: KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 \ No newline at end of file + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29093 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper1:2181,zookeeper2:2181 + +networks: + kafka: + driver: bridge \ No newline at end of file diff --git a/services/queue-manage/src/main/java/com/ticketPing/queue_manage/infrastructure/config/ReactiveKafkaConfig.java b/services/queue-manage/src/main/java/com/ticketPing/queue_manage/infrastructure/config/ReactiveKafkaConfig.java index daf56c8e..168f622a 100644 --- a/services/queue-manage/src/main/java/com/ticketPing/queue_manage/infrastructure/config/ReactiveKafkaConfig.java +++ b/services/queue-manage/src/main/java/com/ticketPing/queue_manage/infrastructure/config/ReactiveKafkaConfig.java @@ -32,7 +32,7 @@ public ReceiverOptions receiverOptions() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // 초기 오프셋 위치 설정 + // 초기 오프셋 읽기 위치 설정 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 수동 커밋