在这里输入代码需要在kafka流中使用flink和out聚合数据值放一个新的主题。
聚合应该发生在eventtime,而不是进程时间,这意味着数据对象中的时间戳。
遵循Flink教程中的示例,使用TumblingEventTimeWindow,但根本不调用聚合getResult方法。
如果我更改为TumblingProcessingTimeWInow,getResult将被调用并将结果下沉。
由于它是一个传感器事件,我们需要考虑事件时间和聚集应该发生在事件时间,而不是处理时间。
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.log4j.Logger
import scala.collection.JavaConverters._
object DownsamplingService {
val log=Logger.getLogger("Service")
def main(args: Array[String]): Unit = {
val parameter: Properties = ParameterTool.fromArgs(args).getProperties
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 60 seconds
env.enableCheckpointing(60000)
env.setParallelism(4) //Dependsnon core
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//var params = new Properties
parameter.put("bootstrap.servers", "kafka-server:9092")
parameter.put("zookeeper.connect", "zookeeper:2181")
parameter.put("group.id", "downsampling-consumer")
env.getConfig.setGlobalJobParameters(ParameterTool.fromArgs(args))
val kafkaDiProducer = FlinkKafkaProducer011("engine-test2", new JsonNodeDeserializationSchema, params)
var norTopics = List.tabulate(normalDiTopicCount)(n => s"$normalDiTopicPrefix$n")
val serTopics =(norTopics ::: List.tabulate(serviceDITopicCount)(n => s"$serviceDITopicCount$n")).asJava
val kafkaConsumer = new FlinkKafkaConsumer011(serTopics, new JsonNodeDeserializationSchema(), parameter)
// val ipstream= env.addSource(kafkaConsumer).name("source")
// .assignTimestampsAndWatermarks(new TimestampExtractor).keyBy(_.get("uuid").asText()).window(Tum.of(Time.minutes(1))).process(new MyProcessWindowFunction).name("aggregate")
val ipstream= env.addSource(kafkaConsumer).name("source").assignTimestampsAndWatermarks(new TimestampExtractor)
.keyBy(_.get("uuid").asText()).window(TumblingEventTimeWindows.of(Time.seconds(60))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(60))).aggregate(new AverageAggregate,new MyProcessWindowFunction).name("aggregate")
//.keyBy(_.get("uuid").asText()).window(TumblingProcessingTimeWindows.of(Time.minutes(1))).aggregate(new AverageAggregate, new MyProcessWindowFunction).name("aggregate")
ipstream.print()
ipstream.addSink(kafkaDiProducer).name("kafka-push")
env.execute("aggregate-stream")
}
class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.minutes(1)) {
override def extractTimestamp(element: ObjectNode): Long = element.get(Constants.Timestamp).asLong()
}
case class Acc[T](key:String,timeStamp:Long, min:T,max:T,count:Long,result:Double)
//case class Result[T](min:T,max:T,count:Long,value:Double)
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[ObjectNode, Acc[Double], ObjectNode] {
override def createAccumulator() = Acc("",0L,0,0,0L,0)
override def add(value: ObjectNode, accumulator: Acc[Double]): Acc[Double] = {
val valueCol=value.get("value").asDouble();
val maxVal=if(accumulator.timeStamp<=0 || valueCol>accumulator.max){
valueCol
}else{
accumulator.max
}
val minVal= if(accumulator.timeStamp<=0 || valueCol<accumulator.min){
valueCol
}else{
accumulator.min
}
val timeStamp=value.get("timestamp").asLong()
val newTimeSamp=if(accumulator.timeStamp<=0 || accumulator.timeStamp>timeStamp ){
timeStamp
}else{
accumulator.timeStamp
}
log.info(s"${value.get("uuid").asText()} $timeStamp $newTimeSamp $maxVal $minVal" )
Acc(value.get("uuid").asText(),newTimeSamp,minVal,maxVal,accumulator.count+1l,valueCol+accumulator.result)
}
override def getResult(accumulator: Acc[Double]):ObjectNode = {
log.info(s"${accumulator.key} ${accumulator.timeStamp} ${accumulator.result} ${accumulator.max} ${accumulator.min}" )
val result=JsonNodeFactory.instance.objectNode()
result.put("uuid",accumulator.key)
result.put("timestamp",accumulator.timeStamp)
result.put("value_max_1_m",accumulator.max)
result.put("value_min_1_m",accumulator.min)
result.put("value_sum_1_m",accumulator.result)
result.put("value_count_1_m",accumulator.count)
result.put("value_mean_1_m",accumulator.result/accumulator.count)
result
} //Acc(accumulator.min,accumulator.max,accumulator.count, accumulator.sum / accumulator.count)
override def merge(a: Acc[Double], b: Acc[Double]): Acc[Double] = {
val maxVal=if(b.max>a.max){
b.max
}else{
a.max
}
val minVal=if(b.min<a.min){
b.min
}else{
a.min
}
val newTimeSamp=if(a.timeStamp<=0 || a.timeStamp>b.timeStamp ){
b.timeStamp
}else{
a.timeStamp
}
Acc(a.key,newTimeSamp,maxVal,minVal,a.count+b.count,a.result+b.result)
}
}
class MyProcessWindowFunction extends ProcessWindowFunction[ObjectNode, ObjectNode, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[ObjectNode]): Unit = {
val averages=elements.iterator.next()
out.collect(averages)
}
}
}
聚合应该发生在eventtime,而不是进程时间,这意味着数据对象中的时间戳。
将ENV Parallelism设置为1时,上述代码正在工作。然后只生成并推进水标记。如果您的主题有多个分区,flink的kafka consumer connector中似乎存在问题。
解释于[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission]
主要内容:1.COUNT函数,2. SUM函数,3. AVG函数,4. MAX函数,5. MIN函数SQL聚合函数用于对表的单个列的多行执行计算,它只返回一个值。它还用于汇总数据。 SQL聚合函数的类型,如下图所示 - 接下来,我们一个个地讲解。 1.COUNT函数 函数用于计算数据库表中的行数,它可以在数字和非数字数据类型上工作。 函数使用返回指定表中所有行的计数。 包函重复值和值。 语法 假设有一个表,它的结构和数据如下所示 - PRODUCT COMPANY QTY RATE COST I
问题内容: 在解释CTE的一些概念时,有人问了一个可爱的问题..我们可以找到行的乘法吗,而我们总是从新手开始集中精力。那给了我一个想法!仅使用SQL是否有可能。我还考虑了我们甚至可以支持的最大精度,因为该产品可能非常庞大。 话虽如此,我们不能编写自己的聚合函数。(可以吗?)我在想仅使用SQL就有可能。 我想到的就像是自己添加2,3次。.但是当集合很大时..由于繁琐,我无法实现。 另一个可能是和,为
查看SQL的ANSI聚合函数,我找不到字符串的任何东西。但是,每个数据库似乎都有自己的数据库,例如MySQL和Oracle的GROUP_CONCAT和LISTAGG,这使得可移植性有点困难。我是不是缺了什么?这是有原因的吗?
主要内容:应用聚合函数在《 Python Pandas窗口函数》一节,我们重点介绍了窗口函数。我们知道,窗口函数可以与聚合函数一起使用,聚合函数指的是对一组数据求总和、最大值、最小值以及平均值的操作,本节重点讲解聚合函数的应用。 应用聚合函数 首先让我们创建一个 DataFrame 对象,然后对聚合函数进行应用。 输出结果: 1) 对整体聚合 您可以把一个聚合函数传递给 DataFrame,示例如下: 输出结果: 2)
在本教程中,将了解和学习SQL Server聚合函数以及如何使用它们来计算聚合。 聚合函数执行一个或多个值的计算并返回单个值。 聚合函数通常与SELECT语句的GROUP BY子句和HAVING子句一起使用。 下表显示了SQL Server中的聚合函数: 编号 聚合函数 描述 1 AVG()函数 函数用于计算集合中非值的平均值。 2 CHECKSUM_AGG()函数 函数根据一组行计算校验和值。
1. 前言 慕课解释:SQL 内置的聚合函数主要用于数据的统计和分析。 本小节,我们将一起学习 SQL 中的聚合函数。 数据统计和分析是挖掘数据规律、发现数据特征的主要手段,SQL 虽然无法直接分析数据,但是提供基本聚合函数来帮助开发者做数据挖掘。 本小节测试数据如下,请先在数据库中执行: DROP TABLE IF EXISTS imooc_user; CREATE TABLE imooc_us