当前位置: 首页 > 知识库问答 >
问题:

带有Kafka Binder的Spring Cloud Stream: /bindings执行器API不会停止生产者

赵英资
2023-03-14

我有一个带有Actuator和Kafka活页夹的Spring Cloud Stream项目。我正在探索绑定/执行器,并试图停止生产者作为一个练习。我通过curl提出以下POST请求:

curl-v'localhost:8081/actuator/bindings/producer-out-0'-H'内容类型:应用程序/json'-d'{"state":"STOPPED"}'

实际结果:查询返回204。生产者的状态(从GET /actuator/bindings/producer-out-0看到)现在是停止。然而,生产者仍在产生消息,这可以从日志记录和消费者关于该主题的活动中看到。

预期结果:我希望制作人停止生成消息。(我还尝试使用暂停状态,该状态也返回204,但错误日志表明此生产者无法暂停。)

我是不是误解了这个执行器的工作原理?当一个制作人被停止时,是否期望S. C. S.会继续对该制作人进行投票?我知道的唯一留档在这里,但据我所知,它没有回答我的问题。

背景:

我使用的是SpringBootStarter父版本2.5.3,并将StarterWeb和StarterActuator列为依赖项。我想我没有遗漏任何东西。

这是生产者/消费者对。正如你所看到的,我使用的是一个可调查的供应商。

@Configuration
@Profile("numbers")
public class NumberHandlers {
  private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);

  @Bean
  public Supplier<Integer> producer() {
    // Needed an effectively-final mutable integer. Side-bar comments welcome. :P
    var counter = new AtomicInteger();
    return () -> {
      var n = counter.getAndIncrement();
      LOGGER.info("Producing number: " + n);
      return n;
    };
  }

  @Bean
  public Consumer<Integer> consumer() {
    return it -> LOGGER.info("Consuming number: " + it);
  }
}

当我传入编号配置文件时,这些都处于活动状态。我的配置如下。

application.yml:

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${env.kafka.bootstrapservers:localhost}
management:
  endpoints:
    web:
      exposure:
        include: 'bindings'

...application-numbers.yml:

spring:
  cloud:
    stream:
      poller:
        fixedDelay: 5000
      bindings:
        producer-out-0:
          destination: numbers-raw
          producer:
            partitionCount: 3
        consumer-in-0:
          destination: numbers-raw
      kafka:
        bindings:
          producer-out-0:
            producer:
              topic.properties:
                # These look weird because they're done as an exercise.
                retention.bytes: 10000
                retention.ms: 172800000
    function:
      definition: producer;consumer

我正在本地主机环境中使用docker compose kafka和zookeeper在主机网络上进行测试。

谢谢!

共有1个答案

呼延承平
2023-03-14

当前不支持生产者绑定的生命周期控制,仅支持使用者绑定。

 类似资料:
  • 我正在使用最新的JMeter 3.3运行最新的Selenium WebDriver测试,一切似乎都很好, 我只有1个线程配置了1秒的爬升和1个循环计数,但是在webdriver执行完成后,JMeter并没有停止整个测试执行,而是无限期地继续执行测试,我不确定为什么会发生这种情况,我正在使用chromedriver,我所做的所有配置都是按照JMeter文档进行的,我还添加了命令位于脚本末尾。请参考附

  • 问题内容: 我已经编写了一个包含循环的“ Hangman”游戏,不断询问用户字母,直到他们解决单词或耗尽生命。奇怪的是,在满足条件时循环不会停止。 这是包含循环的游戏逻辑的实现: 可变结果是从方法中获取返回值。这是实现。 难道是我返回导致循环不断循环的值的方式吗?我这样声明了这些值: 问题答案: 逻辑上,根据德摩根定律, 是相同的 这永远是一个真实的表达。 条件可能应该是 当应用与上述相同的法律时

  • 我通过创建固定数量的线程来使用执行器服务来进行HTTP GET数据检索。 当Tomcat停止时,我们会出现以下错误: 严重:web应用程序[/viewer]似乎已启动名为[ThreadExecutor_51616156]的线程,但未能停止该线程。这很可能会造成内存泄漏。 这是真的吗?在没有这些服务错误的情况下,如何正确停止tomcat。

  • 问题内容: 我正在尝试使用Linux下的ProcessBuilder类将mp3文件解码为wav文件。由于某些原因,该过程不会停止,因此我必须手动取消它。 有人可以给我一个提示。我认为引用的代码很容易重现: jstack的输出 问题答案: 您需要清空进程的输出(通过)和错误(通过)流,否则可能会阻塞。 引用过程文档: 由于某些本机平台仅为标准输入和输出流提供有限的缓冲区大小,因此未能及时写入子流程的

  • 我使用Drools6.1.0.final和一个无状态会话来激发所有规则。 在我的.drl文件中有以下两个简单的规则。 我想这样做:如果第一个规则为true,则不执行任何其他规则,并从。drl文件退出。 我试着如下: > 试图在第一个规则的Then块中抛出运行时异常,但由于NullPointerException的原因,它仍然会在第二个规则中失败。 添加了drools.halt()和kcontext

  • 问题内容: 我有一个布尔变量来控制服务器的执行(启动/停止): private boolean ecoute = true; 这是我的课: 当我单击按钮将变量传递给false时,我的线程退出无限循环,什么也没发生: 我也换了 新东西… 有什么建议吗? 问题答案: 您的ServerSocket是导致循环不终止的原因。即使’ecoute’为假,server.accept()也会阻塞,直到满足以下两个条