当前位置: 首页 > 知识库问答 >
问题:

Flink流-延迟和吞吐量检测

酆意智
2023-03-14

我正在尝试运行Flink流媒体作业。我想确定流处理的延迟和吞吐量。我已启动Kafka代理服务器,并收到来自Kafka的传入消息。如何计算每秒的邮件数(吞吐量)?(如rdd.count。是否有类似的方法来获取传入消息的计数)

(完整的场景:我已经通过生产者发送了消息作为Json对象。我在Json对象中添加了一些信息,如名称为字符串和System.currentTimeMills。在流式传输期间,我如何通过MessageStream(DataStream)获得发送的json对象?(

提前感谢。

代码:

/***从Kafka读取字符串并将其打印到标准输出。*/

public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir", "c:/winutils/");
    // parse input argum    ents
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer010<>(
                    parameterTool.getRequired("topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));


    messageStream.print();

    env.execute();
}

共有2个答案

桂智志
2023-03-14

这个基准测试应用程序可能是一个很好的起点。Flink的Kafka连接器提供的关于延迟跟踪和度量的文档也应该是有趣的阅读。

尉迟鑫鹏
2023-03-14

Flink UI中有一些可用的指标,您可以在其中计算每秒事件的数量等等。

您还可以添加自己的指标,根据您的需求计算一些数字,这可以显示在Flink用户界面中。

最后,对于具体的延迟跟踪,也许你可以尝试这里解释的延迟跟踪,同样,你可以使用米来获得吞吐量

 类似资料:
  • 问题内容: 我为Apache Flink写了一个非常简单的Java程序,现在我对测量统计信息感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间)。 我知道Flink公开了一些指标: https://ci.apache.org/projects/flink/flink-docs- release-1.2/monitoring/metrics.html 但是我不确定如何使

  • 总的来说,我认为我对延迟和吞吐量之间的区别有很好的理解。但是,对于Intel Intrinsics,延迟对指令吞吐量的影响我还不清楚,尤其是在顺序(或几乎顺序)使用多个内在调用时。 例如,让我们考虑: 这有11个延迟,在Haswell处理器上的吞吐量为7。如果我在循环中运行这条指令,我会在11个循环后获得每个循环的连续输出吗?由于这需要一次运行11条指令,并且由于我的吞吐量为7,我是否用完了“执行

  • 我找不到任何关于agner.orgRDRAND指令的延迟或吞吐量的信息。但是,这个处理器存在,所以信息必须在那里。 编辑:实际上,最新的优化手册中提到了此说明。记录如下:

  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 我试图开发以下代码,但它不起作用。我想使用apache Flink来延迟时间(在时间戳字段中指定的)与当前日期不同的事件。 样品: > 当前日期:2022-05-06 10:30 事件1[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 10:30”}-- 事件2[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-

  • 无论从什么角度来看,它都不是。 假设我有两个消费者,它们以每秒“10”条消息的速度从给定主题中消耗数据。现在,不管它们是从单个分区还是从两个不同的分区进行消耗;我的吞吐量将保持不变,每秒20条消息。 我觉得我一定漏了一些内部工作的细节,你能帮我解释一下kafka分区(多个)是如何帮助提高固定用户数量的吞吐量的,而不是单个kafka分区。