更新(添加代码):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set a cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- Set atomicity mode. -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Configure a number of backups. -->
<property name="backups" value="1"/>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
IgniteCache代码(这将df放入并尝试通过转换为RDD来读取它):
object SparkIgniteCache {
private val CONFIG = "config/cache.xml"
import org.apache.ignite.IgniteCache
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheAtomicityMode
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction
import org.apache.ignite.configuration.CacheConfiguration
private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
val ic = new IgniteContext(sc, CONFIG, false)
// FAILED ATTEMPT OF SETTING CONFIG : 1
// val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[Row])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
// FAILED ATTEMPT OF SETTING CONFIG : 2
// val cacheConfiguration: CacheConfiguration[String, BinaryObject] = new CacheConfiguration[String, BinaryObject](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[BinaryObject])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
val sharedRDD = ic.fromCache[String, Row](KEY)
sharedRDD.saveValues(df.rdd)
}
private[sample] def get(sc: SparkContext, KEY: String) = {
val ic = new IgniteContext(sc, CONFIG, false)
// val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[Row])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
ic.fromCache[String, Row](KEY)
}
}
我可以通过以下方式解决上述问题:
在CacheConfiguration节点下的xml文件中添加了以下xml代码段:
<property name="indexedTypes">
<list>
<value>java.lang.String</value>
<value>org.apache.spark.sql.Row</value>
</list>
</property>
我想存储dataframe[Row]类型的dataframe,但在ignite中还无法实现。但是,我可以保存rdd[Row],要保存它,您必须将它保存成对格式。所以我需要将rdd[Row]转换为rdd[(String,Row)]。为了在CacheConfiguration中表示它,我添加了IndexTypes,如上所示。
下面是保存/读取DF的代码:
object SparkIgniteCache {
private val CONFIG = "config/cache.xml"
private val schemaCacheConfig = makeSchemaCacheConfig("schemas")
private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
val ic = new IgniteContext(sc, CONFIG, false)
val sharedRDD = ic.fromCache[String, Row](KEY)
val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
rddSchemaCache.put(KEY+"_schema", df.schema)
sharedRDD.saveValues(df.rdd)
}
private[sample] def get(sc: SparkContext, KEY: String)
: (StructType, IgniteRDD[String, Row]) =
{
val ic = new IgniteContext(sc, CONFIG, false)
val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
(rddSchemaCache.get(KEY+"_schema"), ic.fromCache[String, Row](KEY))
}
private def makeSchemaCacheConfig(name: String) =
new CacheConfiguration[String, StructType](name)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction(false, 1))
}
在上面的代码中,我还创建了动态CacheConfiguraiton来保存SchemaType类型的dataframe的模式。
// Set data/dataframe for KEY=input_data
SparkIgniteCache.set(spark.sparkContext, df, "input_data")
//Get dataframe
val (schema, igniteRDD) = SparkIgniteCache.get(spark.sparkContext, "input_data")
val rdd1: RDD[Row] = igniteRDD.map(_._2) //Getting Row from (String,Row)
val df = spark.sqlContext.createDataFrame(rdd1, schema)
问题内容: 我需要创建一个数组来添加具有这种格式的对象,例如Swift中的字典:[“ key1”:“ value1”,“ key2”:“ value2”] 当我尝试全部保存时是正确的,但是使用键读取时会崩溃。var obj需要什么类型的数据? 问题答案: 这个问题的意思是“数组数组”,但是我想大多数人可能只是想知道如何将数组保存到。对于那些人,我将添加一些常见示例。 保存数组 检索数组 整数数组
谁能指导我怎么做这件事吗? 我是一个新的android所以请,如果我能有一个详细的程序,我会很感激。
本文向大家介绍Python Cookie 读取和保存方法,包括了Python Cookie 读取和保存方法的使用技巧和注意事项,需要的朋友参考一下 如下所示: 以上这篇Python Cookie 读取和保存方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持呐喊教程。
本文向大家介绍python读取和保存视频文件,包括了python读取和保存视频文件的使用技巧和注意事项,需要的朋友参考一下 为了获取视频,应该创建一个 VideoCapture 对象。他的参数可以是设备的索引号,或者是一个视频文件。设备索引号就是在指定要使用的摄像头。 一般的笔记本电脑都有内置摄像头。所以参数就是 0。你可以通过设置成 1 或者其他的来选择别的摄像头。之后,你就可以一帧一帧的捕获视
我有一个map-reduce作业,并且reducer获得一个驻留在Azure Blob存储中的文件的绝对地址,并且reducer应该打开它并读取它的内容。在配置Hadoop集群(HDInsight)时,我添加了包含文件的存储帐户。因此,还原器必须有权访问这个Blob存储,但Blob存储不是我的作业的默认HDFS存储。我的reducer中有以下代码,但它给了我一个FileNotFound错误消息。