当前位置: 首页 > 工具软件 > HornetQ > 使用案例 >

hornetq_Spring 3 HornetQ 2.1集成教程

谷梁涵忍
2023-12-01

hornetq

通过Spring框架使用JBoss的新超高性能消息传递系统。

HornetQ是一个开放源代码项目,用于构建多协议,可嵌入,非常高性能的集群异步消息传递系统。 它是用Java编写的,并且可以在具有Java 5或更高版本运行时的任何平台上运行。 HornetQ一流的高性能日志以非持久消息传递通常看到的速度提供持久消息传递性能。 非持久消息传递性能也非常高。 HornetQ除其他“性感”功能外,还提供服务器复制和自动客户端故障转移功能,以消除服务器故障时丢失或重复的消息,可以配置成群集使用,其中HornetQ服务器的地理位置分散的群集知道如何负载均衡消息并提供全面的信息。管理API,用于管理和监视所有HornetQ服务器。

在本教程中,我们将向您展示如何通过Spring框架利用HornetQ 。 为了使事情变得更加有趣,我们将从上一篇关于Spring GWT Hibernate JPA Infinispan集成的文章的 结尾处继续 。 我们将使用我们的GWTSpringInfinispan项目,并通过消息传递功能对其进行授权! 当然,您可以阅读本文,将基于Spring的项目与HornetQ集成。

我们将使用HornetQ 2.1.0.Final版本,您可以从此处下载。 我们还将需要jboss-logging-spi库。 将使用JBoss Logging SPI 2.1.1.GA版本,您可以在此处JBoss Maven存储库下载该版本

为了在运行时正确集成SpringHornetQ ,我们必须为Web应用程序提供所有必需的库。 因此,复制下面在/ war / WEB-INF / lib下列出的文件(如果使用的是不同版本,请复制相关文件)

HornetQ发行

  • /lib/hornetq-bootstrap.jar
  • /lib/hornetq-core.jar
  • /lib/hornetq-jms.jar
  • /lib/hornetq-logging.jar
  • /lib/jnpserver.jar
  • /lib/netty.jar

JBoss Logging SPI库

  • jboss-logging-spi-2.1.1.GA.jar

最后,为了使HornetQ在运行时正常工作,Web应用程序的类路径中必须有几个配置文件。 如本教程的介绍部分所述,我们可以创建HornetQ服务器群集,以实现负载平衡和高可用性消息传递,也可以在非群集环境中使用HornetQ 。 两种情况都需要不同的配置。 HornetQ发行版包含/ config目录下的所有配置文件。 我们将使用jboss-as-5集群配置,以便能够使用消息传递平台的全部功能。 将以下文件从/ config / jboss-as-5 / clustered目录复制到应用程序/ resources包中:

  • hornetq-configuration.xml –这是主要的HornetQ配置文件
  • hornetq-jms.xml –服务器端JMS服务配置文件

除非您要在JBoss应用服务器中进行部署,否则请编辑hornetq-configuration.xml文件,并将“ $ {jboss.server.data.dir}”替换为“ $ {data.dir:../ data}”。

将以下文件从/ config / stand-alone / clustered目录复制到应用程序/ resources包中:

  • hornetq-users.xml – HornetQ安全管理器的用户凭证文件

在继续实际的集成和客户端实现示例之前,让我们确定一些有关HornetQ服务器体系结构和上述配置文件的有用信息。

HornetQ服务器不会讲JMS ,实际上对JMS一无所知,它是一种协议不可知的消息传递服务器,旨在与多种不同的协议一起使用。 HornetQ客户端(可能在不同的物理计算机上)与HornetQ服务器交互。 HornetQ当前在客户端提供了两种用于消息传递的API:

  • 核心客户端API。 这是一个简单而直观的Java API,可在没有JMS的某些复杂性的情况下提供完整的消息传递功能集
  • JMS客户端API。 客户端提供了标准的JMS API

JMS语义由客户端上的瘦JMS外观层实现。 当用户在客户端上使用JMS API时,所有JMS交互都将转换为HornetQ核心客户端API上的操作,然后再使用HornetQ有线格式通过有线进行传输。 服务器始终只处理核心API交互。

标准的独立消息传递服务器配置包括核心消息传递服务器, JMS服务和JNDI服务。

JMS服务的作用是将任何服务器端hornetq-jms.xml配置文件中的任何JMS Queue,Topic和ConnectionFactory实例部署并绑定到JNDI 。 它还提供了用于创建和销毁队列,主题和ConnectionFactory实例的简单管理API,可以通过JMX或连接对其进行访问。 由于核心服务器与JMS无关,因此它是HornetQ核心服务器的一项单独服务。 如果您不想通过服务器端XML配置部署任何JMS Queue,Topic或ConnectionFactory实例,并且不需要在服务器端使用JMS管理API,则可以禁用此服务。

