当前位置: 首页 > 面试题库 >

Java Stream并行化的可视化

国仰岳
2023-03-14
问题内容

通常,不清楚并行流如何精确地将输入拆分为多个块以及以什么顺序连接这些块。是否有任何方法可以可视化任何流源的整个过程,从而更好地了解发生了什么?假设我创建了这样的流:

Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();

我想看一些树状结构:

             [0..99]
         _____/   \_____
        |               |
     [0..49]         [50..99]
    __/   \__        __/  \__
   |         |      |        |
[0..24]  [25..49] [50..74] [75..99]

这意味着将整个输入范围[0..99]划分为[0..49][50..99],然后将范围进一步划分。当然,该图应反映Stream
API的实际工作,因此,如果我对此类流执行某些实际操作,则拆分应该以相同的方式执行


问题答案:

我想用一种解决方案来扩展Tagir的出色答案,该解决方案可以监视_源_ 端甚至中间操作的拆分(受当前流API实现的某些限制):

public static <E> Stream<E> proxy(Stream<E> src) {
    Class<Stream<E>> sClass=(Class)Stream.class;
    Class<Spliterator<E>> spClass=(Class)Spliterator.class;
    return proxy(src, sClass, spClass, StreamSupport::stream);
}
public static IntStream proxy(IntStream src) {
    return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);
}
public static LongStream proxy(LongStream src) {
    return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);
}
public static DoubleStream proxy(DoubleStream src) {
    return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);
}
static final Object EMPTY=new StringBuilder("empty");
static <E,S extends BaseStream<E,S>, Sp extends Spliterator<E>> S proxy(
        S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) {

    final class Node<T> implements InvocationHandler,Runnable,
        Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer {
        final Class<? extends Spliterator> type;
        Spliterator<T> src;
        Object first=EMPTY, last=EMPTY;
        Node<T> left, right;
        Object currConsumer;
        public Node(Spliterator<T> src, Class<? extends Spliterator> type) {
            this.src = src;
            this.type=type;
        }
        private void value(Object t) {
            if(first==EMPTY) first=t;
            last=t;
        }
        public void accept(Object t) {
            value(t); ((Consumer)currConsumer).accept(t);
        }
        public void accept(int t) {
            value(t); ((IntConsumer)currConsumer).accept(t);
        }
        public void accept(long t) {
            value(t); ((LongConsumer)currConsumer).accept(t);
        }
        public void accept(double t) {
            value(t); ((DoubleConsumer)currConsumer).accept(t);
        }
        public void run() {
            System.out.println();
            finish().forEach(System.out::println);
        }
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Node<T> curr=this; while(curr.right!=null) curr=curr.right;
            if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {
                curr.currConsumer=args[0];
                args[0]=curr;
            }
            if(method.getName().equals("trySplit")) {
                Spliterator s=curr.src.trySplit();
                if(s==null) return null;
                Node<T> pfx=new Node<>(s, type);
                pfx.left=curr.left; curr.left=pfx;
                curr.right=new Node<>(curr.src, type);
                src=null;
                return pfx.create();
            }
            return method.invoke(curr.src, args);
        }
        Object create() {
            return Proxy.newProxyInstance(null, new Class<?>[]{type}, this);
        }
        String pad(String s, int left, int len) {
            if (len == s.length())
                return s;
            char[] result = new char[len];
            Arrays.fill(result, ' ');
            s.getChars(0, s.length(), result, left);
            return new String(result);
        }
        public List<String> finish() {
            String cur = toString();
            if (left == null) {
                return Collections.singletonList(cur);
            }
            List<String> l = left.finish();
            List<String> r = right.finish();
            int len1 = l.get(0).length();
            int len2 = r.get(0).length();
            int totalLen = len1 + len2 + 1;
            int leftAdd = 0;
            if (cur.length() < totalLen) {
                cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
            } else {
                leftAdd = (cur.length() - totalLen) / 2;
                totalLen = cur.length();
            }
            List<String> result = new ArrayList<>();
            result.add(cur);

            char[] dashes = new char[totalLen];
            Arrays.fill(dashes, ' ');
            Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
                    + leftAdd, '_');
            int mid = totalLen / 2;
            dashes[mid] = '/';
            dashes[mid + 1] = '\\';
            result.add(new String(dashes));

            Arrays.fill(dashes, ' ');
            dashes[len1 / 2 + leftAdd] = '|';
            dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
            result.add(new String(dashes));

            int maxSize = Math.max(l.size(), r.size());
            for (int i = 0; i < maxSize; i++) {
                String lstr = l.size() > i ? l.get(i) : String.format("%"
                        + len1 + "s", "");
                String rstr = r.size() > i ? r.get(i) : String.format("%"
                        + len2 + "s", "");
                result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
            }
            return result;
        }
        private Object first() {
            if(left==null) return first;
            Object o=left.first();
            if(o==EMPTY) o=right.first();
            return o;
        }
        private Object last() {
            if(right==null) return last;
            Object o=right.last();
            if(o==EMPTY) o=left.last();
            return o;
        }
        public String toString() {
            Object o=first(), p=last();
            return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");
        }
    }
    Node<E> n=new Node<>(src.spliterator(), spc);
    Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n);
    return f.apply(sp, true).onClose(n);
}

