当前位置: 首页 > 知识库问答 >
问题:

使用spring集成在spring批处理中侦听消息的从属配置

岳景明
2023-03-14

我能有一份只有奴隶而没有主人的工作,听rabbitmq队列吗?我想在spring boot应用程序中使用spring批处理和spring集成来侦听队列并以面向块的方式处理消息。

我想使用Michael Minella(https://www.youtube.com/watch?v=30Tdp1mfR0g)在Spring批处理的远程组块示例中解释的chunkProcessorChunkHandler配置,但没有主配置

下面是我的工作配置。

@Configuration
@EnableIntegration
public class QueueIntegrationConfiguration {

  @Autowired
  private CassandraItemWriter cassandraItemWriter;
  @Autowired
  private VendorProcessor vendorProcessor;

  @Autowired
  ConnectionFactory connectionFactory;

  @Bean
  public AmqpInboundChannelAdapter inboundChannelAdapter(
      SimpleMessageListenerContainer listenerContainer) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inboundQueueChannel());
    adapter.setAutoStartup(true);

    return adapter;
  }

 @Bean
  public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageConverter jsonMessageConverter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
        connectionFactory);
    container.setQueueNames("ProductStore_Partial");
    container.setAutoStartup(true);
    container.setMessageConverter(jsonMessageConverter);

    return container;
  }

  @Bean
  @ServiceActivator(inputChannel = "ProductStore_Partial")
  public ChunkProcessorChunkHandler chunkProcessorChunkHandler()
      throws Exception {
    SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor(vendorProcessor,
        cassandraItemWriter);
    chunkProcessor.afterPropertiesSet();

    ChunkProcessorChunkHandler<Vendor> chunkHandler = new ChunkProcessorChunkHandler<>();
    chunkHandler.setChunkProcessor(chunkProcessor);
    chunkHandler.afterPropertiesSet();

    return chunkHandler;
  }

  @Bean
  public QueueChannel inboundQueueChannel() {
    return new QueueChannel().;
  }

}

下面是我的申请表。用于spring boot的java类。

@SpringBootApplication
@EnableBatchProcessing
public class BulkImportProductApplication {

  public static void main(String[] args) {
    SpringApplication app = new SpringApplication(BulkImportProductApplication.class);
    app.setWebEnvironment(false);
    app.run(args).close();
  }

}

根据我对Spring集成的理解,我有一个用于监听队列中消息的AmqpIn边界通道适配器。一个服务激活器、一个无限队列通道、自动配置的项目处理器和项目写入器。我不确定我在这里错过了什么。

批处理作业启动,使用队列中的一条消息,并获得cancelOk,而我的作业在不处理该消息的情况下终止。

我也分享我的调试日志,如果这会有所帮助。

2017-12-04 09:58:49.679  INFO 7450 --- [           main] c.a.s.p.b.BulkImportProductApplication   : Started BulkImportProductApplication in 9.412 seconds (JVM running for 10.39)
2017-12-04 09:58:49.679  INFO 7450 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@31c88ec8: startup date [Mon Dec 04 09:58:40 PST 2017]; root of context hierarchy
2017-12-04 09:58:49.679 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'inboundChannelAdapter'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'listenerContainer'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationHeaderChannelRegistry'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'queueIntegrationConfiguration.chunkProcessorChunkHandler.serviceActivator.handler'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'queueIntegrationConfiguration.chunkProcessorChunkHandler.serviceActivator'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2017-12-04 09:58:49.680  INFO 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Asking bean 'inboundChannelAdapter' of type [class org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter] to stop
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
2017-12-04 09:58:49.814 DEBUG 7450 --- [pool-1-thread-5] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.814 DEBUG 7450 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'[B@358a5358(byte[618])' MessageProperties [headers={__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ProductStore, receivedRoutingKey=, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, consumerQueue=null])
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[json__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[618], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_redelivered=false, id=a4868670-240f-ddf2-8a8c-ac4b8d234cdd, amqp_consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512410329815}]
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[618], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_redelivered=false, id=a4868670-240f-ddf2-8a8c-ac4b8d234cdd, amqp_consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512410329815}]
2017-12-04 09:58:49.853  INFO 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2017-12-04 09:58:49.853 DEBUG 7450 --- [pool-1-thread-6] o.s.a.r.listener.BlockingQueueConsumer   : Received cancelOk for tag amq.ctag-nWGbRxjFiaeTEoZylv6Hrg (null); Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035]
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://admin@xxxx:5672/,2)
2017-12-04 09:58:50.027  INFO 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2017-12-04 09:58:50.027 DEBUG 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Bean 'inboundChannelAdapter' completed its stop procedure

