当前位置: 首页 > 知识库问答 >
问题:

使用Flink获取数据流的文件名

强阳曜
2023-03-14

我有一个flink在单个路径中处理csv文件的流式处理。我想知道每个处理文件的文件名。

我目前正在使用此功能将csv文件读取到路径(dataPath)中。

val recs:DataStream[CallCenterEvent] = env
          .readFile[CallCenterEvent](
          CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
          dataPath,
          FileProcessingMode.PROCESS_CONTINUOUSLY,
          c._2.fileInterval)
          .uid("source-%s-%s".format(systemConfig.name, c._1))
          .name("%s records reading".format(c._1))

并使用此函数获取TupleCsvInputFormat。

def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
  val typeInfo = implicitly[TypeInformation[T]]
  val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
  if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
    format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
  format.setFieldDelimiter(conf.fieldDelimiter)
  format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
  format.setLenient(true)

  return format
}       

进程运行正常,但我找不到方法来获取每个csv文件的文件名。

提前谢谢

共有1个答案

贺皓
2023-03-14

我遇到了类似的情况,我需要知道正在处理的记录的文件名。文件名中有一些信息在记录中不可用。要求客户更改记录架构不是一个选项。

我找到了一种访问底层源的方法。在我的例子中,它是FileInputSplit(它具有源数据文件的路径信息)

class MyTextInputFormat(p:Path ) extends TextInputFormat(p) {

     override def readRecord(reusable: String, bytes: Array[Byte], offset: Int, numBytes: Int):String = {
val fileName = {
      if (this.currentSplit != null)      
        this.currentSplit.getPath.getName
      else
         "unknown-file-path"
    }

    //Add FileName to the record!
    super.readRecord(reusable, bytes, offset, numBytes)+","+fileName
  }
}

现在,您可以在流中使用此设置

val format = new MyTextInputFormat(new Path(srcDir))
format.setDelimiter(prfl.lineSep)
val stream = env.readFile(format, srcDir, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.seconds(10).toMilliseconds

虽然我的情况略有不同,但这种方法也应该对您有所帮助!

 类似资料:
  • 我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中

  • 我正在构建一个有以下要求的应用程序,我刚刚开始使用Flink。 null null 谢谢并感激你的帮助。

  • 问题内容: 我正在尝试使用PHP从以下JSON文件中获取数据。我特别想要“ temperatureMin”和“ temperatureMax”。 这可能真的很简单,但是我不知道该怎么做。我被困在file_get_contents(“ file.json”)之后该做什么。一些帮助将不胜感激! 问题答案: 使用以下命令获取JSON文件的内容: 现在使用解码JSON : 您有一个包含所有信息的关联数组。

  • 问题内容: 有可能知道Java中文件的元数据吗?如果是的话,如何在Java中获取文件的元数据? 问题答案: 您可以从文件中获取一组基本的元数据。 有些东西取决于平台,可能会引发异常或返回意外结果。 您可以在管理元数据(文件和文件存储属性)中阅读更多内容。

  • 我想得到输入流作为JSON数组从一个网址。如何设置源代码,以便在apache flink中使用datastream连续获得输入。简而言之,我想从一个url连续获得json数据,而不会关闭flink作业。

  • 我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端