当前位置: 首页 > 知识库问答 >
问题:

使用度量的Flink事件计数

宇文曦
2023-03-14

我在《Kafka》中有一个主题,在这里我得到了json格式的多种类型的事件。我创建了一个filestreamsink,用bucketing将这些事件写入S3。

FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
                new SimpleStringSchema(),
                properties);
        final StreamingFileSink<Object> errorSink = StreamingFileSink
                .forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeBucketAssignerJson())
                .build();

        env.addSource(errorTopicConsumer)
                .name("error_source")
                .setParallelism(1)
                .addSink(errorSink)
                .name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {

    @Override
    public String getBucketId(Object record, Context context) {
        StringBuffer partitionString = new StringBuffer();
        Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
        try {
            partitionString.append("event_name=")
                    .append(tuple3.f0).append("/");

            String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
            partitionString.append(timePartition);
        } catch (Exception e) {
            partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
                    .append("month=").append(Constants.DEFAULT_MONTH).append("/")
                    .append("day=").append(Constants.DEFAULT_DAY);
        }
        return partitionString.toString();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

现在我想将每个事件的每小时计数作为指标发布给prometheus,并在此基础上发布grafana仪表板。

因此,请帮助我如何使用flink指标实现每个事件的小时计数并发布到prometheus。

谢谢

共有1个答案

鲁炳
2023-03-14

通常情况下,只需为请求创建一个计数器,然后使用Prometheus中的rate()函数,即可获得给定时间内的请求速率。

然而,如果出于某种原因,您想自己做这件事,那么您可以做一些类似于在org中所做的事情。阿帕奇。Kafka。常见的指标。统计数据。速率 。因此,在这种情况下,您需要收集样本列表及其收集时间,以及用于计算速率的窗口大小,然后您可以简单地进行计算,即删除超出范围且已过期的样本,然后简单地计算窗口中的样本数。

然后可以将仪表设置为计算值。

 类似资料:
  • 谈到与StatsD相关的计数器,它的工作方式是你不断发布计数器的值,例如。请求 每当应用程序收到对 StatsD 守护程序的请求时。守护程序设置了刷新间隔,当它将此计数器在该时间段内的聚合推送到外部后端时。此外,它还将计数器重置为 0。 试图将其映射到Flink计数器。 Flink计数器只有inc和dec方法,因此在报告时间到来之前,应用程序可以调用inc或dec来更改计数器的值。 在报告计数器的

  • 我想测量有多少事件在允许的延迟内到达,按事件的特定特征分组。我们假设特定类型的事件有更多的延迟到达,并想验证这一点。 我想到的进行度量的地方是OneElement方法中的自定义触发器,因为这是我们知道事件是否延迟的地方。然而,在SlidingEventTimeWindow的情况下,这意味着如果单个元素延迟超过一张幻灯片,那么它可以被计算多次。 有什么建议吗?

  • 问题内容: 我有两个原始流,我正在加入这些流,然后我要计算已加入的事件总数是多少,尚未加入的事件有多少。我通过使用如下所示的地图来做到这一点 问题1: 这是计算流中事件数量的适当方法吗? 问题2: 我注意到一种有线行为,有些人可能不相信。问题是,当我在IntelliJ IDE中运行Flink程序时,它显示了的正确值,但是当我将该程序提交为时。因此,我获得了将程序作为文件运行而不是实际计数时的初始值

  • 我想用基于历史事件的流计算Flink中基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(而不是基于处理时间): 我已经了解了如何在摄入时添加时间戳: 但是当我进行计算(应用函数)时,当我只是以与没有EventTime时相同的方式进行计算时,它就不起作用了。我读过一些关于我必须设置的水印的东西: 有没有人举一个简单的Scala例子? 尊敬的安德烈亚斯

  • 我将Flink 1.11.3与SQL API和Blink planner结合使用。我在流模式下工作,使用带有文件系统连接器和CSV格式的CSV文件。对于一个时间列,我生成水印,并希望根据这个时间进行窗口聚合。就像根据事件时间快进过去一样。 是否必须为此对时间列进行排序,因为逐行使用时间列,如果不进行排序,可能会发生延迟事件,从而导致行的删除? 我对Ververica的CDC连接器也很感兴趣。也许我

  • 本文向大家介绍laravel 使用事件系统统计浏览量的实现,包括了laravel 使用事件系统统计浏览量的实现的使用技巧和注意事项,需要的朋友参考一下 最近有一个商城项目中有统计商品点击量和艺术家访问量的需求,但又不想改动太多原来的代码,而点击与访问这两个动作是有明确触发点的,正好可以用laravel中的事件系统来做,在点击和访问对应的函数中产生这俩事件,监视器获取到之后,再将记录保存到数据库中,