当前位置: 首页 > 知识库问答 >
问题:

注册Apache Camel处理器的侦听器

司寇阳曦
2023-03-14

我尝试将在Apache Camel处理器中处理的数据推送到侦听器类。在处理器类实例中,我尝试在Camel上下文的实例化期间注册侦听器,但不知何故失败了。也许我在这里根本错误,这是不可能的。如果是这种情况,最好你告诉我。

我有一个Apache Camel路由,从ActiveMQ服务器获取JSON消息,并将这些JSON推送到一个自定义处理器类,该类由Camel-Spring XML定义:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring
    http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="url to ActiveMQ" />
        <property name="clientID" value="clientID" />
        <property name="userName" value="theUser" />
        <property name="password" value="thePassword" />
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
    </bean>
    <bean id="customProcessor" class="...CustomProcessorClass" />
    <camelContext id="matrixProfileContext" xmlns="http://camel.apache.org/schema/spring">  
        <route id="matrixProfileRoute" autoStartup="false">
            <from uri="activemq:queue:queuename" />
            <log message="${body}" />
            <to uri="customProcessor" />
        </route>
    </camelContext>
</beans>

我的想法是,类CustomProcess解组通过路由传递的JSON内容,并将POJO推送到实现侦听器接口的侦听器类:

public interface ProcessorListenerIF {

    public void doOnDataProcessed(POJO processedData);
}

我通过单元测试测试整个设置:

public class TestProcessor extends TestCase {

    @Test
    public void testRoute() throws Exception {
        MyActiveMQConnector camelContext = new MyActiveMQConnector(new TestListener());
        try {
            camelContext.startConnections();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            camelContext.stopConnection();
        }
    }

    private class TestListener implements ProcessorListenerIF {

        @Override
        public void doOnDataProcessed(POJO data) {
            System.out.println(data);
        }
    }
}

骆驼处理器有两种方法:

public void addListener(MatrixProfileProcessorListenerIF listener) {
    _processorListeners.add(listener);
}

@Override
public void process(Exchange exchange) throws Exception {

    Pseudocode: POJO data = unmarshal_by_JSON-JAVA(exchange)

    _processorListeners.parallelStream().forEach(listener -> {
        listener.doOnDataProcessed(data);
    });
}

其中我在ActiveMQConnector的构造函数中注册侦听器:

public class ActiveMQConnector {

    private SpringCamelContext _camelContext = null;

    public ActiveMQConnector(ProcessorListenerIF listener) {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("camelContext.xml");
        _camelContext = new SpringCamelContext(appContext);
-------------------------------------
        ((CustomProcessor) _camelContext.getProcessor("customProcessor")).addListener(listener);
-------------------------------------
    }

    public void startConnections() throws Exception {
        try {
            _camelContext.start();
        } catch (Exception e) {
            exception handling
        }
    }
... more methods

上面突出显示的行((CustomProcencer)...失败:语句_camelContext.get处理器没有找到任何内容,实例_camelContext中的路由为空。

如何实现将处理后的数据从处理器推送到某个观察者?

共有1个答案

敖硕
2023-03-14

我找到了另一个解决方案,它完全基于Java,没有Spring XML。

单元测试仍然如上图所示。我没有通过Spring XML定义ActiveMQEndpoint,而是创建了一个新类:

public class MyActiveMQConnection {

    public static ActiveMQConnectionFactory createActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://<activemq-url>:<port>");
        connectionFactory.setUserName("myUsername");
        // connection factory configuration:
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID(UUID.randomUUID().toString());
        connectionFactory.setConnectResponseTimeout(300);
        ... whatever ...

        return connectionFactory;
    }
}

此外,我更改了类ActiveMQConnector的构造函数:

public ActiveMQConnector(ProcessorListenerIF listener) throws Exception {

    _camelContext = new DefaultCamelContext();
    _camelContext.addComponent("activemqEndpoint",
            JmsComponent.jmsComponent(MyActiveMQConnection.createActiveMQConnectionFactory()));
    _camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {

            MyCustomProcessor processor = MyCustomProcessor.getInstance();
            processor.addListener(listener);

            from("activemqEndpoint:queue:matrixprofile") //
                    .process(processor) //
                    .to("stream:out");
        }
    });
}

