当前位置: 首页 > 知识库问答 >
问题:

kafka consumer API consumer.poll()在没有错误、没有异常和只是块情况下不能工作

柳涵映
2023-03-14

我正在学习Kafka以下阿帕奇Kafka文件。我用默认配置启动它。

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &

我运行kafka-console-producer.sh和kafka-console-conducer.sh来生成和使用消息,并且成功了。我写了一个java代码,使用producer API生成消息,这是可以的。这可以通过kafka-console-consumer.sh进行验证。代码与Apache Kafka指南相同:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",    "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
  producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka版本和API版本为0.11.0.0

为什么他们不能消费信息?

共有1个答案

怀飞扬
2023-03-14

使用--Zookeeper参数意味着使用旧的使用者,它工作得很好,因为您指定的是Zookeeper服务器(localhost:2181)。

当您想要指定一个Kafka代理(因此使用新的使用者)时,您必须使用--bootstrap-server选项:您仍然使用--zookeeper但传递了一个有效的Kafka代理地址(localhost:9092)。

因此,对于控制台使用者应用程序,您的配置需要为--bootstrap-server localhost:9092而不是--zookeeper localhost:9092

关于您的代码,您确定poll方法被阻止了吗?如果没有记录但没有阻塞,它应该在100 ms后退出(您指定的超时)。

然后从您的代码中我看到生产者发送到“my-topic”,消费者订阅“foo”和“bar”;最后,控制台使用者读取“test”。都是不同的话题!

 类似资料:
  • 问题内容: 在Java中,我们使用try catch块处理异常。我知道我可以像下面这样编写一个try catch块来捕获方法中抛出的任何异常。 但是Java中有什么方法可以让我在发生异常时获取一种称为的特定方法,而不是像上面的方法那样编写一个包罗万象的方法? 具体来说,当抛出异常(我的应用程序逻辑未处理)时,我想在Swing应用程序中显示一条用户友好的消息。 谢谢。 问题答案: 默认情况下,JVM

  • 我刚刚通过nodejs.org上的软件包安装了node和npm,每当我试图搜索或安装npm时,它都会抛出以下错误,除非我执行该命令。我觉得这是一个权限问题?我已经是管理员了。

  • 问题内容: 我有一个带有一些视图的布局,其中一个是EditText。布局很容易放在一页上,但当软键盘退出时,布局不会滚动。这是我的布局的回顾: 在清单中,我声明了该属性: 有谁知道为什么它不起作用以及如何确保它起作用? 提前致谢! 问题答案: 好的,显然不应该将ScrollView 设置为。我将其设置为并将其设置为页面底部的按钮。 不要问我为什么,但这可以解决问题。

  • 我在创建一个不需要Robolectric的单元测试时遇到了麻烦。我在代码中使用AndroidThreeten.init(this),当我运行测试时,如果禁用robolectric,我会得到一个错误: 还是因为这个我必须用robolectric?(附注:Log不是android的util.Log,而是我自己的类)(编辑过)

  • 问题内容: 我得到这个错误 01-14 12:20:57.591:E / AndroidRuntime(1825):原因:android.content.res.Resources $ NotFoundException:字符串资源ID#0x7f040003 如果我使用类变量保存上下文并执行 context.getString(R.string.create_profile_table_sql)*

  • 问题内容: 我有一种方法可以做很多事情。其中包括进行大量插入和更新。因此宣布… 它的工作完全符合预期,我对此没有任何问题。但是在某些情况下,尽管没有异常,但我还是想强制回滚…目前,当我遇到合适的条件时,我正在强制异常,但这很丑陋,我不喜欢它。 我可以以某种方式积极地调用回滚吗?异常调用它…我在想也许我也可以。 问题答案: 在Spring Transactions中,您使用。 您在这里遇到的问题是您