본문 바로가기

카테고리 없음

kafka producer

반응형

시작 전에 리눅스 서버에서 server.propertise의 advertised.listener 가 뭔지 확인하고, localhost:9092에서 통신할 ip로 변경해주기 

이렇게 되어있어야 한다. 안그러면 밑에서 실행할 java 프로그램 (producer에서 오류난다.)

바꾼 후엔, 재부팅

 

 

 

HOST 셋팅

1. vs code 다운로드
2. java 설치 및 환경변수 지정 --> 확인 cmd 열어서 java 및 javac
3. 깃헙에서 소스코드 zip 으로 다운로드 받고, 폴더 오픈
4. 원하는 코드 찾아서 ctrl+shift+d --> vs code 내에서 gradle, java 디버거 등등 없으면 다운로드 (vscode 화면에서 바로 가능)
5. 코드에서 BOOTSTRAP_SERVERS를 내꺼로 바꿔주기

private final static String BOOTSTRAP_SERVERS = "192.168.56.201:9092";

6. 재실행 --> 터미널 올라가는 것 확인 가능

7. 처음엔 사실 브로커와의 통신이 안된다고 떴었음
8. 윈도우 cmd 열어 telnet 192.168.56.201 9092 시, 접속 불가능

 

리눅스 서버 세팅

1. 카프카 서버 실행
cd 카프카 홈
[kafka@kafka3 kafka_2.13-3.5.1]$ bin/kafka-server-start.sh config/kraft/server.properties
2. 서버 실행 확인
ps -ef | grep 9092
3. 즉, 포트는 열려있는데 호스트에서 못들어오는 것

==> 리눅스 방화벽이 문제-> 방화벽 종료 필요

sudo service iptables stop
sudo systemctl stop firewalld
systemctl status firewalld

 

방화벽 종료하니 telnet 및 데이터 전송 가능.
리눅스 서버 consumer로 확인

 

이게 뭐라고

박수를 짞짞짞 쳤다.

 

 

 

코드

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class ProducerWithKeyValue {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "192.168.56.201:9092;192.168.56.202:9092;192.168.56.203:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
        producer.send(record);
        ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "Busan", "Busan");
        producer.send(record2);
        producer.flush();
        producer.close();
    }
}

 

결과

[root@kafka1 kafka_2.13-3.5.1]# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginning --property print.key=true --property key.separator="-"
null-testMessage
Pangyo-Pangyo
Busan-Busan

 

 

 

 

 

 

 

반응형