当前位置: 首页 > 知识库问答 >
问题:

从spark集群向cassandra集群写入dataframe:分区和性能调优

赖诚
2023-03-14

我有两个集群-1。Cloudera Hadoop-Spark作业在这里运行2。云-卡桑德拉星团,多DC

在编写从spark作业到cassandra集群的dataframe时,我在编写之前在spark中进行了重新分区(repartioncount=10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户spark集群中,对于一个有20M记录的spark批加载,以及以下配置,我看到了很多任务失败、资源抢占和动态失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

PS:我一开始的理解是:对于20m行的负载,“重新分区”应该将负载均匀地分配到执行器上(每个分区有2m行),批处理将在这些分区级别(2m行)上进行。但是现在,我怀疑这是否会导致不必要的洗牌,如果spark-cassandra-connector在整个数据流级别(整个20m行)上进行批处理的话。

更新:删除“重新分区”大大降低了cloudera spark集群的性能(spark级别的默认分区设置是-spark.sql.shuffle.partitions:200),所以我深入研究了一下,发现我最初的理解是正确的。请注意我的spark和cassandra集群是不同的。Datastax spark-cassandra-connector使用cassandra协调器节点打开每个分区的一个连接,所以我决定让它相同。正如Alex所建议的,我已经减少了并发写,我相信这会有所帮助。

共有1个答案

朱丰
2023-03-14

您不需要在Spark中进行重新分区--只需将数据从Spark写入Cassandra即可,不要试图更改Spark Cassandra连接器的默认值--它们在大多数情况下都能正常工作。您需要查看发生了什么样的阶段故障--很可能只是因为spark.Cassandra.output.concurrent.writes=20(使用默认值(5))而重载了Cassandra--有时由于不重载Cassandra,作业也不会重新启动,所以编写器较少,有助于更快地写入数据。

附言。Spark.Cassandra.output.batch.grouping.key中的分区-它不是Spark分区,而是依赖于分区键列值的Cassandra分区。

 类似资料:
  • 我有一个由4个节点组成的Cassandra(2.2.1)集群,由Java客户端应用程序使用。复制因子为3,读写的一致性级别为LOCAL_QUORUM。每个节点大约有5 GB的数据。请求量约为每秒2-4k。几乎没有删除操作,因此创建了少量的墓碑。 一段时间前,我注意到读写性能很差,而且随着时间的推移,性能越来越差——集群变得非常慢。读取(通常)和写入超时已变得非常频繁。硬件不应该引起问题,部署集群的

  • 我必须为每个客户端每秒存储大约250个数值,即每小时大约90万个数字。它可能不会是全天的记录(可能每天5-10个小时),但我会根据客户端ID和读取日期对数据进行分区。最大行长约为22-23M,这仍然是可管理的。无论如何,我的方案看起来像这样: 密钥空间的复制因子为2,仅用于测试,告密者为和。我知道复制因子3更符合生产标准。 接下来,我在公司服务器上创建了一个小型集群,三台裸机虚拟化机器,具有2个C

  • 我们有没有可能在Spark中先按一列分区,然后再按另一列聚类? 在我的例子中,我在一个有数百万行的表中有一个< code>month列和一个< code>cust_id列。我可以说,当我将数据帧保存到hive表中,以便根据月份将该表分区,并按< code>cust_id将该表聚类成50个文件吗? 忽略按< code>cust_id的聚类,这里有三个不同的选项 第一种情况和最后一种情况在 Spark

  • 应用程序不是那么占用内存,有两个连接和写数据集到目录。同样的代码在spark-shell上运行没有任何失败。 寻找群集调优或任何配置设置,这将减少执行器被杀死。

  • 我有一个关于设计的挑战,我有选择在我的卡桑德拉桌子上。这是在生产上运行的。但最近我观察到以下问题。 (这里的表名和列是为了便于讨论) 我的问题是,有人能在不改变Datamodel设计的情况下提出解决方案吗?