还包括一个JNDI服务器,因为在使用JMS查找队列,主题和ConnectionFactory实例时, JNDI是常见的要求。 如果您不需要JNDI,则也可以禁用此服务。 HornetQ允许您直接在客户端上以编程方式创建JMS和核心对象,而不是从JNDI查找它们,因此JNDI服务器并不总是必需的。

HornetQ附带了一个基本的安全管理器实现,该实现可获取用户凭证
从hornetq-users.xml文件中。 该文件包含用户,密码和角色信息。

我们将使用HornetQ JMS服务,并在与命名服务器相同的JVM中执行JMS客户端代码,因此我们必须创建一个“ jndi.properties”文件,并将其与上述其余HornetQ配置文件一起放在我们的application / resources包下。 “ jndi.properties”文件的内容应如下所示:

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory

在继续之前,我们必须注意Eclipse项目的依赖性。 以下jars应该包含在项目的Java构建路径中:

  • jms.jar

现在让我们将SpringHornetQ集成。 找到您的applicationContext.xml文件/ war / WEB-INF文件夹,并添加以下bean:

<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop" />

<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
 <property name="namingInfo" ref="namingServerImpl" />
 <property name="port" value="1099" />
 <property name="bindAddress" value="localhost" />
 <property name="rmiPort" value="1098" />
 <property name="rmiBindAddress" value="localhost" />
</bean>

<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />

<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />

<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />

<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
 <constructor-arg ref="fileConfiguration" />
 <constructor-arg ref="mbeanServer" />
 <constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>

<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
 <constructor-arg ref="hornetQServerImpl" />
</bean>

如果打算在独立环境中配置SpringHornetQ ,则上述配置就足够了。 在我们的例子中,如果要在Apache – Tomcat上部署Web应用程序,则应进行一些小的修改。

Apache – Tomcat为所有已部署的Web应用程序提供JNDI服务,以配置环境属性和资源。 此外,由于环境和资源管理是使用部署描述符文件(例如web.xml和context.xml)完成的,因此在运行时可用的命名上下文是只读的。 另外,在启动时, Apache – Tomcat使用系统属性初始化其JNDI环境。 结果,使用JNDI InitialContext类(不提供构造函数环境参数)来执行命名操作的“在VM中”客户端始终检索Apache – Tomcat JNDI实现Context接口。

为了使HornetQ JNDI服务器与Apache-Tomcat命名服务和HornetQ JMS服务共存,以便将队列,主题和ConnectionFactory实例绑定到JNDI ,我们必须执行以下操作:

  • 对我们的Web应用程序禁用Apache – Tomcat命名服务
  • HornetQ JNDI服务器配置为不使用现有的JNDI服务(如果可用),但始终创建​​一个新的服务

要为我们的Web应用程序禁用Apache – Tomcat命名服务,我们必须执行以下操作:

  • 在我们项目的/ war文件夹下创建一个META-INF文件夹
  • 创建一个包含以下context指令的context.xml文件:
<Context override="true" useNaming="false" />

要将HornetQ JNDI服务器配置为不使用现有的JNDI服务(如果可用),我们必须在Spring bean的“ namingServerImpl”中添加以下属性:

<property name="useGlobalService" value="false" />

为了通过Spring使用HornetQ消息服务,我们可以创建一个连接工厂,也可以从JNDI查找一个。 下面提供了连接工厂和“ JmsTemplate”示例:

<bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory" >
 <constructor-arg>
  <bean class="org.hornetq.api.core.TransportConfiguration">
   <constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
   <constructor-arg>
    <map key-type="java.lang.String" value-type="java.lang.Object">
     <entry key="port" value="5445"></entry>
    </map>
   </constructor-arg>
  </bean>
 </constructor-arg>
</bean>

<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="connectionFactory" ref="connectionFactory"></property>
</bean>

连接工厂示例的JNDI查找如下所示:

<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>java:/ConnectionFactory</value>
 </property>
</bean>

我们将使用JNDI查找方法来获取连接工厂,因此将上述配置添加到applicationContext.xml文件中。

这就是我们要做的所有配置,让我们继续使用我们新集成的消息传递服务来实现假设的业务案例。 我们的Web应用程序提供了添加,更新和检索“员工”数据的功能。 假设我们希望在每次添加或更改“员工”数据时都收到通知。 为了简单起见,该通知将是Apache – Tomcat控制台上的日志。 我们将实现一个JMS生产者,以便在用户每次对“员工”数据进行更新时将消息发送到“通知”队列。 另外,必须实现JMS使用者,以便处理“通知”队列消息并登录到控制台。

要创建“通知”队列并将其绑定到名称为“ / queue / Notifications”的JNDI ,请将以下内容添加到hornetq-jms.xml文件中:

<queue name="Notifications">
 <entry name="/queue/Notifications"/>
</queue>

为了能够通过Spring Bean使用新创建的“通知”队列,请将以下JNDI查找指令添加到applicationContext.xml文件:

<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>/queue/Notifications</value>
 </property>
</bean>

