当前位置: 首页 > 知识库问答 >
问题:

如何从另一个流处理器访问KStreams物化状态存储

胡玉书
2023-03-14

我需要能够从单独的流处理器中删除Ktable中的记录。今天我使用aggregate()并传递一个物化状态存储。在一个从“终止”主题读取的单独处理器中,我想在.transform()或不同的.gaggregate()中查询实体化状态存储,并“移除”该键/值。每次我尝试从一个单独的流处理器访问物化状态时,它都会告诉我存储没有添加到拓扑中,所以我添加它并再次运行它,然后它会告诉我它已经注册,并且出错。

      builder.stream("topic1").map().groupByKey().aggregate(() -> null,
        (aggKey, newValue, aggValue) -> {
          //add to the Ktable
          return newValue;
        },
        stateStoreMaterialized);

在一个单独的流中,我想从该stateStoreMaterialized中删除一个键

builder.stream("topic2")
.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())

stateStoreDeleteTransformer 将查询密钥并将其删除。

//in ctor
        KeyValueBytesStoreSupplier stateStoreSupplier = Stores.persistentKeyValueStore("store1");
    stateStoreMaterialized = Materialized.<String, MyObj>as(stateStoreSupplier)
        .withKeySerde(Serdes.String())
        .withValueSerde(mySerDe);

我的topic1流对象值上没有可以触发删除的终端标志。它必须来自另一个流/主题。

当我尝试在两个独立的流处理器上使用同一个物化存储时,我得到。。

 Invalid topology: Topic STATE_STORE-repartition has already been registered by another source.
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:268)

编辑:

这是我收到的第一个错误。

由以下原因引起:org.apache.kafka.streams.errors.StreamsException:处理器 KSTREAM-TRANSFORMVALUES-0000000012 无法访问 StateStore store1,因为该存储未连接到处理器。如果您通过“.addStateStore()”手动添加存储,请确保通过将处理器名称提供给“.addStateStore()”来将添加的存储连接到处理器,或通过“.connectProcessorAndStateStores()”连接它们。DSL 用户需要向“.process()”、“.transform()”或“.transformValues()”提供存储名称,以便将存储连接到相应的运算符。如果您不手动添加商店,请在 https://issues.apache.org/jira/projects/KAFKA 提交错误报告。

位于org.apache.kafka.streams.processer.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:104)位于org.apache.cafka.stream.processer.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:85)

然后我这样做:

stateStoreSupplier = Stores.persistentKeyValueStore(STATE_STORE_NAME);
storeStoreBuilder = Stores.keyValueStoreBuilder(stateStoreSupplier, Serdes.String(), jsonSerDe);
stateStoreMaterialized = Materialized.as(stateStoreSupplier);

然后我得到这个错误:

由:org.apache.cafka.streams.errors引起。TopologyException:无效拓扑:已添加StateStore“state store”。位于org.apache.kafka.streams.processer.internal.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:520)

