我正在使用Spring Cloud Stream和Kafka Binder批量消费来自一个Kafka主题的消息。我正在尝试实现一个错误处理机制。根据我的理解,我不能在批处理模式下使用Spring Cloud Stream的< code>enableDLQ属性。 我找到了和,以重试并从spring-kafka文档发送失败消息。但我无法理解如何按照功能编程标准将记录发送到自定义DLQ主题。我看到的所有
我遇到了一个问题,我需要将一个现有的主题(< code>source)重新分区到一个新的主题(< code>target),这个新主题具有更大的分区数量(是以前分区数量的倍数)。 主题是使用Kafka Binder使用Spring Cloud Stream写入的。主题正在使用KStreams应用程序写入。 < code>source主题中的记录基于标题进行分区,其中< code>key=null。
我有一个使用Kafka Streams API的应用程序。我在当地工作时没有问题。我想连接到远程Kafka代理进行阶段测试。远程Kafka代理设置为使用GSSAPI sasl机制并使用Kerberos。我运行用java编写的Streams应用程序时出错。在我查找错误消息后,我找到了答案,但仍然有问题。 错误消息;获取相关id为3的元数据时出错:{[APPID]-KTABLE-AGGREGATE-S
我打算将StateRestoreListener与Spring Cloud Kafka Streams绑定器一起使用。我需要监视应用程序的容错状态存储的恢复进度。汇流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监控运行时状态。 为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.
我使用的是Spring Cloud Stream Kafka,但我有一个特殊的服务,它有一个复杂的启动依赖(长时间运行),可能非常脆弱,所以我希望延迟Kafka(消费者)绑定,直到确认成功初始化。 我怎样才能做到这一点?
我已经配置了一个基于Spring启动的应用程序与Spring云流。我试图在KStream上工作,但我不断得到错误"java.lang.IllegalArgumentExc的:方法必须是声明性的"。有人能帮我了解如何配置这个吗?我查找了StreamListener留档,但我无法让它工作。 https://docs.spring.io/spring-cloud-stream/docs/Elmhurst
参考这个解决方案,我的Spring云流应用程序.yml文件具有以下配置: 在我的主应用程序中,类注释为@EnableB 问题是,在应用程序第一次启动时,状态存储尚未完全填充,也尚未准备就绪(例如空状态存储),但消息仍然更早到达,并且正在由Kafka流拓扑处理,结果出乎意料。 我们如何确保在第一次启动应用程序时,在@StreamListener中定义的任何流处理拓扑开始处理传入消息之前,所有或特定(
这看起来是个不错的资源。但是我甚至不能成功地运行官方的领事docker映像(在服务器或代理模式下)。我可以让这个工作(这里用)。但如果可能的话,我想用官方形象。 有没有人拿到过与Kafka和动物园管理员一起工作的官方领事形象? A)首先,我想让它在docker-compose中工作。这是我的docker-compose.yml文件,我试图让Kafka使用Consul来发现Zookeeper节点。
我有一个运行DCOS的小集群。我能够成功安装Kafka遵循本指南。跑步 谢谢 AJ
我正在创建一个Kafka Streams应用程序,我的主题数据来自Protobuf。我们可以为此创建Java代码绑定。然而,我正在努力使用正确的serde来使用来自主题的数据。有人能告诉我我做错了什么吗。 以下是我使用的属性定义: 我的Serde课 这就是我创建KStream和KTable实例的方式: 然而,我得到以下错误: 组织。阿帕奇。Kafka。溪流。错误。StreamsException:
我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?
我有一个场景,我已经在不同的节点上部署了4个Kafka消费者实例。我的主题有4个分区。现在,我想配置消费者,使他们都从主题的不同分区获取。 我知道一个事实,如果消费者来自同一个消费者组,他们会确保分区被平分。但在我的情况下,他们不在同一组。
我将下面的数据发布到kafka并通过Spring集成通道接收并转换为Log对象,我如何使用Spring集成转换器将下面的数据转换为Log对象?感谢这里的任何帮助 '日志(客户端键=字符串,有效负载=字符串)” 这是通道适配器代码 当我尝试使用下面的方法在服务激活器中进行转换时 它的失败 com.fasterxml.jackson.core.JsonParseException: 无法识别的令牌“日
我们正在使用与Kafka消费者和生产者Spring。我们正在生成大小为905字节的消息。我们正在序列化消息,并试图为下一个使用者反序列化它。 消息有效负载类示例: Application.Properties 当我们接受字符串格式的消息负载时,Consumer工作得很好,但当我们将Consumer中的负载反序列化为对象时,我们面临着问题。下面的错误被抛出相同