当前位置: 首页 > 知识库问答 >
问题:

Spark cassandra连接器+连接超时

闾丘成双
2023-03-14
val df1 = spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "mykeyspace")
    .option("table", "mytable")
    .load

**dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万**

val df2 = spark.read
    .format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("url","****")
    .option("dbtable","table")
    .option("user", "username")
    .option("password", "password123")
    .load()

  val joinExpr = df1.col("ID") === df2.col("ID")

  val res = df1.join(df2,joinExpr)

  res.write.mode(SaveMode.Append).format("orc")
    .saveAsTable("targetTable")

现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)”。

即使失败,也将LOCAL_ONE更改为QUORUM。

我甚至尝试过将键dataframe分成20个键的批(一个dataframe中的20个ID值),然后与cassandra表联接--即使失败了。

我甚至尝试过IN子句,尽管它有效,DBA在加载Cassandra时限制我们运行该子句,并导致CPU峰值。

在使用Cassandra DBA进行检查时,他们要求执行指向查询,因为上面的查询导致了大令牌范围扫描,这导致了失败。但是,单个有针对性的查询将导致到Cassandra的15万次往返(需要几个小时才能完成),这太昂贵了。

 <scala.version>2.11.12</scala.version>
 <spark.version>2.2.0</spark.version>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>



spark-submit --class ExampleCassandra --deploy-mode client --num-executors 15 --executor-memory 4g  --driver-memory=1g  --conf spark.sql.shuffle.partitions=25 --conf spark.executor.heartbeatInterval=100s --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --jars spark-sql_2.11-2.4.0.jar,spark-core_2.11-2.4.0.jar,spark-hive_2.11-2.4.0.jar,mysql-connector-java-8.0.18.jar,spark-cassandra-connector_2.11-2.5.1.jar ExampleCassandra-bundled-1.0-SNAPSHOT.jar

代码=>Spark.sparkcontext.version=2.4.0中打印的Spark版本

由此产生的计划

== Physical Plan ==
*(8) SortMergeJoin [item_nbr#31], [item_nbr#24], Inner
:- *(2) Sort [item_nbr#31 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(item_nbr#31, 25)
:     +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [item_nbr#31,planNum#32,strN#33,currTail#34,currTailTy#35,hor#36,prNbr#37,revSce#38,stckHnad#39] PushedFilters: [], ReadSchema: struct<item_nbr:int,planNum:int,strN:int,currTail:decimal(38,18),currTailTy:s...
+- *(7) Sort [item_nbr#24 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(item_nbr#24, 25)
      +- *(6) HashAggregate(keys=[item_nbr#21], functions=[])
         +- Exchange hashpartitioning(item_nbr#21, 25)
            +- *(5) HashAggregate(keys=[item_nbr#21], functions=[])
               +- *(5) Filter (NOT (trim(lower(item_nbr#21), None) = null) && isnotnull(cast(trim(item_nbr#21, None) as int)))
                  +- Generate explode(split(items#4, ,)), false, [item_nbr#21]
                     +- *(4) Project [items#4]
                        +- *(4) BroadcastHashJoin [planNum#0], [planNum#2], Inner, BuildRight
                           :- *(4) Scan JDBCRelation(( select planNum from QAMdPlans.Plan where plan_type = 'MBM' order by planNum desc ) t) [numPartitions=1] [planNum#0] PushedFilters: [*IsNotNull(planNum)], ReadSchema: struct<planNum:int>
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                              +- *(3) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [planNum#2,items#4] PushedFilters: [], ReadSchema: struct<planNum:int,items:string>

共有1个答案

徐奇逸
2023-03-14

问题是2.0.5版没有优化数据规则的联接--如果您执行res.explain,您将看到Spark将执行从Cassandra读取所有数据,然后在Spark级别执行联接。优化后的联接仅在RDD API中作为leftjoinWithCassandRatablejoinWithCassandRatable可用。

随着Spark Cassandra Connector2.5的发布,这种情况发生了变化,它现在包含了优化的Dataframe API联接(但您需要启用Spark SQL扩展才能使其工作)。因此,您需要将连接器升级到2.5.lates(2.5.1),或者使用RDD API中的联接功能

附言。我最近写了一篇详细的博客文章,介绍了使用Dataframe和RDD API从Spark中对Cassandra表中的数据进行有效联接。

 类似资料:
  • 问题内容: 我在代码中使用了RMI: 这些是4个.java文件。 接下来,我编译所有这些文件。然后创建一个using 。之后,我使用来在服务器端启动rmi注册表。然后,我开始使用服务器,最后使用客户端。 但是什么也没发生 客户端抛出的异常是 原因是什么,我该如何解决? 在客户端计算机上,这些是以下.class文件,在服务器端 问题答案: 错误消息说明了一切:您的连接超时。这意味着您的请求在某个(默

  • 代码片段如下所示: 如果有人有决议,请帮忙?

  • 我想在云运行应用程序中连接云SQL。我使用了golang。这是关于sql连接设置的代码。 我在Cloud Run设置控制台设置了环境变量。delpoy Application后,Cloud Run控制台显示和

  • 我已经做了所有的尝试,但都不起作用,这是我的问题,我试图将一些文件从一台机器发送到另一台机器,想法是在目标机器上创建一个servlet,在服务器上运行它,并等待任何客户机连接,客户机和servlet代码在lan网络类型中完美地工作,但当涉及到wan网络类型时,我得到了这个例外 线程“main”org.apache.http.conn.HTTPhostConnectException:连接到192.

  • 我正在使用带有PostgreSQL数据库服务器的桌面应用程序。当我连续10到20分钟不使用应用程序时,数据库连接会断开。我正在使用PostgresqlJDBC进行数据库连接。 请帮我在这个数据库连接超时。 谢谢

  • 我正在使用Java套接字创建加密通信终端。我的问题是,当我通过“localhost”或我的计算机局域网地址连接时,程序连接完美,并按预期工作,但当我使用我的公共IP地址连接时,连接被拒绝,我得到 java.net.连接异常:连接超时:连接java.base/java.net.DualStackPlainSocketImpl.connect0(本地方法)在java.base/java.net.Dua