我错过了什么?为什么我的信息会被处理?如果我遗漏了什么,请纠正我?也可以随意询问您认为有助于分析此处情况的任何其他配置。

编辑:在删除手动关闭应用程序上下文的代码(app.run(args)。关闭())后,我能够接收消息,但看起来它们在成功检索后丢失了。共享此行为的调试日志。

2017-12-04 14:39:11.297 DEBUG 1498 --- [pool-1-thread-5] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@7219ac49: tags=[{amq.ctag-Z8siptJMdxGU6sXdOHkVCA=ProductStore_Partial}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@6df20ade Shared Rabbit Connection: SimpleConnection@7ba63fe5 [delegate=amqp://admin@xxxx:5672/, localPort= 51172], acknowledgeMode=AUTO local queue size=0
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'[B@347c8f87(byte[624])' MessageProperties [headers={__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ProductStore, receivedRoutingKey=, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, consumerQueue=ProductStore_Partial])
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[json__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[624], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_consumerQueue=ProductStore_Partial, amqp_redelivered=false, id=540399a5-62a6-7178-2524-e274bad4ed13, amqp_consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512427151297}]
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[624], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_consumerQueue=ProductStore_Partial, amqp_redelivered=false, id=540399a5-62a6-7178-2524-e274bad4ed13, amqp_consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512427151297}]
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7219ac49: tags=[{amq.ctag-Z8siptJMdxGU6sXdOHkVCA=ProductStore_Partial}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@6df20ade Shared Rabbit Connection: SimpleConnection@7ba63fe5 [delegate=amqp://admin@xxxx:5672/, localPort= 51172], acknowledgeMode=AUTO local queue size=0

这会继续重复,并消耗新消息,但这些消息不会得到处理,也不会使用提供的itemWriter写入数据存储。现在想一想,既然我没有在这段代码中的任何地方提供tasklet/stepbean引用,这是我在这里遗漏的东西吗?

共有1个答案

葛泳
2023-03-14

app。运行(args)。close()

您正在显式关闭应用程序上下文,这会关闭所有内容。

关闭组织。springframework。上下文注释。注释配置应用程序上下文

 类似资料:
  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb

  • 我试图在spring boot应用程序中使用kafka消费者批处理。我可以看到一些例子,其中我们有一个kafka配置类,其中,被配置并 已启用。我只是想知道这是否可以在没有工厂类的情况下实现,即在应用程序中使用SpringKafka集成属性。yml。早些时候,我定义了一个工厂,并通过应用程序将其替换为SpringKafa集成属性。yml用于简洁的代码。我试图了解后者是否有局限性,使用配置类是否更可

  • Spring Integration Java DSL Reference和Spring Batch Java配置文档说明了如何将Java配置用于Spring Integration和Spring Batch。 但它们没有说明如何为Spring批处理集成配置它。如何使用DSL配置JobLaunchingGateway? 干杯,曼诺

  • 我有一个自定义跳过管理步骤。我定义了一个跳过策略,其源代码如下: 我的Skip侦听器如下: 我的步骤定义如下: 我想跳过一个约束冲突异常。但是,不会调用侦听器或跳过策略。

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。