我想让我的 Spring 启动应用程序在启动时失败,如果它无法连接到 kafka 代理。我的应用程序仅向主题发布消息。我将这一行添加到我的属性文件中,但到目前为止没有运气。
我使用SpringKafka的KafkaTemplate以异步方式发送消息,并使用回调进行正确的错误处理。 此外,我已将 Kafka 生产者配置为具有最大重试次数 (MAX_INTEGER)。 然而,可能有一些与avro序列化相关的错误,但对于那些重试没有帮助。那么,我如何在不重试的情况下逃避这些错误,但对于其他与代理相关的问题,我想重试?
DeadLetterPublishingRecoverer需要单独的KafkaTemplate吗? 我有一个用于向Kafka发送消息的Kafka模板,然后我有一个带有SeekTo货币错误处理程序和DeadletterPublishingRecoverer的KafkaListnerContainer工厂,这反过来又要求我提供一个KafkaTempate。我真的需要这个另一个模板来处理dlq吗?还是我
我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。 但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的
我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗? 日志显示 KafkaTemplate 仅在我在 触发 方法后连接到 Kafka 主题:
我有一个Kafka监听器,它有两个注册的处理程序,每个处理程序监听相同主题但模式类型不同的消息。监听器使用< code>@SendTo注释将结果转发到另一个主题,EOS由< code>@Transactional启用。 根据文档: 为了支持@SendTo,必须为侦听器容器工厂提供用于发送回复的KafkaTemboard(在其回复模板属性中)。 Kafka模板是一个参数化的类型,需要提供它将要产生的
我们有一个非常简单的Kafka Consumer(v 2.6.2)。它是使用者组中唯一的使用者,并且该组是唯一一个阅读主题的组(有6个分区,其中有大约300万个事件)。Broker也是2.6.x版本 由于我们需要实现一个“只有一次”的场景,我们深入研究了一下,如果我们真的只使用一次写入主题的每个事件。不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过一组分区的偏移量。 消费者除了记录之
我有四个Kafka流应用程序实例使用相同的应用程序id运行。所有输入主题都属于一个分区。为了实现可伸缩性,我通过一个具有多个分区的中间虚拟主题来传递它。我已经将< code>request.timeout.ms设置为4分钟。 Kafka 实例进入 ERROR 状态,而不会引发任何异常。很难弄清楚确切的问题是什么。有什么想法吗?
我们有一个Spring Boot Kafka Streams处理器。由于各种原因,我们可能会遇到需要进程启动和运行的情况,但是没有我们希望订阅的主题。在这种情况下,我们只希望进程“Hibernate”,因为其他活动/环境检查器依赖于它的运行。此外,它是RedHat OCP集群的一部分,我们不希望pod不断地执行崩溃退避循环。我完全理解,在使用有效主题重新启动之前,它永远不会真正做任何事情,但没关系
我无法使用Spring Kafka集成发布消息,尽管我的Kafka Java客户端工作正常。 Java代码在Windows上运行,Kafka在Linux上运行。 我得到以下错误
我使用的是< code>SpringXD,我的配置如下: Spring集成kafka 2.1.0.释放 kafka客户端0.10.0.1 Kafka0.10.x.x Spring-xd-1.3.1.释放 我的 xml 文件中有以下配置: 这是我用来启动/停止频道的Java类: 然后我创建了一个基本流来检查我发送到主题的一些消息是否通过 我检查了创建的文件,它包含我发送到 Kafka 主题的所有消息
我一直试图让入站SubscribableChannel和出站MessageChannel在我的spring boot应用程序中工作。 我已经成功设置了kafka频道并成功测试了它。 此外,我还创建了一个基本的spring-boot应用程序,用于测试从通道添加和接收内容。 我遇到的问题是,当我把相同的代码放在它所属的应用程序中时,消息似乎永远不会被发送或接收。通过调试很难确定发生了什么,但对我来说唯
我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。 application.yaml: 现在,我正在处理错误,有两个问题: > 我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服
我们正在研究优化磁盘空间使用的选项。在这样做的时候,我们查看了 kafka 分区的保留磁盘空间。现在,错误主题使用与源主题相同的分区数。我们实际上并不需要分区错误主题,kafka 为每个分区分配了大量的空间。 所以问题是,是否可以使用最近引入的配置 设置较低的分区计数? 当我们在代码中插入一些较低的分区数时,我们面临一些问题,即消息无法移动到dlq,因为目标分区不匹配。我猜源消息的分区索引被重用以
我正在尝试使用启用批处理模式的spring cloud stream实现DLQ 但有一些疑问: > 如何使用属性配置键/值序列化程序-我的消息是String类型,但KafkaOperations使用的是ByteArraySerializer 在批处理中,有多个消息,但如果第一条消息失败,它会转到DLQ,但看不到下一条消息的处理。 要求-如果批处理失败,我只需要将该消息发送到DLQ,并且应该再次处理