更新后出错
[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
连接独立属性文件
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java
连接帖子属性文件
name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets
上面的错误是当我通过apache kafka./bin/connect-standalone.shconfig/connect-standalone.propertiesconfig.postgresql.properties时引起的。
然后,我尝试并实现了本链接中提到的流程:
https://hellokoding.com/kafka-connect-sinks-data-to-postgres-example-with-avro-schema-registry-and-python
但是,这里的数据是使用avro从Python代码生成的。但是在我的例子中,我已经有了来自kafka topic中传感器的数据(JSON格式),我想将这些数据发送到postgreSQL,而不是通过代码生成数据。
那么,我怎样才能实现这种将数据从kafka主题发送到postgreSQL的流程。
我已经共享了我的属性文件,如果需要更正,请告诉我。我正在发送简单的json数据,如“{"cust_id": 1313131," month": 12," expenses": 1313.13}”,我也尝试发送这种类型的数据,但仍然存在错误
示例 JSON 数据
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "int32",
"optional": true,
"field": "month"
},
{
"type": "string",
"optional": true,
"field": "amount_paid"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"cust_id": 13,
"month": 12,
"expenses": 1313.13
}
}
我有一个名为 kafkatable 的表,其列名为 (customer_id、月份、amount_paid) 创建
创建表kafkatable( customer_id int8,month int4,amount _ payed decimal(9,2));
Kafka Connect是Apache Kafka的一部分,非常适合这一点。您可以在此处了解有关Kafka Connect的更多信息。
要将数据从Kafka主题流到Postgres(或任何其他数据库),请使用JDBCSink连接器,您可以从这里获得该连接器。
我通过以下更改解决了这个错误
问题内容: 我在Javascript中有一个功能: data参数是一个JSON对象。 但是每次我单击按钮时,它都会覆盖本地存储中的数据。 有人知道怎么做这个吗? 问题答案: 您需要采取一些步骤将这些信息正确存储在localStorage中。但是,在开始编写代码之前,请注意,localStorage(当前) 不能 保存除字符串以外的任何数据类型。您将需要序列化阵列进行存储,然后将其解析回去以对其进行
问题内容: 如何将JSON数据从浏览器中的Javascript发送到服务器,并由PHP在其中进行解析? 问题答案: 我在这里获得了很多信息,所以我想发布我发现的解决方案。 问题: 从浏览器上的Javascript获取JSON数据到服务器,然后让PHP成功解析它。 环境: Windows上的浏览器(Firefox)中的Javascript。LAMP服务器作为远程服务器:Ubuntu上的PHP 5.3
这是当前的代码:从'@angull/core'导入{Component,OnInit,OnDestroy};从'AngularFire2/FireStore'导入{AngularFirestore,AngularFirestoreCollection};从'AngularFire2/Database'导入{AngularFireDatabase};从'AngularFire2/Auth'导入{An
问题内容: 我有一个示例数组,用于将条目插入到YUI数据表中 我可以通过这样做获得相同的阵列吗? 我在这里尝试的是编写一个通用方法,该方法将遍历结果列表并在将来能够形成一个条目。 所以我怎样才能使数组与本书代码的第一部分相同? 添加了我的整个示例代码,带注释的书本数组似乎可以工作,但未注释的部分似乎无法显示行 问题答案: 我尝试并找到了解决方案,因为在我按入后,att和值将成为对象 这将使其显示在
等。 此数据将持续过帐以在数据报告中使用。我们想要的是在这些数据上绘制可视化。 我们想要的图表,将说的事情,如有多少400s由网站等,这是最高的网站或呼叫者有400。 对于例如,这个http状态是这个站点的,因为它是一个记录。在这种情况下,我们需要像这样的东西 我们的目标是,然后使用grafana在这个数据上有图表,因为在什么是顶级网站有显示400状态?这样可以吗?问候
我在解析JSON中的数据时遇到了以下问题 这些节点是在我使用Firebase时生成的。push()方法。是否有方法获取所有节点并访问信息。我正在使用java并在google app engine上开发一个项目。