当前位置: 首页 > 知识库问答 >
问题:

Sping Cloud Stream Kafka-以批处理模式消费消息,并作为单个处理消息发送出去

慕铭
2023-03-14

我试着为下一个spring cloud流版本准备我们的应用程序。(当前使用3.0.0.rc1)。用Kafka的活页夹。

现在我们收到一个消息,处理它,并将它重新发送到另一个主题。单独处理每个消息会导致对数据库的大量单个请求。

在3.0.0版本中,我们希望以批处理的方式处理消息,这样我们就可以在批更新中保存数据。

在当前版本中,我们使用了@enablebinding、@streamlistener

@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
    publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}

void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
    for ( final ExchangeableItem exchangeableItem : item ) {
        final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .build();
        messageChannel.send( message )
    }
}

我已将使用者属性设置为“批处理模式”,并将签名更改为list<>,但这样做会导致接收到list 而不是预期的list 。Ofc可以在之后进行转换,但这感觉像是“meh”,我认为这是应该在监听器被调用之前发生的事情。

然后我尝试了(新的)函数版本,使用效果很好。我也喜欢这个简单的处理版本

@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
    return articleService::updateStockInformation;
}

但输出主题现在将对象列表作为一条消息接收,并且随后的使用者无法正常工作。

我想我错过了什么...

我是否需要添加某种MessageConverter(对于注释驱动的版本),或者是否有一种方法可以在函数版本中实现所需的行为?

共有1个答案

祁宾白
2023-03-14

IIRC,批处理模式只支持函数。

可以不使用consumer > 并像当前在StreamListener中所做的那样将消息发送到通道吗?

 类似资料:
  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 我正在网上阅读苹果的文档 处理本地和远程通知 在我看来,它有相互矛盾的说法。有人能澄清这些困惑吗?现在让我们严格地说一下远程通知(与本地通知相比)。 文档称,如果按下通知上的操作按钮,它将调用application:didfishlaunchingwithoptions并传递通知负载。之后,它会说,如果应用程序在前台运行,它会通过应用程序:DidReceiveMemoteNotify:发送通知。这

  • 想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息

  • 前言 在消息处理后,会有内存回收的过程this.accumulator.deallocate(batch); 调用deallocate()

  • 在FLTK中是通过Fl_Widegt::handle(),虚拟函数来处理系统的消息。我们可以查看Fltk的源代码来分析系统是怎样处理一些系统消息的,如按钮的消息处理 /******************************************************* Fl_Button中处理消息的代码,省略了具体的处理代码 *******************************