我的用例是:
@Component
public class EPPQ2Subscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(EPPQ2Subscriber.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
AppConfig appConfig;
List<Message> messageList = new ArrayList<Message>();
List<Long> diliveryTag = new ArrayList<Long>();
/**
* Method is listener's receive message method , invoked when there is message ready to read
* @param message - Domain object of message encapsulated
* @param channel - rabitmq client channel
* @param messageId - @TODO Delete it later.
* @param messageProperties - amqp message properties contains message properties such as delivery tag etc..
*/
@RabbitListener(id="messageListener",queues = "#{rabbitMqConfig.getSubscriberQueueName()}",containerFactory="queueListenerContainer")
public void receiveMessage(Message message, Channel channel, @Header("id") String messageId,
MessageProperties messageProperties) {
LOGGER.info("Result:" + message.getClass() + ":" + message.toString());
if(messageList.size() <= appConfig.getSubscriberChunkSize() ) {
messageList.add(message);
diliveryTag.add(messageProperties.getDeliveryTag());
} else {
// call the service here to decrypt, read pan, call danger to scrub, encrypt pan and re-pack them in message again.
//after this branch messageList should have scrubbed and encrypted message objects ready to publish.
// Here is call for publish and ack messages..
}
}
}
@Component
@Configuration
public class TopicConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired EPPQ2Publisher eppQ2Publisher;
/**
* Caching connection factory
* @return CachingConnectionFactory
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(), rabbitMqConfig.getPublisherPort());
connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());
return connectionFactory;
}
/**
* Bean RabbitTemplate
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new
ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
/* rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
rabbitTemplate.setConfirmCallback((correlation, ack, reason) ->
if(correlation != null ) {
LOGGER.info("Received " + (ack ? " ack " : " nack ") +
"for correlation: " + correlation);
if(ack) {
// this is confirmation received..
// here is code to ack Q1. correlation.getId and ack
eppQ2Publisher.ackMessage(new
Long(correlation.getId().toString()));
} else {
// no confirmation received and no need to do any
thing for retry..
}
}
});
rabbitTemplate.setReturnCallback((message, replyCode,
replyText, exchange, routingKey) ->
{
LOGGER.error("Returned: " + message + "\nreplyCode: " +
replyCode+ "\nreplyText: " + replyText +
"\nexchange/rk: " + exchange + "/" + routingKey);
});
return rabbitTemplate;
}
/**
* Bean Jackson2JsonMessageConverter
* @return Jackson2JsonMessageConverter
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
public interface EPPQ2Publisher {
public void sendMessage(Message msg,Long deliveryTag);
public void sendMessages(List<Message> msgList, Channel channel, List<Long> deliveryTagList);
public void ackMessage(Long deliveryTag);
}
@Component
public class EPPQ2PublisherImpl implements EPPQ2Publisher{
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
private Channel channel;
/**
* Method sendMessage for sending individual scrubbed and encrypted message to publisher queue (Q2).
* @param msg - message domain object
* @param deliveryTag - is message delivery tag.
*/
@Override
public void sendMessage(Message msg,Long deliveryTag) {
rabbitTemplate.convertAndSend(rabbitMqConfig.getPublisherTopic(), rabbitMqConfig.getRoutingKey(), msg,new CorrelationData(deliveryTag.toString()));
}
/**
* sendMessages for sending list of scrubbed and encrypted messages to publisher queue (Q2)
* @param msgList - is list of scrubbed and encrypted messages
* @param channel - is ampq client channel
* @param deliveryTagList - is list of incoming message delivery tags.
*/
@Override
public void sendMessages(List<Message> msgList, Channel channel, List<Long>deliveryTagList) {
if(this.channel == null) {
this.channel = channel;
}
for (int i = 0 ; i < msgList.size(); i ++) {
sendMessage(msgList.get(i),deliveryTagList.get(i));
}
}
/**
* Method ackMessage for sending acknowledgement to subscriber Q1
* @param deliveryTag - is deliveryTag for each individual message.
*/
@Override
public void ackMessage(Long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
组织。springframework。amqp。兔子联系CachingConnectionFactory从AMQChannel创建缓存兔子通道(amqp://dftp_subscriber@10.15.190.18:5672/海德拉。服务,2)
我希望dftp_publisher,我想我的主题配置没有正确注入。
组织。springframework。amqp。兔子果心RabbitTemplate[0;39m:在RabbitMQ通道:缓存的兔子通道:AMQChannel上执行回调RabbitTemplate$$Lambda$285/33478758(amqp://dftp_subscriber@10.15.190.18:5672/海德拉。康涅狄格州服务部(2)Proxy@1dc339f共享兔子连接:SimpleConnection@2bd7c8[代表=amqp://dftp_subscriber@10.15.190.18:5672/海德拉。服务,localPort=55553]org。springframework。amqp。兔子果心RabbitTemplate[0;39m:发布消息(正文:“{”标头“:{”重试计数“:”0,“发布事件类型“:”AUTH“,”有效负载“:{”MTI“:”100”,“MTI请求“:”100”,“PAN“:”6011000000000000”,“PROCCODE“:”00”,“PROCCODE请求“:”00”,“从账户“:”00”,“到账户“:”00”,“交易金额“:”000000000 100”,“传输”:“0518202930”,“STAN:”000001”,“LOCALTIME\u HHMMSS:“010054”,“LOCALDATE\u YYMMDD:“180522”,“到期日期\u YYMM:“2302”,“商户类型”:“5311”,“收单国家代码”:“840”,“收单输入模式”:“02”,“收单输入功能”:“0”,“功能代码”:“100”,“收单ID代码”:“000000”,“转发ID代码”:“000000”,“检索参考号”:“1410N644D597”,“商户编号”:“60110000000596”,“卡接受人姓名”:“发现Acq模拟器”,“卡接受人城市”:“Riverwoods”,“卡接受人州”:“IL”,“卡接受人国家”:“840”,“卡接受人国家数字”:“840”,“NRID”:“123456789012345”,“交易货币代码”:“840”,“POS终端考勤指示器”:“0”,“POS部分批准指示器”:“0”,“POS终端位置指示器”:“0”,“POS交易状态指示器”:“0”,“POS电子商务传输指示器”:“0”,“POS终端设备的POS类型”:“0”,“POS卡存在指示器”:“0”,“POS卡捕获能力指示器”:“0”,“POS交易安全指示器”:“0”,“POS卡数据输入能力指示器”:“C”,“POS卡持卡人状态指示器”:“0”,“DFS POS数据”:“000000 C00”,“GEODATA街道地址”:“2500 LAKE COOK ROAD”,“GEODATA邮政编码”:“600150000”,“GEODATA县代码”:“840”,“GEODATA商店编号”:“10001”,“GEODATA商城名称”:“发现财务SR”,“ISS参考ID”:“72967956”,“ISS处理器参考ID”:“123459875”,“VERSION_INDICATOR”:“03141”}exchange[hydra.hash2Syphon.exc]上的MessageProperties[headers={TypeId=com.discover.dftp.scruber.domain.Message},contentType=application/json,contentEncoding=UTF-8,contentLength=1642,deliveryMode=PERSISTENT,priority=0,deliveryTag=0]),routingKey=[100]org。springframework。amqp。兔子联系CachingConnectionFactory$DefaultChannelCloseLogger[0;39m:通道关闭:通道错误;协议方法:#方法(回复代码=403,回复文本=访问被拒绝-无法发布到vhost'hydra.services'中的内部交换'hydra.hash2Syphon.exc',类id=60,方法id=40)
编辑2。
@Component
@Configuration
public class ListenerContainerFactory {
static final Logger logger = LoggerFactory.getLogger(ListenerContainerFactory.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired
EPPQ2Subscriber receiver;
@Autowired
EPPQ2ChanelAwareSubscriber receiverChanel;
public ListenerContainerFactory(ConfigurableApplicationContext ctx) {
printContainerStartMsg();
}
private void printContainerStartMsg() {
logger.info("----------- Scrubber Container Starts --------------");
}
@Bean
public SimpleRabbitListenerContainerFactory queueListenerContainer(AbstractConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
connectionFactory.setAddresses(rabbitMqConfig.getSubscriberHosts());
connectionFactory.setVirtualHost("hydra.services");
connectionFactory.setPort(rabbitMqConfig.getSubscriberPort());
connectionFactory.setUsername(rabbitMqConfig.getSubscriberUsername());
connectionFactory.setPassword(rabbitMqConfig.getSubscriberPassword());
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
MessageListenerAdapter listenerAdapter(EPPQ2Subscriber receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/*@Bean
MessageListenerAdapter listenerAdapterWithChanel(EPPQ2ChanelAwareSubscriber receiverChanel) {
return new MessageListenerAdapter(receiverChanel);
}*/
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(fatalExceptionStrategy());
}
@Bean
public ScrubberFatalExceptionStrategy fatalExceptionStrategy() {
return new ScrubberFatalExceptionStrategy();
}
}
和最新的主题配置。
@Component
@Configuration
public class TopicConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);
@Autowired
RabbitMqConfig rabbitMqConfig;
@Autowired EPPQ2Publisher eppQ2Publisher;
/**
* Bean Queue
* @return Queue
*/
@Bean
Queue queue() {
return new Queue(rabbitMqConfig.getPublisherQueueName(), false);
}
/**
* Bean TopicExchage
* @return TopicExchage
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(rabbitMqConfig.getPublisherTopic());
}
/**
* Bean BindingBuilder
* @param queue - Queue
* @param exchange - TopicExchange
* @return BindingBuilder
*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(rabbitMqConfig.getRoutingKey());
}
/**
* Caching connection factory
* @return CachingConnectionFactory
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(),
rabbitMqConfig.getPublisherPort());
connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());
return connectionFactory;
}
/**
* Bean RabbitTemplate
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
/* rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if(correlation != null ) {
LOGGER.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
if(ack) {
// this is confirmation received..
// here is code to ack Q1. correlation.getId() and ack
eppQ2Publisher.ackMessage(new
Long(correlation.getId().toString()));
} else {
// no confirmation received and no need to do any
}
}
});
rabbitTemplate.setReturnCallback(
(message, replyCode, replyText,
exchange, routingKey) ->
{
LOGGER.error("Returned: " + message + "\nreplyCode: " +
replyCode
+ "\nreplyText: " + replyText + "\nexchange/rk: " +
exchange + "/" + routingKey);
});
return rabbitTemplate;
}
/**
* Bean Jackson2JsonMessageConverter
* @return Jackson2JsonMessageConverter
*/
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
请检查您的用户权限,如果在启动消费者/生产者时,我们没有exchange名称,它将返回默认值
不清楚你在问什么。如果您的意思是订户用户没有权限写入该交换,则您的接线错误。
不显示订阅者配置。
订户连接工厂bean是否也被称为connectionFactory
?在那种情况下,一方或另一方将获胜。
他们需要不同的豆命名。
我正在使用Azure Functions V2,在Visual Studio 2017中开发。我想为我的函数返回的HTTP错误代码返回自定义内容。当我在本地测试时,这很好用。但是当我将函数发布到Azure时,自定义内容不再返回。 如何发布或配置函数,使其返回自定义错误状态内容?
问题内容: 这是我在laravel 5中测试的ajax(请参阅下文) 和触发链接 和我的路线 但是当我在google chrome中运行控制台时,它给了我错误,并且未返回预期的响应“在laravel 5中返回’成功!ajax’;” POST http://juliver.laravel.com/test 500(内部服务器错误) 我的代码有什么问题/问题?我缺少什么? 问题答案: 虽然这个问题存在
问题内容: 我有一个似乎无法满足的简单要求:我有一个产品页面。产品具有供应商,供应商输入是带有自动完成功能的文本字段。如果用户输入数据库中不存在的供应商,则需要添加它。要添加它,我在.load()页面上有一个DIV并调用了我的/ Vendor / Create控制器方法。该方法的视图使用: 这应该通过ajax发布我的表单,完成后调用Javascript。我遇到的问题是,提交后,我的整个页面都会刷新
我真的不知道我的java代码中的错误在哪里。我必须使用REST API登录Kofax Total Agility。为此,我尝试使用postman测试我的json是否正确构建。以下是我的登录JSON: 我得到了肯定的回答: 到目前为止,一切顺利。为此,我创建了模型: 对于响应: 这些类应该允许我构建 json。现在,我创建了一个方法,用于生成请求对象并期望响应对象。 当我调用这部分代码时,我注意到我
我正在用Spring-amqp和Spring-Rabbit进行多项测试。我的Maven父是Spring引导启动父:1.2.3。释放拉: 在某一点上,其中一个测试失败了,出现了这个错误,我看不出原因: 如果我改为SpringBootStarter父级:1.3.1。发布所有测试都通过了。 在不同的版本中挖掘,似乎我仍然可以用 但是所有的测试都通过了 1.5.0之间是否有任何相关变更。M1和1.5.0。
我有一个在客户端使用自动完成功能的谷歌地图查询——我试图在用户选择“place”对象后将其转移到服务器上——我可以在客户端解析它,但我认为在服务器端进行更容易。我通过浏览器“网络”检查验证它是否通过正确的json对象发送,但服务器端我无法获得正确的对象。我尝试了各种请求的排列。*我可以找到它,但要么一个也得不到,要么一个