当前位置: 首页 > 知识库问答 >
问题:

如何在storm拓扑中使用drools

阚夕
2023-03-14

现在我想在一个污点中使用Drools,它在LocalCluster中正常工作,但是当我把它放在生产集群中时,它有错误。污点是:

public class DealLostBolt extends BaseRichBolt {

      private static final long serialVersionUID = 1L;

      private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");

      private OutputCollector collector;

      private KieSession kieSession;

      private FactHandle factHandle;

      @Override
      public void execute(Tuple input) {
        // 获取数据
        String sentence = (String) input.getValue(0);
        LOGGER.info("DealLostBolt获取到的数据:" + sentence);

        // 数据转换
        PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

        KieServices ks = KieServices.Factory.get();
        KieContainer kieContainer = ks.getKieClasspathContainer();
        kieSession = kieContainer.newKieSession("all-rule");
        kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();

        factHandle = kieSession.insert(dataPoint);
        kieSession.fireAllRules();
        kieSession.delete(factHandle);

        collector.emit(new Values(sentence));
      }

      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("value"));

      }

      @Override
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
      }

    }

我使用官方文件创建了kiesession。误差为:

java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    Caused by: java.lang.NullPointerException
    at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
    ... 6 more

也许有些东西没有初始化。但当blot执行时,我创建了一个新的kieservice。有人能帮我吗

谢啦!

共有1个答案

宰父子安
2023-03-14

我在使用Drools和JMH作为着色jar时遇到了类似的问题。Drools使用ServiceRegistry方法。这意味着Drools库(Drools编译器、kie ci、Drools decisiontables…)包含相同的命名属性文件,该文件指示它们为接口提供的实现。

着色jar插件通常将(可传递的)依赖关系平坦化为一个jar。对于存在多次的文件,这通常意味着如果没有另外指定,则选择其中一个文件。对于ServiceRegistry属性,我们需要合并所有文件。通常这是通过ServicesResourceTransformer完成的。这个转换器处理META-INF/services中的文件,但是Drools的相关文件是META-INF/kie.conf。我的JMH问题可以通过附加变压器解决:

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>META-INF/kie.conf</resource>
</transformer>

我不是Storm的专家,但Starter建议它也使用阴影插件。我假设您从IDE运行本地集群——它不使用阴影jar。

 类似资料:
  • 问题内容: 我们是新来的风暴。我们不知道如何创建拓扑,请帮助我们应对风暴。我们尝试了“ Windows上的狂风暴雨”一文中给出的示例wordcount c = topology。但是我们无法理解如何给出输入,以及风暴用户界面中输入存在的位置以及输出存在的位置。 问题答案: 输入和输出在Storm UI中不存在。在Storm UI中,您看不到发出的元组,处理时间,集群配置和集群的运行状况。要查看输出

  • 我刚来暴风,所以温柔点:-) 什么是实现这一目标的最佳方式?

  • 关于拓扑结构的说明: 喷口连续向读取螺栓发送元组。 读取bolt过程并将结果发送给下一个bolt等等。 在R bolt中处理tuple1之后还是在readbolt发送tuple1写入bolt之后?

  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 8台机器一直在使用。每一个都有22个核心和512 GB的RAM。但是,我们的代码运行得真的很慢。传输600万个数据需要10分钟才能完成。 60个文件中的10 MB在一秒钟内传输到HDFS。我们正在努力优化我们的代码,但很明显我们做了一些非常错误的事情。 对于蜂巢表,我们有64个桶。 在HDFS喷口;.setmaxextending(50000); 在蜂巢喷口选项;.WithTxNsperBatch

  • 如何为storm拓扑提供自定义配置?例如,如果我构建了一个连接到MySQL集群的拓扑,并且我希望能够更改需要连接到哪些服务器而不需要重新编译,我将如何做到这一点?我更喜欢使用配置文件,但我担心文件本身没有部署到集群中,因此它不会运行(除非我对集群工作方式的理解有缺陷)。到目前为止,我所看到的在运行时将配置选项传递到storm拓扑的唯一方法是通过命令行参数,但当您获得大量参数时,这将是混乱的。 有一