我有一个Kafka Streams应用程序,其中我将读取“topic1”的KStream与读取“topic2”的GlobalKTable连接起来,然后再与读取“topic3”的GlobalKTable连接起来。
当我尝试同时推送消息到所有3个主题时,我会得到以下异常-
org.apache.kafka.streams.errors.invalidStateStoreException
如果我在这些主题中逐个推送消息,即在topic2中推送消息,然后在topic3中推送消息,然后在topic1中推送消息,那么我不会得到这个异常。
在启动KafkaStreams之前,我还添加了StateListener
KafkaStreams.StateListener stateListener = new KafkaStreams.StateListener() {
@Override
public void onChange (KafkaStreams.State newState, KafkaStreams.State oldState) {
if(newState == KafkaStreams.State.REBALANCING) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
streams.setStateListener(stateListener);
streams.start();
同时,我会通过调用以下方法等到流启动后存储可以查询
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (final InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
下面是Kafka Streams和GlobalKTable连接代码:
KStream<String, GenericRecord> topic1KStream =
builder.stream(
"topic1",
Consumed.with(Serdes.String(), genericRecordSerde)
);
GlobalKTable<String, GenericRecord> topic2KTable =
builder.globalTable(
"topic2",
Consumed.with(Serdes.String(), genericRecordSerde),
Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as("topic2-global-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericRecordSerde)
);
GlobalKTable<String, GenericRecord> topic3KTable =
builder.globalTable(
"topic3",
Consumed.with(Serdes.String(), genericRecordSerde),
Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as("topic3-global-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericRecordSerde)
);
KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join(
topic2KTable,
(topic2Id, topic1Obj) -> topic1.get("id").toString(),
(topic1Obj, topic2Obj) -> new MergedObj(topic1Obj, topic2Obj)
);
final KStream<String, GenericRecord> enrichedStream =
stream_topic1_topic2.join(
topic3KTable,
(topic2Id, mergedObj) -> mergedObj.topic3Id(),
(mergedObj, topic3Obj) -> new Enriched(
mergedObj.topic1Obj,
mergedObj.topic2Obj,
topic3Obj
).enrich()
);
enrichedStream.to("enrichedStreamTopic", Produced.with(Serdes.String(),getGenericRecordSerde()));
上面的代码与此非常相似。
当我尝试将消息同时推送到所有三个主题时,我会遇到以下异常:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1,
partition=1, offset=61465,
stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException:
Store topic2-global-store is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245)
at
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
我用auto.register.schemas=false
修复了代码中的问题,因为我手动注册了所有主题的模式。
在我设置auto.register.schemas=true
并重新运行streams应用程序之后,它工作得很好。我认为它的内部话题需要这面旗帜。
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我试图用GlobalKTable连接KStream,连接不完全在键上。 我想通过empIdOverLoginUserId的值通过employeesDetails的键将empIdOverLoginUserId与employeesDetails连接
我目前正在尝试使用KStream到KTable的连接来执行Kafka主题的充实。对于我的概念证明,我目前有一个Kafka流,其中有大约600,000条记录,它们都有相同的键,还有一个KTable,它是从一个主题创建的,其中KTable主题中的键与创建KStream的主题中的600,000条记录中的键匹配。 当我使用左联接(通过下面的代码)时,所有记录在ValueJoiner上都返回NULL。 下面
我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st
我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。
我对Kafka的溪流很陌生。我想执行以下KStream-GlobalKTable纯基于DSL的左联接操作,而不使用map操作。 和另一个输入主题,它是 ,其中value: 我要执行左联接操作是一个流,主数据是一个全局表,以实现结果值为 连接条件为 代码: