我为Apache
Flink写了一个非常简单的Java程序,现在我对测量统计信息感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
我知道Flink公开了一些指标:
https://ci.apache.org/projects/flink/flink-docs-
release-1.2/monitoring/metrics.html
但是我不确定如何使用它们来获取我想要的东西。从链接中我已经读到“仪表”可以用来测量平均吞吐量,但是在定义后,我应该如何使用它?
我们正在运行在纱线上的生产流作业中运行自定义指标,例如仪表,仪表。
步骤如下:
对pom.xml的附加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
我们正在使用1.2.1版
然后将仪表添加到MyMapper类。
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
.map(new MyMapper())
.writeAsCsv("/home/LizardKing/Results.csv");
JobExecutionResult res = env.execute();
}
private static class MyMapper extends RichMapFunction<String, Object> {
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public Object map(String value) throws Exception {
this.meter.markEvent();
return value;
}
}
}
希望这可以帮助 。
我正在尝试运行Flink流媒体作业。我想确定流处理的延迟和吞吐量。我已启动Kafka代理服务器,并收到来自Kafka的传入消息。如何计算每秒的邮件数(吞吐量)?(如rdd.count。是否有类似的方法来获取传入消息的计数) (完整的场景:我已经通过生产者发送了消息作为Json对象。我在Json对象中添加了一些信息,如名称为字符串和System.currentTimeMills。在流式传输期间,我如
总的来说,我认为我对延迟和吞吐量之间的区别有很好的理解。但是,对于Intel Intrinsics,延迟对指令吞吐量的影响我还不清楚,尤其是在顺序(或几乎顺序)使用多个内在调用时。 例如,让我们考虑: 这有11个延迟,在Haswell处理器上的吞吐量为7。如果我在循环中运行这条指令,我会在11个循环后获得每个循环的连续输出吗?由于这需要一次运行11条指令,并且由于我的吞吐量为7,我是否用完了“执行
无论从什么角度来看,它都不是。 假设我有两个消费者,它们以每秒“10”条消息的速度从给定主题中消耗数据。现在,不管它们是从单个分区还是从两个不同的分区进行消耗;我的吞吐量将保持不变,每秒20条消息。 我觉得我一定漏了一些内部工作的细节,你能帮我解释一下kafka分区(多个)是如何帮助提高固定用户数量的吞吐量的,而不是单个kafka分区。
我找不到任何关于agner.orgRDRAND指令的延迟或吞吐量的信息。但是,这个处理器存在,所以信息必须在那里。 编辑:实际上,最新的优化手册中提到了此说明。记录如下:
来自AWS Lambda常见问题解答: Q: 我一次可以执行的AWS Lambda函数的数量是否有限制? 不需要。AWS Lambda旨在并行运行多个函数实例。然而,AWS Lambda的默认安全限制为每个区域每个帐户100次并发执行。如果您希望提交请求以增加100次并发执行的限制,您可以访问我们的支持中心,单击“打开新案例”,然后提交服务限制增加请求。 Q: 如果我的帐户超过并发执行的默认限制,
在大数据存储中,IOPS和吞吐量之间的关键区别是什么