当前位置: 首页 > 知识库问答 >
问题:

Apache Flink DataStream API没有映射分区转换

东门新立
2023-03-14

Spark DStream有mapPartition API,而Flink DataStream API没有。是否有人可以帮助解释原因。我想做的是在Flink上实现一个类似于Spark的API。

共有2个答案

徐俊楚
2023-03-14

假设您的输入流是单分区数据(例如String)

val new_number_of_partitions = 4

//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)

//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
  // var local_val_to_different_part : Type = null
  var myTaskId : Int = null

  //below function is executed once for each mapper function (one mapper per partition)
  override def open(config: Configuration): Unit = {
    myTaskId = getRuntimeContext.getIndexOfThisSubtask
    //do whatever initialization you want to do. read from data sources..
  }

  def map(value: String): (String, Int) = {
    (value, myTasKId)
  }
})

val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function

Flink流媒体是纯流媒体(不是批量流媒体)。看看迭代API。

巩镜
2023-03-14

Flink的流式处理模型与以小批量为中心的Spark Streaming有很大不同。在Spark Streaming中,每个小批量都像一个常规的批处理程序一样在有限的数据集上执行,而Flink DataStream程序则持续处理记录。

在Flink的DataSet API中,MapPartitionFunction有两个参数。用于输入的迭代器和用于函数结果的收集器。Flink DataStream程序中的MapPartitionFunction永远不会从第一个函数调用中返回,因为迭代器将遍历无穷无尽的记录流。然而,Flink的内部流处理模型要求用户函数返回以检查函数状态。因此,DataStream API不提供映射分区转换。

为了实现类似于Spark Streaming的reduceByKey的功能,您需要在流上定义一个键控窗口。Windows将流离散化,这在某种程度上类似于小批量,但Windows提供了更大的灵活性。由于窗口大小有限,因此可以调用reduce窗口。

这可能看起来像:

yourStream.keyBy("myKey") // organize stream by key "myKey"
          .timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
          .reduce(new YourReduceFunction); // apply a reduce function on each window

DataStream文档显示了如何定义各种窗口类型,并解释了所有可用的函数。

注意:DataStream API最近进行了修改。该示例假定最新版本(0.10-SNAPSHOT),该版本将在未来几天内发布为0.10.0。

 类似资料:
  • 我通读了地图和地图分区之间的理论差异, 但我下面描述的问题更多地基于GC活动 = = 提前感谢。任何帮助都将不胜感激。

  • 问题内容: 我有一个带有一个字段的表,该字段可以根据说明符的值(Project,TimeKeep或CostCenter)指向其他3个表之一中的外键。通常这是通过子类实现的,我想知道是否有下面将工作。 请注意,子类名是相同的父类和noteObject属性映射到java.lang.Object类型的实例变量 ,所以应该接受一个项目,TimeKeep或CostCenter对象,只要我们投来正确的类型,h

  • 我正在尝试使用spring MVC。我的问题是,在我的控制器中没有请求映射工作,只有我的主页链接:“/”工作。 我的web.xml文件是: 2019年5月9日下午4:55:42 org.springframework.web.servlet.dispatcherservlet noHandlerFound警告:GET/springmvc/showform没有映射

  • 问题内容: 在我的applicationContext.xml中,这就是将xml映射到POJO的方式。如何将目录映射到类文件而无需创建xml? 问题答案: 您可以通过转换进一步简化操作 至 现在您的包中所有带有注释的类都将自动被拾取。

  • 我遇到了java的一个问题。时间类及其到DB类型的映射。我想存储Instant类型,但它的行为非常不直观。 我有基本的实体类: 现在,我将创建并持久化实体: Java中的数据符合预期-系统输出如下: 但是,DB中的数据不是。它们都存储为相同的值。所有列的默认类型都设置为TIMESTAMP。它们都被存储为应用程序JVM时区中的LocalDateTime。 我一点也不喜欢这样,这些类型之间没有区别,值