我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。
现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。
在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。
在当前版本中,我们使用了@enablebinding、@streamlistener
@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}
void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
for ( final ExchangeableItem exchangeableItem : item ) {
final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.build();
messageChannel.send( message )
}
}
我已将使用者属性设置为“批处理模式”,并将签名更改为list<>
,但这样做会导致接收到list
而不是预期的list
。Ofc可以在之后进行转换,但这感觉像是“meh”,我认为这是应该在监听器被调用之前发生的事情。
然后我尝试了(新的)函数版本,使用效果很好。我也喜欢这个简单的处理版本
@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
return articleService::updateStockInformation;
}
但输出主题现在将对象列表作为一条消息接收,并且随后的使用者无法正常工作。
我想我错过了什么...
我是否需要添加某种MessageConverter(对于注释驱动的版本),或者是否有一种方法可以在函数版本中实现所需的行为?
IIRC,批处理模式只支持函数。
可以不使用consumer
并像当前在StreamListener中所做的那样将消息发送到通道吗?
>
我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程
我正在网上阅读苹果的文档 处理本地和远程通知 在我看来,它有相互矛盾的说法。有人能澄清这些困惑吗?现在让我们严格地说一下远程通知(与本地通知相比)。 文档称,如果按下通知上的操作按钮,它将调用application:didfishlaunchingwithoptions并传递通知负载。之后,它会说,如果应用程序在前台运行,它会通过应用程序:DidReceiveMemoteNotify:发送通知。这
想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这
我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息
前言 在消息处理后,会有内存回收的过程this.accumulator.deallocate(batch); 调用deallocate()
在FLTK中是通过Fl_Widegt::handle(),虚拟函数来处理系统的消息。我们可以查看Fltk的源代码来分析系统是怎样处理一些系统消息的,如按钮的消息处理 /******************************************************* Fl_Button中处理消息的代码,省略了具体的处理代码 *******************************