当前位置: 首页 > 知识库问答 >
问题:

spark如何在cassandra表之间复制数据?

平光明
2023-03-14

谁能解释一下从一个表读取数据并将其写入另一个表时,火花的内部工作在cassandra中。

以下是我的用例:

我正在通过一个kafka主题将来自IOT平台的数据摄取到cassandra中。我有一个小型python脚本,它解析来自kafka的每条消息以获取它所属的表名,准备查询并使用datastax的cassandra-python驱动程序将其写入cassandra。使用该脚本,我每分钟能够将大约30万条记录摄取到cassandra中。然而,我的传入数据速率是每分钟510000条记录,因此kafka消费者延迟不断增加。

Python脚本已经并发调用了cassandra。如果我增加python执行器的数量,cassandra-driver开始失败,因为cassandra节点变得不可用。我假设我每秒钟访问的cassandra调用是有限的。下面是我得到的错误消息:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

最近,我运行了一个pyspark作业,将数据从一个表中的两列复制到另一个表。该表中有大约1.68亿条记录。Pyspark工作在大约5小时内完成。因此,它每分钟处理超过550000条记录。

下面是我使用的pyspark代码:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

版本:

    < li >卡珊德拉3.9。 < li>Spark 2.1.0。 < li>Datastax的spark-Cassandra-连接器2.0.1 < li > Scala版

簇:

  • 具有 3 个工作节点和 1 个主节点的 Spark 设置。
  • 3 个工作器节点还安装了 Cassandra 集群。(每个Cassandra节点都有一个Spark工作节点)
  • 每个工作线程允许 10 GB 内存和 3 个内核。

所以我想知道:

> < Li > < p > spark是先从cassandra读取所有数据,然后将其写入新表,还是在spark cassandra connector中有某种优化,允许它在cassandra表中移动数据,而无需读取所有记录?

如果我用一个火花流作业替换我的python脚本,在这个作业中我解析数据包以获取cassandra的表名,这会帮助我更快地将数据摄取到cassandra中吗?

共有1个答案

太叔景同
2023-03-14

Spark连接器进行了优化,因为它将处理和读取/插入数据并行化到拥有数据的节点中。使用Cassandra Spark连接器可能会获得更好的吞吐量,但这需要更多资源。

谈论你的任务——300000次插入/分钟是5000次/秒,坦率地说,这不是一个很大的数字——你可以通过不同的优化来提高吞吐量:

  • 使用异步调用提交请求。您只需要确保提交更多可以由一个连接处理的请求(但您也可以增加这个数量-我不知道如何在Python中完成,但请查看Java驱动程序文档以获得想法)
  • 使用正确的一致性级别(LOCAL_ONE应该可以提供非常好的性能)
  • 使用正确的负载平衡策略
  • 您可以并行运行脚本的多个副本,确保它们都在同一个Kafka用户组中
 类似资料:
  • 我在处理Cassandra时学到的第一件事是,您需要非常小心您的数据模型,因为Cassandr不支持联接、聚合等。因此,您可以根据查询对数据进行建模,使用广泛的去规范化等。 但是,假设我已经决定在Cassandra上运行Spark。Spark将允许对数据进行连接和聚合,以及不同的计算。 所以我的问题是,当在Cassandra中对我的数据建模并考虑到我将在其上运行Spark时,我需要改变我的模型以使

  • 我的目标是使用不同的主键将行从一个Cassandra2.0表移动到另一个表。为了确保数据的一致性,我需要用原始的时间戳插入它们()。我计划使用/复制,或者只使用和从CSV自定义导入。 是否有方法复制带有时间戳的行?在TimeUUID列上不起作用,说明“不正确的复制命令”。 提前谢了。

  • 问题内容: 如何在Apache poi的两个不同的工作簿之间复制工作表? 该方法缺少引用此线程。没有这样的解决方案。 问题答案: 经过研究和尝试,这里是我的解决方案。我已经通过Java 1.6和apache-poi 3.8进行了测试 在这里,我将不提供适用于我的解决方案,特别是在这种情况下,我将粘贴工作表和输入流列表(我要合并的不同工作簿)。 我的解决方案是一个合并的解决方案,我从这里合并了整个源

  • 我正在做一些基于数据的事情,我需要将一个表从一个模型复制到另一个模型,但我尝试了很多方法都没有效果。有什么办法可以这样做吗?

  • 我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改