我尝试在scala中实现一个非常简单的Kafka(0.9.0.1)消费者(代码如下)。
据我理解,Kafka(或者更好地说是Zookeeper)为每个groupId存储给定主题的最后一条消费消息的偏移量。因此,在以下情况下:
groupId1
的使用者,昨天只消耗了主题中的 5 条消息。现在最后使用的消息的偏移量为 4(考虑偏移量为 0 的第一条消息)groupId1
,将有两个选项:选项1:如果我将以下属性设置为“最新”
,则消费者将读取夜间到达的最后2条新消息:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
选项2:如果我将以下属性设置为“最早”
,消费者将阅读主题中的所有7条消息:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
问题:由于某种原因,如果我将使用者的 groupId 更改为 groupId2
,那就是给定主题的新 groupId,因此它以前从未使用过任何消息,其最新偏移量应为 0。我期待通过设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
使用者将在第一次执行期间读取存储在主题中的所有消息(相当于具有最早的消息)。然后对于后续执行,它将只消耗新的执行。然而,事实并非如此。
如果我设置一个新的 groupId
并AUTO_OFFSET_RESET_CONFIG
保持为最新版本
,则使用者无法读取任何消息。然后我需要做的是将第一次运行设置为最早
AUTO_OFFSET_RESET_CONFIG
,一旦 groupID 的偏移量已经不同于 0,我就可以移动到最新版本
。
这就是我的消费者应该如何工作吗?有没有比在第一次运行使用者后切换AUTO_OFFSET_RESET_CONFIG
更好的解决方案?
下面是我作为一个简单消费者使用的代码:
class KafkaTestings {
val brokers = "listOfBrokers"
val groupId = "anyGroupId"
val topic = "anyTopic"
val props = createConsumerConfig(brokers, groupId)
def createConsumerConfig(brokers: String, groupId: String): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props
}
def run() = {
consumer.subscribe(Collections.singletonList(this.topic))
Executors.newSingleThreadExecutor.execute( new Runnable {
override def run(): Unit = {
while (true) {
val records = consumer.poll(1000)
for (record <- records) {
println("Record: "+record.value)
}
}
}
})
}
}
object ScalaConsumer extends App {
val testConsumer = new KafkaTestings()
testConsumer.run()
}
这被用作编写这个简单消费者的参考
在我的测试中,我通常希望从最早的偏移量读取,但正如所指出的,一旦您读取了具有给定groupId的消息,那么您的偏移量将保持在该指针上。我这样做:
< code > properties . put(consumer config。GROUP_ID_CONFIG,uuid . randomuuid());
这是按照记录工作的。
如果您启动一个新的消费者组(即 Kafka 中没有存储现有偏移量的消费者组),您必须选择消费者是从最早可能的消息(主题中仍然可用的最旧消息)还是从最新消息(仅从现在开始生成的消息)开始。
有没有比第一次运行消费者后切换AUTO_OFFSET_RESET_CONFIG更好的解决方案?
您可以将其保持在最早,因为第二次运行消费程序时,它已经存储了偏移量,只需在那里拾取即可。重置策略仅在创建新用户组时使用。
今天我重新启动消费者,使用相同的groupId1,将有两个选项:
不是真的。由于消费者组在前一天运行,因此它将找到其已提交的补偿,并从停止的位置恢复。因此,无论您将重置策略设置为什么,它都将收到这两条新消息。
尽管意识到,Kafka不会永远存储这些偏移量,我相信默认值只有一周。因此,如果你关闭消费者的时间超过了这一时间,那么补偿可能会过时,你可能会意外地重置为EARLIEST(这对于大型主题来说可能会很昂贵)。鉴于此,无论如何,将其更改为最新版本可能是明智的。
我正在学习Kafka,如果有人能帮助我理解一件事。“制作人”向Kafka主题发送消息。它会在那里停留一段时间(默认为7天,对吗?)。 但是“消费者”收到这样的信息,永远保持它在那里没有多大意义。我预计当消费者收到它们时,这些信息会消失。否则,当我再次连接到Kafka时,我将再次下载相同的消息。所以我必须管理重复的避免。 它背后的逻辑是什么? 问候
我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...
问题内容: 我只是将应用程序主题设置为Synthetica Alu Oxide,但由于某些原因,JFrame不会重新绘制,而另一个Synthetica主题将重新绘制JFrame。 这就是我的样子。 http://i.imgur.com/SOBDTs4.png 这就是它的样子。 http://www.jyloo.com/images/screenshots/syntheticaAluOxide/de
我有一个kafka主题,有25个分区,集群已经运行了5个月。 根据我对给定主题的每个分区的理解,偏移量从0,1,2开始...(无界) 我看到log-end-offset值很高(现在- 我创建了一个新的消费群体,偏移设置为最早;因此,我预期该消费者组的客户端将从偏移量0开始的偏移量。 我用来创建一个偏移量为最早的新消费者组的命令: 我看到正在创建消费者组。我预计当前偏移量为0;然而,当我描述消费者组
我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到