我们目前在HDF(Hortonworks Dataflow)3.3.1上,它捆绑了Kafka 2.0.0,并且正在尝试使用分布式模式下的Kafka Connect,以推出一个Google Cloud PubSub接收器连接器。我们正在计划将一些元数据发回到Kafka主题中,并且需要将一个Kafka生产者集成到Sink任务Java代码的flush()函数中。
这是否会对Kafka Connect向Kafka提交补偿的过程产生负面影响(因为我们会在同花顺之前增加运行Kafka生产者的开销)。
此外,当接收器或源的连接器属性中没有指定引导服务器列表时,Kafka Connect如何从配置中获取引导服务器列表?我需要使用相同的引导服务器列表来启动生产者。当前,我正在更改接收器连接器的配置,将引导服务器列表添加为一个属性,并在连接器的Java代码中解析它。如果可能的话,我希望使用Kafka Connect worker属性中的bootstrap服务器列表。
帮个忙吧。
提前道谢。
需要将一个卡夫卡制作人集成到接收器任务Java代码的flush()函数中
SinkTask API中没有公开生成器实例...
这是否会对Kafka Connect向Kafka提交补偿的过程产生负面影响(因为我们会在同花顺之前增加运行Kafka生产者的开销)。
我是说,你可以添加任何你想要的代码。就负面影响而言,这取决于您在自己的基础设施上进行基准测试。显然,添加更多的阻塞代码会使其他进程总体上更慢
当接收器或源的连接器属性中未指定引导服务器列表时,Kafka Connect如何从配置中获取引导服务器列表?
汇和源都不是工人。查看connect-distributed.properties
如果可能的话,我希望使用Kafka Connect worker属性中的bootstrap服务器列表
这不可能.向接收器/源配置添加额外的属性是唯一的方法。(尽管可以创建一个Kafka JIRA请求这样一个公开工作者配置的特性)
我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能
kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-
因此,我想知道做这件事的步骤。 我的理想是由kafka Connect创建与表相对应的主题,然后再由我声明(使用KSQL)创建视图。 虽然我在这里描述的一开始听起来是可行的,但我对数据有一个问题主题中数据的结构(模式)。问题似乎是,我可能必须做一个额外的步骤,但不知道它是否可以避免或实际上是必要的。
我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我
一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod