在e2e FlinkSQL教程中,源表被定义为带有启用水印的时间戳列的Kafka源表
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime AS PROCTIME(), -- generates processing-time attribute using computed column
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'kafka:9094', -- kafka broker address
'format' = 'json' -- the data format is json
);
只要GROUP BY是由一个翻滚在ts上的字段生成的,这看起来很自然(因为Flink知道何时触发/弹出窗口),但在教程的中间我们看到了以下表达式
INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
FROM (
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
user_id
FROM user_behavior)
GROUP BY date_str;
在这里,我们看到分组是在导数<code>date_str</code>字段上进行的,但是水印在这里是如何工作的呢?Flink如何决定何时“关闭”date_ str桶?由于<code>date_str
也许您可以参考下面的链接来了解水印的生成和交付,特别是“操作员如何处理水印”
在本例中,水印是从源运算符的ts生成的,而下游运算符将仅处理水印,这与date_str字段无关。
public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
......
@Override
public void open() throws Exception {
super.open();
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator =
emitProgressiveWatermarks
? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
: new NoWatermarksGenerator<>();
wmOutput = new WatermarkEmitter(output);
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0 && emitProgressiveWatermarks) {
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
@Override
public void processElement(final StreamRecord<T> element) throws Exception {
final T event = element.getValue();
final long previousTimestamp =
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}
......
@Override
public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark)
throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE) {
wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
}
}
......
}
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
我有一个这样的方法: 此方法需要以如下字符串形式返回3个最昂贵项目的产品ID:“item1,item2,item3”。我应该只能使用溪流,我被困在这里了。我应该能够按值对项目进行排序,然后获得产品ID,但我似乎无法使其正常工作。 编辑: 产品ID位于入口类中
我有一个BigDecimals(在本例中是)的集合,希望将其添加在一起。有没有可能使用流来实现这一点? 我注意到类有几个方法 每一个都有一个方便的方法。但是,正如我们所知,和算术几乎总是一个坏主意。 那么,有没有方便的方法来求和BigDecimals呢? 这是我目前掌握的代码。 为后人编辑后答案: 这两个答案都非常有用。我想补充一点:我的实际场景不涉及原始的集合,它们包装在发票中。但是,我能够修改
我已经阅读了Spring Cloud stream binder参考文档,其中提到了使用@RabbitListener进行DLQ处理。https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#ra
我有一个作者集合和一个书籍集合。一本书与作者之间的联系是作者的电子邮件地址。 Book类有一个作者电子邮件地址的:。 如何使用lambda表达式获取所有图书的所有作者列表,并按图书名称进行筛选?
问题内容: 我的清单包含大小等的集合。我尝试这样做,但似乎不起作用。 我想要的最终结果是。 我可以尝试添加在所有的元素和那种出来再做出新的的。但是,有某种班轮吗? 更新: 这可行,但是可以简化吗? 问题答案: @Eugene的回答很甜蜜,因为番石榴很甜。但是,如果您碰巧在类路径中没有番石榴,这是另一种方式: 首先,我将所有集合映射到一个流中,然后对所有元素进行排序,最后,将整个排序后的流收集到集合
我有以下课程 我正在尝试按贡献者名称和角色名称对ResourceContributor列表进行排序。到目前为止,我得到的是: 我已经尝试使用then比较,但还不知道如何使用它。