Spark DStream有mapPartition API,而Flink DataStream API没有。是否有人可以帮助解释原因。我想做的是在Flink上实现一个类似于Spark的API。
假设您的输入流是单分区数据(例如String)
val new_number_of_partitions = 4
//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)
//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
// var local_val_to_different_part : Type = null
var myTaskId : Int = null
//below function is executed once for each mapper function (one mapper per partition)
override def open(config: Configuration): Unit = {
myTaskId = getRuntimeContext.getIndexOfThisSubtask
//do whatever initialization you want to do. read from data sources..
}
def map(value: String): (String, Int) = {
(value, myTasKId)
}
})
val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function
Flink流媒体是纯流媒体(不是批量流媒体)。看看迭代API。
Flink的流式处理模型与以小批量为中心的Spark Streaming有很大不同。在Spark Streaming中,每个小批量都像一个常规的批处理程序一样在有限的数据集上执行,而Flink DataStream程序则持续处理记录。
在Flink的DataSet API中,MapPartitionFunction有两个参数。用于输入的迭代器和用于函数结果的收集器。Flink DataStream程序中的MapPartitionFunction永远不会从第一个函数调用中返回,因为迭代器将遍历无穷无尽的记录流。然而,Flink的内部流处理模型要求用户函数返回以检查函数状态。因此,DataStream API不提供映射分区转换。
为了实现类似于Spark Streaming的reduceByKey的功能,您需要在流上定义一个键控窗口。Windows将流离散化,这在某种程度上类似于小批量,但Windows提供了更大的灵活性。由于窗口大小有限,因此可以调用reduce窗口。
这可能看起来像:
yourStream.keyBy("myKey") // organize stream by key "myKey"
.timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
.reduce(new YourReduceFunction); // apply a reduce function on each window
DataStream文档显示了如何定义各种窗口类型,并解释了所有可用的函数。
注意:DataStream API最近进行了修改。该示例假定最新版本(0.10-SNAPSHOT),该版本将在未来几天内发布为0.10.0。
我通读了地图和地图分区之间的理论差异, 但我下面描述的问题更多地基于GC活动 = = 提前感谢。任何帮助都将不胜感激。
问题内容: 我有一个带有一个字段的表,该字段可以根据说明符的值(Project,TimeKeep或CostCenter)指向其他3个表之一中的外键。通常这是通过子类实现的,我想知道是否有下面将工作。 请注意,子类名是相同的父类和noteObject属性映射到java.lang.Object类型的实例变量 ,所以应该接受一个项目,TimeKeep或CostCenter对象,只要我们投来正确的类型,h
我正在尝试使用spring MVC。我的问题是,在我的控制器中没有请求映射工作,只有我的主页链接:“/”工作。 我的web.xml文件是: 2019年5月9日下午4:55:42 org.springframework.web.servlet.dispatcherservlet noHandlerFound警告:GET/springmvc/showform没有映射
问题内容: 在我的applicationContext.xml中,这就是将xml映射到POJO的方式。如何将目录映射到类文件而无需创建xml? 问题答案: 您可以通过转换进一步简化操作 至 现在您的包中所有带有注释的类都将自动被拾取。
我遇到了java的一个问题。时间类及其到DB类型的映射。我想存储Instant类型,但它的行为非常不直观。 我有基本的实体类: 现在,我将创建并持久化实体: Java中的数据符合预期-系统输出如下: 但是,DB中的数据不是。它们都存储为相同的值。所有列的默认类型都设置为TIMESTAMP。它们都被存储为应用程序JVM时区中的LocalDateTime。 我一点也不喜欢这样,这些类型之间没有区别,值