当前位置: 首页 > 知识库问答 >
问题:

使用kafka connect的hdfs中没有avro数据

万俟棋
2023-03-14

我正在使用Kafka连接分布。命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

工作人员配置为:


    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
    group.id=connect-cluster
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

Kafka连接重新开始没有错误!


    curl -X POST -H "Content-Type: application/json" --data '{"name":"hdfs-sink-mysiteview","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"3","topics":"mysiteview","hdfs.url":"hdfs://master1:8020","topics.dir":"/kafka/topics","logs.dir":"/kafka/logs","format.class":"io.confluent.connect.hdfs.avro.AvroFormat","flush.size":"1000","rotate.interval.ms":"1000","partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner","path.format":"YYYY-MM-dd","schema.compatibility":"BACKWARD","locale":"zh_CN","timezone":"Asia/Shanghai"}}'  http://kafka1:8083/connectors


    {"f1":"192.168.1.1","f2":"aa.example.com"}

java代码如下:

Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++) {
    long runtime = new Date().getTime();
    String site = "www.example.com";
    String ipString = "192.168.2." + rnd.nextInt(255);
    String key = "" + rnd.nextInt(255);
    User u = new User();
    u.setF1(ipString);
    u.setF2(site+" "+rnd.nextInt(255));
    System.out.println(JSON.toJSONString(u));
    producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
    Thread.sleep(50);
}

producer.flush();
producer.close();

奇怪的事情发生了。我从kafka-logs中获取数据,但在hdfs中没有数据(没有主题目录)。我尝试connector命令:


    curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/status

    {"name":"hdfs-sink-mysiteview","connector":{"state":"RUNNING","worker_id":"10.255.223.178:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":1,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":2,"worker_id":"10.255.223.178:8083"}]}
    curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/hdfs-sink-siteview-1

出什么问题了?

共有1个答案

凌联
2023-03-14

在没有看到Worker日志的情况下,我不确定在使用上面描述的设置时HDFS连接器实例到底有哪一个异常失败。不过,我可以发现配置中的一些问题:

  1. 您提到使用bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties启动连接工作者。这些属性默认将键和值转换器设置为avroconverter,并要求您运行schema-registry服务。如果您确实已经编辑了connect-avro-distributed.properties中的配置,以使用JSONConverter作为替代,那么在将Kafka记录转换为connect的sinkrecord数据类型期间,HDFS连接器可能会失败,就在它试图将数据导出到HDFS之前。
  2. 直到最近,HDFS连接器只能将Avro记录导出到Avro或Parquet格式的文件中。这需要使用前面提到的avroconverter。最近添加了将记录作为JSON导出到文本文件的功能,该功能将出现在连接器的4.0.0版本中(您可以通过签出并从源代码处构建连接器来尝试该功能)。

此时,我的第一个建议是尝试使用bin/kafka-avro-console-producer导入数据。定义它们的模式,确认使用bin/kafka-avro-console-consumer成功导入了数据,然后将HDFS连接器设置为使用avroformat。连接器页面上的quickstart描述了一个非常相似的过程,也许这将是您用例的一个很好的起点。

 类似资料:
  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • {“type”:“record”、“name”:“twitter_schema”、“namespace”:“com.miguno.avro”、“fields”:[{“name”:“username”、“type”:“string”、“doc”:“Twitter.com上的用户帐户名称”}、{“name”:“tweet”、“type”:“string”、“doc”:“用户的Twitter消息内容”}

  • 我想知道下面的模式对于Avro模式是否有效。请注意,字段数组的第一个对象中缺少名称。 它实际上是针对以下类型的数据设计的 根据下面的阅读,似乎没有名字的数组是不允许的 Avro架构失败 https://avro.apache.org/docs/current/spec.html#schema_complex 我怀疑下面是正确的 它应该有如下数据,以便成功进行avro转换 下面的模式是否有效?在第一

  • 我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi

  • 我让用户编写AVRO文件,我想使用Flume将所有这些文件移动到使用Flume的HDFS中。因此,我以后可以使用Hive或Pig来查询/分析数据。 在客户端上,我安装了水槽,并有一个SpoolDir源和AVRO接收器,如下所示: 在hadoop集群上,我有一个AVRO源和HDFS接收器: 问题是HDFS上的文件不是有效的AVRO文件!我正在使用色调UI检查文件是否是有效的AVRO文件。如果我将我在

  • 我已经安装了一个总共有3台机器的hadoop集群,其中2个节点充当Datanode,1个节点充当Namenode,还有一个Datanode。我想澄清一些关于hadoop集群安装和体系结构的疑问。下面是我正在寻找答案的问题列表--- 我在集群中上传了一个大约500MB大小的数据文件,然后检查hdfs报告。我注意到我制作的namenode在hdfs中也占用了500MB大小,还有复制因子为2的datan