当前位置: 首页 > 知识库问答 >
问题:

Apache Spark数据集. forach与Aerospike客户端

墨宜人
2023-03-14

我想通过ApacheSpark从Apache Hive中检索行,并将每一行放入Aerospike缓存。

这里有一个简单的案例。

var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike;  // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    aerospikeClient.add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

我得到一个错误:

Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient

显然,我无法将AerospikeReactorClient序列化。我试图添加数据集。collectAsList(),这确实有效。但据了解,这种方法将所有内容加载到一个节点中。可能有大量的数据。所以,这不是选项。

处理此类问题的最佳做法是什么?

共有2个答案

韦高阳
2023-03-14

通过在foreachlambda中手动创建AerospikeClient,我成功地解决了这个问题。

var dataset = session.sql("select * from employee");
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    newAerospikeClient(aerospikeProperties).add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

现在我只需要声明AerospikePropertiesSerializable

姜育
2023-03-14

您可以直接从数据帧写入。不需要在数据集中循环

启动spark shell并导入com。aerospike。火花sql.\u包裹:

$ spark-shell
scala> import com.aerospike.spark.sql._
import com.aerospike.spark.sql._

将数据写入Aerospike的示例

val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
    Array(
    StructField("one", IntegerType, nullable = false),
    StructField("two", StringType, nullable = false),
    StructField("three", DoubleType, nullable = false)
  ))

val simpleDF = {
    val inputBuf=  new ArrayBuffer[Row]()
    for ( i <- 1 to num_records){
        val one = i
        val two = "two:"+i
        val three = i.toDouble
        val r = Row(one, two, three)
        inputBuf.append(r)
    }
    val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
    spark.createDataFrame(inputRDD,simpleSchema)
}

//Write the Sample Data to Aerospike
simpleDF.write
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()
 类似资料:
  • 我正在使用Spring boot和AWS elasticsearch服务。AWS Elasticsearch服务,仅提供REST接口。 Elasticsearch Rest客户端在这里。 简单地说,是否可以将REST客户端与Spring Data Elasticsearch一起使用? 换句话说,Spring Data Elasticsearch是否与Elasticsearch Rest客户端配合使

  • 客户端集成 CAT推出多种语言的客户端,基本覆盖了主流开发语言。 CAT目前支持::Java、C、Python、node、Go等语言的接入。详情请参考:传送门 注意所有的客户端均在lib目录下,Java客户端也是,Java客户端不再使用cat-client这个模块,这个客户端无任何依赖。 Java C C++ Python Go Node.js 其它语言客户端接入: .NET 客户端 项目地址 c

  • Aerospike是一个以分布式为核心基础,可基于行随机存取内存中索引、数据或SSD存储中数据的数据库。 Aerospike主要用于广告业务,作为一个服务器端的cookie存储来使用,在这种场景下读取和写入性能是至关重要的。 Aerospike 提供免费社区版本。

  • 问题内容: 我已经使用sqlite3在xcode中创建了一个应用程序。我想创建一个名为sync的按钮以与服务器中的mysql数据库进行同步。关于同步过程有什么建议吗?请告诉我。 问题答案: 在服务器上使用Web服务返回架构版本号和上次更新的时间戳记。如果客户端已过时,它将再次调用以获取更新的架构和/或新数据。

  • 我在Jmeter中执行API时遇到了问题。我们的API中有客户端证书。pfx格式。我已经把它换成了。jks,并在系统中进行了更新。jmeter的特性。在jmeter中,我创建了一个csv文件来获取创建的别名。但是,错误显示为 错误:响应消息:非HTTP响应消息:java.lang.IllegalArgumentException:未找到别名的证书:'certalias' 以下是我的别名信息:别名:

  • 长连接服务(TCP、WebSocket)支持向客户端推送数据,具体用法https://doc.imiphp.com/utils/Server.html