import org.apache.spark._
import org.apache.spark.SparkContext._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("XXXX")
.set("spark.cassandra.connection.host" ,"cassandra.us-east-2.amazonaws.com")
.set("spark.cassandra.connection.port", "9142")
.set("spark.cassandra.auth.username", "XXXXX")
.set("spark.cassandra.auth.password", "XXXXX")
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.connection.ssl.trustStore.path", "/home/nihad/.cassandra/cassandra_truststore.jks")
.set("spark.cassandra.connection.ssl.trustStore.password", "XXXXX")
.set("spark.cassandra.output.consistency.level", "LOCAL_QUORUM")
val connector = CassandraConnector(conf)
val session = connector.openSession()
sesssion.execute("""INSERT INTO "covid19".delta_by_states (state_code, state_value, date ) VALUES ('kl', 5, '2020-03-03');""")
session.close()
有人能帮助我如何通过datastax.cassandraconnector设置一致性吗?
破解了。
而不是通过spark配置来设置cassandra一致性。我在src/main/resources目录中创建了一个application.conf文件。
datastax-java-driver {
basic.contact-points = [ "cassandra.us-east-2.amazonaws.com:9142"]
advanced.auth-provider{
class = PlainTextAuthProvider
username = "serviceUserName"
password = "servicePassword"
}
basic.load-balancing-policy {
local-datacenter = "us-east-2"
}
advanced.ssl-engine-factory {
class = DefaultSslEngineFactory
truststore-path = "yourPath/.cassandra/cassandra_truststore.jks"
truststore-password = "trustorePassword"
}
basic.request.consistency = LOCAL_QUORUM
basic.request.timeout = 5 seconds
}
并创建了如下所示的cassandra会话
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.datastax.oss.driver.api.core.CqlSession
val loader = DriverConfigLoader.fromClassPath("application.conf")
val session = CqlSession.builder().withConfigLoader(loader).build()
sesssion.execute("""INSERT INTO "covid19".delta_by_states (state_code, state_value, date ) VALUES ('kl', 5, '2020-03-03');""")
我有一个3节点的Cassandra集群,其密钥空间的复制因子为3: (仅部署在一个数据中心) 当进行失败测试时,即关闭一个节点,我在尝试查询我的键空间时得到这些异常: 我不知道为什么会看到这个错误,因为: 我的复制因子设置为3(即我仍然有2个节点,每个节点包含所有数据) 我的一致性级别设置为QUORUM。(为什么我看到LOCAL_ONE?)
想象一个电子商务应用程序: 假设我有三个并且我的一致性级别(CL)很弱:即 我有一个产品表,例如 这是跨三个节点同步的初始数据 > 现在,客户端A从N1读取信息,客户端B从N2读取信息 客户端1看到1台计算机可用 客户端 2 看到 1 台计算机可用 他们现在都去购买客户A先下订单。所以N1,表格如下所示: 现在客户端 2 下订单,因此在 N2 处,表将如下所示: 但实际上客户2的订单不应该被处理。
问题内容: 我有一个使用Spark的Java类。我需要从JavaRDD过滤出标头。这就是我要这样做的方式。 但是,此代码无法编译。IntelliJ IDE表示“此语言级别不支持Lambda表达式”。 问题答案: 您可以在Java 7上使用lambda,但涉及到一点点- 您必须使用Retrolambda之类的东西。 同样,您也可以在没有lambda的情况下执行相同的操作。Lambda可以通过匿名类轻
我刚刚开始从事一个Java项目,并使用IntelliJ从GitHub下载了源代码--我以前从未使用过IntelliJ,但我被告知它是一个比Eclipse好得多的IDE(大约四年前我上次进行Java开发时使用的是Eclipse)。 当我试图在计算机上本地构建源代码时,从GitHub中提取了最新的工作版本,我在几行不同的代码上得到了一个编译错误--错误如下: 错误:(27,34)Java:-sourc
我正在测试Java8的一些新特性,并将该示例复制到我的IDE(最初是Eclipse,然后是IntelliJ),如下所示 Eclipse不支持lambda表达式,IntelliJ不断报告错误
我正在使用sql server进行数据库连接。并且我希望在运行多个结果集时保持结果集打开。我使用了,但在执行语句时出现以下错误, 由于我是使用sql server的新手,所以我对其中的一些概念还不是很了解。如果有人知道为什么会这样请回答我。