我以前从没用过Kafka。我有两个测试程序访问本地Kafka实例:一个读卡器和一个写卡器。我试图调整我的制作人、消费者和Kafka服务器设置,以获得特定的行为。
我的作者:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
rand.Seed(time.Now().UnixNano())
topics := []string{
"policymanager-100",
"policymanager-200",
"policymanager-300",
}
progress := make(map[string]int)
for _, t := range topics {
progress[t] = 0
}
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0",
})
if err != nil {
panic(err)
}
defer producer.Close()
fmt.Println("producing messages...")
for i := 0; i < 30; i++ {
index := rand.Intn(len(topics))
topic := topics[index]
num := progress[topic]
num++
fmt.Printf("%s => %d\n", topic, num)
msg := &kafka.Message{
Value: []byte(strconv.Itoa(num)),
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
err = producer.Produce(msg, nil)
if err != nil {
panic(err)
}
progress[topic] = num
time.Sleep(time.Millisecond * 100)
}
fmt.Println("DONE")
}
我的本地kafka上存在三个主题:政策管理器-100、政策管理器-200、政策管理器-300。它们每个都只有1个分区,以确保所有消息都按kafka收到它们的时间进行排序。我的作者将随机选择其中一个主题并发布一条由一个数字组成的消息,该数字仅为该主题递增。运行完成后,我希望队列看起来像这样(主题名称缩短以适应易读性):
100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12
到目前为止还不错。我正在尝试配置东西,以便任意数量的消费者都可以旋转并按顺序使用这些消息。“按顺序”的意思是,在消息1完成(而不是刚刚开始)之前,任何消费者都不应该收到主题100的消息2。如果正在处理主题100的消息1,消费者可以自由地从当前没有消息正在处理的其他主题中消费。如果主题的消息已发送给消费者,则整个主题应被“锁定”,直到超时假定消费者失败或消费者提交消息,然后该主题被“解锁”以使其下一条消息可供消费。
我的读者:
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
count := 2
for i := 0; i < count; i++ {
go consumer(i + 1)
}
fmt.Println("cosuming...")
// hold this thread open indefinitely
select {}
}
func consumer(id int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0", // strconv.Itoa(id),
"enable.auto.commit": "false",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
for {
msg, err := c.ReadMessage(-1)
if err != nil {
panic(err)
}
fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
time.Sleep(time.Second)
_, err = c.CommitMessage(msg)
if err != nil {
fmt.Printf("ERROR commiting: %+v\n", err)
}
}
}
根据我目前的理解,实现这一目标的方法可能是正确设置我的消费者。我尝试过这个程序的许多不同版本。我试着让我所有的goroutines共享同一个消费者。我试过使用不同的组。每个goroutine的id
。这些都不是我想要的行为的正确配置。
发布的代码所做的是一次清空一个主题。尽管有多个goroutine,但该过程将读取所有100个,然后移动到200个,然后300个,并且只有一个goroutine实际上会执行所有读取。当我让每个goroutine有不同的group.id
时,消息会被多个goroutine读取,我想阻止这一点。
我的示例消费者只是简单地将goroutines分解,但当我开始在工作中使用这个项目到我的用例中时,我需要它来跨多个kubernetes实例工作,这些实例不会相互交谈,所以当两个kubes上有两个实例时,使用goroutines之间交互的任何东西都不会起作用。这就是为什么我希望让Kafka做我想要的把关。
一般来说,你不能。即使只有一个使用者使用了该主题的所有分区,这些分区也会以不确定的顺序使用,并且不能保证所有分区的总顺序。
尝试键入消息,你可能会发现这对你的用例很有用。
我知道,如果我们在消费者组中有多个分区和几乎相同数量的消费者,那么处理速度会加快。如果我们想保持事件的顺序并在收到每个事件时处理它,我们如何使用多个分区和消费者来实现这一点。 在我的用例中,按顺序处理事件非常关键,否则系统会崩溃。我想使用多个分区来增加并行性,但不知何故“让它们按顺序”。
我有一个问题,我做了一个apache kafka消费者在Spring Boot消费3个不同的主题。但是我需要先使用来自第一个主题的所有数据,然后使用来自以下主题的数据,有什么方法可以做到这一点吗?还是你总是用同样的方式读它们?
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较