当前位置: 首页 > 知识库问答 >
问题:

OpenNMS v18 AMQP消息发送问题

狄宏大
2023-03-14

我无法让OpenNMS向AMQP终端发送消息。我从来没有这样做过,这是我第一次使用OpenNMS和AMQP,所以这可能会减少我的经验不足。

我已经按照这个问题配置了RabbitMQ 3.5.7并进行了测试。它在使用外部QPID 0.32客户端时工作良好,在使用python或perl时也工作良好。works fine的定义是建立通信,并将消息有效负载传输到exchange,然后传递到后端队列。然后可以在RabbitMQ管理GUI中查看消息。

在OpenNMS中,我一直遵循这里的说明从EventForwarder开始,然后尝试了警报Northbounder,两者都产生了NullPointerException。

我设置Karaf如下,使用以下语句设置属性:-

opennms> config:edit org.opennms.features.amqp.alarmnorthbounder
opennms> propset connectionUrl amqp://simon:simon@/test?brokerlist=\'localhost:5672\'
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }"
opennms> propset processorName default-alarm-northbounder-processor
opennms> config:update
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)'
----------------------------------------------------------------
Pid:            org.opennms.features.amqp.alarmnorthbounder
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0
Properties:
   connectionUrl = amqp://simon:simon@/test?brokerlist='localhost:5672'
   destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }
   felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg
   processorName = default-alarm-northbounder-processor
   service.pid = org.opennms.features.amqp.alarmnorthbounder

我在日志中收到以下错误消息

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[forwardAlarm      ] [forwardAlarm      ] [seda://forwardAlarm                                                           ] [         8]
[forwardAlarm      ] [convertBodyTo3    ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm]                  ] [         0]
[forwardAlarm      ] [log3              ] [log                                                                           ] [         1]
[forwardAlarm      ] [bean3             ] [bean[ref:dynamicallyTrackedProcessor]                                         ] [         0]
[forwardAlarm      ] [to3               ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }                 ] [         7]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-ubuntu-1604-35241-1465752556843-2-60
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0}
        BodyType            String
        Body                NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException
        at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]

我的期望是,它应该能够使用与我在外部使用的库相同的库将消息传递到队列。

正在为组织启用调试。阿帕奇。我获得的qpid:-

2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254...
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]}
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 INFO  org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1

它在写入任何信息之前关闭会话。

当我从外部qpid客户端做本质上相同的事情时-执行如下:-

#!/bin/bash

rm log.out
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7.
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \
    org.apache.qpid.example.ListSender

我明白了:-

142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Are we connected:true
142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Connected with ProtocolHandler Version:0-91
146  [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0  - Write channel open frame for channel id 1
162  [main] DEBUG org.apache.qpid.client.AMQSession  - Created session:org.apache.qpid.client.AMQSession_0_8@6bdf28bb
164  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelOpenOkBody]
165  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [BasicQosOkBodyImpl: ]
173  [main] DEBUG org.apache.qpid.client.AMQDestination  - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR
177  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 0...
178  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
180  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_8@1936f0f5 using publish mode : ASYNC_PUBLISH_ALL
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content body frames to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content header frame to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 90...
191  [main] DEBUG org.apache.qpid.framing.FieldTable  - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]}
192  [main] DEBUG org.apache.qpid.client.AMQSession  - Closing session: org.apache.qpid.client.AMQSession_0_8@6bdf28bb
192  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession  - closeSession called on protocol session for session 1
194  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelCloseOkBody]
194  [IoReceiver - localhost/127.0.0.1:5672] INFO  org.apache.qpid.client.handler.ChannelCloseOkMethodHandler  - Received channel-close-ok for channel-id 1
195  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ConnectionCloseOkBody]
196  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - Session closed called by client

列表ender.java如下:-

package org.apache.qpid.example;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ListMessage;


public class ListSender {

    public static void main(String[] args) throws Exception
    {
        Connection connection =
            new AMQConnection("amqp://simon:simon@localhost/test?brokerlist='tcp://localhost:5672'");
                                                  AMQShortString a1 = new AMQShortString("");
                                                   AMQShortString a2 = new AMQShortString("");
        AMQShortString[] bindvars = new AMQShortString[]{a1,a2};
        boolean is_durable = true;
/*      
        Destination queue = new AMQAnyDestination( new AMQShortString("onms2"), 
                                                   new AMQShortString("direct"),
                                                   new AMQShortString("Simon"),
                                                   true,        
                                                   true,        
                                                   new AMQShortString(""),
                                                   false,       
                                                   bindvars);
        */

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //Destination queue = new AMQAnyDestination("onms3/Simon");
        Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }");
        //Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D");
        //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}");
        //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}");
        MessageProducer producer = session.createProducer(queue);

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage();
        m.setIntProperty("Id", 987654321);
        m.setStringProperty("name", "WidgetSimon");
        m.setDoubleProperty("price", 0.99);

        List<String> colors = new ArrayList<String>();
        colors.add("red");
        colors.add("green");
        colors.add("white");
        m.add(colors);

        Map<String,Double> dimensions = new HashMap<String,Double>();
        dimensions.put("length",10.2);
        dimensions.put("width",5.1);
        dimensions.put("depth",2.0);
        m.add(dimensions);
        List<List<Integer>> parts = new ArrayList<List<Integer>>();
        parts.add(Arrays.asList(new Integer[] {1,2,5}));
        parts.add(Arrays.asList(new Integer[] {8,2,5}));
        m.add(parts);

        Map<String,Object> specs = new HashMap<String,Object>();
        specs.put("colours", colors);
        specs.put("dimensions", dimensions);
        specs.put("parts", parts);
        m.add(specs);

        producer.send((Message)m);
        System.out.println("Sent: " + m);
        connection.close();
    }

}

我的假设是这是一个与某种描述的AMQP服务器的连接问题。如果我从外部jar遇到问题,在进行故障排除时遇到了许多问题,从日志中可以清楚地看出问题是什么。这是OpenNMS的问题吗?有人成功地解决了这个问题吗?有什么想法吗?

干杯

西蒙

共有1个答案

叶卓君
2023-03-14

根据OpenNMS列表的指导,我决定安装QPID代理v6.0.3来代替RabbitMQ,现在消息流没有问题。

 类似资料:
  • 接口说明 轻推轻应用/订阅号支持发送文本、图片、文本卡片、图文、key-value、文件、待办等消息类型。本接口针对各种消息类型和发送的对象(单发、群发以及给部分人发送)进行了定义。 注:openid是用户关注某个轻应用/订阅号后生成的唯一id,单发和给部分人发送消息必须携带此参数,可以通过如下接口来获取: 根据qt_code获取用户基本信息 获取使用者列表 通过userId获取openid 消息

  • 主动发送消息 use EasyWeChat\Kernel\Messages\TextCard; // 获取 Messenger 实例 $messenger = $app->messenger; // 准备消息 $message = new TextCard([ 'title' => '你的请假单审批通过', 'description' => '单号:1928373, ...

  • 向已经创建连接凭据的设备发送消息数据。 请求方式: |4|2|3|message|\r 参数 message 发送的消息内容 返回值: "|4|2|3|1|\r" 发送成功 "|4|2|3|2|\r" 发送失败 Arduino样例: softSerial.print("|4|2|3|DFRobot|\r");

  • 问题内容: 我有自己的基于JDA的Discord BOT。我需要向特定频道发送短信。我知道如何将消息作为onEvent响应发送,但是在我的情况下,我没有此类事件。 我有:作者(BOT),令牌和通道号。 我的问题是:如何在 没有事件的情况下 将消息发送到此频道? 问题答案: 好吧,我想我知道你的意思。您无需进行任何事件即可获取频道ID和发送消息。您唯一需要做的就是实例化JDA,调用awaitRead

  • 之前章节定义的SocketIO活动处理函数可以凭借send()函数和emit()函数来连接客户端 接下来的例子是将接收到的消息退回到发送它们的客户端: from flask_socketio import send, emit @socketio.on('message') def handle_message(message): send(message) @socketio.on('

  • 26.3 发送消息 JmsTemplate包含许多方便的方法来发送消息。有些发送方法可以使用 javax.jms.Destination对象指定目的地,也可以使用字符串在 JNDI 中查找目的地。没有目的地参数的发送方法使用默认的目的地。 import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.