当前位置: 首页 > 知识库问答 >
问题:

将数据从Kafka Topic推送到JSON中的PostgreSQL

皇甫乐
2023-03-14

更新后出错

[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

连接独立属性文件

bootstrap.servers=localhost:9092 
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets 
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java

连接帖子属性文件

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true 
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets

上面的错误是当我通过apache kafka./bin/connect-standalone.shconfig/connect-standalone.propertiesconfig.postgresql.properties时引起的。

然后,我尝试并实现了本链接中提到的流程:

https://hellokoding.com/kafka-connect-sinks-data-to-postgres-example-with-avro-schema-registry-and-python

但是,这里的数据是使用avro从Python代码生成的。但是在我的例子中,我已经有了来自kafka topic中传感器的数据(JSON格式),我想将这些数据发送到postgreSQL,而不是通过代码生成数据。

那么,我怎样才能实现这种将数据从kafka主题发送到postgreSQL的流程。

我已经共享了我的属性文件,如果需要更正,请告诉我。我正在发送简单的json数据,如“{"cust_id": 1313131," month": 12," expenses": 1313.13}”,我也尝试发送这种类型的数据,但仍然存在错误

示例 JSON 数据

 {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "int32",
                    "optional": false,
                    "field": "customer_id"
                },
                {
                    "type": "int32",
                    "optional": true,
                    "field": "month"
                },

                {
                    "type": "string",
                    "optional": true,
                    "field": "amount_paid"
                }
            ],
            "optional": false,
            "name": "msgschema"
        },
        "payload": {
           "cust_id": 13, 
           "month": 12, 
           "expenses": 1313.13
        }
    }

我有一个名为 kafkatable 的表,其列名为 (customer_id、月份、amount_paid) 创建

创建表kafkatable( customer_id int8,month int4,amount _ payed decimal(9,2));

共有2个答案

史景铄
2023-03-14

Kafka Connect是Apache Kafka的一部分,非常适合这一点。您可以在此处了解有关Kafka Connect的更多信息。

要将数据从Kafka主题流到Postgres(或任何其他数据库),请使用JDBCSink连接器,您可以从这里获得该连接器。

凌善
2023-03-14

我通过以下更改解决了这个错误

    < li>insert.mode=insert < li >注释掉table.name.format=kafkatable,因为将通过自动创建创建表 < li >删除connection.url行末尾的问号。 < li>pk.fields不应在此处保留“无”,请确保改为提供列名以避免复杂化。 < Li > PostgreSQL不支持int32,所以当我将其更改为int8时,它工作正常。 < li >您的方案和有效负载中的字段具有不同的名称,请确保提供相同的名称。
 类似资料:
  • 问题内容: 我在Javascript中有一个功能: data参数是一个JSON对象。 但是每次我单击按钮时,它都会覆盖本地存储中的数据。 有人知道怎么做这个吗? 问题答案: 您需要采取一些步骤将这些信息正确存储在localStorage中。但是,在开始编写代码之前,请注意,localStorage(当前) 不能 保存除字符串以外的任何数据类型。您将需要序列化阵列进行存储,然后将其解析回去以对其进行

  • 问题内容: 如何将JSON数据从浏览器中的Javascript发送到服务器,并由PHP在其中进行解析? 问题答案: 我在这里获得了很多信息,所以我想发布我发现的解决方案。 问题: 从浏览器上的Javascript获取JSON数据到服务器,然后让PHP成功解析它。 环境: Windows上的浏览器(Firefox)中的Javascript。LAMP服务器作为远程服务器:Ubuntu上的PHP 5.3

  • 这是当前的代码:从'@angull/core'导入{Component,OnInit,OnDestroy};从'AngularFire2/FireStore'导入{AngularFirestore,AngularFirestoreCollection};从'AngularFire2/Database'导入{AngularFireDatabase};从'AngularFire2/Auth'导入{An

  • 问题内容: 我有一个示例数组,用于将条目插入到YUI数据表中 我可以通过这样做获得相同的阵列吗? 我在这里尝试的是编写一个通用方法,该方法将遍历结果列表并在将来能够形成一个条目。 所以我怎样才能使数组与本书代码的第一部分相同? 添加了我的整个示例代码,带注释的书本数组似乎可以工作,但未注释的部分似乎无法显示行 问题答案: 我尝试并找到了解决方案,因为在我按入后,att和值将成为对象 这将使其显示在

  • 等。 此数据将持续过帐以在数据报告中使用。我们想要的是在这些数据上绘制可视化。 我们想要的图表,将说的事情,如有多少400s由网站等,这是最高的网站或呼叫者有400。 对于例如,这个http状态是这个站点的,因为它是一个记录。在这种情况下,我们需要像这样的东西 我们的目标是,然后使用grafana在这个数据上有图表,因为在什么是顶级网站有显示400状态?这样可以吗?问候

  • 我在解析JSON中的数据时遇到了以下问题 这些节点是在我使用Firebase时生成的。push()方法。是否有方法获取所有节点并访问信息。我正在使用java并在google app engine上开发一个项目。