我已经把Kafka和博士后的成绩记录下来了。我使用JDBC接收器连接器将数据从Kafka主题加载到Postgres表。首先,我用“AVRO”值格式创建一个主题和一个主题上方的流。
CREATE STREAM TEST01 (ROWKEY VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
以下是创建接收器连接器的代码:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-postgre-01/config \
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:postgresql://postgres:5432/",
"topics" : "test01",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.user" : "postgres",
"connection.password" : "********",
"auto.create" : true,
"auto.evolve" : true,
"insert.mode" : "insert",
"pk.mode" : "record_key",
"pk.fields" : "MESSAGE_KEY"
}'
然后,我使用\dt
命令检查Postgres是否有来自Kafka的数据,它返回以下信息:未找到任何关系
然后我检查kafka connect日志,它返回以下结果:
[2021-03-30 10:05:07,546] INFO Attempting to open connection #2 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect | [2021-03-30 10:05:07,577] INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect | org.postgresql.util.PSQLException: The connection attempt failed.
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:296)
connect | at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
connect | at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
connect | at org.postgresql.Driver.makeConnection(Driver.java:459)
connect | at org.postgresql.Driver.connect(Driver.java:261)
connect | at java.sql.DriverManager.getConnection(DriverManager.java:664)
connect | at java.sql.DriverManager.getConnection(DriverManager.java:208)
connect | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:224)
connect | at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:93)
connect | at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
connect | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)
connect | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | Caused by: java.net.UnknownHostException: postgres
connect | at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
connect | at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
connect | at java.net.Socket.connect(Socket.java:589)
connect | at org.postgresql.core.PGStream.<init>(PGStream.java:81)
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:92)
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:196)
connect | ... 22 more
connect | [2021-03-30 10:05:17,578] INFO Attempting to open connection #3 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
connect | [2021-03-30 10:05:17,732] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: org.postgresql.util.PSQLException: The connection attempt failed. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: The connection attempt failed.
connect | at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:69)
connect | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)
connect | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | Caused by: org.postgresql.util.PSQLException: The connection attempt failed.
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:296)
connect | at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
connect | at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:211)
connect | at org.postgresql.Driver.makeConnection(Driver.java:459)
connect | at org.postgresql.Driver.connect(Driver.java:261)
connect | at java.sql.DriverManager.getConnection(DriverManager.java:664)
connect | at java.sql.DriverManager.getConnection(DriverManager.java:208)
connect | at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:224)
connect | at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:93)
connect | at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)
connect | ... 13 more
connect | Caused by: java.net.UnknownHostException: postgres
connect | at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
connect | at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
connect | at java.net.Socket.connect(Socket.java:589)
connect | at org.postgresql.core.PGStream.<init>(PGStream.java:81)
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:92)
connect | at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:196)
connect | ... 22 more
connect | [2021-03-30 10:05:17,734] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
我想问题可能是缺少postgresql连接器。jar文件位于usr/share/java/kafka-connect-jdbc中,但它在这里。
root@connect:/usr/share/java/kafka-connect-jdbc# ls -l
total 8412
-rw-r--r-- 1 root root 17555 Apr 18 2020 common-utils-5.5.0.jar
-rw-r--r-- 1 root root 317816 Apr 18 2020 jtds-1.3.1.jar
-rw-r--r-- 1 root root 230113 Apr 18 2020 kafka-connect-jdbc-5.5.0.jar
-rw-r--r-- 1 root root 927447 Apr 18 2020 postgresql-42.2.10.jar
-rw-r--r-- 1 root root 41139 Apr 18 2020 slf4j-api-1.7.26.jar
-rw-r--r-- 1 root root 7064881 Apr 18 2020 sqlite-jdbc-3.25.2.jar
这个问题的解决方案是什么?
多亏了@Robin Moffatt教程和@OneCricketeer技巧,我找到了解决这个问题的方法。Kafka连接和Postgres应该在一个docker组合中。yml文件。我附上docker compose的代码。yml如下。希望这对那些面临同样问题的人有所帮助:
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect:
image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:5.5.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
postgres:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
大家好,我正在使用debezium捕获Mongo中的更改,并将它们推送到mysql中。我正在使用以下链接https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt我正在把postgres数据库替换为mysql数据库,但我无法这样做。 这是我修改后的jdbc-sink.json,我使用mysql url连接
我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr
当我在SBT之上运行时,我会得到一些异常/错误:
我无法将数据加载到表中。我有类,其名称为、等。我想将、插入到TextField上的表播放器中。 我正在执行与下面所示完全相同的操作:http://docs.oracle.com/javase/8/javafx/user-interface-tutorial/table-view.htm#cjagaaee 但我不能让它起作用。有人能帮我吗?
下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc
无法通过jupyter笔记本使用pyspark将数据写入hive。 给我下面的错误 Py4JJavaError:调用o99.saveAsTable时发生错误。:org.apache.spark.sql.分析异常:java.lang.运行时异常:java.lang.运行时异常:无法实例化org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreCl