为了使用Kafka通用地发布消息,我使用类名作为主题:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
服务器属性(我从默认属性中唯一更改的内容):
num.partitions=4
注意:我还尝试了以下用户设置:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
如果您的所有使用者都具有相同的使用者组(group.id
属性),那么该组中只有一个使用者将接收消息。如果希望所有使用者都接收消息,他们需要有不同的group.id
。
要检查哪些使用者绑定到主题的分区,可以使用以下命令
./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe
拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...
我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:
我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: