当前位置: 首页 > 知识库问答 >
问题:

Spark Mapwith State快照不缩放(Java)

壤驷康裕
2023-03-14

我正在使用spark从Kafka Stream接收数据,以接收有关定期发送健康更新的物联网设备的状态以及设备中存在的各种传感器的状态。我的Spark应用程序侦听单个主题,使用Spark direct stream从Kafka流接收更新消息。我需要根据每个设备的传感器状态触发不同的警报。然而,当我添加更多使用Kakfa向spark发送数据的物联网设备时,spark无法扩展,尽管添加了更多的机器,并且执行器的数量增加了。下面我给出了Spark应用程序的精简版本,其中通知触发部分已删除,但性能问题相同。

   // Method for update the Device state , it just a in memory object which tracks the device state  .
private static Optional<DeviceState> trackDeviceState(Time time, String key, Optional<ProtoBufEventUpdate> updateOpt,
            State<DeviceState> state) {
            int batchTime = toSeconds(time);
            ProtoBufEventUpdate eventUpdate = (updateOpt == null)?null:updateOpt.orNull();
            if(eventUpdate!=null)
                eventUpdate.setBatchTime(ProximityUtil.toSeconds(time));
            if (state!=null && state.exists()) {
                DeviceState deviceState = state.get();
                if (state.isTimingOut()) {
                    deviceState.markEnd(batchTime);
                }
                if (updateOpt.isPresent()) {
                        deviceState = DeviceState.updatedDeviceState(deviceState, eventUpdate);
                        state.update(deviceState);
                }
            } else if (updateOpt.isPresent()) {
                DeviceState deviceState = DeviceState.newDeviceState(eventUpdate);
                state.update(deviceState);              
                return Optional.of(deviceState);
            } 

        return Optional.absent();
}
    SparkConf conf = new SparkConf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    .set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
     JavaStreamingContext context= new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put( “zookeeper.connect”, “192.168.60.20:2181,192.168.60.21:2181,192.168.60.22:2181”);
        kafkaParams.put("metadata.broker.list", “192.168.60.20:9092,192.168.60.21:9092,192.168.60.22:9092”);
        kafkaParams.put(“group.id”, “spark_iot”);
        HashSet<String> topics=new HashSet<>();
        topics.add(“iottopic”);

JavaPairInputDStream<String, ProtoBufEventUpdate> inputStream = KafkaUtils.
            createDirectStream(context, String.class, ProtoBufEventUpdate.class,  KafkaKryoCodec.class, ProtoBufEventUpdateCodec.class, kafkaParams, topics);

JavaPairDStream<String, ProtoBufEventUpdate> updatesStream = inputStream.mapPartitionsToPair(t -> {
            List<Tuple2<String, ProtoBufEventUpdate>> eventupdateList=new ArrayList<>();
            t.forEachRemaining(tuple->{
                    String key=tuple._1;
                    ProtoBufEventUpdate eventUpdate =tuple._2;                  
                    Util.mergeStateFromStats(eventUpdate);
                    eventupdateList.add(new Tuple2<String, ProtoBufEventUpdate>(key,eventUpdate));

            });
            return eventupdateList.iterator();
});

JavaMapWithStateDStream<String, ProtoBufEventUpdate, DeviceState, DeviceState> devceMapStream = null;

devceMapStream=updatesStream.mapWithState(StateSpec.function(Engine::trackDeviceState)
                             .numPartitions(20)
                             .timeout(Durations.seconds(1800)));
devceMapStream.checkpoint(new Duration(batchDuration*1000));


JavaPairDStream<String, DeviceState> deviceStateStream = devceMapStream
                .stateSnapshots()
                .cache();

deviceStateStream.foreachRDD(rdd->{
                if(rdd != null && !rdd.isEmpty()){
                    rdd.foreachPartition(tuple->{
                    tuple.forEachRemaining(t->{
                        SparkExecutorLog.error("Engine::getUpdates Tuple data  "+ t._2);
                    });
                });
                }
});

即使负载增加,我也没有看到执行器实例的CPU使用率增加。大多数时候,执行器实例CPU处于空闲状态。我尝试增加kakfa partics(目前Kafka有72个分区。我也试着把它降到36)。我还尝试增加devceMapStream分区,但没有看到任何性能改进。代码不会在IO上花费任何时间。

我正在Amazon EMR(纱线)上运行我们的Spark应用程序,其中有6个执行器实例,每台机器有4个内核和32 gb Ram。它试图将executor实例的数量增加到9个,然后增加到15个,但没有看到任何性能改进。也在spark上玩了一点。违约通过将其设置为20、36、72、100来获得并行度值,但我可以看到20是一个给了我更好性能的值(可能每个执行器的内核数对此有一定影响)。