由于JMS生产者和使用者都是服务器端组件,因此必须将它们放在我们应用程序的/ server子包下。 我们选择在/ server / utils子包下创建它们,因为它们本质上是实用程序类。 下面提供了示例JMS生产者和消费者类:

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsProducer")
public class NotificationsProducer {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;
 private Session notificationsQueueSession;
 private MessageProducer notificationsQueueProducer;


 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
  notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 public void sendNotification(final String message) throws Exception {

  TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
  notificationsQueueProducer.send(textMessage);

 }

}

还有消费者

package com.javacodegeeks.gwtspring.server.utils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("notificationsConsumer")
public class NotificationsConsumer implements MessageListener {

 @Autowired
 Queue notificationsQueue;

 @Autowired
 ConnectionFactory inVMConnectionFactory;

 private Connection notificationsQueueConnection;

 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
  notificationsQueueConsumer.setMessageListener(this);
  notificationsQueueConnection.start();
 }

 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }

 @Override
 public void onMessage(Message message) {
  if (message instanceof TextMessage) {
   try {
    String text = ((TextMessage) message).getText();
    System.out.println("The Notification Message is : \n" + text);
   } catch (JMSException ex) {
     throw new RuntimeException(ex);
   }
  } else {
    throw new IllegalArgumentException("Message must be of type TextMessage");
  }
 }

}

要结束我们的示例业务案例,我们必须修改Spring员工“ employeeService”,以便在用户每次请求保存或更新“ employee”数据时使用“ notificationsProducer”实用程序bean发送通知消息。 我们使用“ @Autowire”注释在“ employeeService”内部连接“ notificationProducer”,并从“ notificationProducer”调用“ sendNotification”操作,以便每次请求“ employeeService”的saveOrUpdateEmployee“操作时发送通知。 完整的代码如下所示:

package com.javacodegeeks.gwtspring.server.services;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.javacodegeeks.gwtspring.server.dao.EmployeeDAO;
import com.javacodegeeks.gwtspring.server.utils.NotificationsProducer;
import com.javacodegeeks.gwtspring.shared.dto.EmployeeDTO;
import com.javacodegeeks.gwtspring.shared.services.EmployeeService;

@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {

 @Autowired
 private EmployeeDAO employeeDAO;

 @Autowired
 NotificationsProducer notificationsProducer;

 @PostConstruct
 public void init() throws Exception {
 }

 @PreDestroy
 public void destroy() {
 }

 @Transactional(propagation=Propagation.SUPPORTS, rollbackFor=Exception.class)
 public EmployeeDTO findEmployee(long employeeId) {

  return employeeDAO.findById(employeeId);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO == null) {
   employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
   employeeDAO.persist(employeeDTO);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void updateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null) {
   employeeDTO.setEmployeeName(name);
   employeeDTO.setEmployeeSurname(surname);
   employeeDTO.setJob(jobDescription);
  }

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void deleteEmployee(long employeeId) throws Exception {

  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);

  if(employeeDTO != null)
   employeeDAO.remove(employeeDTO);

 }

 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveOrUpdateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {

  EmployeeDTO employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);

  employeeDAO.merge(employeeDTO);

  notificationsProducer.sendNotification("Save Or Update Employee with values : \nID : " + employeeId + "\nName : " + name + "\nSurname : " + surname + "\nJob description : " + jobDescription);

 }

}

而已! 要部署Web应用程序,只需将/ war文件夹复制到Apache – Tomcat “ webapps”文件夹中。 您可以将war文件夹的名称更改为任意名称,最好在项目名称后重命名,例如GWTSpringInfinispanHornetQ

在午餐之前,应用程序不要忘记创建数据库模式,这里是“ javacodegeeks”。

午餐应用程序将您的浏览器指向以下地址

http:// localhost:8080 / GWTSpringInfinispanHornetQ /

如果一切顺利,您应该会看到您的主页。 应该显示两个文本框,每个文本框后面都有一个按钮。 在第一个文本框中,您可以将员工保存或更新到数据库。 作为输入,提供ID,名称,姓氏和职位描述,并用空格字符分隔。 单击“ SaveOrUpdate”按钮,将提供的信息存储到数据库中。 对于现有的“员工”条目(相同ID),将执行更新。 在这两种情况下,都应记录一个通知日志。 日志格式应如下:

通知消息为:
使用值保存或更新员工:
编号:xxx 名称:xxx 姓:xxx 职位描述:xxx

其中“ xxx”应为您提供的“员工”信息。 请查看日志文件(catalina.out)。 第二个文本框用于检索现有的“雇员”条目。 提供一个“雇员” ID,然后单击“检索”按钮。 如果“雇员”存在,则应看到“雇员” ID,姓名,姓氏和职位描述。

您可以从此处下载该项目(如开头所述,并且不包含先前的文章,所需的第三方库)

玩得开心!

贾斯汀

相关文章 :

翻译自: https://www.javacodegeeks.com/2010/06/spring-3-hornetq-21-integration.html

hornetq

 类似资料: