当前位置: 首页 > 知识库问答 >
问题:

Spring boot Sandalone CommandLineRunner不会返回spring-starter-amqp

祝高阳
2023-03-14

我正在构建一个经典的生产者->RabbitMQ->消费者流。所有3个节点都运行在单独的jvm上,甚至单独的主机上

[INFO] |  +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] |  |  +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] |  |  \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] |  |     +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] |  |     |  \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] |  |     +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] |  |     \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile

生产者代码

/**
 * @author louis.gueye@gmail.com
 */
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {

    private final AmqpTemplate template;

    @Override
    public void run(String... args) {
        final Instant now = Instant.now();
        final Instant anHourAgo = now.minus(Duration.ofHours(1));
        final String directExchangeName = "careassist_queues";
        final String fanoutExchangeName = "careassist_schedules_topics";
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.on) //
                    .build();
            final String routingKey = "care.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final String routingKey = "maintenance.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
                    .destination("any.routing.queue") //
                    .message(event) //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .build();
            final String routingKey = "#";
            template.convertAndSend(fanoutExchangeName, routingKey, schedule);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
        });
    }
}

消费者代码(1个监听器)

@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
    public static final String QUEUE_NAME = "care_events";
    @RabbitHandler
    public void onMessage(SensorEventDto event) {
        log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
    }
}

我希望生产者生产然后关机,但相反,java进程无限期挂起

共有1个答案

尹承业
2023-03-14

PlatformBrokerClientConfiguration绑定队列。但我看不到任何地方可以关闭队列。所以挂起实例的原因可能是。

请试试这个。

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(EmployeeDataProduceApp.class, args)));
  }
 类似资料:
  • 问题内容: 我在尝试使用history.pushstate事件时遇到了一些问题。我进行了设置,以使页面的url是通过AJAX加载的页面的实际URL,并且可以正常工作。 我了解它应该自动创建历史记录,以加载先前加载的页面。不幸的是,帽子没有发生,当我单击后退时,URL确实会更改,但页面不会更改。你能帮助我吗?这是我的简化代码: 问题答案: 想通了,我刚刚添加: 到页面末尾

  • 我想在我的spring boot应用程序中使用Redis repository,但每次我想重新使用findById或findByName等方法时,每次repository都返回null,但当我运行findAll或save等函数时,一切都正常工作。这是我的代码:我主要使用了以下注释:@EnableRedisRepository(“com.redis.repository.redis”)我创建了Red

  • 问题内容: 我有一个非常简单的,我正在尝试设置自定义错误消息。但是由于某种原因,该错误没有显示出来。 这是我的控制器: 这是我得到的答复: 我正在传递JSON,但我没有验证任何内容,我只是在尝试设置自定义消息。如果更改状态代码,则会在响应中看到它,但是始终为空。 为什么这不按预期工作?这是一个非常简单的示例,我看不到可能缺少的内容。当我调试代码时,我可以看到错误消息设置了所有字段。但是由于某种原因

  • 我们有一个Spring事务回滚问题,其中回滚似乎不起作用 在用注释的服务层方法中,我调用三个不同的类来插入3条记录 中间插入从第四个表执行get以填充描述字段,但此get失败。我希望第一次插入会回滚,但它似乎没有发生 几点: 获取方法抛出运行时异常 我们使用和中定义的。Bean是在中创建的,它被导入到 在层 中没有 注释 我们已经使用了

  • 问题内容: 我有存储库“ ClientRepository”: 当我请求http:// localhost:8080 / clients / 1时, 服务器响应 响应具有预期的链接。 当我在另一个控制器中调用存储库继承的方法findOne时 它回应 为什么在第二种情况下响应不包含链接?如何使Spring添加他们的响应? 问题答案: HATEOAS功能仅对于带有注释的Spring数据jpa存储库可用

  • 我有一个查询和一张表。查询“EmployeeTraining”包括员工姓名、参加的培训课程、参加的日期和有效期。我希望LeftJoin返回的是所有可用的培训课程,以及每个员工缺少的课程,因此在essense中,任何空值都是空的。 “应用培训”共有5项记录。一些员工只参加了4/5的课程,在“员工培训”记录中只有4项记录。left join不应该在“DateTake”和“Expiration”中使用空