当前位置: 首页 > 知识库问答 >
问题:

只有在我的RDD中的所有分区都已处理后,如何在Spark Streaming中接收输入?

子车文康
2023-03-14

假设我有一个JavaDStreamRecencer,它每秒从Spark Streaming中的TCP/IP套接字连接接收一个整数。然后我将其存储在列表中,直到我有100个整数。之后,我想将该RDD划分为4个分区,在我的电脑中每个内核一个,并在paralel中映射这些分区。所以类似于这样:

 public final class sparkstreaminggetjson {
 private static final Pattern SPACE = Pattern.compile(" ");
 private static Integer N=100;
 private static List<Integer> allInputValues= new List<Integer>();

 public static void main(String[] args) throws Exception {

  SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");


  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
        args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);

  JavaDStream<List<Integer>> storeValuesInList=receivedStream.map( // if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value );

  JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);


  JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)

  JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)

...

finalStream.print();

这是我的问题。我想实现一个FILO模型,在该模型中,我接收一个新输入,将其放置在RDD的第一个分区中,并从RDD的最后一个分区中删除最后一个元素。因此,基本上,我从列表中放置并轮询整数,保持原始大小。之后,我像往常一样并行处理每个分区。

这是我的问题:每当我的分区完成处理时,应用程序都会返回接收流,而不是分区列表。也就是说,我每个分区都会得到一个新的输入,这不是我想要的。我希望每个分区都被处理,然后才返回接收流以获取新的输入。

我该怎么做?我应该用其他方法来分离阶段吗?

非常感谢你。

共有1个答案

赵炯
2023-03-14

据我所知,你可以使用一个窗口:每秒1个整数意味着你可以使用

JavaDstream integers = your stream;
JavaDstream hundredInt = integers.window(Seconds(100));

这样,每个RDD将有100个整数。

根据缓冲:newInt-

 类似资料:
  • 我有一个有 30 条记录的 RDD(键/值对:键是时间戳,值是 JPEG 字节数组), 我正在运行 30 个执行器。我想将此 RDD 重新分区为 30 个分区,以便每个分区获得一条记录并分配给一个执行器。 当我使用 30) 时,它会在 30 个分区中重新分区我的 rdd,但有些分区得到 2 条记录,有些得到 1 条记录,有些没有得到任何记录。 在Spark中,有没有什么方法可以将我的记录平均分配到

  • 旋转设备后,我们在Backback或fragmentSupportManager中的任何片段都会激活onSaveInstanceState。 但是,在第一次旋转之后,内部的所有内容都为null(所有变量),应用程序会因null指针异常而崩溃。 我想知道为什么如果片段实际上被销毁为零(所有值都为null),为什么片段仍然存在,并且调用onSaveInstanceState? 我的代码很深,有多个基类

  • 本文向大家介绍如何处理所有Replica都不工作相关面试题,主要包含被问及如何处理所有Replica都不工作时的应答技巧和注意事项,需要的朋友参考一下   上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案: 1.等待ISR中的任一个Repli

  • 我正在使用Laravel 5,并试图像这样在控制器中获得POST变量的所有输入- 所以,我得到了这个错误- 我做错了什么?

  • 我需要为以下有效日期创建。 我尝试了以下日期时间格式化程序,但在最近两个日期(和)中失败。 从到的所有日期都可以正常工作,但在尝试分析最近两个日期时,我得到一个: 线程“main”java.time.format.DateTimeParseException中出现异常:无法解析文本“2017-06-20T17:25:28.477777+0530”,在索引29处找到未解析的文本 用于解析我正在使用的

  • 我试图在对数据的某一列执行聚合操作之前对数据进行预分区。我有3个工作节点,我希望每个分区在我分区的列中都有不重叠的值。我不希望出现两个分区在列中可能具有相同值的情况。 例如。如果我有以下数据 那么以下隔墙是令人满意的: 分区1 分区2 分区3 不幸的是,我下面的代码不起作用。 我已经看过了 如何定义数据帧的分区 我还是想不通。