当前位置: 首页 > 知识库问答 >
问题:

cassandra direct join的Spark流不工作

颜举
2023-03-14

嗨,伙计们!我正在尝试开发火花流应用程序,但遇到了一些问题。一些细节:我们有Kafka主题,spark 3.2.1和Cassandra 4.0.4,带有datastax spark Cassandra连接器版本com.datastax.spark:spark-Cassandra-connector_2.12:3.1.0

我需要数据的下一条路线。

获取 kafka 消息并在 Spark 中转换为数据帧 -

在文档中写了关于新功能,自SCC 2.5以来可用的DataFrame API不仅来自DSE,而且是RDD API中的DirectJoin等于joinWithCassandraTable。如果我尝试使用Datasourse V2 API,我会在火花方面得到通常的SortMergeJoin。坦率地说,它并不是真正的“流媒体”应用程序,要在Cassandra中添加数据,我使用微批处理方式。

== Physical Plan ==
AppendData (12)
+- * Project (11)
   +- * Filter (10)
      +- * SortMergeJoin LeftOuter (9)
         :- * Sort (4)
         :  +- Exchange (3)
         :     +- * SerializeFromObject (2)
         :        +- Scan (1)
         +- * Sort (8)
            +- Exchange (7)
               +- * Project (6)
                  +- BatchScan (5)


(1) Scan
Output [1]: [obj#342]
Arguments: obj#342: org.apache.spark.sql.Row, MapPartitionsRDD[82] at start at RunnableStream.scala:13

(2) SerializeFromObject [codegen id : 1]
Input [1]: [obj#342]
Arguments: [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), LongType) AS user_id#343L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_type), StringType), true, false, true) AS user_type#344, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, order_id), StringType), true, false, true) AS order_id#345, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, status_name), StringType), true, false, true) AS status_name#346, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, status_dttm), TimestampType), true, false, true) AS status_dttm#347]

(3) Exchange
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: hashpartitioning(user_id#343L, user_type#344, 16), ENSURE_REQUIREMENTS, [id=#793]

(4) Sort [codegen id : 2]
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: [user_id#343L ASC NULLS FIRST, user_type#344 ASC NULLS FIRST], false, 0

(5) BatchScan
Output [2]: [user_id#348L, user_type#349]
Cassandra Scan: keyspace_name.table_name
 - Cassandra Filters: []
 - Requested Columns: [user_id,user_type]

(6) Project [codegen id : 3]
Output [2]: [user_id#348L, user_type#349]
Input [2]: [user_id#348L, user_type#349]

(7) Exchange
Input [2]: [user_id#348L, user_type#349]
Arguments: hashpartitioning(user_id#348L, user_type#349, 16), ENSURE_REQUIREMENTS, [id=#801]

(8) Sort [codegen id : 4]
Input [2]: [user_id#348L, user_type#349]
Arguments: [user_id#348L ASC NULLS FIRST, user_type#349 ASC NULLS FIRST], false, 0

(9) SortMergeJoin [codegen id : 5]
Left keys [2]: [user_id#343L, user_type#344]
Right keys [2]: [user_id#348L, user_type#349]
Join condition: None

(10) Filter [codegen id : 5]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]
Condition : (isnull(user_id#348L) = true)

(11) Project [codegen id : 5]
Output [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]

(12) AppendData
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3358/1878168161@32616db8, org.apache.spark.sql.connector.write.WriteBuilder$1@1d354f3b

换句话说,如果我尝试使用数据源 V1 并在将 cassandra 表作为数据帧时明确指出 directJoinSetting,例如

spark.read.cassandra格式("table leName","key space").选项("DirectJoinS的","on"). load

这将调用加入时的错误:

Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.UnaryExecNode.children$(Lorg/apache/spark/sql/execution/UnaryExecNode;)Lscala/collection/Seq;
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinExec.children(CassandraDirectJoinExec.scala:18)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.hasCassandraChild(CassandraDirectJoinStrategy.scala:206)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:241)
    at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:240)

完整的火花提交命令

/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master yarn --deploy-mode cluster --name "name" \
--conf spark.driver.cores=1 \
--conf spark.driver.memory=1g \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties" \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=4 \
--conf spark.executor.memory=8g \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties" \
--conf spark.yarn.queue=default \
--conf spark.yarn.submit.waitAppCompletion=true \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs:///spark3-history/ \
--conf spark.eventLog.compress=true \
--conf spark.sql.shuffle.partitions=16 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
--conf spark.sql.catalog.cassandracatalog=com.datastax.spark.connector.datasource.CassandraCatalog \
--conf spark.sql.dse.search.enableOptimization=on \
--conf spark.cassandra.connection.host=cassandra_host \
--conf spark.cassandra.auth.username=user_name \
--conf spark.cassandra.auth.password=*** \
--conf spark.sql.directJoinSetting=on \
--class ...

cassandra的类连接器

import org.apache.spark.sql._

class CassandraConnector(
  val ss: SparkSession,
  catalog: String,
  keyspace: String,
  table: String
) extends Serializable {

  def read: DataFrame = ss.read.table(s"$catalog.$keyspace.$table")
  def writeDirect(dataFrame: DataFrame): Unit = dataFrame.writeTo(s"$catalog.$keyspace.$table").append()

}

卡萨德拉ddl表

CREATE KEYSPACE IF NOT EXISTS keyspace_name
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};

CREATE TABLE IF NOT EXISTS keyspace_name.table_name
(
    user_id BIGINT,
    user_type VARCHAR,
    order_id VARCHAR,
    status_name VARCHAR,
    status_dttm timestamp,
    PRIMARY KEY (user_id, user_type)
);

正在连接和写入卡桑德拉的方法

 override def writeBatch(batch: Dataset[Row], batchId: Long): Unit = {
    val result =
      batch
        .as("df")
        .join(
          cassandraConnector.read
            .as("cass"),
          col("df.user_id") === col("cass.user_id")
            && col("df.user_type") === col("cass.user_type"),
          "left"
        )
        .withColumn("need_write", when(col("cass.user_id").isNull, true).otherwise(false))
        .filter(col("need_write") === true)
        .select("df.user_id", "df.user_type", "df.order_id", "df.status_name", "df.status_dttm")

    cassandraConnector.writeDirect(result)

  }

有人能解释一下我做错了什么吗?

共有1个答案

弓磊
2023-03-14

是的,Spark Cassandra Connector 的版本是问题的根源 - 高级功能(如直接加入)严重依赖于 Spark 内部类,这些类可能会在版本之间发生变化。因此,如果您使用 Spark 3.2,则需要使用相应版本的 SCC:com.datastax.spark:spark-cassandra-connector_2.12:3.2.0

请注意,Spark 3.3还没有版本。。。

另外,我有一篇关于使用直接连接的博客文章,你可能会感兴趣。

 类似资料:
  • 我曾尝试将spark程序作为单步执行Oozie工作流。我使用了jar,它通过spark submit或spark shell(相同的代码)成功执行: 应用程序不应需要大量资源—加载单个csv( 火花版本:1.6.0 Oozie版本:4.1.0 工作流是使用Hue、Oozie工作流编辑器创建的: 运行工作流后,我得到以下日志: 标准输出: 立即调用Spark类 失败的Oozie启动器,Main类[o

  • 我正在为Spark Streaming作业构建指标系统,在系统中,指标收集在每个执行程序中,因此需要在每个执行程序中初始化指标源(用于收集指标的类)。 度量源被打包在一个jar中,当提交作业时,jar使用参数'--jars'从本地发送到每个执行器,但是,执行器在jar到达之前开始初始化度量源类,因此,它会抛出类未找到异常。 似乎如果执行者可以等到所有资源都准备好,问题就会得到解决,但我真的不知道该

  • 我无法通过Spark(2.1.0)连接到Phoenix(4.10),基于Phoenix网站上的“使用Data Source API作为DataFrame加载”示例。我使用的是lastet(Phoenix4.10)和Hbase 1.2.5。我可以通过Phoenix(sqlline客户端)在Hbase中创建一个表。Spark内返回的错误如下: 更新1:如果通过HBase删除System.Mutex表,

  • 我正在调查使用spark作为REST API后端的适用性。其中的一个问题似乎是Spark的FIFO调度方法。这意味着,如果一个大任务正在执行中,那么在那个重任务完成之前,任何小任务都无法完成。根据https://spark.apache.org/docs/latest/job-scheduling.html,一个公平的调度程序应该可以解决这个问题。然而,我没有注意到这改变了什么。我配置的排定程序是

  • 我正在尝试使用Spark-CSV(https://github.com/databricks/Spark-CSV)将DataFrame写成CSV文件 而不是 伦敦 哥本哈根 莫斯科

  • 我对Spark Streaming是新手,从Spark Streaming我使用Kafkautils创建了一个直接到Kafka的流。如下所示 当我试图运行该作业时,它正抛出以下错误 下面是我的pom.xml 请让我知道如何解决这个问题。