当前位置: 首页 > 知识库问答 >
问题:

我如何让Spark Streaming在单元测试中计算文件中的单词?

时浩波
2023-03-14

我已经成功地在Java中构建了一个非常简单的Spark流应用程序,它基于Scala中的HdfsCount示例。

当我将这个应用程序提交到我的本地Spark时,它会等待一个文件被写入给定的目录,当我创建该文件时,它会成功地打印字数。我通过按Ctrl+C终止应用程序。

现在我尝试为这个功能创建一个非常基本的单元测试,但是在测试中我无法打印相同的信息,即字数。

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}

我还尝试在ssc.start()之后添加ssc.awaittermination();,但在这方面没有任何改变。之后,我还尝试在Spark Streaming应用程序正在检查的目录中手动创建一个新文件,但这一次它给出了一个错误。

为了完整起见,下面是wordCounts方法:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }

共有1个答案

魏成济
2023-03-14

几点建议:

  • 至少为SparkStreaming上下文提供2个核心。1用于流式处理,1用于火花处理。“本地”->“本地[2]”
  • 您的流式传输间隔为3000ms,因此在程序的某个地方,您需要等待--至少--该时间才能期望输出。
  • Spark Streaming需要一些时间来设置侦听器。发布ssc.start后,将立即创建该文件。文件系统侦听器并不保证已经到位。我会在ssc.start
  • 之后执行一些 sleep(xx)

在流媒体中,关键是要把握好时机。

 类似资料:
  • 我确实看到了一些关于在SonarQube中安装Groovy插件的相关答案,但是没有提到这是用于哪一个版本的SonarQube的解决方案(我不再看到这个插件可用)

  • 问题内容: 我有一个具有以下格式的.txt文件, 尽管显然它要大得多,但实际上是这样。基本上,我试图总结每个单独字符串在文件中的次数(每个字母/字符串在单独的一行上,因此从技术上讲文件是C \ nV \ nEH \ n等。但是,当我尝试将这些文件转换为列表,然后使用count函数时,它会分离出字母,以使诸如’IRQ’之类的字符串为[‘\ n’I’,’R’ ,’Q’,’\ n’],这样当我计算它时,

  • 我使用的是spring boot 1.4.4,我有一个测试类,比如: 这是我的application.yml: application.yml是读取的,因为我将该值存储在字符串路径值中,conf.json位于/src/main/resources和/src/test/resouces中 但我要做什么:mapper.readtree(新URL(路径));我得到错误:文件找不到。 我试过:“class

  • 问题内容: 是否有标准方法(无需安装第三方库)在Python中进行跨平台文件系统模拟?如果我必须使用第三方库,那么哪个库是标准库? 问题答案: pyfakefs(主页)做您想要的–一个 伪造的 文件系统;它是第三方,尽管该第三方是Google。有关使用的讨论,请参见如何为被测模块替换文件访问引用。 对于 嘲笑 ,unittest.mock是用于Python 3.3+(标准库PEP 0417); 有

  • 我使用PyCharm来运行一个相当大的测试套件(1800个快速测试)。最近修改了一个测试,我意识到我的测试不会出错。当我单独运行这些测试时,确实会中断,但是如果我运行整个测试套件,它们总是会通过。这是由于一些以前的同事对类的这种不正确的嘲笑: 我现在需要检查整个测试套件中模拟的使用情况,但是我想单独运行每个测试,看看哪些测试没有通过(我将这些测试标记为高优先级)。我怎么能从控制台或Py魅力中做到这

  • 问题内容: 因此,我将一些单元测试从Selenium IDE导出到了Python。现在,我正在尝试调试某些东西,并且我注意到Selenium使用该模块。我真的很想看到其中的一行输出。它是: 文件顶部是另一行,内容为: 那么此日志文件在哪里?我想看看。 问题答案: 在您的单元测试脚本中,放置 您想将日志文件写入的位置的路径是哪里。 如果没有调用或进行类似的调用来设置日志记录处理程序,该命令将不执行任