我有这个java代码,其中火花UDF将行作为输入并返回行。还有一个广播变量,它是哈希映射。
UDF所做的就是检查广播HashMap是否包含rowKey,如果包含,则返回一个新行,其中包含来自输入行的一些现有值和来自广播HashMap的一些更新值。如果没有,则按原样返回输入行。我这样做是因为我想根据HashMap中的值更新行列值。这是代码:
广播变量
final Broadcast<HashMap<String, HashMap<String, String>>> broadcastVariable = jsc.broadcast(someHashMap);
UDF定义
UDF1<Row, Row> myUDF = new UDF1<Row, Row> () {
@Override
public Row call(Row inputRow) {
String myKey = inputRow.getString(3);
if (broadcastVariable.value().containsKey(myKey)){
Map<String, String> valuesToUpdate = broadcastVariable.value().get(myKey);
String col1 = inputRow.getString(0);
String col2 = inputRow.getString(1);
String col3 = inputRow.getString(2);
for (Map.Entry<String, String> entry : valuesToUpdate.entrySet())
{
String columnName = entry.getKey();
switch(columnName) {
case "col1" :
col1 = entry.getValue();
break;
case "col2" :
col2 = entry.getValue();
break;
case "col3" :
col3 = entry.getValue();
break;
}
}
return RowFactory.create(col1,col2,col3,myKey);
}
return inputRow;
}
};
UDF注册
hiveContext.udf().register("myUDF", myUDF, DataTypes.createStructType(DF1.schema().fields()));
UDF来电
DataFrame DF2 = DF1.select(org.apache.spark.sql.functions.callUDF
("myUDF", org.apache.spark.sql.functions.struct(DF1.col("col1"),
DF1.col("col2"),
DF1.col("col3"),
DF1.col("myKey"))));
我有以下问题,
>
如何将数据帧中的所有列传递给 UDF 而不逐个列出它们?我问这个问题的原因是实际的数据帧有超过50列。我看到了这个例子,但无法让它在Java中工作。
有没有办法在UDF中按名称访问行列?现在我正在使用字符串(int)。
UDF输出是一个名为myUDF的结构(struct(col1,col2,col3,myKey))。它变得很长,有50列。我怎么给它起别名呢?
任何帮助是值得赞赏的!
不需要提前知道列名!
您可以将行类型作为udf的参数之一。例如:
import org.apache.spark.sql.functions._
val myUdf = udf((row: Row) => <here comes the code inside your udf>)
您可以像这样调用udf:
df。withColumn(newColumnName,myUdf(struct(df.columns映射col:_*))
然后,您可以访问udf中的dataframe行(结构和数据),以获取所需的任何信息,例如,将该行转换为(column_name)的映射-
val myUdf = udf((行: Row) =
TL;DR使用数据集。映射(并将UDF替换为map
函数)。
如何将dataframe中的所有列传递到UDF,而不一一列出它们?
dataframe.schema.fieldNames
请参见数据集API。
有没有办法在UDF中按名称访问行列?
引用Row.fieldIndex的大话:
fieldIndex(name: String): Int返回给定字段名的索引。
并使用索引。
它变得很长,有50列。我怎么给它起别名呢?
看起来您的代码将从一些重构和组合中受益。在单个管道中使用 50 个字段可能会变得有点笨拙。
我正试图将dataframe列作为参数传递 但是得到错误 :33:错误:类型不匹配; 找到:org.apache.spark.sql.column 必需:int val df_new=df.withcolumn(“age_category”,ageclassification.agecategory(df(“age”))
问题内容: 我试图将两个包含整数的列表作为参数传递给python代码。但是将参数作为字符串列表获取。 输入看起来像 我发现以下黑客可以转换列表。 有一个更好的方法吗? 问题答案: 命令行参数始终以字符串形式传递。您将需要自己将它们解析为所需的数据类型。 有像argparse和click这样的库,可以让您定义自己的参数类型转换,但将其视为相同, 因此我怀疑它是否有用。 编辑Jan 2019 这个答案
问题内容: 我已经熟悉Android框架和Java,并希望创建一个通用的“ NetworkHelper”类,该类可以处理大多数联网代码,使我能够从中调用网页。 我遵循了来自developer.android.com的这篇文章来创建我的网络类:http : //developer.android.com/training/basics/network- ops/connecting.html 码:
我正在编写一个方法,如果我想将一个类传递给一个方法,其中代码的一部分包括检查对象是否属于某种类型。这就是我想要的(但显然不行): 关于如何做到这一点有什么提示吗?谢谢!
问题内容: 在Java中,如何将一个函数作为另一个函数的参数传递? 问题答案: Java 8及以上 如果你的类或接口只有一个抽象方法(有时称为SAM type),则使用Java 8+ lambda表达式,例如: 然后可以在使用MyInterface的任何地方替换lambda表达式: 例如,你可以非常快速地创建一个新线程: 并使用方法引用语法使其更加清晰: 如果没有 lambda表达式,则最后两个示
问题内容: 我正在创建一种通过传递搜索字段从任何表中选择ID的方法。 但是我得到一个有关语法错误的MySqlException。当我查看“异常”消息时,它向我显示带引号的查询表!如何将表格作为不带引号的参数传递? 问题答案: 大多数数据库不允许您通过参数指定表名或列名。参数用于 值 。如果确实确实需要使它动态化,则应验证输入(它应该是一个已知的表名,并且该表中具有已知的列名),然后将其包括在SQL