我将处理器实现为单例,如下所示(为了完整性):

public class MyCustomProcessor implements Processor {

    private final Set<ProcessorListenerIF> _processorListeners = new HashSet<>();
    private static volatile MyCustomProcessor      _myInstance         = null;
    private static Object                          _token              = new Object();

    private MyCustomProcessor() {

    }

    public static MyCustomProcessor getInstance() {
        MyCustomProcessor result = _myInstance;
        if (result == null) {
            synchronized (_token) {
                result = _myInstance;
                if (result == null)
                    _myInstance = result = new MyCustomProcessor();
            }
        }
        return result;
    }

    public void addListener(ProcessorListenerIF listener) {
        _processorListeners.add(listener);
    }

    /**
     * I assume the JSON has the following structure:
     * {timestamp: long, data: double[]}
    **/
    @Override
    public void process(Exchange exchange) throws Exception {

        _processorListeners.parallelStream().forEach(listener -> {
            // convert incoming message body to json object assuming data structure above
            JSONObject    jsonObject    = new JSONObject(exchange.getMessage().getBody().toString());
            MyPOJO myPojo = new MyPOJO();

            try {
                myPojo.setTimestamp(jsonObject.getLong("timestamp"));
            } catch (Exception e) {
                ...
            }
            try {
                JSONArray dataArray = jsonObject.getJSONArray("data");
                double[]  data      = new double[dataArray.length()];
                for (int i = 0; i < dataArray.length(); i++) {
                    data[i] = Double.valueOf(dataArray.get(i).toString());
                }
                myPojo.setData(data);
            } catch (Exception e) {
                ...
            }

            listener.doOnDataProcessed(myPojo);
        });
    }
}
 类似资料:
  • 主要内容:1.bean后置处理器的创建过程,2.注册监听器部分/Spring如何按照类型找到组件1.bean后置处理器的创建过程 refresh() registerBeanPostProcessors() PostProcessorRegistrationDelegate: 这个类是管理全部的后置处理器的类 注册BeanPostProcessor: 就是将BeanPostProcessor放入一个存放后置处理器的List, beanPostProcessors 首先注册实现了Priorit

  • 女士们先生们晚上好, 我有一个Java Swing的问题,我无法解决,也许你可以帮助我。在这里: 我有一个使用BorderLayout的JFrame和许多JPanel 每次我需要设置一个新屏幕(即,从主菜单,当单击搜索按钮时,转到搜索菜单),我只需删除位于中心的组件(JPanel),并将新屏幕(新JPanel)放在中心 这样,我不会在每次我想显示新屏幕时调用所有的页眉和页脚对象 这个系统一切正常,

  • 我最近发现自己正在编写以下代码: 一位同事指出 然而, 基于这一点,我得到的印象是我没有正确理解,我开始怀疑我的原始代码是否安全。 DR 使用引用要用作侦听器且需要多次引用的方法(例如,添加+删除)是否安全? 关于匿名类等的幕后实际发生了什么?

  • 我试图为我们现有的laravel站点(laravel 5.2)的注销功能添加一些逻辑,但它不像登录那样简单。 客户端的现有注销工作正常,但我只想向我的Cognoto实例添加一个调用,以将用户注销他们的Cognoto会话。基本上,当用户单击logout时,我想让他们从网站上注销,就像以前一样,但也要点击我的cognito注销endpoint 我的困惑来自这样一个事实,即auth的现有路由和控制器并不

  • 问题内容: 我有两个Activity类。和。稍后,我添加了当用户摇动设备在或时启动的功能。现在,如果我在和中注册了ShakeListener ,就可以实现我的目标 但是,我现在想要的是另一回事,我不想更改和。我想编写一个不同的类,该类在整个应用程序中运行,并为该应用程序中的所有活动注册ShakeListener。我怎样才能做到这一点?那应该是什么样的课? 我尝试了扩展BroadcastReceiv

  • 我有一个自定义跳过管理步骤。我定义了一个跳过策略,其源代码如下: 我的Skip侦听器如下: 我的步骤定义如下: 我想跳过一个约束冲突异常。但是,不会调用侦听器或跳过策略。