我正在尝试使用Spring Cloud Stream处理发送到Azure Event Hub实例的消息。这些消息应该路由到Kafka集群上运行时根据消息内容确定的租户特定主题。出于开发目的,我通过Docker在本地运行Kafka。我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这个场景中所需要的。 然而,让我的解决方案工作的唯一方法是使用。我宁愿使用动态目标标头,这样处理器就可以被
Kafka版本:0.10.2.1,
我遇到的问题是使用Spring Cloud数据流(SCDF)通过环境移动数据以将数据移动到正确的位置。以下是情况: 我 有一个Kafka的生产版本,我只能订阅(只读)。(3 节点群集:节点 1:9092,节点 2:9092,节点 3:9092) 我有一个在 Kubernetes 集群中运行的 SCDF 实例,该集群使用 RabbitMQ 主干(全部在一台服务器上) 我有一个 Node Kafka
我在Spring云数据流流中遇到 HTTP 源应用程序的问题。 当我尝试将具有巨大体量的 HTTP 请求发布到 HTTP endpoint时,就会发生这种情况。 我尝试在部署中设置以下属性,但没有帮助: 春云启动器HTTP Kafka源码app使用的版本是3.1.1 有人知道如何解决这个问题吗? 这可以以如下所示的独立方式进行最低限度的复制 从https://mvnrepository.com/a
是否可以为Apache Kafka的Spring Cloud DataFlow配置身份验证?在哪里可以看到示例? 谢谢你。
我正试图使用Spring Cloud数据流,通过bridge应用程序将两个Kafka集群(本质上是一个奇特的MirrorMaker实例)连接起来。如文档中所述,我定义了两个绑定器。Kafka-qa1应该是默认的,kafka-qa2可以在定义或部署属性中作为输出绑定器提供,例如:app . bridge . spring . cloud . stream . bindings . output .
我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0
对于我的使用,它创建了额外的桥接实例和另外两个 kafka 主题,这会导致数据在 so - 之间流动的延迟问题 - 创建数据流的用例 1)对于新记录流应该如此= 2)对于更新记录流程应如此= 问题: 1)这里数据将从 so - 这里数据使用4个队列从源流向接收器,与源数据流相比,延迟问题- 这将创建so和foo作为额外的队列,这将消耗我的资源和硬件。 2)这也创建了额外bridge实例,我可以实时
原生 Kafka API 允许我们使用 StreamsBuilder 创建和添加状态存储: 我想使用Spring Cloud Streams做同样的事情,但无法弄清楚访问以添加商店的方法。 我已尝试检索文档中所述的,希望可以从中获取对象,但该bean似乎不可用: org.springframework.beans.factory.异常:没有名为流构建器进程的bean可用 无论如何,我什至不确定这是
我刚刚意识到,一旦在 Kafka 主题上收到的消息会丢失标题。有没有办法在将消息发布到 Kafka 主题然后在消费者处阅读之前向消息添加标头?我正在使用Java 11,Spring Cloud Hoxton.SR6,Spring 2.2.4,kafka_2.13-2.6.0。提前感谢!
我正在为kafka设置一个架构注册表服务器。我已经使用了融合模式注册表,一切都很好,但是后来我看到,你可以用更少的麻烦设置一个默认的,Spring的。所以我做了,但我有点惊讶,它似乎更难控制模式,它在命名方面做了奇怪的事情(EntityCamelCase到entitycamelCase),没有像融合一样分离x值和x键。模式的版本会自动颠簸,我甚至不知道兼容模式是什么。 所以我想找到更多的东西,但是
在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1
Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时
我创建了以批处理方式接收消息的ConsumerConfig: Spring启动配置: 侦听器类 : 我在处理消息后使用手动确认。 我找到了一些调试日志: 在上面的调试日志中,***获取偏移量发生在偏移量提交之前,该偏移量未提交,因此它返回offset_OUT_OF_RANGE,之后使用者无法接收任何消息。是否有任何方法处理使用者代码中的此错误,或如何仅在提交后获取偏移量****
我有两个Kafka监听器组件,每个组件监听不同的主题并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端id吗?还是必须使用不同的客户端id?如果客户端id必须不同,我想了解一个可以有效使用客户端id的用例。