到目前为止,在运行Spring Boot应用程序之前,我已经从命令行启动了zookeeper和kafka server,但现在我需要直接从代码启动它们。
首先,我已经尝试在main方法中使用ProcessBuilder:
Process process = new ProcessBuilder("C:\\kafka_2.12-2.2.0\\bin\\windows\\zookeeper-server-start.bat",
"C:\\kafka_2.12-2.2.0\\config\\zookeeper.properties").start();
InputStream is = process.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
但这似乎不起作用,因为控制台上没有打印任何内容,过了一段时间,应用程序抛出一个TimeoutException。
第二,我想让kafka服务器在Zookeeper启动后运行;一个人怎样才能做到这一点?
您必须使用ZookeePerExecutor
从“您的”java应用程序启动Zookeeper API,方法是提供4个初始参数和一个线程运行器。ZooKeeper API文档https://ZooKeeper.apache.org/doc/r3.4.13/javaexample.html和https://www.programcreek.com/java-api-examples/?API=org.apache.ZooKeeper.server.zookeeperservermain给出了一个示例
我有一个独立的Kafka经纪人,我试图配置SASL。配置如下。我试图在经纪人身上设置SASL_PLAIN身份验证。 我的理解是,这与听众有关。名称服务器中的配置。属性,我不需要jaas文件。但我已经尝试过一种方法,看看这是否是一种更好的方法。 我对这些命令中的每一个都进行了实验,但都产生了相同的异常。 显示的异常为: KafkaServer启动期间出现致命错误。准备关机。。。找不到“KafkaSe
我是Kafka的初学者 1/我下载了Kafka的1.0.0版本 2/I更改了bith server.properties和zookeeper.properties中的数据目录位置属性 你能帮帮我吗?
null null 谢谢你的回答。
我正在尝试用3个代理&Zookeeper来测试运行一个单独的Kafka节点。我希望使用控制台工具进行测试。我是这样管理制作人的: 然后我以这样的方式运行消费者: 我可以在生产者中输入消息,并在消费者中看到它们,这是预期的。但是,当我使用bootstrap-server运行消费者的更新版本时,我什么也得不到。例如: 当我有一个代理在端口9092上运行时,这工作得很好,所以我彻底搞糊涂了。有没有办法让
我使用Zookeeper和Kafka作为使用Java的消息传递用例。我以为当您重新启动Zookeeper和Kafka服务器时,消费者组详细信息会被删除。但他们没有。zookeeper会将消费者组详细信息保存在某种文件中吗? 如果我想重置消费者组,我应该手动删除消费者组详细信息吗? 任何人都可以向我澄清这一点吗?
这更多的是一个系统设计问题。 让我们假设我有一个微服务体系结构,我有X个实例(用于负载平衡对服务的HTTP请求)。但是,也是Kafka主题的消费者。如何避免将同一消息处理X次(X是的实例数)<如果处理是幂等的,至少一次就可以了。它不需要是,但不能是。 服务A可以是订单服务。它生成关于用户向订单主题下单的消息。 服务B可以是支付服务。它使用订单主题中的消息向用户收费。 支付订单可能是幂等操作。但是,