spark-submit --deploy-mode cluster --class com.ajay.Engine --supervise --driver-memory 5G --driver-cores 8 --executor-memory 4G --executor-cores 4 --conf spark.default.parallelism=20 --num-executors 36 --conf spark.dynamicAllocation.enabled=false --conf spark.streaming.unpersist=false --conf spark.eventLog.enabled=false --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties --conf spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError --conf spark.executor.extraJavaOptions=-XX:HeapDumpPath=/tmp --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.driver.extraJavaOptions=-XX:+UseG1GC --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties s3://test/engine.jar

目前Spark正在努力在10秒内完成处理(我甚至尝试了不同的批处理时间,如5,10,15等)。完成一个批次需要15-23秒,输入速率为每秒1600条记录,每个批次有17000条记录。我需要使用statestam定期检查设备的状态,看看设备是否发出任何警报或任何传感器停止响应。我不确定如何提高火花应用程序的性能?

共有1个答案

麻鹏鹍
2023-03-14

mapAnd State执行以下操作:

将函数应用于此流的每个键值元素,同时为每个唯一键维护一些状态数据

根据其文档:PairDStreamFunctions#mapWithState

这也意味着,对于每个批处理,具有相同密钥的所有元素都是按顺序处理的,而且由于StateSpec中的函数是任意的,由我们提供,没有定义状态组合器,因此无论在mapWithState之前如何划分数据,它都无法进一步并行化。也就是说,当密钥不同时,并行化会很好,但是如果所有RDD元素中只有几个唯一的密钥,那么整个批次将主要由与唯一密钥数相等的内核数进行处理。

在您的情况下,钥匙来自Kafka:

            t.forEachRemaining(tuple->{
                String key=tuple._1;

而您的代码片段并没有显示它们是如何生成的。

根据我的经验,这就是可能发生的情况:批处理的某些部分正在被多个内核快速处理,而另一部分,对于整个批处理的大部分具有相同的密钥,需要更多的时间并延迟批处理,这就是为什么您看到大多数时间只运行一些任务,而执行器负载不足的原因。

要查看它是否是真的,请检查您的密钥分布,每个密钥有多少个元素,会不会只有几个密钥拥有所有元素的20%?如果这是真的,你有以下选择:

  • 更改密钥生成算法
  • mapAnd State之前人为地分割有问题的键,并在稍后组合状态快照以使整个
  • 限制每个批次中要处理的具有相同密钥的元素数量,要么忽略每个批次中第一个N之后的元素,要么将它们发送到其他地方,进入一些“不能及时处理”的Kafka流并分别处理它们
 类似资料:
  • Pinch手势对图片进行缩放。即用两根手指往不同方向拖拉照片,照片会被缩小或放大。 作者说:在本站见到过一些照片缩放的例子,都是用ScrollerView设置的,总感觉有点飘浮。本例子用简单地手势控制照片的缩放。 [Code4App.com]

  • 问题内容: 我需要实现变焦为包含在。我已经通过覆盖方法和调用来成功进行缩放。 这是不正常:对的和的规模如预期,但一定会得到的和这样的寄存器在预分频的位置。我能做什么?感谢您的阅读。 问题答案: 显示了如何使用明确的转化方法扩展鼠标坐标:,,和。)。

  • Overview Rados supports two related snapshotting mechanisms: pool snaps: snapshots are implicitely applied to all objects in a pool self managed snaps: the user must provide the current SnapContext on

  • 我正在尝试在运行时使用Java2D和某种像样的抗锯齿插值(如双线性)来扩展backbuffer。我的想法是将场景渲染到此图像,然后在全屏模式下放大图像,以匹配用户具有的任何分辨率。 请注意,全屏模式很重要。这在窗口模式下不会发生。 有没有一种使用硬件扩展的快速方法?Javadocs建议它存在(-Dsun.java2d.ddscale=true),但它对我没有影响。 代码如下: 结果是: 最近邻(约

  • 大型软件应用程序通常由多个模块组成,并且通常情况下,多个团队正在处理同一应用程序的不同模块。 例如,考虑一个团队正在作为app-ui项目(app-ui.jar:1.0)在应用程序的前端工作,他们正在使用数据服务项目(data-service.jar:1.0)。 现在可能发生的是,从事数据服务的团队正在快速进行错误修复或增强,他们几乎每隔一天就将库发布到远程存储库。 现在,如果数据服务团队每隔一天上

  • 以下是CardView的XML代码: 我尝试对ImageView(和LinearLayout)的属性使用wrap_content,并尝试包含属性android: caleType="fitXY",但它没有帮助,似乎没有任何效果。 谁能帮我一下吗?我非常感谢您的每一句评论,也非常感谢您的帮助。