当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream-First Kafka消息得到错误“Dispatcher没有订阅者”

长孙谦
2023-03-14

我的应用程序成功发送Kafka消息,但只有在Kafka初始化后。在此之前,我得到错误“Dispatcher没有订阅者”。如何等待订阅者完成频道注册?

  • 17.165 SenderClass已创建
  • 17.816初始化类,@PostConstruct启动PollingTask
  • 24.781PollingTask发送第一条Kafka消息
  • 24.816第一个错误:“Dispatcher没有订阅服务器”
  • 25.778注册MessageChannel my-channel
  • 仍然看到调度程序错误
  • 27.067频道my-channel'有1个订阅者
  • 此后不再出错,消息发送正常

我不知道该怎么做。疯狂的猜测包括:

  1. 将发送代码放在@PostConstruct
  2. 向发件人添加@AutoConfigureBefore(BindingServiceConfiguration.class)
  3. 向SenderClass添加@AutoConfigureAfter(BindingServiceConfiguration.class)
  4. 将@AutoConfigureBefore(BindingServiceConfiguration.class)添加到main
  5. 将@Dependson({“EnableBindingClass”})放置在任务上
  6. 将@Dependson({“ApplicationLifeCycle”})放置在ScheduerClass上,其中ApplicationLifeCycle是一个类,它除了实现SmartLifecycle外什么也不做,并且getPhase返回max_int
  7. 确保整个包的ComponentScan处于打开状态(来自其他SO线程的建议)
  8. 以上各种组合
public interface Source {
  @Output(channelName)
  MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
  @Autowired
  private Source source;

  public boolean send(SomeObject object) {
    return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
  }
@Service
public class Scheduler {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @PostConstruct
  public void initialize() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

现在管用了!在启动发送消息的事情的调度程序中,我从@PostConstruct中的启动事情切换到了SmartLifecycle::start()。

@Service
public class Scheduler implements SmartLifecycle {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @Override
  public void start() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

共有1个答案

宋飞掣
2023-03-14

@PostConstruct发送消息为时过早;上下文仍在构建中。IMPLEMT SmartLifecycle,将bean置于高阶段(integer.max_value),并在start()中执行发送操作。

或者在ApplicationRunner中执行发送操作。

 类似资料:
  • 您可以帮助进行spring集成配置吗。 收到消息后,我收到: 我的代码: 我有类似的JmsInboudGateway配置,用于其他没有.requestChannel和.replyChannel的队列。 如果不是注入请求通道bean,而是用文本名称声明它,得到了这个 以及更多的文字宣传问题。

  • 我为使用spring cloud starter stream rabbit编写了一个测试类,当我只运行receiver并从rabbitmq发送消息时,它可以工作;但是当我运行test sender类时,出现了一个错误:

  • 对调试的任何帮助都将非常感谢。 这是我的堆栈: node.js socket.io express.js passport.js MongoDb react.js 流程: Anna在聊天中发送一条消息(这条消息写入数据库并发布到PubSubtopic“messages”) Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人。 在本例中,与安娜在同一频道的鲍勃

  • 微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi

  • 开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→