kafka jdbc接收器连接器是否支持将其使用的内容写入不同的主题。我正在寻找一种传递机制,如下图所示。如果没有,我可以链接一个接收器和源(从接收器写的地方读取),但我认为这不会有那么好的性能。也许我可以修改现有的接收器连接器来实现这一点?
我正在测试使用Apache Camel在Kafka(0.8.2.1)上发送消息的simple producer。我在Camel中使用java DSL创建了endpoint。 现在我想使用Apache Camel提供的ProducerTempalte在kafka上发送消息。但我在运行该程序时得到以下错误注意:Zookeeper和Kafka已启动并可以使用Kafka控制台生成/消费消息。 我猜这些属性
我是Quarkus的新手。我正在尝试使用quarkus reactive编写一个RESTendpoint,它接收输入,进行一些验证,将输入转换为列表,然后将消息写入kafka。我的理解是,将所有内容转换为Uni/Multi,将导致以异步方式在I/O线程上执行。在intelliJ日志中,我可以看到代码在executor线程中以顺序方式执行。Kafka写入在其自己的网络线程中按顺序进行,这会增加延迟。
我正在尝试制作一个定制的spring cloud stream活页夹,但它无法自动注册: 活页夹实现: 配置类: Spring活页夹文件: application.yml 我已经按照spring cloud stream的指导方针创建了一个custome活页夹,但这不起作用。此外,使用@Configuration创建绑定bean会禁用我在类路径上添加的kafka绑定。
我尝试在使用邮件时进行以下错误处理: 如果出现序列化错误:在DLT中发送消息 我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下: 现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT- 在我的中,我有一个,捕获并重新捕获。 我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。 如果我删除errorH
我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA
我有一个Kafka消费者。如果消费者未能阅读任何信息,我需要将其发送到死信主题。我使用的是Spring cloud Kafka stream,我在这样的配置中启用了DLQ。 但我的常规消费者话题与DLQ话题不同。有可能做到这一点吗?如果是,你能指导我完成配置吗?
我正在使用Spring云流Kafka活页夹编写Kafka生产者和消费者。我想在生产者和消费者中访问以下信息 a) 主题 b)分区 b)偏移 我确实检查了文档,但并没有真正找到在哪里完成这项工作。我在文档中看到的只是指定一个作为生产者/消费者配置的一部分。有人能告诉我这到底是怎么做到的吗?
你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些
试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。
我试图将消息设置为示例留档。我所有的服务和类都装饰有组件或服务注释,但我仍然得到以下例外: org.springframework.messaging.MessageDeliveryException:Dispatcher没有频道unknown.channel.name的订阅者 应用程序设置增加了:cloud:stream:kafka:binder:brokers:localhost zk nod
Kafka SSL对等方未经身份验证,在客户端连接代理SASL端口时返回匿名错误,它允许在明文或SSL端口上连接。 我在启用SSL的windows系统中使用了kafka 2.2.0,kafka代理明文在9092上运行,SSL在9093上运行。最重要的是,将SASL配置为紧急停堆机制,侦听器端口为9094,当producer作为kafka控制台producer运行时,出现问题摘要中提到的错误。bat
我正在尝试设置代理间SSL(而非客户端)身份验证,并不断看到以下错误: 我的server.properties是:
我知道生产者/消费者需要与经纪人交谈以了解分区的领导者。经纪人与zk交谈以告诉他们加入了集群。 是真的吗 经纪人从zk知道谁是给定分区的负责人 zk发现经纪人离开/死亡。然后重新选举领导人,并向所有经纪人发送新的领导人信息 问题: 为什么我们需要经纪人相互沟通?这只是为了让tehy可以移动分区,或者他们也可以互相查询元数据。如果是这样,元数据交换的例子是什么
我在我的本地机器上设置了一个单节点Kafka Docker容器,就像在融合留档中描述的那样(步骤2-3)。 此外,我还公开了Zookeeper的端口2181和Kafka的端口9092,以便能够从本地机器上运行的客户端连接到它们: 问题:当我试图从主机连接到Kafka时,连接失败,因为它无法解析地址:Kafka:9092。 这是我的Java代码: 例外情况: 问:如何连接到在Docker中运行的Ka