我设计了一个简单的工作,可以从MySQL读取数据并将其保存在带有Spark的Elasticsearch中。
这是代码:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
您可以看到代码非常简单。它将数据读取到DataFrame中,选择一些列,然后count
对Dataframe 进行基本操作。到目前为止,一切正常。
然后,它尝试将数据保存到Elasticsearch中,但是由于无法处理某种类型而失败。您可以在此处查看错误日志。
我不确定为什么它不能处理这种类型。 有人知道为什么会这样吗?
我正在使用Apache Spark 1.5.0,Elasticsearch 1.4.4和elaticsearch-hadoop 2.1.1
编辑:
这个问题的答案很棘手,但是由于samklr,我设法弄清了问题所在。
但是,该解决方案并非简单明了,可能会考虑一些“不必要的”转换。
首先让我们谈谈 序列化 。
在数据的Spark序列化和功能序列化中要考虑两个方面的序列化。在这种情况下,它与数据序列化以及反序列化有关。
从Spark的角度来看,唯一需要做的就是设置序列化-
默认情况下,Spark依赖Java序列化,这很方便,但是效率很低。这就是Hadoop本身引入自己的序列化机制和类型(即)的原因Writables
。因此,InputFormat
并OutputFormats
要求返回Writables
其,开箱即用,星火不明白。
使用elasticsearch-spark连接器,必须启用一种不同的序列化(Kryo),该序列化可以自动处理转换,并且还可以非常高效地完成转换。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
即使因为Kryo不需要类实现要序列化的特定接口,这也意味着POJO可以在RDD中使用,而无需进行任何其他工作即可启用Kryo序列化。
就是说,@ samklr向我指出,Kryo需要在使用它们之前注册类。
这是因为Kryo编写了对要序列化的对象的类的引用(对于每个写入的对象,将写入一个引用),如果该类已注册,则它只是一个整数标识符,否则为完整的类名。Spark代表您注册Scala类和许多其他框架类(例如Avro
Generic或Thrift类)。
用Kryo注册课程很简单。创建KryoRegistrator的子类,并重写该registerClasses()
方法:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
最后,在驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定的类名:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
其次,甚至考虑到设置了Kryo序列化器并注册了类,并对Spark 1.5进行了更改,并且由于某种原因,Elasticsearch无法 反序列化
Dataframe,因为它无法SchemaType
将Dataframe的内容推断到连接器中。
所以我不得不将数据框转换为JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
现在可以准备将数据写入elasticsearch了:
JavaEsSpark.saveToEs(products, "test/test");
参考文献:
我使用的是由java.sql引起的相同代码:na]。SQLSyntaxErrorException:ORA-00942:表或视图不存在-Spring批处理,无法将记录保存到数据库中。 我已经创建了表 LifeCycleStatusWriter.java 作家 错误: 配置详细信息 数据库配置
我有一个带有REST方法的控制器类,可以抛出各种异常。我已经决定在一个单独的类中处理这些异常,在我的处理程序方法中使用@ControlllerAdwn和@ExceptionHandler。但是,我有一个问题,我的REST方法使用另一个库的注释。此库捕获REST方法引发的异常。现在,我正在全局处理异常,而不是直接通过REST方法中的试/捕获,我的异常总是被其他库捕获,而不是被我自己的处理程序方法捕获
这是使用电子邮件密码注册帐户的所有代码,保存验证电子邮件,将用户数据保存到Firestore数据库中。只有Firestore数据库无法运行。 else { Toast.makeText(register.this, “Error ! ” task.getException().getMessage(), Toast.LENGTH_SHORT).show();progressBar.setVisib
我想使用MongoDB用NestJs和TypeORM创建一个应用程序。假设我有一个实体,除了ID字段外,还有两个唯一的字段 当我想用已经存在的或创建一个新模块时,我得到了以下异常错误 因此,在catch语句中,我必须处理传入的错误。我要做的是 TypeORM是否公开预制异常?是否有我可以使用的枚举?获取这些TypeORM数据库错误的好方法是什么? 这里似乎列出了一些错误 https://githu
我尝试用以下代码保存从internet下载的文件 但在运行时,我得到的错误如下 03-04 20:42:51.080 8972-8972/com.example.me.demo2 E/BitmapFactory:无法解码流:java.io.FileNotFoundExcoop: /storage/emulated/0/.tanks/4a100abb-0e55-4062-8c37-f11f4189e
我正在构建一个颤振应用程序,我必须解析api中的一些数据,我设置了所有内容,但我收到了这个错误,我不知道为什么,我是颤振新手,任何帮助都将不胜感激。谢谢。 生成的错误 这是我的api响应示例 这就是我处理数据的方式 这是模型课