当前位置: 首页 > 知识库问答 >
问题:

如何向Kafka Streams DSL处理器添加自定义StateStore?

傅兴平
2023-03-14

对于我的一个Kafka streams应用程序,我需要同时使用DSL和处理器API的特性。我的流媒体应用程序流是

source -> selectKey -> filter -> aggregate (on a window) -> sink

聚合之后,我需要向接收器发送单个聚合消息。因此我定义拓扑如下

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());
public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}

知道这里出了什么问题吗?

共有1个答案

钱澄邈
2023-03-14

您需要使用StreamsBuilder(或在较旧版本中使用KStreamBuilder)在处理器外部注册存储。首先创建存储,然后将其注册到StreamsBuilder(KStreamBuilder),添加处理器时提供存储名称以连接处理器和存储。

StreamsBuilder builder = new StreamsBuilder();

// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("invStore"),
    Serdes.String(),
    invSerde));
// register store
builder.addStateStore(storeBuilder);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name


// older API:

KStreamBuilder builder = new KStreamBuilder();

// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
    .withKeys(Serdes.String())
    .withValues(invSerde)
    .persistent()
    .build();
// register store
builder.addStateStore(storeSupplier);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
 类似资料:
  • 我的工具基本上读取PDF并在JTextArea中打印PDF的内容。在我的PDF包含阿拉伯语的PH Mirjan字体之前,一切正常。我的文本区域显示一些垃圾字符,如下所示。 我该如何解决这个问题? 我的文本区域的默认字体是Arial Unicode MS。我可以配置文本区域的字体吗?假设我在本地下载了PH Mirjan,如何将文本区域字体更改为下载的字体。非常感谢任何建议或参考链接。 编辑 这给了我

  • 我想添加“from”(date)来响应datepicker输入,但到目前为止,我有一些奇怪的行为。 我的代码: 到目前为止,我试过: 和 但这两种解决方案都将我的日期从“2020年9月14日16:43”转变为“从周一9月14 2020 17:19:38格林尼治标准时间0400(留尼汪岛)”,这是不必要的行为,因为我绝对想要法语日期。任何想法?谢啦

  • 在RestTemplate中,我有一个自定义拦截器,它将记录一些请求-响应详细信息并保存到数据库。 我的自定义拦截器: springboot中的RestTemboard bean配置: 将拦截器添加到restTemboard bean: 如何将此拦截器添加到佯装客户端? 正在应用中。yml: InterceptorOne为假装客户端中的每个请求添加标头: 但是我不能添加日志服务拦截器,因为它由于错

  • 问题内容: 我正在研究Spring Data JPA。考虑下面的示例,默认情况下我将使所有crud和finder功能正常工作,如果我想自定义finder,那么也可以在界面本身中轻松完成。 我想知道如何为上述AccountRepository的实现添加完整的自定义方法?由于它是一个接口,所以我不能在那里实现该方法。 问题答案: 你需要为自定义方法创建一个单独的接口: 并提供该接口的实现类:

  • 在中有一个方法,但它看起来不像是一个公共API,所以我宁愿不使用它。创建自定义指令并使用看起来是另一种选择,但基本上需要为每个自定义验证规则创建一个指令,而我不希望这样做。 实际上,在最简单的场景中,将控制器中的某个字段标记为无效(同时保持同步)可能是我完成任务所需要的,但我不知道如何做到这一点。

  • 搜索很好,它在工作,我正在过滤的交易类型(出售或出租)和房间数量在每个房地产。 但是我的JSON响应缺少很多字段,包括ACF。例如:{ “id”:149,“post_author”:“2”,“post_date”:“2016-03-03 23:53:39”,“post_date_gmt”:“2016-03-03 23:53:39”,“post_content”:“”post_title“:”opo