当前位置: 首页 > 知识库问答 >
问题:

使用kafka检测值的变化

卢和昶
2023-03-14

我有一个流应用程序,它连续接收坐标流以及一些自定义元数据,这些元数据还包括位字符串。该流使用生产者API生成到Kafka主题上。现在,另一个应用程序需要处理该流[Streams API],并存储位字符串中的特定位,并在该位更改时生成警报

下面是需要处理的连续消息流

{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

现在我想把这些提醒写到另一个Kafka主题,比如

{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

我可以使用以下方式将当前位保存在状态存储中

streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

但是我无法理解如何从现有位检测到这种变化。我需要在处理器拓扑中以某种方式查询状态存储吗?如果是?如何?如果不是?还能做什么?

任何我可以尝试的建议/想法(可能与我的想法完全不同)也非常感谢。我对Kafka还不熟悉,从事件驱动流的角度思考问题让我难以理解。

提前谢谢。

共有1个答案

陆啸
2023-03-14

我不确定这是最好的方法,但在类似的任务中,我使用了一个中间实体来捕捉状态变化。在你的情况下,会是这样

    streamsBuilder.stream("my-topic").groupByKey()
              .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
            public DeviceState apply(String key, Device newValue, DeviceState state) {
                if(!newValue.getStatusBit().equals(state.getStatusBit())){
                     state.setChanged(true);    
                }
                state.setStatusBit(newValue.getStatusBit());
                state.setDeviceId(newValue.getDeviceId());
                state.setKey(key);
                return state;
            }
        }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

在生成的主题中,您将看到更改。您还可以向DeviceState添加一些属性以首先对其进行初始化,具体取决于您是否要发送事件、第一条设备记录到达的时间等。

 类似资料:
  • 问题内容: 有什么方法可以在python中每次变量值更改时调用函数? 像听众一样吗? 具体来说,我指的 是 像GAE-Session这样的脚本之间 仅共享变量 的情况 。(使用Cookie,Memcache等共享数据) 示例:ScriptA和ScriptB,共享一个会话变量。当脚本B进行更改时,SctiptA必须调用一个方法来处理该更改。 问题答案: 使用属性。首先,可变值可以更改。

  • 问题内容: 是否可以使用JavaScript在iPad或Galaxy Tab上检测浏览器方向的变化?我认为使用CSS媒体查询是可能的。 问题答案: 更新: 您可能要签出 jQuery移动方向更改 或普通的JS之一: MDN: 较旧的答案 http://www.nczonline.net/blog/2010/04/06/ipad-web-development- tips/ iPad上的Safari

  • 我有一个父组件(CategoryComponent)、一个子组件(videoListComponent)和一个APIService。 我的大部分工作都很好,即每个组件都可以访问json api,并通过Observables获取其相关数据。 当前video list组件只获取所有视频,我想将其筛选为特定类别中的视频,我通过将categoryId传递给子对象实现了这一点。 CategoryCompon

  • app/movie.component.ts 让我们再回顾一下它背后的逻辑。 当用户单击按钮时,调用方法并更新actor对象的属性。 当变化检测分析绑定到AppComponent模板的属性时,它将看到与之前相同的景象: Is slogan !== previousSlogan No, it’s the same. Is ? No, it’s the same. 让我们重新运行应用程序,但这次我们将

  • 由于双向数据绑定的本质,在Angular 1中不能保证父节点在子节点之前总是被检查。 有可能子节点可以改变父节点或兄弟节点或树中的任何其他节点,这又会在链中触发新的更新。 这使得变化检测机制难以遍历所有节点,并可能掉入具有臭名昭着的“震荡”循环中: 在Angular 2中,改变被保证单向传播。 更改检测器将遍历每个节点一次,始终从根开始。 这意味着父组件始终在其子组件之前检查。

  • Figure: Change Detector by Vovka is licensed under Public Domain () 变化检测在旧版本的Angular和新版本之间发生了很大的变化。在Angular 1中,框架保留了一长串观察者(每个属性绑定到我们的模板),需要在每次 digest 循环开始时检查。这被称为脏检查,它是唯一可用的变化检测机制。 在Angular 2中,信息流是单向的