我正忙于学习 kafka,特别是使用接收器连接器 (jdbc) 将数据从 kafka 主题发送到 mysql 的下游元素。
我已经用jdbc连接器和mysqljdbc驱动程序从汇合的kafka connect基础映像构建了一个映像,在Dockerfile中如下:
FROM confluentinc/cp-kafka-connect-base
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.1
ENV MYSQL_DRIVER_VERSION 5.1.39
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
| tar -xzf - -C /usr/share/java/kafka/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar
老实说,我在文档中有点迷失了方向,我到了想要包含接收器配置的地步,但我不知道如何包含它或将其复制到何处。我已经创建了接收器配置文件,但不确定将其放在哪里。作为映像构建的一部分,还是在运行 kafka-connect 容器时?
这样做的最终目标是为此连接器创建 helm 部署,但我还没有。
当我了解到关于Kafka和Kafka连接的所有信息时,我将非常感谢任何帮助。
由于您在分布式模式下使用Kafka Connect(这通常是最佳选择),因此您将连接器配置作为REST调用传递。
这是一个示例JDBC接收器配置-您需要修改它以适应您的源主题、序列化等:
curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql-host:3306/",
"connection.user": "user",
"connection.password": "pw",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"tasks.max": "1",
"topics": "foo",
"auto.create": "true",
"auto.evolve":"true",
"pk.mode":"none"
}'
如果您想在容器实例化时传递连接器配置,您可以将其嵌入到开始命令中,该命令应该启动工作程序,等待它可用,然后传递配置。这里有一个例子。另请参见https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/.
在标准/自定义kafkaconnect接收器中,我们如何指定它应该只使用来自kafka主题的read_comitted消息。我可以在这里看到配置,但看不到任何选项(除非这是默认行为)。谢了。https://docs . confluent . io/current/installation/configuration/connect/sink-connect-configs . html
我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误
我正在使用Confluent Kafka Docker镜像,特别是使用这个: https://github.com/confluentinc/cp-docker-images/tree/4.0.x/examples/cp-all-in-one 我想添加MySQL连接器,通过: 正在下载连接器的1.5.46版本(https://dev.mysql.com/get/Downloads/Connecto
事件适配器 事件活动 @override public void onBindViewHolder(@nonnull ViewHolder holder,final int position){
问题内容: 创建密钥后,是否可以向Python字典添加密钥?它似乎没有方法。 问题答案:
假设我有一个Christmas list对象,它携带一个用于圣诞礼物的ArrayList和另一个用于收件人的字符串。我对这个客户端有以下方法: 所以基本上,我试图在Hazelcast IMAP中初始化一个新的键/值对。我尝试使用christmaslistimap.put()和christmaslistimap.set(),但它们似乎都不起作用。至少,我需要知道如何添加一个新的键让IMap识别。