嗨,伙计们!我正在尝试开发火花流应用程序,但遇到了一些问题。一些细节:我们有Kafka主题,spark 3.2.1和Cassandra 4.0.4,带有datastax spark Cassandra连接器版本com.datastax.spark:spark-Cassandra-connector_2.12:3.1.0
我需要数据的下一条路线。
获取 kafka 消息并在 Spark 中转换为数据帧 -
在文档中写了关于新功能,自SCC 2.5以来可用的DataFrame API不仅来自DSE,而且是RDD API中的DirectJoin等于joinWithCassandraTable。如果我尝试使用Datasourse V2 API,我会在火花方面得到通常的SortMergeJoin。坦率地说,它并不是真正的“流媒体”应用程序,要在Cassandra中添加数据,我使用微批处理方式。
== Physical Plan ==
AppendData (12)
+- * Project (11)
+- * Filter (10)
+- * SortMergeJoin LeftOuter (9)
:- * Sort (4)
: +- Exchange (3)
: +- * SerializeFromObject (2)
: +- Scan (1)
+- * Sort (8)
+- Exchange (7)
+- * Project (6)
+- BatchScan (5)
(1) Scan
Output [1]: [obj#342]
Arguments: obj#342: org.apache.spark.sql.Row, MapPartitionsRDD[82] at start at RunnableStream.scala:13
(2) SerializeFromObject [codegen id : 1]
Input [1]: [obj#342]
Arguments: [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, user_id), LongType) AS user_id#343L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, user_type), StringType), true, false, true) AS user_type#344, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, order_id), StringType), true, false, true) AS order_id#345, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, status_name), StringType), true, false, true) AS status_name#346, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, status_dttm), TimestampType), true, false, true) AS status_dttm#347]
(3) Exchange
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: hashpartitioning(user_id#343L, user_type#344, 16), ENSURE_REQUIREMENTS, [id=#793]
(4) Sort [codegen id : 2]
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: [user_id#343L ASC NULLS FIRST, user_type#344 ASC NULLS FIRST], false, 0
(5) BatchScan
Output [2]: [user_id#348L, user_type#349]
Cassandra Scan: keyspace_name.table_name
- Cassandra Filters: []
- Requested Columns: [user_id,user_type]
(6) Project [codegen id : 3]
Output [2]: [user_id#348L, user_type#349]
Input [2]: [user_id#348L, user_type#349]
(7) Exchange
Input [2]: [user_id#348L, user_type#349]
Arguments: hashpartitioning(user_id#348L, user_type#349, 16), ENSURE_REQUIREMENTS, [id=#801]
(8) Sort [codegen id : 4]
Input [2]: [user_id#348L, user_type#349]
Arguments: [user_id#348L ASC NULLS FIRST, user_type#349 ASC NULLS FIRST], false, 0
(9) SortMergeJoin [codegen id : 5]
Left keys [2]: [user_id#343L, user_type#344]
Right keys [2]: [user_id#348L, user_type#349]
Join condition: None
(10) Filter [codegen id : 5]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]
Condition : (isnull(user_id#348L) = true)
(11) Project [codegen id : 5]
Output [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Input [7]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347, user_id#348L, user_type#349]
(12) AppendData
Input [5]: [user_id#343L, user_type#344, order_id#345, status_name#346, status_dttm#347]
Arguments: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3358/1878168161@32616db8, org.apache.spark.sql.connector.write.WriteBuilder$1@1d354f3b
换句话说,如果我尝试使用数据源 V1 并在将 cassandra 表作为数据帧时明确指出 directJoinSetting,例如
spark.read.cassandra格式("table leName","key space").选项("DirectJoinS的","on"). load
这将调用加入时的错误:
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.UnaryExecNode.children$(Lorg/apache/spark/sql/execution/UnaryExecNode;)Lscala/collection/Seq;
at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinExec.children(CassandraDirectJoinExec.scala:18)
at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.hasCassandraChild(CassandraDirectJoinStrategy.scala:206)
at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:241)
at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$$anonfun$1.applyOrElse(CassandraDirectJoinStrategy.scala:240)
完整的火花提交命令
/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master yarn --deploy-mode cluster --name "name" \
--conf spark.driver.cores=1 \
--conf spark.driver.memory=1g \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties" \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=4 \
--conf spark.executor.memory=8g \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -Duser.timezone=GMT -Dfile.encoding=utf-8 -Dlog4j.configuration=name_Log4j.properties" \
--conf spark.yarn.queue=default \
--conf spark.yarn.submit.waitAppCompletion=true \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=hdfs:///spark3-history/ \
--conf spark.eventLog.compress=true \
--conf spark.sql.shuffle.partitions=16 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
--conf spark.sql.catalog.cassandracatalog=com.datastax.spark.connector.datasource.CassandraCatalog \
--conf spark.sql.dse.search.enableOptimization=on \
--conf spark.cassandra.connection.host=cassandra_host \
--conf spark.cassandra.auth.username=user_name \
--conf spark.cassandra.auth.password=*** \
--conf spark.sql.directJoinSetting=on \
--class ...
cassandra的类连接器
import org.apache.spark.sql._
class CassandraConnector(
val ss: SparkSession,
catalog: String,
keyspace: String,
table: String
) extends Serializable {
def read: DataFrame = ss.read.table(s"$catalog.$keyspace.$table")
def writeDirect(dataFrame: DataFrame): Unit = dataFrame.writeTo(s"$catalog.$keyspace.$table").append()
}
卡萨德拉ddl表
CREATE KEYSPACE IF NOT EXISTS keyspace_name
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
CREATE TABLE IF NOT EXISTS keyspace_name.table_name
(
user_id BIGINT,
user_type VARCHAR,
order_id VARCHAR,
status_name VARCHAR,
status_dttm timestamp,
PRIMARY KEY (user_id, user_type)
);
正在连接和写入卡桑德拉的方法
override def writeBatch(batch: Dataset[Row], batchId: Long): Unit = {
val result =
batch
.as("df")
.join(
cassandraConnector.read
.as("cass"),
col("df.user_id") === col("cass.user_id")
&& col("df.user_type") === col("cass.user_type"),
"left"
)
.withColumn("need_write", when(col("cass.user_id").isNull, true).otherwise(false))
.filter(col("need_write") === true)
.select("df.user_id", "df.user_type", "df.order_id", "df.status_name", "df.status_dttm")
cassandraConnector.writeDirect(result)
}
有人能解释一下我做错了什么吗?
是的,Spark Cassandra Connector 的版本是问题的根源 - 高级功能(如直接加入)严重依赖于 Spark 内部类,这些类可能会在版本之间发生变化。因此,如果您使用 Spark 3.2,则需要使用相应版本的 SCC:com.datastax.spark:spark-cassandra-connector_2.12:3.2.0
。
请注意,Spark 3.3还没有版本。。。
另外,我有一篇关于使用直接连接的博客文章,你可能会感兴趣。
我曾尝试将spark程序作为单步执行Oozie工作流。我使用了jar,它通过spark submit或spark shell(相同的代码)成功执行: 应用程序不应需要大量资源—加载单个csv( 火花版本:1.6.0 Oozie版本:4.1.0 工作流是使用Hue、Oozie工作流编辑器创建的: 运行工作流后,我得到以下日志: 标准输出: 立即调用Spark类 失败的Oozie启动器,Main类[o
我正在为Spark Streaming作业构建指标系统,在系统中,指标收集在每个执行程序中,因此需要在每个执行程序中初始化指标源(用于收集指标的类)。 度量源被打包在一个jar中,当提交作业时,jar使用参数'--jars'从本地发送到每个执行器,但是,执行器在jar到达之前开始初始化度量源类,因此,它会抛出类未找到异常。 似乎如果执行者可以等到所有资源都准备好,问题就会得到解决,但我真的不知道该
我无法通过Spark(2.1.0)连接到Phoenix(4.10),基于Phoenix网站上的“使用Data Source API作为DataFrame加载”示例。我使用的是lastet(Phoenix4.10)和Hbase 1.2.5。我可以通过Phoenix(sqlline客户端)在Hbase中创建一个表。Spark内返回的错误如下: 更新1:如果通过HBase删除System.Mutex表,
我正在调查使用spark作为REST API后端的适用性。其中的一个问题似乎是Spark的FIFO调度方法。这意味着,如果一个大任务正在执行中,那么在那个重任务完成之前,任何小任务都无法完成。根据https://spark.apache.org/docs/latest/job-scheduling.html,一个公平的调度程序应该可以解决这个问题。然而,我没有注意到这改变了什么。我配置的排定程序是
我正在尝试使用Spark-CSV(https://github.com/databricks/Spark-CSV)将DataFrame写成CSV文件 而不是 伦敦 哥本哈根 莫斯科
我对Spark Streaming是新手,从Spark Streaming我使用Kafkautils创建了一个直接到Kafka的流。如下所示 当我试图运行该作业时,它正抛出以下错误 下面是我的pom.xml 请让我知道如何解决这个问题。