当前位置: 首页 > 知识库问答 >
问题:

无法使用Cassandra驱动程序运行Spark作业

邹禄
2023-03-14

Build.Gradle

plugins {
    id 'java'
    id 'com.github.johnrengelman.shadow' version '1.2.3'
}

group 'com.hello.aggregation'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'idea'

sourceCompatibility = 1.8
targetCompatibility = 1.8

configurations {
    provided
}

sourceSets {
    main {
        compileClasspath += configurations.provided
    }
}

repositories {
    mavenCentral()
}

dependencies {
    compile "org.scala-lang:scala-library:$scalaVersion"
    compile "org.scala-lang:scala-reflect:$scalaVersion"
    compile "org.scala-lang:scala-compiler:$scalaVersion"

    compile "org.apache.spark:spark-core_$scalaBase:$sparkVersion"
    compile "org.apache.spark:spark-sql_$scalaBase:$sparkVersion"

    compile "com.datastax.cassandra:cassandra-driver-core:$cassandraDriverVersion"
    compile "com.datastax.spark:spark-cassandra-connector_$scalaBase:$connectorVersion"

    compile "org.slf4j:slf4j-api:$slf4jVersion"

    compile "mysql:mysql-connector-java:$mySqlConnectorVersion"

    testCompile group: 'junit', name: 'junit', version: '4.12'
}

task run(type: JavaExec, dependsOn: classes) {
    main = mainClassFile
    classpath sourceSets.main.runtimeClasspath
    classpath configurations.runtime
}

jar {
    classifier = 'all'
    manifest {
        attributes 'Implementation-Title': title,
                'Implementation-Version': version,
                'Main-Class': mainClassFile
    }
    include{sourceSets.main.output.classesDir}
    zip64 true
}

shadowJar {
    classifier = 'shadow'
    append 'reference.conf'
    dependencies {

    }

    zip64 true
}

idea {
    module {
        // IntelliJ does not know about the standard idiom of provided as used in managing
        // uber/shaded jar dependencies. Make it so!
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}

分级。性质

version=1.0.0

scalaBase=2.11
scalaVersion=2.11.4
slf4jVersion=1.7.25
sparkVersion=1.6.3
connectorVersion=1.6.7
cassandraDriverVersion=3.0.7
mySqlConnectorVersion=5.1.37

例外情况:

17/06/19 16:03:54 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef;
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$7.apply(CassandraConnector.scala:150)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$7.apply(CassandraConnector.scala:150)
        at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32)
        at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69)
        at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57)
        at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:80)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:107)
        at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:118)
        at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:330)
        at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:350)
        at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:50)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:60)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:60)
        at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:137)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:60)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:230)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.RDD$$anonfun$distinct$2.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDD$$anonfun$distinct$2.apply(RDD.scala:359)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.distinct(RDD.scala:358)
        at com.achoo.scala.streambright.SimpleDailyRun$.delayedEndpoint$com$achoo$scala$streambright$SimpleDailyRun$1(SimpleDailyRun.scala:30)
        at com.achoo.scala.streambright.SimpleDailyRun$delayedInit$body.apply(SimpleDailyRun.scala:14)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)

代码:

package com.streambright

import java.sql.{Connection, DriverManager}

import com.mysql.jdbc.Driver
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.spark.connector.toSparkContextFunctions
import org.apache.spark.sql.cassandra.CassandraSQLContext


object SimpleDailyRun extends App {
  DriverManager.registerDriver(new Driver())
  val config = new SparkConf(true).setAppName("Simple Daily Run")
  val sc = SparkContext.getOrCreate(config)
  val cc = new CassandraSQLContext(sc)
  cc.setKeyspace("achoo")
  val conn = DriverManager.getConnection("jdbc:mysql://10.175.190.95/db?useUnicode=yes&characterEncoding=UTF-8&user=user&password=pass")

  val mySqlJdbcRDD = new JdbcRDD(sc, () => conn,
    "SELECT b.project_id,a.keyword FROM keyword a " +
      "JOIN project_keyword b ON a.id = b.keyword_id LIMIT ?, ?",
    0, 100000000, 1, r => (r.getInt("project_id"), r.getString("keyword")))

  val cassandraRDD = sc.cassandraTable("hello", "instagram_keyword_analytic")
    .select("keyword", "relativepath")
    .as((_: String, _: String))
    .distinct()

  mySqlJdbcRDD.saveAsTextFile("/data/MySQL_projectid_keywords_"+System.currentTimeMillis()+".txt")
  cassandraRDD.saveAsTextFile("/data/MySQL_projectid_keywords_"+System.currentTimeMillis()+".txt")

}

有人知道怎么修吗?

共有1个答案

冷翼
2023-03-14

解决这个问题的方法是使用Scala2.10而不是2.11。

 类似资料:
  • 代码: 输出: Traceback(最近一次调用last):文件“C:/Users/RV CSP/pycharm projects/untitled 25/hotel . py”,第9行,在session = cluster.connect('dbsi ')文件“C:\ Users \ RV CSP \ AppData \ Roaming \ Python \ Python 36 \ site-p

  • 问题内容: 我正在尝试运行一个ruby文件,该文件将使用seleniumwebdriver启动chrome驱动程序。我有selenium独立服务器2.35.0。和chromedriver可执行文件已安装。我正在通过运行服务器来启动 两个会话正在启动,chrome驱动程序无法启动。 这是在我使用以下文件运行文件之后 我对此并不陌生,无法找出问题所在。而且,我也试图让它无头运行,所以我正在运行Xvfb

  • 我试图运行一个ruby文件,这将启动chrome驱动程序使用selenium WebDriver。我有selenium独立服务器2.35.0。和chromedriver可执行文件安装。我通过运行来启动服务器, 这是在我使用 我对此很陌生,不知道哪里出了问题。我也试图无头运行它,所以我有Xvfb运行。有人能帮我指出我犯的错误并启动chromedriver吗? 更新: 谁能帮我弄清楚出了什么问题吗?

  • 有时还会发出以下警告: 我尝试更改一些集群对象参数,但没有帮助。 以下是我正在使用的cassandra中的密钥空间配置:

  • 我使用的是Java8、Spark 2.1.1、Ignite2.5和BoneCP 0.8.0 结果出现以下异常: 提交脚本如下所示: 当使用“本地”Spark实例时,它使用think JDBC驱动程序连接到Ignite。有什么想法吗?

  • 我错过了什么?到目前为止,我搜索到的所有答案都建议使用1.2.1,我已经在做了。 任何建议都将不胜感激!