我正在测试Apache Flink(使用v1.8.2)从Kinesis Data Stream读取消息的速度。Kinesis Data Streams仅包含一个分片,它包含40,000条消息。每个消息大小小于5 KB。
尝试使用TRIM\u HORIZON从最旧的消息中读取流,我希望该应用程序能够快速读取所有消息,因为通过GetRecords,每个碎片可以支持高达每秒2 MB的最大总数据读取速率。使用连接器配置(SHARD\u GETRECORDS\u MAX=400,SHARD\u GETRECORDS\u INTERVAL\u MILLIS=1000),应用程序应在几分钟内完成,以读取所有消息。但由于某些原因,阅读所有信息需要花费大量时间。
您介意检查一下我的连接器配置中有什么问题吗?感谢您的帮助。
public static DataStream<ObjectNode> createKinesisStream(
StreamExecutionEnvironment env) throws IOException {
Properties properties = new Properties();
properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");
return env.addSource(new FlinkKinesisConsumer<>(
"stream1", new JsonNodeDeserializationSchema(), properties));
}
main() code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10000L);
source = AppConfig.createKinesisStream(env);
DataStream<ObjectNode> filteredStream = source
.map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO DiscardingSink:15 - 856
一种可能的解释是,管道中的某些东西正在对源施加背压。要仅测量源的容量,可以将作业简化为:
source.addSink(new DiscardingSink<>());
其中,DiscardingSink是
public static class DiscardingSink<OUT> implements SinkFunction<OUT> {
@Override
public void invoke(OUT value, Context context) throws Exception {
}
}
我想使用terraform、Kinesis数据流和数据消防软管创建,并将它们连接(作为管道)。当我使用UI时,当我去消防软管时,我可以去源- 这是创建动觉流的代码(我从官方动觉文档中获取): 这是数据消防水带的代码: 那么我如何连接它们呢,是不是类似于${aws\u kinesis\u stream.test\u stream.arn}?或者类似的东西? 我使用了aws_kinesis_strea
我想制作下面的数据发送架构。 生产商-- 消费者服务器可以关闭,因此我认为应该至少有两个消费者。是这样吗? 当一个数据流有两个使用者时,是否有任何方法可以处理每个使用者一半的数据?正如我所知,这是不可能的。如果每个消费者都使用相同的数据,那就是浪费时间和成本。因为我只为高可用性提供了两个消费者。(用于故障切换) 在web was体系结构中,ELB或L4可以通过负载平衡将一半数据发送到每个was服务
在我当前的项目中,我的目标是从帧流中检测不同的对象。视频帧是用与覆盆子PI连接的摄像机拍摄的。 体系结构设计如下: > 代码正在raspberry PI上运行。此代码将图像流发送到AWS中的Kinesis数据流(称为)。 Lambda函数() 以下是Kinesis数据流日志(日期为2019年8月17日-IST下午1:54)。最后一次,2019年8月16日通过覆盆子PI摄取的数据-下午6:45)
我正在尝试将本地SQL Server 2016 Enterprise Edition实例中的数据获取到云中。我遇到了一个障碍,所以如果有人对解决方法有任何指导,我真的很感谢你分享你的知识! 我计划使用AWS数据库迁移服务(aws.amazon.com),在这篇文章中我将其称为“DMS”。出于监管原因,数据库必须保持在本地,因此我需要不断地从该数据库中捕获数据并将其发送到云端。在这方面,我将使用更改
我正在尝试学习如何使用Oracle Container数据库,并只做基本的JDBC连接。我安装了一个文档化版本的Oracle: https://hub.docker.com/_/oracle-database-enterprise-edition 根据数据表,该数据库由CDB数据库ORCLCDB和PDB数据库ORCLPDB1组成。 所以我想我可以这样连接它: jdbc:oracle:thin:@l
我已经部署了一个nginx容器,并且公开了端口8080:80,但是当我执行curl localhost:8080时,我会得到“recv failure:连接由peer重置”。我已经允许端口8080的入站规则允许入站流量通过容器。