它允许用代理程序包装拆分器,该代理器将监视拆分操作和遇到的对象。块处理的逻辑类似于Tagir的逻辑,实际上,我复制了他的结果打印例程。

您可以传入流的源或已附加相同操作的流。(在后一种情况下,您应.parallel()及早申请该流)。正如Tagir解释的那样,在大多数情况下,拆分行为取决于源和配置的并行度,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:

try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {
    is.filter(i -> i/20%2==0)
      .mapToObj(ix->"\""+ix+'"')
      .forEach(s->{});
}

将打印

                                                                  [0..99]                                                                   
                                   ___________________________________/\________________________________                                    
                                  |                                                                     |                                   
                              [0..49]                                                               [50..99]                                
                 _________________/\______________                                     _________________/\________________                  
                |                                 |                                   |                                   |                 
            [0..24]                           [25..49]                            [50..74]                            [75..99]              
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______         
       |               |                 |                 |                 |                 |                 |                 |        
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]     
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)
      .mapToObj(ix->"\""+ix+'"'))) {
    s.forEach(str->{});
}

将打印

                                                                                   ["0".."99"]                                                                                    
                                              ___________________________________________/\___________________________________________                                            
                                             |                                                                                        |                                           
                                       ["0".."49"]                                                                              ["50".."99"]                                      
                         ____________________/\______________________                                           ______________________/\___________________                       
                        |                                            |                                         |                                           |                      
                  ["0".."19"]                                  ["40".."49"]                              ["50".."59"]                                ["80".."99"]                 
            ____________/\_________                      ____________/\______                           _______/\___________                   ____________/\________             
           |                       |                    |                    |                         |                    |                 |                      |            
     ["0".."11"]             ["12".."19"]            (empty)           ["40".."49"]              ["50".."59"]            (empty)        ["80".."86"]           ["87".."99"]       
      _____/\___              _____/\_____           ___/\__            _____/\_____              _____/\_____           ___/\__         _____/\__              _____/\_____      
     |          |            |            |         |       |          |            |            |            |         |       |       |         |            |            |     
