当前位置: 首页 > 知识库问答 >
问题:

使用kafka-connect-transform-archive和HdfsSinkConnector时的刷新大小

司马自明
2023-03-14

我有一个Kafka主题的数据,我想把它保存在我的数据湖里。

在担心密钥之前,我能够使用HDFSSINKConnector将Avro值保存在datalake上的文件中。每个文件中的消息值的数目由HDFSSINKConnector的“Flush.Size”属性确定。

都很好。接下来,我还想保留钥匙。为此,我使用了kafka-connect-transform-archive,它将字符串、键和Avro值包装到一个新的Avro模式中。

这两种情况的唯一区别是HdfsSinkConnector的配置,它指定了存档转换。

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

kafka-connect-transform-archive在设计上是否忽略了刷新大小,或者是否需要一些额外的配置,以便能够在数据湖上保存每个文件的多个键值消息?

共有1个答案

法池暝
2023-03-14

我在使用kafka gcs接收器连接器时也遇到了同样的问题。

在com.github.jcustenborder.kafka.connect.archive.archive代码中,每个消息创建一个新的模式。

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
    .name("com.github.jcustenborder.kafka.connect.archive.Storage")
    .field("key", r.keySchema())
    .field("value", r.valueSchema())
    .field("topic", Schema.STRING_SCHEMA)
    .field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
    .put("key", r.key())
    .put("value", r.value())
    .put("topic", r.topic())
    .put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

}

 类似资料:
  • connect connect_overview Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collec

  • 问题内容: 我希望从Kafka消费数据并将数据保存到Hadoop和Elasticsearch中。我目前已经看到了两种方法:使用Filebeat从Kafka消费并将其发送到ES,以及使用Kafka- Connect框架。有一个Kafka-Connect-HDFS和Kafka-Connect-Elasticsearch模块。 我不确定要使用哪个发送流数据。尽管我认为如果我想在某个时候从Kafka中获取

  • 我正在寻找从Kafka消费,并将数据保存到Hadoop和ElasticSearch中。目前我已经看到了两种实现方法:使用Filebeat从Kafka消费并将其发送到ES,以及使用Kafka-Connect框架。有一个Kafka-Connect-HDFS和Kafka-Connect-Elasticsearch模块。 我不确定使用哪一个来发送流数据。虽然我认为如果我想在某个时候从Kafka获取数据并将

  • Java类: 应用yml公司 应用程序模拟ecom。yml公司 当我击中时,http://localhost:8080/hello,我得到的回应是“Hello mock api.com!”。 如果我从应用程序中删除模拟的ecom。然后调用刷新后api调用http://localhost:8080/refresh要刷新上下文,我希望得到“Hello dev api.com!”但我收到了“你好,moc

  • 我有一个奇怪的问题与Kafka->elasticsearch连接器。当我第一次启动时,我在elasticsearch中收到了一个新数据,并通过kibana dashboard进行了检查,但当我使用同一个producer应用程序在kafka中生成新数据并尝试再启动一次connector时,我在elasticsearch中没有得到任何新数据。现在我遇到了这样的错误: 我正在使用next命令运行连接器:

  • 我正在开发一个带有NodeJs、Express、types cript和no清新的应用程序。 但是当我更改ts文件中的一些代码时,页面没有刷新。 如何在js中编译ts文件并使用no清新浏览器(或其他工具)? 包裹json 所以,每次我进行更改,我都需要停止服务器并重新运行npm start