https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model显示了一个示例,其中可以使用属性设置输入主题。 现在我想使用依赖注入,例如。 启动应用
在使用Kafka的Spring Boot中,我可以如下设置ConsumerFactory的属性: } 使用Kafka Streams,我可以在代码中设置属性,如下所示: 使用Spring Cloud Streams和Kafka Streams时,所有属性似乎仅通过应用程序输入。属性或应用程序。资源文件夹中的yml文件,如 在将Spring Cloud Streams与Kafka Streams一起
我在Spring Boot应用程序上工作,它使用Spring Cloud Stream与Kafka集成。 我需要以编程方式暂停接收来自Kafka的消息。我知道通过执行器endpoint(文档)管理绑定生命周期的可能性。所以我可以自动装配并使用它的公共方法。但在我看来有点奇怪... 是否有更好的方法以编程方式管理绑定的生命周期?
我正在尝试实现一个PollableConsumer,当我在SpringBoot应用程序中遇到endpoint时,它会在特定条件下开始轮询来自Kafka的消息。 在特定条件下,我尝试了多种触发民意调查的方法,但显然,只有当它不断地从Kafka主题进行民意调查时,它才会起作用。(就像spring cloud stream文档中的所有示例一样) 我正在寻找这样的东西: 当我碰到这样的endpoint时可
我正在使用Spring Cloud Stream制作Kafka流。在消息处理应用程序中,可能会产生错误。因此,不应再次提交和重试消息。 我的申请方法- 这里,如果DAO insert方法失败,则不应将消息发布到输出主题,并应重试处理同一消息。 我们如何配置kafka streams binder来执行此操作?。非常感谢您的帮助。
曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法
我没有看到使用执行器的kafka消费者/生产者指标 /actuator/metrics.只显示了基本的jvm指标。 我的应用程序工作正常,绑定看起来也可以,在 /actuator/bindings.可见 下面是我的流配置 下面是我的配置。yml公司 我使用的是SpringBoot 2.2.2。发布,SpringCloud Hoxton。SR4和spring cloud stream活页夹kafka
Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0 我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商
我想移动在下-这可能吗?我想到了类似于或,但它不起作用。
我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedK
我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R
我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法
我的理解是spring-kafka是为了与Kafka Client API交互而创建的,后来,spring-cloud-stream项目是为了“构建与共享消息系统连接的高度可扩展的事件驱动微服务”而创建的,该项目包括几个绑定器,其中一个是允许与Kafka Stream API交互的绑定器: 所以我很清楚,如果我想与Kafka流API交互,我将使用Spring-Cloud-Stream方法和适当的绑
我正在使用Spring云流和Kafka绑定器使用SASL连接到Kafka集群。SASL配置如下所示: 我想以编程方式/在运行时更新用户名和密码,如何在Spring Cloud Stream中使用Spring Kafka binders做到这一点? 旁注:使用BinderFactory,我可以在其hashmap中看到这些配置,但我想知道如何在运行时更新配置,以便这些更改也反映在连接中?
我们有一个使用Kafka的Spring-Cloud-Stream应用程序。要求是在生产者端,消息列表需要放在事务的主题中。同一应用中的消息没有消费者。当我使用spring启动事务时。云流动Kafka。粘合剂交易事务id前缀,我面临的错误是调度程序没有订阅服务器,并且从主题获得的分区总数少于配置的事务。应用程序无法在事务模式下获取主题的分区。你能告诉我是否遗漏了什么吗。明天我会发布详细的日志。 谢啦