我正忙于学习 kafka,特别是使用接收器连接器 (jdbc) 将数据从 kafka 主题发送到 mysql 的下游元素。 我已经用jdbc连接器和mysqljdbc驱动程序从汇合的kafka connect基础映像构建了一个映像,在Dockerfile中如下: 老实说,我在文档中有点迷失了方向,我到了想要包含接收器配置的地步,但我不知道如何包含它或将其复制到何处。我已经创建了接收器配置文件,但不
尝试在Ksqldb Server中嵌入Kafka连接器时,“卷”映射似乎不起作用 下面是我的docker文件 当我尝试使用连接器创建源时,我看到以下错误 { “error_code” : 500, “message” : “找不到任何实现连接器的类,并且其名称与io.debezium.connector.mysql.MySqlConnector匹配,可用的连接器是:..........}”} 我在
我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不
我已经安装了Kafka在当地的Minikube通过使用Helm图表https://github.com/confluentinc/cp-helm-charts按照以下说明https://docs.confluent.io/current/installation/installing_cp/cp-helm-charts/docs/index.html如下: kafka_config.yaml 几乎
我正在尝试使用Kafka Connect Elasticsearch连接器,但没有成功。它正在崩溃,并出现以下错误: 我已经在kafka子文件夹中解压了插件的编译版本,并在connect-standalone.properties中有以下代码行: 我可以看到该文件夹中的各种连接器,但Kafka Connect不加载它们;但它确实加载了标准连接器,如下所示: 如何正确注册连接器?
下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同
我们已经成功地使用 mySQL - 使用 jdbc 独立连接器的 kafka 数据摄取,但现在在分布式模式下使用相同的连接器(作为 kafka 连接服务)时面临问题。 connect-distributed.properties档案- 我有我的连接器罐在这里- 我可以通过以这种方式运行脚本来运行独立模式- 但是当我尝试调用 REST API 来运行分布式模式连接器时,出现错误: 错误- 注意 -
我们试图在给定的节点上启动多个独立的kafka hdfs连接器。 对于每个连接器,我们分别将和设置为不同的端口和路径。 也是Kafka经纪人JMX港是@ 9999。 当我启动 kafka 独立连接器时,出现错误 错误:代理引发异常:java.rmi.server。ExportException:端口已在使用:9999;嵌套异常是:java.net。BindException:地址已在使用中(绑定失
这就是开发人员指南对动态连接器的描述https://docs.confluent.io/current/connect/devguide.html#dynamic-连接器 并非所有的连接器都有一组静态的分区,因此连接器实现还负责监控外部系统是否有任何可能需要重新配置的变化。例如,在JDBCSourceConnector示例中,连接器可能会为每个任务分配一组表。当创建一个新表时,它必须发现这一点,这
我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?
我已经实现了kafka流应用程序。假设流当前正在处理的对象的一个字段包含一个数字而不是一个字符串值。当前,当处理逻辑(如< code >)中出现异常时。transform()方法,整个流被终止,我的应用程序停止处理数据。 我想跳过此类无效记录并继续处理输入主题上可用的下一条记录。此外,我不想在我的流处理代码中实现任何 try-catch 语句。 为了实现这一点,我实现了,因此它返回枚举,以便生成新
如果我们在后台启动KafkaStream应用程序(比如Linux),有没有一种方法可以从外部向应用程序发出信号,从而启动优雅的关机?
我的任务是拆除一个开发环境,并从废品中重新设置它,以验证我们的CI-CD流程;唯一的问题是我搞砸了创建一个主题,因此Kafka Streams应用程序退出并出现错误。 我仔细研究了一下,发现了问题并纠正了它,但当我深入研究时,我遇到了另一个奇怪的小问题。 我实现了一个意外的异常处理器,如下所示: 问题是,如果应用程序抛出一个异常,因为一个主题错误时,KafkaStreams::c失去的是调用应用程
我正在尝试使用Kafka使用Spring Boot云流设置一个项目。我设法构建了一个简单的示例,其中侦听器从一个主题获取消息,并在处理后将输出发送到另一个主题。 我的侦听器和频道配置如下: 此示例的问题在于,当服务启动时,它不会检查主题中已存在的消息,它只处理启动后发送的那些消息。我对 Springboot 流和 kafka 很陌生,但对于我所读到的内容,这种行为可能与我正在使用的事实相对应。例如
我有一个包含多个Kafka作品的资源库。我想将其中一个流提取到它自己的存储库中。但是,我不确定如何处理那个流的消费群体。我的意思是:在新的存储库中,流将有一个不同的< code>application.id。据我理解,消费者组的名称是基于< code>application.id设置的。如果我简单地关闭旧流,对于每个主题的每个分区,新流将从第零个偏移量开始,而不是从旧流停止的偏移量开始。这将导致输