当前位置: 首页 > 知识库问答 >
问题:

冲刺云流Kafka流绑定器处理器应用程序卡住

范京
2023-03-14

我有以下Spring Cloud Stream Kafka Streams Binder 3. x应用程序:

当我通过这个应用程序运行X消息时,通过使用@SpringBoottest@Em申明Kafka从联调将它们发布到Topic1,点1和点2的消息计数是相等的,正如我所期望的那样。

当我使用连接到Kafka代理的实时应用程序做同样的事情时,点1和点2的计数仍然显着不同:Count1

Kafka工具显示< code >处理器2消费者在< code >主题2上有很大的滞后,并且该滞后保持不变(在我停止发布消息后不会改变)

处理器2由以下部分组成

    < li>flatTransform状态转换器 < li >聚合器 < li >其他下游步骤

测试和实时模式下的不同行为以及实时模式下的延迟没有下降的原因是什么?

我已经彻底比较了测试和实时应用程序中活动的所有应用程序属性值,它们完全等价。

在这两种情况下,所有主题只有一个分区。

共有1个答案

花玄裳
2023-03-14

在我的情况下,原因是Spring Cloud Stream应用程序自动创建的主题的默认7天保留设置。

我的输入流中的消息跨越了8年,我使用的是自定义TimestampExtractor。

在我手动将主题配置为较长的保留时间后,问题已解决:

/usr/bin/kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic2 --add-config retention.hours=87600

或者为整个Kafka代理设置log.retention.hours

 类似资料:
  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 假设我希望实现与在spring-amqp函数周围使用时基本相同的功能: 在Rabbit绑定器中使用Spring Cloud Stream时,是否必须手动将消息访问到RabbitMQ? 如果是,如何实现?

  • 因此,我们的想法是从2015年开始对现有的meteor应用程序进行归档。该应用程序分为两部分(后端和前端)。我已经制作了一个巨大的bash脚本来处理所有旧的依赖项。。。软件依赖项。。。等等。我只需要运行脚本,我们就可以让应用程序运行了。但现在的想法是为该项目创建一个docker形象。我应该如何做到这一点?我应该创建一个空的docker映像并在那里运行脚本吗?。谢谢我是docker的新手。

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。 主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。 这些是主题 application.yml我正在使用cp-all-in-one-community作为docker-file 但现在我得到以下错误: