在使用kafka streams 时使用了globalktable,需要用到statestore,在使用过程中出现如下异常:
Exception in thread "mywordcount-27abf25a-4a79-47ac-8114-86694871acee-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=student-grade, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store student-course-statestore is currently closed.
at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:71)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:156)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:137)
at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
... 5 more
Exception in thread "mywordcount-27abf25a-4a79-47ac-8114-86694871acee-GlobalStreamThread" org.apache.kafka.streams.errors.StreamsException: Updating global state failed. You can restart KafkaStreams to recover from this error.
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:250)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {student-course-0=5}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
... 1 more
原因是在连接kafka服务时没有配置statestore的存储路径。
解决方法:
prop.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\IT\\tool\\kafka-state-store");//设置状态仓库的存储路径