当前位置: 首页 > 知识库问答 >
问题:

Spark Cassandra写入所需时间比预期长

申阳伯
2023-03-14

我有一个spark作业,它从一个cassandra表中读取数据,并将结果转储回两个表,只需稍作修改。我的问题是这项工作比预期的要长得多。

代码如下:

val range = sc.parallelize(0 to 100)

val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2)

val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4 )).groupByKey

val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted))

//STORE 1

rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble )).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5"))

//STORE 2  

rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2) ).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11"))

对于大约一百万条记录,STORE1需要将近40秒,STORE2(对rdd3稍作修改)需要一分钟以上。不确定我哪里出错了,也不知道为什么要花这么多时间。我的火花环境如下:

DSE 4.8.9,6个节点70 GB RAM,每个12个内核

任何帮助,不胜感激。

共有1个答案

夹谷琨
2023-03-14

让我做我的猜测。需要日志、性能监控输出和 C* 数据模型才能获得更精确的答案。但是一些数学:你有

  • 加入Cassandra-随机C*读取
  • SaveToCassandra-秒C*写入
  • 火花重新分区?/减少

(我预计SaveToCassadndra需要一半的时间)如果你不运行任何查询之前,你需要减去12-20秒的火花启动执行器和其他东西

因此,对于6个节点上的1M条目,您可以得到40秒:
 1000000/6/40=4166记录/秒/节点。这还不错。具有混合工作负载的每个节点10K/s是一个很好的结果。

第二次写入是第二次的2倍(11列,5列),它在第一次写入之后运行,所以我希望Cassandra此时开始将之前的数据溢出到磁盘,这样您可以在这里获得更多的性能降级。

我是否正确理解当您添加 rdd3.cache() 调用时,第二次运行没有任何变化?太奇怪了。

是的,您可以通过调整C*数据模型和Spark/C*参数获得更好的结果

 类似资料:
  • 我写了一个多线程应用程序,它广泛使用了async/await。它应该在预定的时间下载一些东西。为此,它使用了“await task.delay”。有时它每分钟发送数千个请求。 它按预期工作,但有时我的程序需要记录一些大的东西。这样做时,它会序列化许多对象并将它们保存到一个文件中。在那段时间里,我注意到我预定的任务执行得太晚了。我已经将所有日志记录放到一个具有最低优先级的单独线程中,这样问题就不再经

  • 我在Windows10和vscode上使用Python 3.8.1。

  • 我有一种情况,我想实现一个API重试机制。假设我有一个调用第三方API的API,正常响应时间不到2秒,但有时我们会收到一个错误,说“服务不可用”、“网关超时”等等。 所以我上网看看我们是否有一个图书馆来处理这些事情,我发现了https://jodah.net/failsafe/ 使用图书馆的目的:- 如果在5秒钟内,我没有得到结果,我将取消当前调用的执行,再试一次。 为此,在库中,我可以看到我们有

  • 问题内容: 我想在Android应用程序中设置日期和时间。日期应为今天的日期,但默认情况下,文本字段中的时间应设置为6:00 AM。我已经阅读了很多链接,但其中大多数都显示了今天的时间和日期(例如:2016-03-28 11:53:55)。 问题答案:

  • 问题内容: 我有一个脚本,需要在脚本的不同行执行以下命令: 在我的陈述中,我有以下内容: 我收到以下错误: 如果我将语句的顺序更改为: 我收到以下错误: 如果我再次将语句更改为: 我收到以下错误: 这是怎么回事,我怎么都可以工作? 问题答案: 您的麻烦是,您有一些代码希望对 模块 进行引用,而其他代码希望对类进行引用 。 显然,不能两者兼有。 当您这样做时: 您首先要设置为对该类的引用,然后立即将

  • 我有一个包含记录计数器的固定长度流 记录以开头 字符16+9(人形)包含 字符25+9(人形)包含 用填充并向右对齐的所有数字 记录在1898位置以+结尾(记录为长2000个字符) 我的出口代码有什么问题?为什么我总是得零分?