当前位置: 首页 > 知识库问答 >
问题:

在 scala Spark 中连接 UDF

缪茂勋
2023-03-14

我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。

val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
val myUdf4 = udf((Number: Long) => (Number) & 255)

val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
  .withColumn("bitwise 2", myUdf2(Data("Ip")))
  .withColumn("bitwise 3", myUdf3(Data("Ip")))
  .withColumn("bitwise 4", myUdf4(Data("Ip")))

val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
.drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")

共有2个答案

仉宸
2023-03-14

我认为,这在没有udf的情况下是可以实现的-

val Data = spark.range(2).withColumn("Ip", lit(10))
    val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
    val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
    val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
    val myUdf4 = udf((Number: Long) => (Number) & 255)

    val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
      .withColumn("bitwise 2", myUdf2(Data("Ip")))
      .withColumn("bitwise 3", myUdf3(Data("Ip")))
      .withColumn("bitwise 4", myUdf4(Data("Ip")))

    val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
      .drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")
    FinalDF.show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */

 spark.range(2).withColumn("Ip", lit(10))
      .withColumn("FinalIp",
        concat_ws(".", expr("shiftRight(Ip, 24) & 255"), expr("shiftRight(Ip, 16) & 255"),
          expr("shiftRight(Ip, 8) & 255"), expr("Ip & 255"))
      ).show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */
马哲
2023-03-14

正如@Someshwar Kale所建议的,你可以没有UDF。

如果选择使用 UDF,则可以将 UDF 中的函数抽象出来,并串联到一个函数中

scala> Data.show
+---+
| ip|
+---+
| 10|
| 20|
+---+


scala> val a:Seq[(Int, Int)] = Seq((24, 255), (16,255), (8, 255),(0,255))
a: Seq[(Int, Int)] = List((24,255), (16,255), (8,255), (0,255))

scala> val myUdf = udf((number: Long) => (a.map((t:(Int, Int)) => (number >> t._1) & t._2).mkString(".")))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> Data.withColumn("finalIp", myUdf($"ip")).show
+---+--------+
| ip| finalIp|
+---+--------+
| 10|0.0.0.10|
| 20|0.0.0.20|
+---+--------+
 类似资料:
  • 我正试图通过JSTL连接JDBC。我的Localhost ie:Xampp连接也完成了,但我还是遇到了无法解决的错误。请帮忙!我使用的是Netbeans 8.0.2。下面是代码。[1]: https://i.stack.imgur.com/R4AUp.png[这是我得到的错误][1] 我还添加了“mysql-connector-java-5.1.23-bin”。jar'是Web服务器的lib目录。

  • 问题内容: 要指定SQLite连接属性,请使用org.sqlite.SQLiteConfig,它的内容如下: 使用c3p0创建连接池的过程如下: 问题:如何创建结合了两者的数据源,让我设置诸如连接池的最大池大小和sqlite的同步模式之类的东西? 问题答案: 尝试 现在,数据源将是c3p0 PooledDataSource,它包装了已根据需要配置的SQLite未池化数据源。 请参阅C3P0的文档,

  • 问题内容: 所以人们可能会告诉我这是一个坏主意,但我至少想让它尝试一下。 编辑 此应用程序的目的是仅当设备与oracle db位于同一网络中或通过VPN连接到网络时才可以工作。数据库中的信息将无法全局访问,这就是为什么我需要直接连接到oracle db的原因。 现在根据这个线程 他成功查询了oracle db。 因此,我有一个相当基本的类,在初始化时将尝试与我的数据库建立连接。 在我的主要活动on

  • 问题内容: 我正在尝试为python中的类编写单元测试。该类在 init 上打开一个tcp套接字。我试图对此进行模拟,以便可以断言使用正确的值调用了连接,但是显然在单元测试中实际上并未发生。我已经厌倦了MagicMock,补丁程序等,但是还没有找到解决方案。 到目前为止我的班级看起来像 问题答案: 如果您只想断言被正确调用,这很简单 如果必须先导入模块才能访问,则需要稍微调整补丁:

  • 问题内容: 我试图通过hibernate将一些数据插入到postgresql中。但是,关于使用postgresql配置hibernate的教程并不多(我知道,它应该类似于mysql =) src / main / resources / hibernate.cfg.xml src / main / java / src / main / java包com.hib.entities; src / m

  • 问题内容: 我认为这是不可能的,但我想我要是有办法的话。我的想法是,我有一个用于Web资源文件夹路径的变量: 我得到这个结果: 但是,我希望将字符串合并为一个字符串,如下所示: 是否可以在Less中将字符串连接在一起? 问题答案: 使用变量插值 完整代码: