当前位置: 首页 > 知识库问答 >
问题:

如何将流数据从spark下沉到MongoDB?

梁丘高朗
2023-03-14
from __future__ import print_function
import sys
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType
from pyspark.sql.types import *
import json
from pyspark.sql.functions import struct
from pyspark.sql.functions import *
import datetime

json_schema = StructType([
  StructField("twitterid", StringType(), True),
  StructField("created_at", StringType(), True),
  StructField("tweet", StringType(), True),
  StructField("screen_name", StringType(), True)
])

def parse_json(df):
    twitterid   = json.loads(df[0])['id']
    created_at  = json.loads(df[0])['created_at']
    tweet       = json.loads(df[0])['text']
    tweet       = json.loads(df[0])['text']
    screen_name = json.loads(df[0])['user']['screen_name']
    return [twitterid, created_at, tweet, screen_name]

def convert_twitter_date(timestamp_str):
    output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
    return output_ts

if __name__ == "__main__":

        spark = SparkSession\
                        .builder\
                        .appName("StructuredNetworkWordCount")\
                        .config("spark.mongodb.input.uri","mongodb://192.168.1.16:27017/twitter.test")\
                        .config("spark.mongodb.output.uri","mongodb://192.168.1.16:27017/twitter.test")\
                        .getOrCreate()
        events = spark\
                        .readStream\
                        .format("kafka")\
                        .option("kafka.bootstrap.servers", "localhost:9092")\
                        .option("subscribe", "twitter")\
                        .load()
        events = events.selectExpr("CAST(value as String)")

        udf_parse_json = udf(parse_json , json_schema)
        udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
        jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) \
                                        .where(col("parsed_field").isNotNull()) \
                                        .withColumn("created_at", col("parsed_field.created_at")) \
                                        .withColumn("screen_name", col("parsed_field.screen_name")) \
                                        .withColumn("tweet", col("parsed_field.tweet")) \
                                        .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))

        windowedCounts = jsonoutput.groupBy(window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),jsonoutput.screen_name)$

        mongooutput = jsonoutput \
                        .writeStream \
                        .format("com.mongodb.spark.sql.DefaultSource")\
                        .option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/twitter.test")\
                        .start()
        mongooutput.awaitTermination()

我看过mongodb文档,它支持spark到mongo sink

https://docs.mongodb.com/spark-connector/master/scala/streaming/

共有1个答案

薛涛
2023-03-14

我看过mongodb文档,它支持spark到mongo sink

文档声称,您可以使用标准RDDAPI来使用遗留流(DStream)API编写每个RDD。

这并不表明MongoDB支持结构化流,它也不支持。由于您使用的是PySpark,其中foreachwriter是不可访问的,因此您必须等待,直到(如果有的话)MongoDB包被更新以支持流操作。

 类似资料:
  • 我正试图将数据从Kafka传递到火花流。 这就是我到现在所做的: null

  • 我有一个项目,我需要使用java从JSON文件中获取数据,并将其沉入kafka topic,然后将数据从topic沉入mongodb。我已经找到了kafka-mongodb连接器,但是文档只适用于使用汇合平台进行连接。我试过了: 从Maven下载mongo-kafka-connect-1.2.0.jar。 将文件放入 /kafka/plugins 中 在connect-standalone.pro

  • 尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中

  • 问题内容: 初学者ES问题在这里 将Spark数据框推送到Elastic Search的工作流程或步骤是什么? 通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。 但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑 请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个

  • 问题内容: 我有Cassandra数据库,可以通过Apache Spark使用SparkSQL从该数据库分析数据。现在我想将那些分析过的数据插入PostgreSQL中。除了使用PostgreSQL驱动程序之外,是否有其他方法可以直接实现此目的(我想通过postREST和Driver实现它,我想知道是否有类似的方法)? 问题答案: 目前,尚无将RDD写入任何DBMS的本地实现。这里是Spark用户列

  • 我是火花的新手,我找不到这个...我有许多拼花地板文件上传到的位置: 此文件夹的总大小为,。如何将这些文件分块并读取到一个数据包中,如何将所有这些文件加载到一个数据包中? 错误: