我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html
我能够通过配置单元集成将数据从Kafka导出到HDFS。
现在我正尝试在Java程序的帮助下将avro记录写入Kafka
public static void main(String[] args) throws InterruptedException,IOException,RestClientException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9094");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://10.15.167.109:8084");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema schema= SchemaRegstryClient.getLatestSchema("StreamExample_1");
// Random rnd = new Random();
for (int i = 0; i < 1000; i++) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", i);
avroRecord.put("str2",i+1);
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>(
"StreamExample_1", ""+new Integer(i), avroRecord);
producer.send(data);
Thread.sleep(250);
}
producer.close();
}
{
"type": "record",
"name": "StreamExample_1",
"fields": [
{
"name": "str1",
"type": "int",
},
{
"name": "str2",
"type": "int",
}
]
}
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=StreamExample_1
hdfs.url=hdfs://localhost:9000
flush.size=3
hive.metastore.uris=thrift://10.15.167.109:9083
hive.integration=true
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8084
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8084
当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误
org.apache.kafka.connect.errors.DataException: StreamExample_1
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 101
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:174)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:394)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:387)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:138)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:121)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-03-12 08:59:25,070] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-03-12 08:59:25,070] INFO Shutting down Hive executor service. (io.confluent.connect.hdfs.DataWriter:471)
[2018-03-12 08:59:25,070] INFO Awaiting termination. (io.confluent.connect.hdfs.DataWriter:476)
不知道为什么在可以实际使用Avro对象的情况下,仍然在生产者中使用byte[]
。
而且,您没有发送任何密钥,因此不清楚您为什么将值序列化器设置为Avro序列化器。我建议将循环中的整数设置为键。
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
Producer<Integer, GenericRecord> producer = new KafkaProducer<Integer, GenericRecord>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("StreamExample_1", new Integer(i), avroRecord);
producer.send(data);
}
producer.close();
参考汇流示例代码
如果您希望使用Kafka连接Avro数据,则需要将值转换器更新为
value.converter=io.confluent.connect.avro.AvroConverter
Java 1.8.0_151 Spark 2.2.1 Scala 2.11 卡桑德拉3.11.1
设置: 我安装了Postresql(11.10版)和TimescaleDB(1.7.1版)扩展。我有2个表,我想用安装在Kafka Connect上的Debezium(ver1.3.1)连接器监视它们,目的是CDC(捕获数据更改)。 表是table1和table2hyper,但table2hyper是hypertable。 在Kafka Connect中创建Debezium连接器后,我可以看到创建
我使用SpringBoot(V2.3.0.Release)、JPA和Hibernate(带有MySQL数据库)。总的来说,我需要努力提高表演。 是否需要手动配置(同时添加依赖项)连接池?
我知道如何删除Kafka连接器,如此处所述 Kafka Connect - 如何删除连接器 但我不确定它是否也删除/擦除特定的连接器相关的配置,偏移量和状态从*。sorage.topic该工作者? 例如:假设我删除了一个连接器名为“connector-abc-1.0.0”的连接器,Kafka connect worker以下面的配置启动。 现在,在该连接器的DELETE调用之后,它是否会删除该特定
我需要与AuthorizeNet连接,但我正在获得: sslhandshakeexception:java.security.cert.certificateexception:找不到与certification.authorize.net匹配的主题替代DNS名称(请参阅下面的错误跟踪)。 我的类AuthorizeNet中的连接代码如下: 错误跟踪如下: 2015-04-06 13:00:52,5
我的应用程序有两个quartz作业同时从Oracle DB读取两个不同的表。我正在尝试使用c3p0与Hibernate和Oracle从池中获取连接。Tomcat7启动时,C3P0似乎已初始化: 2021-02-15 18:47:26,271[INFO]HibernateEntityManager 3.2.1.GA 2021-02-15 18:47:26,290[INFO]Hibernate注释3.