这是修复我问题的代码。事实证明,在建造溪流时,秩序很重要。必须先设置物化存储,然后在随后的代码行中设置转换器。

  /**
   * Create the streams using the KStreams DSL - a method to configure the stream and add any state stores.
   */
  @Bean
  public KafkaStreamsConfig setup() {

    final JsonSerDe<Bus> ltaSerde = new JsonSerDe<>(Bus.class);
    final StudentSerde<Student> StudentSerde = new StudentSerde<>();
    //start lta stream
    KStream<String, Bus> ltaStream = builder
        .stream(ltaInputTopic, Consumed.with(Serdes.String(), ltaSerde));

    final KStream<String, Student> statusStream = this.builder
        .stream(this.locoStatusInputTopic,
            Consumed.with(Serdes.String(),
                StudentSerde));

    //create lta store
    KeyValueBytesStoreSupplier ltaStateStoreSupplier = Stores.persistentKeyValueStore(LTA_STATE_STORE_NAME);

    final Materialized<String, Bus, KeyValueStore<Bytes, byte[]>> ltaStateStoreMaterialized =
        Materialized.
            <String, Bus>as(ltaStateStoreSupplier)
            .withKeySerde(Serdes.String())
            .withValueSerde(ltaSerde);

    KTable<String, Bus> ltaStateProcessor = ltaStream
        //map and convert lta stream into Loco / LTA key value pairs
        .groupByKey(Grouped.with(Serdes.String(), ltaSerde))
        .aggregate(
            //The 'aggregate' and 'reduce' functions ignore messages with null values FYI.
            // so if the value after the groupbykey produces a null value, it won't be removed from the state store.
            //which is why it's very important to send a message with some terminal flag indicating this value should be removed from the store.
            () -> null, /* initializer */
            (aggKey, newValue, aggValue) -> {
              if (null != newValue.getAssociationEndTime()) { //if there is an endTime associated to this train/loco then remove it from the ktable
                logger.trace("removing LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
                return null; //Returning null removes the record from the state store as well as its changelog topic. re: https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
              }
              logger.trace("adding LTA: {} loco from {} train", newValue.getLocoId(), newValue.getTrainAuthorization());
              return newValue;
            }, /* adder */
            ltaStateStoreMaterialized
        );

    // don't need builder.addStateStore(keyValueStoreStoreBuilder); and CANT use it
    // because the ltaStateStoreMaterialized will already be added to the topology in the KTable aggregate method above.
    // The below transformer can use the state store because it's already added (apparently) by the aggregate method.
    // Add the KTable processors first, then if there are any transformers that need to use the store, add them after the KTable aggregate method.


    statusStream.map((k, v) -> new KeyValue<>(v.getLocoId(), v))
        .transform(locoStatusTransformerSupplier, ltaStateStoreSupplier.name())
        .to("testing.outputtopic", Produced.with(Serdes.String(), StudentSerde));

    return this; //can return anything except for void.
  }

共有1个答案

高恺
2023-03-14

>

  • stateStoreMaterializedstateStoreSupplier。name()是否具有相同的名称?

    使用拓扑中有错误

    KStream.transform(stateStoreDeleteTransformer, stateStoreSupplier.name())
    

    您必须在TransformerSupplier中为每个ProcessContext提供StateStoreDeleteTransformer的新瞬间,如下所示:

    KStream.transform(StateStoreDeleteTransformer::new, stateStoreSupplier.name()) 
    or
    KStream.transform(() -> StateStoreDeleteTransformerSupplier.get(), stateStoreSupplier.name())//StateStoreDeleteTransformerSupplier return new instant of StateStoreDeleteTransformer
    
    
    public void init(ProcessorContext context) {
            kvStore = (KeyValueStore<String, MyObj>) context.getStateStore("store1");
        }
    

  •  类似资料:
    • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

    • 我正在学习与Akka溪流一起工作,并且真的很喜欢它,但是物化部分对我来说仍然有些神秘。 引用自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api ...通过对池客户端流具体化到的HostConnectionPo

    • 我想在状态尚未装入时访问状态的属性。 我想做4,但得到以下错误: P. S也不起作用。

    • 我是java新手,我不知道如何从另一个类访问变量 我正在尝试编写一个代码来发送带有未存储在本地的附件的邮件。我想访问SendMail类中ExcelFile类中编写的变量 如何在另一个类中访问excelFileAsByte并发送邮件而不将其存储在本地。我可以使用addBodyPart和ByteArrayResource将文件添加为附件吗。

    • 有没有减少代码冗余的方法?

    • 问题内容: 我基于两个不同的图像创建了两个docker容器。一个数据库,另一个用于网络服务器。这两个容器都在我的Mac OS X上运行。 我可以从主机访问数据库容器,也可以从主机访问Web服务器。 但是,如何从Web服务器访问数据库连接? 我启动数据库容器的方式是 我开始了wls容器作为 我可以通过连接到主机上的数据库 我可以以以下方式访问主机上的wls: 问题答案: 最简单的方法是使用–link