当前位置: 首页 > 知识库问答 >
问题:

如何在Java提交时在Oracle AQ表上排队,并使用JMS客户机使用

陈志
2023-03-14

我正在为企业级产品编写一个Java组件,并希望利用Oracle 11g数据库的一个特定功能,即活动队列。我想要完成的确切场景是-1。向提交2上的Oracle活动队列/队列表写入一条消息。用JMS消费者从队列中读取该消息

我按照演示和教程在http://docs.oracle.com/cd/B28359_01/java.111/b31224/streamsaq.htm

特别是,我想关注代码的排队部分-

    // Create the actual AQMessage instance:
    AQMessage mesg = AQFactory.createAQMessage(msgprop);
    // and add a payload:
    byte[] rawPayload = new byte[500];
    for (int i = 0; i < rawPayload.length; i++) {
        rawPayload[i] = 'b';
    }

    mesg.setPayload(new RAW(rawPayload));

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setRetrieveMessageId(true);
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(queueName, opt, mesg);

这对我来说很好,因为我们希望确保只有在提交交易时,消费者才能看到消息。

问题在于,在演示中,我们创建了负载类型为RAW的队列

doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
           "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE',  "+
           "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
           "   COMPATIBLE         =>  '10.0'); "+
           "END; ");
doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE( "+
           "    QUEUE_NAME     =>   '"+USERNAME+".RAW_SINGLE_QUEUE', "+
           "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
           "END;  ");
doUpdateDatabase(conn,
           "BEGIN "+
           "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
           "END; ");

通过使用在RAW中创建的队列,我可以将消息排队到队列中,但是JMS消费者无法订阅队列抛出一个(空指针)异常,在该异常中,消费者期望为预期类型提供一个参数。简而言之,这段代码在init上抛出一个空指针异常。

Properties env = new Properties();
env.load(new FileInputStream(new File("jndi.properties")));
Context ctx = new InitialContext(env);
ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(connectionFactoryName);
Connection connection = connFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue);

JNDI。属性

java.naming.factory.initial = oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal = username
java.naming.security.credentials = password
db_url = jdbc:oracle:thin:@host:port:dbname

我在尝试在Camel中设置消费者时也遇到了类似的例外。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <!-- this camel route will read incoming messages from Oracle -->
        <route>
            <from uri="oracleQueue:queue:RAW_SINGLE_QUEUE" />
            <to uri="WebSphereMQ:queue:myWebSphereQueue" />
        </route>
    </camelContext>

    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>oracle db URL</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>username</value>
        </property>
        <property name="password">
            <value>password</value>
        </property>
    </bean>

    <bean id="oracleQueue" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
    </bean>

经过一些研究,我认为队列有效负载类型可能是个问题。因此,我改变了队列表创建脚本,并使用JMS消息作为有效负载类型

 doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
 + "   QUEUE_TABLE        =>  'RAW_SINGLE_QUEUE_TABLE',  "
 + "   QUEUE_PAYLOAD_TYPE =>  'SYS.AQ$_JMS_MESSAGE', " +
 "   COMPATIBLE         =>  '10.0'); " + "END; ");

在这种情况下,JMS使用者能够连接,但是队列代码现在失败-ORA-25215:user_data类型和队列类型不匹配

问题是,我如何将来自Java生产者的消息排队(仅在提交时可见),并能够使用camel或通用JMS消费者?

约束条件(过滤掉网络上已有的一些答案)——不能使用PL/SQL、spring事务、JTA。我见过一些例子,比如如何使用Java将JMS消息排队到Oracle AQ中,其中队列表是用SYS创建的。AQ$_JMS_消息类型,但示例生产者是JMS消息生产者,而不是oracle指南中的生产者。我并没有尝试将JMS消息(AQJmsMessage)排队,而是使用Oracle指南中解释的AQMessage类型,并使用提交时可见选项。

我的感觉是,如果问题仅仅基于有效负载类型的不匹配,那么必须在消费者端进行一些配置,以指定有效负载类型,或者在生产者端以JMS消费者能够理解的方式编写消息。有没有办法做到这一点?

共有1个答案

秦俊
2023-03-14

