当前位置: 首页 > 知识库问答 >
问题:

无法向Kafka主题发送单个消息

郏稳
2023-03-14

我正在使用kafka java客户端0.11.0和kafka服务器2.11-0.10.2.0

我的代码:

Kafka马纳格

public class KafkaManager {

    // Single instance for producer per topic
    private static Producer<String, String> karmaProducer = null;

    /**
     * Initialize Producer
     * 
     * @throws Exception
     */
    private static void initProducer() throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.kafkaUrl);
        props.put(ProducerConfig.RETRIES_CONFIG, Constants.retries);
        //props.put(ProducerConfig.BATCH_SIZE_CONFIG, Constants.batchSize);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Constants.requestTimeout);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, Constants.linger);
        //props.put(ProducerConfig.ACKS_CONFIG, Constants.acks);
        //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Constants.bufferMemory);
        //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Constants.maxBlock);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, Constants.kafkaProducer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try {
            karmaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
        }
        catch (Exception e) {
            throw e;
        }
    }

    /**
     * get Producer based on topic
     * 
     * @return
     * @throws Exception
     */
    public static Producer<String, String> getKarmaProducer(String topic) throws Exception {
        switch (topic) {
        case Constants.topicKarma :
            if (karmaProducer == null) {
                synchronized (KafkaProducer.class) {
                    if (karmaProducer == null) {
                        initProducer();
                    }
                }
            }
            return karmaProducer;

        default:
            return null;
        }
    }

    /**
     * Flush and close kafka producer
     * 
     * @throws Exception
     */
    public static void closeKafkaInstance() throws Exception {
        try {
            karmaProducer.flush();
            karmaProducer.close();
        } catch (Exception e) {
            throw e;
        }
    }
}
public class KafkaProducer {

    public void sentToKafka(String topic, String data) {
        Producer<String, String> producer = null;
        try {
            producer = KafkaManager.getKarmaProducer(topic);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, data);
            producer.send(producerRecord);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class App {

    public static void main(String[] args) throws InterruptedException {

        System.out.println("Hello World! I am producing to stream " + Constants.topicKarma);
        String value = "google";
        KafkaProducer kafkaProducer = new KafkaProducer();
        for (int i = 1; i <= 1; i++) {
            kafkaProducer.sentToKafka(Constants.topicKarma, value + i);
            //Thread.sleep(100);
            System.out.println("Send data to producer=" + value);
            System.out.println("Send data to producer=" + value + i + " to tpoic=" +  Constants.topicKarma);
        }
    }
}

当我的循环长度如果在1000左右(在类app)时,我就能成功地向Kafka主题发送数据。

但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。

根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。

但是如果我使用Thread.Sleep(10)(正如你在我的应用程序类中看到的,我已经对它进行了评论),那么我就成功地发送了关于我的主题的数据。

你能解释一下为什么Kafka表现这种模棱两可的行为。

共有1个答案

高溪叠
2023-03-14

对KafKaProducer.send()的每个调用都返回一个未来。您可以在退出之前使用这些期货中的最后一个来阻塞主线程。更简单的是,您可以在发送所有消息后调用kafkaProducer.flush():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaProducer.html#flush()

调用此方法会使所有缓冲记录立即可用发送(即使linger.ms大于0),并在完成与这些记录相关联的请求时阻塞。

 类似资料:
  • 我在用Kafka。这是我的代码,在那里我想发送消息到kafka服务器,主题名是“west”,消息是“message1”。我没有收到任何错误,虽然我没有看到我发送的消息在主题中有什么问题吗?

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”

  • 我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那