当前位置: 首页 > 知识库问答 >
问题:

如何在Java中使用Kafka8.2 API生成消息?

陆宏壮
2023-03-14
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

我正在使用以下客户端代码创建远程KafkaProducer

Properties propsProducer = new Properties();

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);

一旦我创建了生产者,我就可以运行下面的行并返回有效的主题信息,假设strTopic是一个现有的主题名称。

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);

当我尝试发送消息时,我会执行以下操作:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");

共有1个答案

蒙经纶
2023-03-14

经过大量的挖掘,我决定实现这里找到的示例:Kafka生产者示例。我缩短了代码,没有实现一个分区器类。我用列出的依赖项更新了我的pom,但我仍然有同样的问题。最后,我做了一些配置更改,一切都工作了。

最后一个难题是在服务器和客户机的/etc/hosts中定义Kafka服务器。我在两个文件中都添加了以下内容。

172.xx.xx.xxx     serverHost1

同样,X只是面具。然后,我将server.properties文件中的adveredsed.host.name设置为serverhost1。注意:我是在服务器机器上运行ifconfig后得到该IP的。

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080");
propsProducer.put("metadata.broker.list", "serverHost1:8080");

Kafka API不喜欢我将IP定义为字符串的事实。相反,它是从etc/hosts文件中查找IP,尽管文档中说:

代理将向生产者和消费者通告主机名。如果未设置,则使用已配置的“host.name”的值。否则,它将使用从java.net.inetaddress.getCanonicalHostName()返回的值。

它将以字符串形式返回IP,如果没有在客户端机器的etc/hosts中定义,则返回与IP成对的名称(在我的例子中是serverHost1)。另外,我也从来没有设置host.name的值。

 类似资料:
  • 在阅读了大量的Kafka合流式聊天的文章后,我想尝试一下如何实现一个普通的聊天系统。但是我在做一些结构设计的时候遇到了一些问题。当使用mysql作为数据的数据库时,我可以给每个有意义的消息赋予,比如user表中的user_id,消息表中的message_id。模型表中有了id后,可以方便地进行客户端和服务器端的通信。但是在Kafka流中,我如何在Ktable中给每个有意义的模型一个唯一的id?还是

  • 问题内容: 我想用Java生成一个.torrent文件,但是我不想要一个大的API,它可以执行诸如抓取跟踪器,种子等操作。这仅适用于生成元数据的客户端。存在哪些轻量级解决方案?我只生成一个.zip文件的.torrent。 谢谢! 问题答案: 我整理了这段独立的Java代码,以准备一个带有单个文件的.torrent文件。 通过调用.torrent文件的名称,共享文件的名称和跟踪器URL 来创建.to

  • 我想产生一个Kafka主题的信息。该消息应该具有以下模式: 我知道这是一个json模式,那么如何将json转换成字符串呢?

  • 问题内容: 我正在努力用Java生成JSON字符串。 实际输出: 预期输出: 问题答案: 写 代替

  • 问题内容: 在Java中以至少32个字节长的String形式生成SALT值的最佳方法是什么? 问题答案:

  • 我的印象是,UUID规范需要一个有保证的、真实的、全球唯一的结果,不是99.999999999999%的唯一结果,而是100%的唯一结果。从规格来看: UUID为128位长,可以保证跨空间和时间的唯一性。 看起来java只支持UUID规范的V3和V4。V4并不是真正独特的。对于使用< code > namuuidfrombytes 的V3实现,下面的结果是重复的,因为计算机太快了(编辑:循环到10