<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
<connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
</acceptors>
<ha-policy>
<replication>
<master/>
</replication>
</ha-policy>
<cluster-connections>
<cluster-connection name="myhost1-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
</static-connectors>
</cluster-connection>
</cluster-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-created-queues>false</auto-delete-created-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
</core>
</configuration>
使用Java和producer模板将消息推送到直接endpoint,然后路由到队列的客户端生成消息:
说java-camel-producer-client
package com.demo.artemis;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ToRoute {
public static void main(String[] args) throws Exception {
ApplicationContext appContext = new ClassPathXmlApplicationContext(
"camel-context-producer.xml");
ProducerTemplate template = null;
CamelContext camelContext = SpringCamelContext.springCamelContext(
appContext, false);
try {
camelContext.start();
template = camelContext.createProducerTemplate();
String msg = null;
int loop =0;
int i = 0;
while(true) {
if (i%10000==0) {
i=1;
loop=loop+1;
}
if(loop==2) break;
msg ="---> "+(++i);
template.sendBody("direct:toDWQueue", msg);
}
} finally {
if (template != null) {
template.stop();
}
camelContext.stop();
}
}
}
向队列发送消息的Camel上下文:camel-producer-client
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
<constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="10" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="concurrentConsumers" value="5" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
<property name="streamMessageTypeEnabled" value="true"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="myqueue" uri="jms:queue:myExampleQueue" />
<route>
<from uri="direct:toMyExample"/>
<transform>
<simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:myqueue"/>
</route>
</camelContext>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
<prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
<prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
<prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
</props>
</property>
</bean>
<bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="ConnectionFactory"/>
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
</bean>
<bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
<!-- dynamic destination if the destination name is not found in JNDI -->
<property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jndiFactoryBean"/>
<property name="destinationResolver" ref="jndiDestinationResolver"/>
<property name="concurrentConsumers" value="10" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
<endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />
<route>
<from uri="ref:inqueue" />
<convertBodyTo type="java.lang.String" />
<transform>
<simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:outqueue" />
</route>
</camelContext>
</beans>
在使用Camel上下文时,我看到当主服务器不可用时,客户机使用者被重定向到从服务器(我使用JNDI和JMSPoolConnectionFactory)。
java-producer-client
使用producer模板将消息发送到经过转换并路由到队列的直接endpoint。当主服务器关闭时,客户端无法进行故障转移并连接到从服务器。(问题:当代理处于HA模式时,这是预期的行为吗)。
在重新连接尝试后,我看到下面的异常,当主停机时。
Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response
另一方面,使用Main启动的camel-context-consumer
能够自动故障转移到slave。在控制台中,我确实注意到从主机和处理数据中有消费者计数10。但是当主节点备份时,客户端没有切换到活动主节点。问题:这也是意料之中的吗?
为了创建connectionfactory,使用以下约定。
(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;
问题:即使使用静态连接器,我们是否需要配置broacast和discovery组?
如果在客户端处于阻塞调用中间(例如发送持久消息并等待代理的ack、提交事务等)时发生故障转移,则会出现“解除阻塞永远不会得到响应的阻塞调用”的错误。这将在文档中进一步讨论。
根据您的配置,客户机在主代理返回时不会切换回主代理的事实也是意料之中的。简而言之,您没有正确配置故障备份。你的主人应该:
<ha-policy>
<replication>
<master>
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
你的奴隶应该:
<ha-policy>
<replication>
<slave>
<allow-failback>true</allow-failback>
</slave>
</replication>
</ha-policy>
我正在尝试使用Apache Camel和Qpid JMS客户端连接到在两个不同节点(VM)中运行的ActiveMQ Artemis主动-主动集群。我正在使用ActiveMQ Artemis 2.17.0。 我正在试图找出我的组织的远程URI配置应该是什么。阿帕奇。qpid。jms。JmsConnectionFactory实例。使用<代码>ampq://host1:5672,ampq://host2
我想在由安全kafka集群的kafka主题支持的Flink SQL表上执行一个查询。我能够以编程方式执行查询,但无法通过Flink SQL客户端执行。我不知道如何通过Flink SQL客户端传递JAAS配置()和其他系统属性。 FlinkSQL以编程方式查询 这很好。 通过SQL客户端Flink SQL查询 运行此命令将导致以下错误。 中没有任何内容,除了以下注释 SQL客户端运行命令 Flink
我在Vert发展。x(基于Netty和Hazelcast),我正在尝试在两个服务器实例之间共享数据(在同一局域网上的不同机器中的每个实例)。 我的问题是我不知道如何配置vert. x服务器以允许它们共享它们的并发内存映射(理论上说这是可能的)。 我已经阅读了Vert. x和Hazelcast的许多文档,但还没有结果。(我不知道如何强制vert. x加载hazelcast xml配置文件)。 提前感
我已经使用KOPS安装了kubernetes集群。 从kops安装kubectl的节点开始,kubectl全部工作完美(假设节点A)。 我正在尝试从另一个安装了kubectl的服务器(节点B)连接到kubernetes集群。我已经将~/.kube从A节点复制到B节点,但当我尝试执行以下基本命令时: 我的配置文件是: 感谢任何帮助
我正在使用POSIX共享内存和未命名信号量实现客户机服务器。服务器可以同时处理多个客户端。该代码适用于单个客户端,但不适用于多个客户端。POSIX操作是用,
我正在使用ActiveMQ客户端库将我的服务器应用程序连接到ActiveMQ。几个不同的消费者和生产者在单个线程中运行。、和之间的关系应该如何? 每个JVM一个连接工厂 每个JVM一个到代理的连接或n个连接,每个使用者一个 n个会话,每个消费者一个(Javadoc似乎强烈建议这样做)