我能够做到这一点——我必须猜测Oracle API的许多部分,并从各种博客收集提示。对任何对这里感兴趣的人来说,这就是我的工作方式——1。我在Oracle Db 2上创建了一个Oracle对象。使用这个Oracle对象,我创建了对象类型的队列表,作为负载3。现在,我可以将包含对象数据4的AQMessage类型与STRUCT payload一起排队。我能够与一个了解ADT有效负载类型的JMS消费者一起排队(多亏了http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)

下面是代码的步骤——创建Oracle对象。对象可以有任何主要的数据类型字段,比如VARCHAR、TIMESTAMP等,也可以有BLOB、CLOB等。在本例中,我将其中一列作为BLOB提供,以使事情更加复杂。

create or replace type aq_event_obj as object
(
  id       varchar2(100),
  payload  BLOB
);
commit;

现在创建队列表。表的有效负载类型是oracle对象。

private void setup(Connection conn) throws SQLException {
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
            + "   QUEUE_TABLE        =>  'OBJ_SINGLE_QUEUE_TABLE',  " + "   QUEUE_PAYLOAD_TYPE =>  'AQ_EVENT_OBJ', "
            + "   COMPATIBLE         =>  '10.0'); " + "END; ");
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + "    QUEUE_NAME     =>   'OBJ_SINGLE_QUEUE', "
            + "    QUEUE_TABLE    =>   'OBJ_SINGLE_QUEUE_TABLE'); " + "END;  ");
    doUpdateDatabase(conn, "BEGIN " + "  DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");
} 

现在,您可以使用对象的struct实例将Java中的AQMessage类型排队

public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
    // First create the message properties:
    AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
    aqMessageProperties.setCorrelation(correlationId);
    aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);

    // Specify an agent as the sender:
    AQAgent aqAgent = AQFactory.createAQAgent();
    aqAgent.setName(SENDER_NAME);
    aqAgent.setAddress(QUEUE_NAME);
    aqMessageProperties.setSender(aqAgent);

    // Create the payload
    StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
    Map<String, Object> payloadMap = new HashMap<String, Object>();
    payloadMap.put("ID", correlationId);
    payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
    STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);

    // Create the actual AQMessage instance:
    AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
    aqMessage.setPayload(struct);

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(QUEUE_NAME, opt, aqMessage);
}

Blob字段需要特殊处理

public class OracleAQBLOBUtil {

    public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
        BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
        OutputStream outputStream = blob.setBinaryStream(1L);
        InputStream inputStream = new ByteArrayInputStream(payload);
        try {
            byte[] buffer = new byte[blob.getBufferSize()];
            int bytesRead = 0;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
            }
            return blob;
        }
        finally {
            outputStream.close();
            inputStream.close();
        }
    }

    public byte[] saveOutputStream(BLOB blob) throws Exception {
        InputStream inputStream = blob.getBinaryStream();
        int counter;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while ((counter = inputStream.read()) > -1) {
            byteArrayOutputStream.write(counter);
        }
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

}

对于使用者,您需要提供一个ORADataFactory实例,让使用者了解有效负载类型(您的自定义对象)。

AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());

OracleQobjoraDataFactory的代码在哪里

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;

import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;

public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {

    public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    protected MutableStruct _struct;

    protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
    protected static ORADataFactory[] _factory = new ORADataFactory[2];
    protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();

    public static ORADataFactory getORADataFactory() {
        return _AqEventObjFactory;
    }

    /* constructors */
    protected void _init_struct(boolean init) {
        if (init)
            _struct = new MutableStruct(new Object[2], _sqlType, _factory);
    }

    public OracleAQObjORADataFactory () {
        _init_struct(true);
    }

    public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
        _init_struct(true);
        setId(id);
        setPayload(payload);
    }

    /* ORAData interface */
    public Datum toDatum(Connection c) throws SQLException {
        return _struct.toDatum(c, EVENT_OBJECT);
    }

    /* ORADataFactory interface */
    public ORAData create(Datum d, int sqlType) throws SQLException {
        return create(null, d, sqlType);
    }

    protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        if (o == null)
            o = new OracleAQObjORADataFactory ();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public String getId() throws SQLException {
        return (String) _struct.getAttribute(0);
    }

    public void setId(String id) throws SQLException {
        _struct.setAttribute(0, id);
    }

    public byte[] getPayload() throws SQLException {
        BLOB blob = (BLOB) _struct.getAttribute(1);
        InputStream inputStream = blob.getBinaryStream();
        return getBytes(inputStream);
    }

    public byte[] getBytes(InputStream body) {
        int c;
        try {
            ByteArrayOutputStream f = new ByteArrayOutputStream();
            while ((c = body.read()) > -1) {
                f.write(c);
            }
            f.close();
            byte[] result = f.toByteArray();
            return result;
        }
        catch (Exception e) {
            System.err.println("Exception: " + e.getMessage());
            e.printStackTrace();
            return null;
        }
    }

    public void setPayload(byte[] payload) throws SQLException {
        _struct.setAttribute(1, payload);
    }

}

您可能在您的项目中使用了Camel或Spring,在这种情况下-1。如果您使用Camel 2.10.2或更高版本,您可以使用自定义消息列表容器(CAMEL-5676)创建一个JMS消费者2。如果您使用的是以前的版本,那么您可能无法使用endpoint方式(我不知道),但是您可以使用JMS请求监听器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                        http://www.springframework.org/schema/jms
                        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- this is just an example, you can also use a datasource as the ctor arg -->
    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>system</value>
        </property>
        <property name="password">
            <value>oracle</value>
        </property>
    </bean>

    <!-- Definitions for JMS Listener classes that we have created -->
    <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />

    <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
    </bean>

    <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
    <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="destination" ref="aqEventQueue" />
        <property name="messageListener" ref="aqMessageListener" />
        <property name="sessionTransacted" value="false" />
    </bean>

</beans>

自定义消息侦听器容器

public class AQMessageListenerContainer extends DefaultMessageListenerContainer {

    @Override
    protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
        return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
                OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
    }
}

请求侦听器onMessage方法

public void onMessage(Message msg) {
    try {
        AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
        OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();

        System.out.println("Datetime: " + obj.getId());
        System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
    }
    catch (Exception jmsException) {
        if (logger.isErrorEnabled()) {
            logger.error(jmsException.getLocalizedMessage());
        }
    }
}
 类似资料:
  • 我正在与JTA、两阶段提交、JMS和JDBC事务作斗争。这个想法(简而言之)是 在队列中接收消息 所以我得到了,创建,从会话创建接收器并设置消息侦听器。 在侦听器内部,在方法中,我开始我的用户事务,执行jdbc内容并提交事务或在出现问题时进行回滚。现在我期望(又名“希望”)当用户事务提交时,消息会得到确认。 但这并没有发生,消息仍然在队列中,并且一次又一次地被重新传递。 我错过了什么?我仔细检查了

  • 我目前正在尝试使用谷歌表单和应用程序脚本制作一个文书向导。在表单的末尾,我尝试在表单提交并显示HTML窗口后运行应用程序脚本。我试着使用以及应用程序事件触发器,但我一直收到错误

  • 在JMS 1.x中,客户端ID用于在创建持久订阅时唯一标识客户端。这个答案解释了JMS 1.x中clientId的用法 对于JMS 2. x,clientId是可选的。我想了解在JMS 2. x中提供clientId的利弊。 摘自甲骨文中一篇关于 JMS 2.x 功能的文章: 共享持久订阅。这些仅在JMS 2.0中可用,并使用创建。它们可以有任意数量的消费者。设置客户端标识符是可选的。订阅由订阅名

  • 问题内容: 我正在为大型数据集创建交互式可视化。由于数据集的大小,无法将其加载到浏览器中。我们正在节点服务器上使用交叉过滤器来加载和过滤服务器端数据。我想知道是否可以通过某种方式将服务器端交叉过滤器过滤器与dc.js图表​​耦合。 我现在正在使用d3.js创建图表,但是想要使用dc.js中已经存在的各种图表。我们基本上是在服务器端过滤数据,然后在客户端上侦听图表变化,然后在服务器上执行适当的过滤并

  • 我是JMS新手,经过长时间的搜索,我搜索出了一个连接到JMS的代码,并发布了一条消息。 问题是我需要在远程队列中发布消息,但我不知道如何建立连接到它并发布消息。 服务器类型:TIBCO EMS 服务器主机:******。net 端口:**USername:user passsbrow:user123 队列:**。。。。顺序经营1. 我想建立连接,发布一条简单的消息,然后把它取回。请帮忙!提前谢谢