当前位置: 首页 > 知识库问答 >
问题:

如何将消息从一台PC上的Kafka制作人发送到另一台PC上的Kafka经纪人?

文喜
2023-03-14

我试图用wifi接口从一台电脑上的Kafka制作人发送消息到另一台电脑上的Kafka经纪人,但消息不出现在Kafka经纪人的指定主题中。

我用华硕无线路由器连接了两台PC机,并禁用了PC机和路由器上的所有防火墙。两台PC都成功地ping了对方。当我转向有线连接时,它工作了,消息被摄取到kafka broker PC上的指定主题。

Kafka制片人:

public class CarDataProducer {
    public static void main(String[] args) {

        CarDataProducer fProducer= new CarDataProducer();
        Producer<String, CarData> producer= fProducer.initializeKafkaProducer();

        String topicName = "IN-DATA";

        CSVReaderCarData csvReader = new CSVReaderCarData();
        List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");

        //read from CSV file and send
        for (CarData val : CarDataList) {
            producer.send(new ProducerRecord<String, CarData>(topicName, val));
        }
    }

    public KafkaProducer<String, CarData> initializeKafkaProducer() {

        // Set the producer configuration properties.
        Properties props = ProducerProperties.getInstance();

        // Instantiate a producerSampleJDBC
        KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);

        return producer;
    }

public class ProducerProperties {
    private ProducerProperties() {
    }

    public static final Properties props = new Properties();
    static {
        props.put("bootstrap.servers", "192.168.1.124:9092");
        props.put("acks", "0");
        props.put("retries", 0);
        props.put("batch.size", 500);
        props.put("linger.ms", 500);
        props.put("buffer.memory", 500);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
    }

    public static Properties getInstance() {
        return props;
    }
}

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA
 kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}

    null

Listeners=明文://:9092

  • netstat-anogrep'9092'

tcp6 0 0:::9092::*
监听(0.00/0/0)tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092建立keepalive(6659.53/0/0)tcp6 0 0 127.0.1.1:9092建立keepalive(6659.53/0/0)tcp6 0 0 127.0.1.1:9092建立keepalive(6659.15/0/0)tcp6 0 0 127.0.1.1:53878建立keepalive(6659.15/0/0)tcp6 0 0 127.0.0.1:53878 127.0.1.1:53878 127.0.1.1:9092建立keepalive(6659.15/0/0)

通过向kafka生产者的发送添加回调,我得到超时错误:

timeoutexception:自上次追加后,in-data-0:30045毫秒的8条记录过期

共有1个答案

乜飞航
2023-03-14

我解决了。每个Kafka经纪人必须公布其主机名/IP,以便从另一台PC上的Kafka制作人那里获得。

 kafka-server-start.sh config/server.properties --override  advertised.listeners=PLAINTEXT://192.168.1.124:9092
sha

相反,我们可以按以下方式更新config/server.properties:

advertised.listeners=PLAINTEXT://your.host.name:9092 

advertised.listeners=PLAINTEXT://192.168.1.124:9092
 类似资料:
  • 问题内容: 我目前在开发PC上使用Jenkins。我将其安装在开发PC上,因为我对该工具的了解有限。因此我在开发PC上对其进行了测试。现在,我对Jenkins感到很满意,可以作为我在构建过程中的长期“合作伙伴”,并希望将此Jenkins“移动”到专用服务器上。 在此之前,我完成了很少的构建,并从每个构建中存档了工件。特别是,对于我来说,内部版本号对于版本控制非常重要。 如何将所有Jenkins信息

  • 我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js

  • 问题内容: 我正在尝试在pyinstaller的可执行文件中添加Chromedriver。虽然这是可能的,但似乎在尝试在另一台计算机上运行此错误消息。 我已经尝试了一些职位,包括本的一个,但不幸的是,这并没有提供预期的效果。最好的情况是,当chrome exe位于同一文件夹中时,我可以在自己的计算机上运行它,这无济于事。 代码1: 主程序 在另一台PC上运行时,我得到的是: 错误1: 找不到Chr

  • 我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了

  • 在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。 我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。 同时,我还创建了两个分区。 但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在

  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