在Kafka中创建主题后,您可以创建任意数量的消费者组,只需尝试使用这些组来阅读主题。
我想创建一个额外的消费者组来监控真实消费者组的消息内容——一个用来偷看他们消息的组。因此,GUI会让您单击任何消费者组的“偷看”,“偷看”组的偏移量将更新为被监控组的偏移量,然后它会向您显示该偏移量中的消息。
不过我很困惑,因为你不能在第一次就明确地创建一个消费者群体。您似乎必须阅读一条消息才能获得在动物园管理员中创建的偏移节点。
我的问题
有没有一种方法可以显式地创建一个指向特定偏移量的使用者组,或者可以为一个尚未使用的使用者组手动创建zookeeper节点,以便将其初始化为正确的偏移量值?或者这种自动创建会打乱消费者群体的分配过程吗?
如上所述,Kafka Manager有一个非常好的界面,值得您花时间进行设置。但是如果你想要CLI版本,正如我所需要的,下面的应该可以:
groupId="legitGroupId"
kafka="localhost:9092"
declare -a topics=(
"topic1"
"topic2"
)
# Create a single consumer of all the topics which starts starts at each topics latest offset
# Use --dry-run instead of --execute to see how the end results will look
for topic in "${topics[@]}"; do
echo "Adding consumer to $topic"
kafka-consumer-groups --bootstrap-server $kafka --topic $topic --group $groupId --execute --reset-offsets --to-latest
echo ""
done
你可以看看Kafka Web控制台项目,它已经做了一些和你描述的类似的事情。
如果您想自己做这件事,您需要使用简单的消费者API,并手动处理新消费者组(存储在Zookeeper或其他地方)的偏移量。您可以从现有消费者组获取当前偏移量,然后使用相同的偏移量为您的peek组读取消息。只要组ID不同,它们就不应该相互干扰或搞砸任何事情。
对于读者,不再支持Kafka Web 控制台。请考虑Kafka经理。
我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不
在一个消费者群体中的所有消费者都失败后,kafka会将该消费者群体的补偿存储多长时间?是否有此配置变量?
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我有一个kafka主题,有25个分区,集群已经运行了5个月。 根据我对给定主题的每个分区的理解,偏移量从0,1,2开始...(无界) 我看到log-end-offset值很高(现在- 我创建了一个新的消费群体,偏移设置为最早;因此,我预期该消费者组的客户端将从偏移量0开始的偏移量。 我用来创建一个偏移量为最早的新消费者组的命令: 我看到正在创建消费者组。我预计当前偏移量为0;然而,当我描述消费者组