当前位置: 首页 > 知识库问答 >
问题:

Kafka比较键的连续值

楚浩然
2023-03-14

我们正在构建一个应用程序来从传感器获取数据。数据流传输到Kafka,消费者将从Kafka发布到不同的数据商店。每个数据点都有多个表示传感器状态的属性。

在其中一个消费者中,我们希望仅当值发生更改时才将数据发布到数据存储。例如,如果有温度传感器,每10秒轮询一次数据,我们希望收到如下数据:

----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}

在上述情况下,只应发布第一条记录和第三条记录。

为此,我们需要某种方法来比较键的当前值与具有相同键的先前值。我认为这在KTable或KStream中应该是可能的,但找不到示例

任何帮助都会很棒!

共有3个答案

洪伟兆
2023-03-14

如果要对Kafka流执行此操作,必须使用处理器API。

您需要使用状态存储实现自定义变压器。对于每条消息,您应该在状态存储中搜索值,如果它已更改或不存在,您应该返回新值,否则为null。除此之外,您还应该将该值保存在状态存储中(KeyValueStore::put(…)

可以在此处找到有关处理器API的更多信息

空枫涟
2023-03-14

您可以使用Kafka流处理器API。可以将本地键值存储设置为状态上下文。为获取的每个记录调用process函数。

在process function中,您可以对照存储的最后一个值进行检查,并根据业务逻辑(在您的情况下,比较温度值)接受或拒绝最新记录。

在标点功能中,您可以按时间表将记录转发给消费者。请参阅下面的示例代码(不带标点符号)

java prettyprint-override">public class SensorProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, String> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;

        // retrieve the key-value store named "SensorData"
        kvStore = (KeyValueStore) context.getStateStore("SensorData");

        // schedule a punctuate() method every second based on event-time
      
    }

    @Override
    public void process(String sensorName, String sensorData) {
      
        String oldValue = this.kvStore.get(sensorName);

        if (oldValue == null) {
            this.kvStore.put(sensorName, sensorData);
        } else {
            //Put the business logic for comparison
            //compare temperatures
            //if required put the value
            this.kvStore.put(sensorName, sensorData);

            //Forward it o consumer
            context.forward(sensorName, sensorData);
        }
        context.commit();
    }

    @Override
    public void close() {
        // nothing to do
    }
}
纪实
2023-03-14

下面是一个如何使用KStream#transformValues()解决此问题的示例。

java prettyprint-override">StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);

将该记录与状态存储中存储的上一个记录进行比较。如果温度不同,则从状态存储返回记录,并将当前记录存储在状态存储中。如果温度相等,则丢弃当前记录。

 类似资料:
  • 主要内容:1.架构,2.消息存储模型,3.消息消费模型,4. 多租户,5.运维1.架构 1.1 Kafka brocker和zk组成 1.2 Pulsar Pulsar Broker会在本地缓存消息,并且支持TTL, Pulsar 通过分层架构, 将计算和存储分离, 存储采用BookKeeper集群, 计算使用Broker集群, Brocker 需要内置BookKeeper 客户端 Pulsar的部署和架构更加复杂,但是也更具有伸缩性。 2.消息存储模型 2.1 Kafka

  • 假设我有一个双人课 我希望对它进行排序,首先是第一个值,然后是第二个值。现在,如果我这样做 一切都很好,列表按对的第一个值排序,但如果我这样做 它因错误而失败 好吧,所以它可能无法推断参数,所以如果我这样做 它因错误而失败 为什么它适用于comparing()而不适用于comparing()。然后比较()?

  • 本文向大家介绍比较RabbitMQ与Apache Kafka相关面试题,主要包含被问及比较RabbitMQ与Apache Kafka时的应答技巧和注意事项,需要的朋友参考一下 答:Apache Kafka的另一个选择是RabbitMQ。那么,让我们比较两者: 功能 Apache Kafka– Kafka是分布式的、持久的和高度可用的,这里共享和复制数据 RabbitMQ中没有此类功能 性能速度 A

  • 问题内容: 我试图用Java编写一个程序,该程序涉及使对象 从一次按键操作中不断移动。想一想吃豆子,在其中按下一次, 吃豆子继续上升直到您按下另一个键。 如果可能的话,我想保持代码简单。我的原始动作(一键按下=一动作)是这样的 : 值中的x和y是椭圆的位置。这完美地工作了,但是我 希望它只按一次 就可以继续运动,而不必按住它来保持运动。我尝试了一个带有布尔 参数的while循环,该布尔参数在tru

  • 我有3个< code > edittext(et1,et2,et3),彼此相邻排列,其中< code>maxLength为“1”。 我的要求是: > < li> 当在编辑文本中键入字母时(只能包含1个字符,因为最大长度为1),光标会自动转到下一个编辑文本。 类似地,当用户单击et2上的back时,光标应该转到et1。 编辑文本将按顺序填写。首先是第一个,然后是第二个,最后是第三个。因此,即使用户单

  • 我有一个关于compareTo函数如何帮助比较器排序的问题,即o1。比较(o2)与o2。比较(o1) 如果两个字符串相等,则此方法返回0,否则返回正值或负值。如果第一个字符串在词典上大于第二个字符串,则结果为正,否则结果为负。 上面的陈述很简单,但是为什么o1.compare(o2)会给我一个升序,而o2.compare(o1)给了我一个降序? 如果我有整数值“5,10,3”,我得到3,5,10和