from __future__ import print_function
import sys
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType
from pyspark.sql.types import *
import json
from pyspark.sql.functions import struct
from pyspark.sql.functions import *
import datetime
json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
def parse_json(df):
twitterid = json.loads(df[0])['id']
created_at = json.loads(df[0])['created_at']
tweet = json.loads(df[0])['text']
tweet = json.loads(df[0])['text']
screen_name = json.loads(df[0])['user']['screen_name']
return [twitterid, created_at, tweet, screen_name]
def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.config("spark.mongodb.input.uri","mongodb://192.168.1.16:27017/twitter.test")\
.config("spark.mongodb.output.uri","mongodb://192.168.1.16:27017/twitter.test")\
.getOrCreate()
events = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "twitter")\
.load()
events = events.selectExpr("CAST(value as String)")
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) \
.where(col("parsed_field").isNotNull()) \
.withColumn("created_at", col("parsed_field.created_at")) \
.withColumn("screen_name", col("parsed_field.screen_name")) \
.withColumn("tweet", col("parsed_field.tweet")) \
.withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
windowedCounts = jsonoutput.groupBy(window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),jsonoutput.screen_name)$
mongooutput = jsonoutput \
.writeStream \
.format("com.mongodb.spark.sql.DefaultSource")\
.option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/twitter.test")\
.start()
mongooutput.awaitTermination()
我看过mongodb文档,它支持spark到mongo sink
https://docs.mongodb.com/spark-connector/master/scala/streaming/
我看过mongodb文档,它支持spark到mongo sink
文档声称,您可以使用标准RDD
API来使用遗留流(DStream
)API编写每个RDD。
这并不表明MongoDB支持结构化流,它也不支持。由于您使用的是PySpark,其中foreach
writer是不可访问的,因此您必须等待,直到(如果有的话)MongoDB包被更新以支持流操作。
我正试图将数据从Kafka传递到火花流。 这就是我到现在所做的: null
我有一个项目,我需要使用java从JSON文件中获取数据,并将其沉入kafka topic,然后将数据从topic沉入mongodb。我已经找到了kafka-mongodb连接器,但是文档只适用于使用汇合平台进行连接。我试过了: 从Maven下载mongo-kafka-connect-1.2.0.jar。 将文件放入 /kafka/plugins 中 在connect-standalone.pro
尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中
问题内容: 初学者ES问题在这里 将Spark数据框推送到Elastic Search的工作流程或步骤是什么? 通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。 但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑 请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个
问题内容: 我有Cassandra数据库,可以通过Apache Spark使用SparkSQL从该数据库分析数据。现在我想将那些分析过的数据插入PostgreSQL中。除了使用PostgreSQL驱动程序之外,是否有其他方法可以直接实现此目的(我想通过postREST和Driver实现它,我想知道是否有类似的方法)? 问题答案: 目前,尚无将RDD写入任何DBMS的本地实现。这里是Spark用户列
我是火花的新手,我找不到这个...我有许多拼花地板文件上传到的位置: 此文件夹的总大小为,。如何将这些文件分块并读取到一个数据包中,如何将所有这些文件加载到一个数据包中? 错误: