我需要能够从单独的流处理器中删除Ktable中的记录。今天我使用aggregate()并传递一个物化状态存储。在一个从“终止”主题读取的单独处理器中,我想在.transform()或不同的.gaggregate()中查询实体化状态存储,并“移除”该键/值。每次我尝试从一个单独的流处理器访问物化状态时,它都会告诉我存储没有添加到拓扑中,所以我添加它并再次运行它,然后它会告诉我它已经注册,并且出错。
builder.stream("topic1").map().groupByKey().aggregate(() -> null,
(aggKey, newValue, aggValue) -> {
//add to the Ktable
return newValue;
},
stateStoreMaterialized);
在一个单独的流中,我想从该stateStoreMaterialized中删除一个键
builder.stream("topic2")
.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())
stateStoreDeleteTransformer 将查询密钥并将其删除。
//in ctor
KeyValueBytesStoreSupplier stateStoreSupplier = Stores.persistentKeyValueStore("store1");
stateStoreMaterialized = Materialized.<String, MyObj>as(stateStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(mySerDe);
我的topic1流对象值上没有可以触发删除的终端标志。它必须来自另一个流/主题。
当我尝试在两个独立的流处理器上使用同一个物化存储时,我得到。。
Invalid topology: Topic STATE_STORE-repartition has already been registered by another source.
at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:268)
编辑:
这是我收到的第一个错误。
由以下原因引起:org.apache.kafka.streams.errors.StreamsException:处理器 KSTREAM-TRANSFORMVALUES-0000000012 无法访问 StateStore store1,因为该存储未连接到处理器。如果您通过“.addStateStore()”手动添加存储,请确保通过将处理器名称提供给“.addStateStore()”来将添加的存储连接到处理器,或通过“.connectProcessorAndStateStores()”连接它们。DSL 用户需要向“.process()”、“.transform()”或“.transformValues()”提供存储名称,以便将存储连接到相应的运算符。如果您不手动添加商店,请在 https://issues.apache.org/jira/projects/KAFKA 提交错误报告。
位于org.apache.kafka.streams.processer.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104)位于org.apache.cafka.stream.processer.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:85)
然后我这样做:
stateStoreSupplier = Stores.persistentKeyValueStore(STATE_STORE_NAME);
storeStoreBuilder = Stores.keyValueStoreBuilder(stateStoreSupplier, Serdes.String(), jsonSerDe);
stateStoreMaterialized = Materialized.as(stateStoreSupplier);
然后我得到这个错误:
由:org.apache.cafka.streams.errors引起。TopologyException:无效拓扑:已添加StateStore“state store”。位于org.apache.kafka.streams.processer.internal.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:520)
这是修复我问题的代码。事实证明,在建造溪流时,秩序很重要。必须先设置物化存储,然后在随后的代码行中设置转换器。
/**
* Create the streams using the KStreams DSL - a method to configure the stream and add any state stores.
*/
@Bean
public KafkaStreamsConfig setup() {
final JsonSerDe<Bus> ltaSerde = new JsonSerDe<>(Bus.class);
final StudentSerde<Student> StudentSerde = new StudentSerde<>();
//start lta stream
KStream<String, Bus> ltaStream = builder
.stream(ltaInputTopic, Consumed.with(Serdes.String(), ltaSerde));
final KStream<String, Student> statusStream = this.builder
.stream(this.locoStatusInputTopic,
Consumed.with(Serdes.String(),
StudentSerde));
//create lta store
KeyValueBytesStoreSupplier ltaStateStoreSupplier = Stores.persistentKeyValueStore(LTA_STATE_STORE_NAME);
final Materialized<String, Bus, KeyValueStore<Bytes, byte[]>> ltaStateStoreMaterialized =
Materialized.
<String, Bus>as(ltaStateStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(ltaSerde);
KTable<String, Bus> ltaStateProcessor = ltaStream
//map and convert lta stream into Loco / LTA key value pairs
.groupByKey(Grouped.with(Serdes.String(), ltaSerde))
.aggregate(
//The 'aggregate' and 'reduce' functions ignore messages with null values FYI.
// so if the value after the groupbykey produces a null value, it won't be removed from the state store.
//which is why it's very important to send a message with some terminal flag indicating this value should be removed from the store.
() -> null, /* initializer */
(aggKey, newValue, aggValue) -> {
if (null != newValue.getAssociationEndTime()) { //if there is an endTime associated to this train/loco then remove it from the ktable
logger.trace("removing LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
return null; //Returning null removes the record from the state store as well as its changelog topic. re: https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
}
logger.trace("adding LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
return newValue;
}, /* adder */
ltaStateStoreMaterialized
);
// don't need builder.addStateStore(keyValueStoreStoreBuilder); and CANT use it
// because the ltaStateStoreMaterialized will already be added to the topology in the KTable aggregate method above.
// The below transformer can use the state store because it's already added (apparently) by the aggregate method.
// Add the KTable processors first, then if there are any transformers that need to use the store, add them after the KTable aggregate method.
statusStream.map((k, v) -> new KeyValue<>(v.getLocoId(), v))
.transform(locoStatusTransformerSupplier, ltaStateStoreSupplier.name())
.to("testing.outputtopic", Produced.with(Serdes.String(), StudentSerde));
return this; //can return anything except for void.
}
>
是stateStoreMaterialized
和stateStoreSupplier。name()
是否具有相同的名称?
使用拓扑中有错误
KStream.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())
您必须在TransformerSupplier中为每个ProcessContext提供StateStoreDeleteTransformer的新瞬间,如下所示:
KStream.transform(StateStoreDeleteTransformer::new, stateStoreSupplier.name())
or
KStream.transform(() -> StateStoreDeleteTransformerSupplier.get(), stateStoreSupplier.name())//StateStoreDeleteTransformerSupplier return new instant of StateStoreDeleteTransformer
public void init(ProcessorContext context) {
kvStore = (KeyValueStore<String, MyObj>) context.getStateStore("store1");
}
我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者
我正在学习与Akka溪流一起工作,并且真的很喜欢它,但是物化部分对我来说仍然有些神秘。 引用自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api ...通过对池客户端流具体化到的HostConnectionPo
我想在状态尚未装入时访问状态的属性。 我想做4,但得到以下错误: P. S也不起作用。
有没有减少代码冗余的方法?
我是java新手,我不知道如何从另一个类访问变量 我正在尝试编写一个代码来发送带有未存储在本地的附件的邮件。我想访问SendMail类中ExcelFile类中编写的变量 如何在另一个类中访问excelFileAsByte并发送邮件而不将其存储在本地。我可以使用addBodyPart和ByteArrayResource将文件添加为附件吗。
问题内容: 我基于两个不同的图像创建了两个docker容器。一个数据库,另一个用于网络服务器。这两个容器都在我的Mac OS X上运行。 我可以从主机访问数据库容器,也可以从主机访问Web服务器。 但是,如何从Web服务器访问数据库连接? 我启动数据库容器的方式是 我开始了wls容器作为 我可以通过连接到主机上的数据库 我可以以以下方式访问主机上的wls: 问题答案: 最简单的方法是使用–link