当前位置: 首页 > 知识库问答 >
问题:

Cassandra/Spark显示大表的条目计数不正确

阎自怡
2023-03-14

我试图使用火花处理一个大的cassandra表(~4.02亿条目和84列),但我得到不一致的结果。最初的要求是从这个表复制一些列到另一个表。复制数据后,我注意到新表中的一些条目丢失了。为了验证我计算了大型源表,但每次都得到不同的值。我尝试了一个较小的表(~700万记录)上的查询,结果很好。

最初,我试图使用pyspark进行计数。这是我的pyspark脚本

spark = SparkSession.builder.appName("Datacopy App").getOrCreate() 
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache() 
df.createOrReplaceTempView("data") 
query = ("select count(1) from data " ) 
vgDF = spark.sql(query) 
vgDF.show(10)

Spark提交命令如下:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py

上述spark提交过程需要大约90分钟才能完成。我跑了三次,下面是我得到的计数:

  • Spark迭代1:402273852
  • Spark迭代2:402273884
  • Spark迭代3:402274209

Spark在整个过程中不显示任何错误或异常。我在cqlsh中运行了三次相同的查询,再次得到不同的结果:

  • Cqlsh 迭代 1:402273598
  • Cqlsh 迭代 2:402273499
  • Cqlsh 迭代 3:402273515

我无法找出为什么我从同一个查询中得到不同的结果。Cassandra系统日志(/var/log/cassandra/system.log)仅显示过一次以下错误消息

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

版本:

  • 卡桑德拉3.9。
  • 火花2.1.0。
  • Datastax的火花卡桑德拉连接器2.0.1
  • Scala版本2.11

簇:

  • 具有 3 个工作节点和 1 个主节点的 Spark 设置。
  • 3 个工作器节点还安装了 Cassandra 集群。
  • 每个工作节点都有 8 个 CPU 内核和 40 GB RAM。

任何帮助都将不胜感激。

共有1个答案

赵浩邈
2023-03-14

Spark Cassandra连接器默认读取一致性为LOCAL_ONE,默认写入一致性为LOCAL_QUORUM,因此可以在使用该默认值进行完全修复之前读取部分数据。对于未能写入数据的节点,您可以读取“ONE”,但这不是错误,因为其他2个副本成功。因此,您应该将两个级别都设置为QUORUM,或者将其中一个级别设置为ALL

config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM").
config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM").

默认的CQL shell级别也是1,因此您还应该增加它:

cqlsh> CONSISTENCY QUORUM
 类似资料:
  • 我有这个控制器和操作方法: 下面是模型: 我需要使用<code>Location</code>作为URL中的查询参数名称,以便按预期到达endpoint。 例如< code > http://localhost/API/Appointment/Company/available slots?位置=SYD 然而,当我查看Swagger页面时,该参数被称为<code>Model。位置,这让我的API的

  • 问题内容: 当数据列表中有一长串元素时,所有元素都将显示,并在其旁边带有滚动条。有没有一种简单的方法可以只显示前5个,而仅切掉其他5个? 问题答案: 使用一些现代的javascript和html,您可以执行以下操作。 这是文档: 这是js:

  • 当数据列表中有一组很长的元素时,它们都会显示,旁边有一个滚动条。有没有一种简单的方法可以只显示前5名,而不显示其他的? 例如:http://jsfiddle.net/yxafa/

  • 我在scala spark中运行以下代码,每当我点击count()或show这样的动作函数时,就会出现数组越界异常。但是我可以打印模式 错误堆栈跟踪

  • 我需要编写一个程序,要求输入整数和循环,直到输入为负整数,然后程序结束。此外,循环需要计算正整数项的总数,并将所有项相加。代码的计数部分似乎工作正常,但我获取所有条目总和的方法不起作用。 我在代码中尝试的是将total=total输入放在循环的末尾,但这在测试时并没有给出正确的总和。 此外,我的类型安全块只适用于第一次进入;如果我在整数后输入一个字母,程序就会崩溃。这不应该在每个条目的开头回到类型

  • 这是个好办法吗?我需要担心关闭会话吗?我在哪里/怎么做最好?任何指针都很感激。