当前位置: 首页 > 知识库问答 >
问题:

我想用Flink的流文件接收器来写ORC文件,但它不能正确地写文件

呼延鹏云
2023-03-14

我正在从Kafka读取数据,并试图以ORC格式将其写入HDFS文件系统。我使用了他们官网链接中的以下链接参考。但是我可以看到Flink为所有数据编写完全相同的内容,并使得这么多文件和所有文件都可以103KB

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#orc-format

请在下面找到我的代码

object BeaconBatchIngest extends StreamingBase {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  def getTopicConfig(configs: List[Config]): Map[String, String]  = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap

  def setKafkaConfig():Unit ={
    val kafkaParams = new Properties()
    kafkaParams.setProperty("bootstrap.servers","")
    kafkaParams.setProperty("zookeeper.connect","")
    kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
    kafkaParams.setProperty("auto.offset.reset", "latest")
    
    val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
    kafka_consumer.setStartFromLatest()
    val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
    val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
    val writerProperties = new Properties()

    writerProperties.setProperty("orc.compress", "ZLIB")
    val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
    val sink: StreamingFileSink[DataParse] = StreamingFileSink
          .forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
          .build()
    stream.addSink(sink)
  }


  def main(args: Array[String]): Unit = {
    setKafkaConfig()
    env.enableCheckpointing(5000)
    env.execute("Kafka_Flink_HIVE")
  }
}
class temp extends MapFunction[String,DataParse]{

  override def map(record: String): DataParse = {
    new DataParse(record)
  }
}

class DataParse(data : String){
  val parsedJason = parse(data)
  val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
  val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
  val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
  val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
  val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {

  override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
    val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
    val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
    val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
    val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
    val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
    timeColVector.vector(batch.size + 1) = element.timestamp
    eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
    sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
    appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
    jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
  }

}

共有1个答案

董弘新
2023-03-14

对于大容量格式(如ORC),StreamingFileSink会在每个检查点滚动到新文件。如果减少检查点间隔(目前为5秒),它将不会写入这么多文件。

 类似资料:
  • 我正在开发一个Flink流媒体程序,可以读取Kafka消息,并将消息转储到AWS s3上的ORC文件中。我发现没有关于Flink的BucketingSink和ORC file writer整合的文件。BucketingSink中没有这样的ORC文件编写器实现。 我被困在这里了,有什么想法吗?

  • 我存储用户收藏夹到json文件,但我得到以下错误: 未处理的异常:类型“\u InternalLinkedHashMap” 我将以下数据添加到文件中: 代码是: 我很困惑,不知道如何解决这个问题。

  • 我正在尝试配置log4j以将消息记录到文件中。现在,文件确实是用我提供的名称创建的,但是日志并没有写入文件中。我的代码: 我的文件: 运行此命令时,我在控制台中得到以下输出: 文件确实在我的主目录中创建。但它是空的。 知道我做错了什么吗?我使用的是log4j版本1.2.17。

  • 我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。

  • 使用 electron 的一大好处是可以访问用户的文件系统。这使你可以读取和写入本地系统上的文件。为了避免 Chromium 的限制以及对应用程序内部文件的改写,请确保使用 electron 的 API,特别是 app.getPath(name) 函数。这个帮助函数可以使你获得指向系统目录的文件路径,如用户的桌面、系统临时文件 等等。 使用案例 假设我们想为我们的应用程序提供本地的数据库存储。在这