当前位置: 首页 > 知识库问答 >
问题:

Kafka流中的状态过滤/平坦MapValue?

盖和泰
2023-03-14

我正在尝试编写一个简单的Kafka Streams应用程序(目标是Kafka 2.2/Confluent 5.2),将一个至少有一次语义的输入主题转换为一个恰好只有一次的输出流。我想对以下逻辑进行编码:

  • 对于具有给定密钥的每条消息:

(这是基于我们从上游系统获得的订购保证来保证提供正确的结果;我不想在这里做任何神奇的事情。)

起初,我以为我可以用Kafka StreamsflatMapValues操作符来实现这一点,它可以让你用同一个键将每个输入消息映射到零个或多个输出消息。然而,该文件明确警告:

这是一个无状态的逐条记录操作(参见有状态值转换的变换值(ValueTransformerProviier,String...))。

这听起来很有希望,但是transformValues留档并没有明确说明如何在每个输入消息中发出零个或一个输出消息。除非这就是示例中的//或null的意思?

平变换看起来也很有希望,但是我不需要操作密钥,如果可能的话,我希望避免重新分区。

有人知道如何正确html" target="_blank">执行这种过滤吗?

共有1个答案

牟黎昕
2023-03-14

如上所述,您可以使用变压器来实现有状态操作。为了不向下游传播消息,您需要从变换方法返回空值,这在变压器java文档中提到。您可以通过进程Context.forward(键,值)来管理传播。下面提供了简化的示例

kStream。变换(()-

public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, String> keyValueStore;

    public DemoTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        String existingValue = keyValueStore.get(key);
        if (/* your condition */) {
            processorContext.forward(key, value);
            keyValueStore.put(key, value);
        }

        return null;
    }

    @Override
    public void close() {
    }
}

 类似资料:
  • 我一直在检查Kafka流。我一直在测试下面的Kafka流代码 生产者主题:(这是第一个生产者主题-发送以下json数据) JSON-主题的生产者: Stream Topic代码:(这是第二个Streaming代码和主题) 如果UserID值为“1”,我想对其进行归档,然后将该数据发送到目标流媒体主题。 当我使用“.filter”并打印System.out时。println(“value:”valu

  • 问题内容: 我将ReactJs与Redux一起使用,在一些教程和代码上,我看到人们建议并使用normalizr 保持状态平坦 。但是,保持平坦的真正优势是什么?如果没有,我会遇到任何问题吗?有必要吗 ? 问题答案: 三个主要原因: 不变地更新嵌套的Javascript对象通常会导致难以维护的丑陋代码,除非您使用实用程序库来打包过程 不变地更新嵌套数据要求您返回嵌套层次结构中所有项目的新副本。由于组

  • 我有4个单一分区和应用程序的三个实例的主题。我试图通过编写一个自定义的PartitionGrouper来实现可伸缩性,它将创建如下3个任务: 第一个实例-topic1,分区0,topic4,分区0 第二个实例-主题2,分区0 第三实例-桌面3,分区0 我将NUM_STANDBY_REPLICAS_CONFIG配置为1,因为它将在本地维护状态(也可以消除invalidstatestore异常)。 上

  • 问题内容: 我正在阅读“过滤器” 部分(https://docs.angularjs.org/guide/filter#stateful- filters )上的AngularJS开发人员指南,并遇到了“状态过滤器”。 该描述如下: 强烈建议不要编写有状态的过滤器,因为Angular无法优化它们的执行,这通常会导致性能问题。只需将隐藏状态公开为模型并将其转换为过滤器的参数,即可将许多有状态过滤器转

  • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将