当前位置: 首页 > 知识库问答 >
问题:

在pyspark中使用UDF和simpe数据帧

督灿
2023-03-14

我是pyspark的新手,我来尝试做一些像下面这样的事情,为每个cookie调用一个函数Print细节,然后将结果写入文件。spark.sql查询返回正确的数据,我也可以将其序列化为文件。有人可以帮助每个cookie上的for语句。调用UDF的语法应该是什么,如何将输出写入文本文件?

任何帮助是值得赞赏的。谢谢

@udf(returnType=StringType())
def PrintDetails(cookie, timestamps,current_day, current_hourly_threshold,current_daily_threshold):
     #DO SOME WORK
     return "%s\t%d\t%d\t%d\t%d\t%s" %(some_data)

def main(argv):
    spark = SparkSession \
        .builder \
        .appName("parquet_test") \
        .config("spark.debug.maxToStringFields", "100") \
        .getOrCreate()

    inputPath = r'D:\Hadoop\Spark\parquet_input_files'
    inputFiles = os.path.join(inputPath, '*.parquet')

    impressionDate =  datetime.strptime("2019_12_31", '%Y_%m_%d')
    current_hourly_threshold = 40
    current_daily_threshold = 200

    parquetFile = spark.read.parquet(inputFiles)
    parquetFile.createOrReplaceTempView("parquetFile")
    cookie_and_time = spark.sql("SELECT cookie, collect_list(date_format(from_unixtime(ts), 'YYYY-mm-dd-H:M:S'))  as imp_times FROM parquetFile group by 1  ")

    for cookie in cookie_and_time :
        PrintDetails(cookie('cookie'), cookie('imp_times'), impressionDate, current_hourly_threshold, current_daily_threshold))

共有1个答案

晏树
2023-03-14

你可以这样做。

cookie_df= cookie_and_time.withColumn("cookies",PrintDetails(cookie('cookie'), cookie('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold)))

或者,您可以在<code>udf</code>函数本身中定义所有变量,并避免作为参数传递。

 类似资料:
  • 我正在尝试在PySpark中为两个数据框(df1和df2)创建自定义连接(类似于此),代码如下所示: 我得到的错误消息是: 有没有办法编写一个可以处理来自两个单独数据帧的列的 PySpark UDF?

  • 有没有办法选择整行作为一列输入到Pyspark过滤器udf中? 我有一个复杂的过滤函数“my_filter”,我想应用于整个数据帧: 但是 引发错误,因为这不是有效的操作。 我知道我可以将数据帧转换为RDD,然后使用RDD的过滤方法,但我不想将其转换为RDD,然后再转换回数据帧。我的数据帧具有复杂的嵌套类型,因此当我尝试再次将 RDD 转换为数据帧时,架构推断将失败。

  • 我目前有一个pyspark数据帧,其中一列包含一些数字行,我想使用我编写的函数来查找这些数字行,以返回一个信息字符串。我知道简单的方法是使用withCoulmn并定义一个UDF来从旧列创建一个新列,但是我的函数的某些方式使它不能注册为UDF。我可以根据旧列的值用新列创建一个新的数据框架,而不创建UDF吗?

  • 我需要从数据库读取数据,并使用PIG分析数据。我用java编写了一个UDF,引用了下面的链接 org.apache.pig.impl.logicallayer.frontendException:错误1066:无法在org.apache.pig.pig.tools.grunt.gruntparser.openiterator(pigserver.java:892)在org.apache.pig.t

  • 我想计算PySpark数据帧的两列之间的Jaro Winkler距离。Jaro-Winkler距离可通过所有节点上的pyjarowinkler包获得。 pyjarowinkler的工作原理如下: 输出: 我试图编写一个UDF,将两列作为序列传递,并使用lambda函数计算距离。我是这样做的: 我应该能够在上述函数中传递任意两个字符串列。我得到以下输出: 预期产出: 我怀疑这可能是因为不正确。它包含

  • 我想循环两个列表,将组合传递给函数,并获得以下输出: 由于这是Pyspark,我想将其并行化,因为函数的每个迭代都可以独立运行。 注:我的实际函数是pyspark中的一个长而复杂的算法。只是想贴一个简单的例子来概括。 最好的方法是什么?​