我有一个Cloudera集群,在3台不同的机器上有3个经纪人。我正在从集群内的第四个开发。 我创建了我的主题如下:创建主题 /usr/bin/kafka-topics --zookeeper host:2181,host2:2181,hosts3:2181/kafka --create --partitions 10 --replication-factor 2 --topic topicname
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?
我尝试在scala中实现一个非常简单的Kafka(0.9.0.1)消费者(代码如下)。 据我理解,Kafka(或者更好地说是Zookeeper)为每个groupId存储给定主题的最后一条消费消息的偏移量。因此,在以下情况下: 具有 的使用者,昨天只消耗了主题中的 5 条消息。现在最后使用的消息的偏移量为 4(考虑偏移量为 0 的第一条消息) 在夜间,2条新消息到达主题 今天我重新启动消费者,使用相
使用Lagom 1.4.11,kafka 0.11与kafka的沟通似乎有效,因为制片人已经制作了一些东西。只有使用者有错误。 看起来我的µService与kafka没有任何联系。我有一个kafka-0 pod,kafka-zooker-0。 我安装了它 是他们能帮忙的人。谢谢你。 添加。 我发现制作人创造了主题: 似乎kafka不存储消息。
我使用一个生产者在本地主机上运行的Kafka服务器中输入了一些消息。Zookeeper也在本地主机上。我使用了这里给出的。 但是,消费者似乎没有收到任何消息! 脚本可以提取所有这些消息,但代码不能。怎么了?消费者代码与该页面上给出的代码完全相同。 这是我发布消息的同一主题。以下是制作人的代码:
我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...
我已经开始学习Kafka了。在上面尝试基本的操作。我坚持了一个关于“经纪人”的观点。 我的kafka正在运行,但当我想创建分区时。 traceback(最近一次调用):文件“”,第1行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”,第284行,在init self中_client=KafkaClient(me
我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p
我使用的是Kafka流,具有无状态的简单处理器拓扑结构。 我有一个主题,有100个分区,有2台机器,每台机器有50个线程,运行同一个流媒体应用程序,因此最终我将在它们之间进行1-1映射。 主题中的消息已是键控消息。 我有一个逻辑约束,一旦线程连接到一个或多个分区,它应该继续处理这些分区(当然,直到重新启动发生,它会重新洗牌) 我从日志中看到线程反复(重新)加入消费者组。 我的问题,kafka 流
我有一个Kafka主题,有50个分区 My Spring Boot应用程序使用Spring Kafka通过读取这些消息 Kubernetes中应用程序自动缩放的实例数。 默认情况下,Spring Kafka似乎每个主题启动1个消费者线程。 因此,对于应用程序的唯一实例,一个线程正在读取50个分区。 对于2个实例,有一个负载平衡,每个实例侦听25个分区。每个实例仍然有1个线程。 我知道我可以使用上的
我目前正在探索Kafka,作为一个简单问题的初学者。 将有一个生产者向一个主题推送消息,但将有n个spark应用程序的消费者从kafka发送消息并插入到数据库中(每个消费者插入到不同的表中)。 是否有可能消费者会不同步(例如消费者的某些部分会停机很长一段时间),然后一个或多个消费者不会处理消息并插入到表中? 假设代码总是正确的,在按摩数据时不会出现异常。重要的是每条消息只处理一次。 我的问题是,K
我想描述以下场景:我有一个节点。js后端应用程序(它使用单线程事件循环)。这是系统的总体架构:Producer- 假设制作者向Kafka发送了一条消息,这条消息的目的是在数据库中进行某个查询并检索查询结果。但是,众所周知Kafka是一个异步系统。如果制作者向Kafka发送消息,它会得到一个响应,表明该消息已被Kafka经纪人接受。Kafka broker不会等到消费者轮询消息并处理它。 在这种情况
我在一个公认的缓慢配置中设置了Kafka——但我不期待我看到的数字。 我将集群设置为<code>LogAppendTime</code>,因此我正在测量事件写入Kafka(由代理决定)与服务接收到事件之间的时间。代理和应用程序都位于“同一位置”,因此服务器之间的ping时间很短,时钟应该同步或接近。 我看到延迟在 到 600ms 之间,很多是 ......巨大的差异让我觉得我的设置出了问题。它因消
在Apache Kafka 0.8.2 office文档的第5.6节“分销、消费者和消费者群体”小节中,它说 组中的使用者尽可能公平地划分分区,每个分区仅由一个消费组中的一个使用者使用。 但是我发现,在实践中,一个消费者组中的多个消费者可以通过从同一主题分区发送 FetchRequest 来使用单个分区中的数据。 在接下来的消费者身份证登记处小节中 除了由一个组中的所有使用者共享的group_id
我们所有的30个主题都是用kafka中的10个分区创建的。我们正在按分区监控所有主题/group p-id的滞后。 我们正在使用Fluentd插件从kafka读取和路由日志。该插件是使用高级消费者实现的。我们为插件的单个主题配置了一些消费者,为多个主题配置了一些消费者。总的来说,除了3个主题之外,数据正在流经,没有问题。 问题是,对于正在处理的30个主题中的3个,我们发现分区滞后值不一致,即查看特