我有两个Kafka代理:server1:9092和server2:9092我正在使用Java客户端向这个集群发送消息,代码如下:
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
当其中一个代理关闭时,Test在某些情况下会抛出此异常(在此异常示例中'server1'已关闭):
2015-11-02 17:59:29138警告[org.apache.kafka.common.network.Selector]服务器1/40.35.250.227 java的I/O错误。网ConnectException:连接被拒绝:sun没有进一步的信息。尼奥。索克坦林普尔。sun的checkConnect(本机方法)。尼奥。索克坦林普尔。finishConnect(socketchannelmpl.java:717)位于org。阿帕奇。Kafka。常见的网络选择器。在org上投票(Selector.java:238)。阿帕奇。Kafka。客户。网络客户端。在org上投票(NetworkClient.java:192)。阿帕奇。Kafka。客户。制作人内部。发件人。在org上运行(Sender.java:191)。阿帕奇。Kafka。客户。制作人内部。发件人。在java上运行(Sender.java:122)。朗。丝线。运行(Thread.java:745)
这就是我解决问题的方法:
>
将此参数添加到ZooKeeper属性文件:
滴答时间=200
使用这些其他参数时需要此参数:
initLimit=5
syncLimit=2
>
道具。setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,“10000”);
使用"RECONNECT_BACKOFF_MS_CONFIG"
属性,WARN只被抛出一次(不是无限循环),然后消息被发送
我遇到了这个问题,结果发现原因是对一个新配置属性的误解。
在从以前的ProducerAPI迁移时,我寻找了一个与“topic.metadata.refresh.interval.ms”等效的API,并选择了ProducerConfig。元数据获取超时配置。然而,事实证明,这是在尝试访问元数据被视为失败之前的超时,由于我将其设置为几分钟,它阻止了故障转移的发生。
将其设置为较低的值(我选择500毫秒)似乎解决了我的问题。
我相信我最初寻找的价值是ProducerConfig。METADATA_MAX_AGE_CONFIG作为元数据刷新前的超时,无论是否发生故障
我让消费者在我的机器上运行。当我停止Kafka broker时,我在应用程序中得到警告 但是在2-4分钟后被触发。根据此文档 https://github.com/spring-projects/spring-kafka/blob/master/src/reference/asciidoc/kafka.adoc#idle-containers 它说“如果轮询未在 pollInterval 属性的
我想在我的集群中有一个调度器,它会在一段时间后发送一些消息。从我看到的调度器是每个actorsystem,从我的测试只针对本地actor系统。不是集群那一个。因此,如果在一个节点上调度某个任务,如果它被关闭,那么所有调度的任务都将被丢弃。 如果我创建一个集群单例来负责调度,那么已经制定的调度是否可以在其他节点上重新创建?还是我应该将它作为一个具有已经创建的调度元数据结构的持久执行元,并在预启动阶段
我有 2 个Kafka集群。群集 A 和群集 B。这些集群是完全独立的。我有一个Spring启动应用程序,它侦听集群 A 上的主题,转换事件,然后将其生成到集群 B 上。我只需要一次,因为这些是金融事件。我注意到,对于我当前的应用程序,我有时会遇到重复的情况,也会错过一些事件。我试图尽我所能只实现一次。其中一篇帖子说,与Spring启动相比,flink将是一个更好的选择。我应该搬到闪光灯吗?请参阅
鉴于以下情况: 我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart 然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,produ
我有一个关于AWS上Kafka经纪人集群的问题。现在集群前面有一个AWS ELB,但当我将生产者或消费者的“bootstrap.servers”属性设置为ELB的“A”记录(以及正确的端口号)时,生产者和消费者都无法分别生成和使用消息。我已经关闭了我的代理上的所有SSL,并通过明文9092端口进行连接,我的ELB将端口1234转发到9092。例如,在我的Producer配置属性中,我将。。。 bo
本教程用于多代理kafka集群。我建立了三个经纪人: 本地主机:9092 本地主机:9093 本地主机:9094 问题是,如果我杀死,我就不能使用以下命令: 我知道端口被杀死了,但是--如何通过通用引导服务器来使它运行?我错过了什么? 编辑1: bin/kafka-console-consumer.sh--bootstrap-server localhost:9092,localhost:9093