["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]

正如我们在此处看到的那样,我们正在监视的结果,.filter(…).mapToObj(…)但是块显然是由源确定的,可能会根据过滤器的条件在下游产生空块。

请注意,我们可以将源代码监视与Tagir的收集器监视结合起来:

try(IntStream s=proxy(IntStream.range(0, 100))) {
    s.parallel().filter(i -> i/20%2==0)
     .boxed().collect(parallelVisualize())
     .forEach(System.out::println);
}

这将打印(请注意,collect输出首先打印):

                                                              [0..99]                                                               
                                  ________________________________/\_______________________________                                 
                                 |                                                                 |                                
                             [0..49]                                                           [50..99]                             
                 ________________/\______________                                   _______________/\_______________                
                |                                |                                 |                                |               
            [0..19]                          [40..49]                          [50..59]                         [80..99]            
        ________/\_____                  ________/\______                   _______/\_______                ________/\_____         
       |               |                |                |                 |                |              |               |        
   [0..11]         [12..19]          (empty)         [40..49]          [50..59]          (empty)       [80..86]        [87..99]     
    ___/\_          ___/\___         ___/\__          ___/\___          ___/\___         ___/\__        ___/\_          ___/\___    
   |      |        |        |       |       |        |        |        |        |       |       |      |      |        |        |   
[0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99]

                                                                  [0..99]                                                                   
                                   ___________________________________/\________________________________                                    
                                  |                                                                     |                                   
                              [0..49]                                                               [50..99]                                
                 _________________/\______________                                     _________________/\________________                  
                |                                 |                                   |                                   |                 
            [0..24]                           [25..49]                            [50..74]                            [75..99]              
        ________/\_____                   ________/\_______                   ________/\_______                   ________/\_______         
       |               |                 |                 |                 |                 |                 |                 |        
   [0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]     
    ___/\_          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___          ___/\___    
   |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

我们可以清楚地看到处理的块是如何匹配的,但是在过滤之后,一些块的元素较少,其中一些完全为空。

这是演示的地方,两种监视方式可以在其中发挥重要作用:

try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {
    is.boxed()
      .collect(parallelVisualize())
      .forEach(System.out::println);
}



                                                                                                [0.0..99.0]                                                                                                 
                                                   ___________________________________________________/\________________________________________________                                                    
                                                  |                                                                                                     |                                                   
                                            [0.0..49.0]                                                                                           [50.0..99.0]                                              
                         _________________________/\______________________                                                     _________________________/\________________________                          
                        |                                                 |                                                   |                                                   |                         
                  [0.0..24.0]                                       [25.0..49.0]                                        [50.0..74.0]                                        [75.0..99.0]                    
            ____________/\_________                           ____________/\___________                           ____________/\___________                           ____________/\___________             
           |                       |                         |                         |                         |                         |                         |                         |            
     [0.0..11.0]             [12.0..24.0]              [25.0..36.0]              [37.0..49.0]              [50.0..61.0]              [62.0..74.0]              [75.0..86.0]              [87.0..99.0]       
      _____/\___              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____              _____/\_____      
     |          |            |            |            |            |            |            |            |            |            |            |            |            |            |            |     
[0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]

                             [0.0..10239.0]                              
       _____________________________/\_____                              
      |                                    |                             
[0.0..1023.0]                      [1024.0..10239.0]                     
                       ____________________/\_______                     
                      |                             |                    
              [1024.0..3071.0]             [3072.0..10239.0]             
                                        ____________/\______             
                                       |                    |            
                               [3072.0..6143.0]     [6144.0..10239.0]    
                                                         ___/\_______    
                                                        |            |   
                                                [6144.0..10239.0] (empty)

这表明Tagir已经解释了什么,大小未知的流分配不佳,甚至事实limit(…)提供了进行良好估计的可能性(实际上,无限+极限在理论上是可预测的),实现没有利用它。

使用的批量大小将源拆分为多个块10241024每次拆分后将其增加,从而在施加的范围之外创建块limit。我们还可以看到每次如何分割前缀。

但是,当我们查看终端拆分输出时,我们可以看到这些多余的块之间已被丢弃,并且第一个块又发生了拆分。由于此块是由中间数组后端组成的,该中间数组已在第一次拆分时由默认实现填充,因此我们在源头没有注意到它,但是我们可以在终端操作中看到此数组已拆分(不足为奇)平衡。

因此,我们需要两种监视方式才能在此处获得完整的图像…



 类似资料:
  • Python 中有很多库可以用来可视化数据,比如 Pandas、Matplotlib、Seaborn 等。 Matplotlib import matplotlib.pyplot as plt import numpy as np %matplotlib inline t = np.arange(0., 5., 0.2) plt.plot(t, t, "r--", t, t**2, "bs", t

  • 表格是一种组织和可视化数据的强大方式。然而,无论数据如何组织,数字的大型表格可能难以解释。 有时解释图片比数字容易得多。 在本章中,我们将开发一些数据分析的基本图形方法。 我们的数据源是互联网电影数据库(IMDB),这是一个在线数据库,包含电影,电视节目,和视频游戏等信息。Box Office Mojo 网站提供了许多 IMDB 数据摘要,我们已经采用了其中一些。 我们也使用了 The Numbe

  • 要创建可视化视图: 点击左侧导航栏的 Visualize 。 点击 Create new visualization 按钮或 + 按钮。 选择视图类型: 基础图形 Line, Area and Bar charts 在X/Y图中比较两个不同的序列。 Heat maps 使用矩阵的渐变单元格. Pie chart 显示每个来源的占比。 数据 Data table 显示一个组合聚合的原始数据。 Met

  • 主要内容:可视化检测系统可视化测试用于通过定义数据来检查软件故障发生的情况,开发人员可以快速识别故障原因,并清楚地表达信息,以便任何其他开发人员可以利用这些信息。 可视化测试旨在显示实际问题,而不仅仅是描述它,显着增加理解和清晰度,以便快速解决问题。 可视化意味着我们可以看到的。因此,可视化测试需要整个过程的视频录制。它捕获视频格式系统测试时发生的所有事情。测试仪将图片网络摄像头中的图片和来自麦克风的音频评论作为输入值。

  • TensorFlow包含一个可视化工具 - TensorBoard。它用于分析数据流图,也用于理解机器学习模型。TensorBoard的重要功能包括有关垂直对齐中任何图形的参数和详细信息的不同类型统计信息的视图。 深度神经网络包括有36,000个节点。TensorBoard有助于在高级块中折叠这些节点并突出显示相同的结构。这允许更好地分析关注计算图的主要部分的图。TensorBoard可视化非常具

  • 在本章中,我们将在Convents的帮助下专注于数据可视化模型。需要以下步骤才能使用传统的神经网络获得完美的可视化图像。 第1步 导入必要的模块,这对于传统神经网络的可视化非常重要。 第2步 要通过训练和测试数据来停止潜在的随机性,请调用以下代码中给出的相应数据集 - 第3步 使用以下代码绘制必要的图像,以完美的方式定义训练和测试数据 - 输出显示如下 -