我们有一个叫做cassandra扫描的程序,它使用spark-cassandra连接器在一个非常大的表中列出分区键的所有值。该表有大约1700万个Cassandra分区,每个分区平均有200行。包含该表的Cassandra集群在6个节点上运行DSE 5.1.8。包含该表的键空间的复制因子为3。
以下是密钥空间和表的简化定义。
CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
cassandra-scan中用于列出分区键的所有值的语句如下:
val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)
我们使用Apache Spark 2.3.1和spark-cassandra-connector 2.3.2。用于启动cassandra-scan的命令如下。
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
cassandra-scan运行正常,大约需要19个小时。
我们最近建立了一个新的 Cassandra 集群,同样有 6 个节点(与第一个集群中使用的节点不同)。此群集运行 DSE 6.8.16。第一个表中的所有数据都已添加到新集群中的表中。
我们将Apache Spark的版本更新为2.4.8,将Spark cassandra连接器更新为2.4.2。我们测试了Spark分区数在2000到200000之间的程序。我们无法让cassandr扫描正常运行。我们看到以下格式的错误:
java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
一些 cassandra-scan 运行导致一些 Cassandra 节点关闭,并在 Cassandra 日志中显示如下消息。
INFO [CoreThread-22] 2022-04-03 06:26:35,467 InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248 AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer
非常感谢任何帮助让这个工作。谢谢。
我们使用DataStax Bulk Loader解决了这个问题。
dsbulk unload \
--connector.csv.url <path>/<to>/<outputDir> \
-h <host> \
-query "select distinct id from myspace.largetable"
dsbulk花费了大约3个小时来获得1750万个值。
此错误表示群集中至少有一个节点无法为请求提供服务:
Not enough replicas available for query at consistency LOCAL_ONE \
(1 required but only 0 alive)
您需要查看Cassandra日志来确定(1)哪个节点没有响应/不可用,以及(2)原因。干杯!
我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改
我试图使用Apache Spark来处理我的大型(230K条目)cassandra数据集,但我经常遇到不同类型的错误。然而,我可以成功地运行应用程序时,运行在一个数据集约200个条目。我有一个由3个节点和1个主节点和2个工作节点组成的spark设置,这两个工作节点还安装了一个cassandra集群,该集群的数据索引复制系数为2。我的两个spark workers在web界面上显示2.4和2.8GB
我正在使用Apache Spark 2.0、Apache Cassandra 3.7和Apache Spark Java Connector for Cassandra 2.11(2.0.0-M3)
我最近在我的5节点集群中安装了DataStax Enterprise(V5.0)。我计划使用3个节点作为spark,2个节点作为cassandra多节点集群。 另外,我应该更改DSE中的哪些conf文件,以便spark连接到在另一个节点上运行的cassandra(而不是在127.0.0.1:9042)
类似Bigtable的数据库存储按其键排序的行。 Cassandra使用分区和聚类键的结合来保持数据的分布和排序;但是,您只能通过使用分区键来选择行! 用于上述查询的Cassandra存储层的可视化。
SyntaxException:第1:34行在输入'(')处没有可行的替代方案(UPDATE mytable SET mycolumn=cast 我看到了关于cast函数的文档,对于任何其他类型,都没有在输出类型下列出。 https://docs.datastax.com/en/dse/5.1/cql/cql/cql_reference/refcqlfunction.html#refcqlfunc