当前位置: 首页 > 知识库问答 >
问题:

在 apache kafka flink 中运行时添加新规则

方长卿
2023-03-14

我正在使用Flink Kafka在流上应用规则。以下是示例代码

ObjectMapper mapper = new ObjectMapper();
List<JsonNode> rulesList = null;
try {
    // Read rule file
    rulesList = mapper.readValue(new File("ruleFile"), new TypeReference<List<JsonNode>>(){});

} catch (IOException e1) {
    System.out.println( "Error reading Rules file.");
    System.exit(-1);
}


for (JsonNode jsonObject : rulesList) {
    String id = (String) jsonObject.get("Id1").textValue();

    // Form the pattern dynamically
    Pattern<JsonNode, ?> pattern = null;
    pattern = Pattern.<JsonNode>begin("start").where(new SimpleConditionImpl(jsonObject.get("rule1")));
    // Create the pattern stream
    PatternStream<JsonNode> patternStream = CEP.pattern(data, pattern);

}

但问题是,当我们启动程序时,FlinkKafka只读取文件一次,我希望新规则在运行时动态添加并应用于流。

在《Flink·Kafka》中,我们有没有办法实现这一点?

共有1个答案

毋宪
2023-03-14

Flink 的 CEP 库(尚未)支持动态模式。(请参阅 FLINK-7129。

标准方法是使用广播状态在整个集群中通信和存储规则,但是您必须想出一些方法来评估/执行规则。

参见https://training.da-platform.com/exercises/taxiQuery.html和https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/examples/datastream_java/broadcast/BroadcastState.java示例。

 类似资料:
  • 有没有什么方法可以动态编辑加载到Drools中的规则,而不需要重新加载新的DRL文件? 我们试图使用Drools作为规则引擎,但在我们的用例中,规则的添加和删除非常频繁,我们希望避免每次发生这种情况时都必须重新加载整个.drl文件。

  • 我知道ApplicationContext扩展点,如ApplicationContext事件和BeanFactoryPostProcessor。 我手头的问题是,在创建了一些bean之后,我需要添加bean,我想这会丢弃BeanFactoryPostProcessor选项,因为在应用程序上下文开始注册bean之前会发生这种情况。 我尝试在上下文刷新后添加一个singletonBean:

  • 是否有可能在运行时为本机库添加新路径?。(而不是使用Java.library.path属性启动Java),因此在尝试查找时,调用将包含该路径。这是可能的,还是一旦JVM启动,这些路径就被冻结了?

  • 问题内容: 到目前为止,我还没有找到在Drools 6.0.0中将规则添加到正在运行的KieSession的最佳方法(即以最小的开销),而仍然将我的事实保留在KieSession中。在Drools 5中,当更改KBase时更新了KSession,但对于Drools 6而言却并非如此,因为我的规则未在KieBase中创建。有没有一种方法可以替换整个KieFileSystem中的整个KieModule

  • 我已经看到了这个答案,并且正在使用它所建议的解决方案,但仍然得到相同的错误。 我有一个对用户表有外键约束的代理表,这是代理表: 这是我的实体: 现在,我想为现有用户添加一个新代理: <代码>\u上下文。SaveChanges() 行引发以下异常: "无法添加或更新子行:外键约束失败(\"dbName\".\"agent\", CONSTRAINT\"FK_Agency_User\"FOREIGN

  • 问题内容: 如何在Java运行时添加camel路线?我找到了Grails示例,但是已经用Java实现了。 我的applicationContext.xml已经有一些预定义的静态路由,我想在运行时为其添加一些动态路由。可能吗?因为包括动态路由的唯一方法是编写route.xml,然后将路由定义加载到上下文。它如何在现有静态路由上工作? 问题答案: 你可以在CamelContext上简单地调用一些不同的