我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理:
@Configuration
class EventKStreamConfiguration {
private val logger = LoggerFactory.getLogger(javaClass)
@StreamListener
fun process(@Input("event") eventStream: KStream<String, EventReceived>) {
eventStream.foreach { key, value ->
logger.info("--------> Processing Event {}", value)
// Save in DB
}
}
}
该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是:
spring:
application:
name: events-processor
cloud:
stream:
schema-registry-client:
endpoint: ${schema-registry-url:http://localhost:8081}
kafka:
streams:
binder:
brokers: ${kafka-brokers:localhost}
configuration:
application:
id: ${spring.application.name}
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
processing:
guarantee: exactly_once
bindings:
event:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
event:
destination: event
data:
mongodb:
uri: ${mongodb-uri:mongodb://localhost/test}
server:
port: 8085
logging:
level:
org.springframework.kafka.config: debug
---
spring:
profiles: confluent-cloud
cloud:
stream:
kafka:
streams:
binder:
autoCreateTopics: false
configuration:
retry:
backoff:
ms: 500
security:
protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas:
config: xxx
basic:
auth:
credentials:
source: USER_INFO
schema:
registry:
basic:
auth:
user:
info: yyy
2019-07-19 10:20:17.120 INFO 82473 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating restore consumer client
2019-07-19 10:20:17.123 INFO 82473 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
2019-07-19 10:20:17.235 INFO 82473 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating consumer client
2019-07-19 10:20:17.241 INFO 82473 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
2019-07-19 10:20:31.577 INFO 82473 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2019-07-19 10:20:31.578 INFO 82473 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f] State transition from REBALANCING to RUNNING
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-3 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-0 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-1 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-5 to offset 0.
2019-07-19 10:20:31.670 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-4 to offset 0.
>
配置了两个消费者的原因是什么?
为什么第二个函数具有auto.offset.reset=aresty
,而我没有显式配置它,而且Kafka的默认值是最新的?
我已经简化了这个场景,这次使用了一个原生的Kafka Streams应用程序。这种行为与用Spring云流观测到的完全相同。但是,检查消费者组和分区,我发现这是有意义的。
KStream:
fun main() {
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = "streams-wordcount"
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
val builder = StreamsBuilder()
val source = builder.stream<String, String>("streams-plaintext-input")
source.foreach { key, value -> println("$key $value") }
val streams = KafkaStreams(builder.build(), props)
val latch = CountDownLatch(1)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(object : Thread("streams-wordcount-shutdown-hook") {
override fun run() {
streams.close()
latch.countDown()
}
})
try {
streams.start()
latch.await()
} catch (e: Throwable) {
exitProcess(1)
}
exitProcess(0)
}
这就是我所看到的:
07:55:03.885 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-4 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
streams-plaintext-input 0 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 5 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 1 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 2 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 3 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 4 1 1 0 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
07:57:39.477 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
streams-plaintext-input 0 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 5 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 1 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 2 1 1 0 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 3 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 4 1 1 0 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
08:00:42.313 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.
还原使用者客户端
是用于容错和状态管理的专用使用者。它负责从changelog主题还原状态。它与应用程序使用者客户端分开显示。您可以在这里找到更多信息:https://docs.confluent.io/current/streams/monitoring.html#kafka-restore-consumer-client-id
您是对的,auto.offset.reset
在Kafka消费者中默认值为lates
。但是在Spring Cloud Stream中,使用者startoffset
的默认值是rease
。因此,它在第二个使用者中显示最早
。它还依赖于spring.cloud.stream.bindings.
绑定。如果显式设置,则startoffset
被设置为aresty
,否则为Anonymous
使用者设置为lates
。
对于Anonymous
使用者组,StartoffSet
的默认值将为Lates
。
我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350
我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这
我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了
我有一个Kafka主题和一个消费者,在Spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从一开始就开始读取所有接收到的消息。这应该是通过属性实现的,但从这个问题可以清楚地看出,它目前不起作用。 我在kafka消费者api中发现了这个变通方法,它建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。在Spring Cloud St
我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?
将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?