从pyspark.sql导入行l=[('ankit',25),('jalfaizy',22),('saurabh',20),('bala',26)]rdd=sc.parallelize(l)people=rdd.map(lambda x:Row(name=x[0],age=int(x[1]))people=people.todf().cache()peebs=people.createorreplacetempview('peebs')result=sqlcontext.sql('select*
到目前为止一切都很好,一切都很好。
在另一个终端上,我初始化spark thrift Server:
./sbin/start-thriftServer.sh--hiveconf hive.server2.thrift.port=10001-conf spark.executor.cores=1--master spark://172.18.0.2:7077
服务器似乎正常启动,我可以看到pyspark和thrift服务器作业在我的spark cluster master UI上运行。
然后我使用beeline连接到集群
./bin/Beeline Beeline>!连接jdbc:hive2://172.18.0.2:10001
我想强调的两件有趣的事情是:
>
启动pyspark时,会收到以下警告
警告ObjectStore:6666-在Metastore中找不到版本信息。未启用hive.mastore.schema.verification,因此记录模式1.2.0版
谢谢
如果有人需要在Spark Streaming中这样做,我让它像这样工作。
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
spark = SparkSession \
.builder \
.appName('restlogs_qlik') \
.enableHiveSupport()\
.config('spark.sql.hive.thriftServer.singleSession', True)\
.config('hive.server2.thrift.port', '10001') \
.getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')
#Order matters!
java_import(sc._gateway.jvm, "")
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
#Define schema of json
schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")
#load data into spark-structured streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "rest_logs") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
#Print output
query = df.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("view_figures") \
.start()
query.awaitTermination();
启动后,可以用beehive进行JDBC测试。我无法理解的是,我必须在相同的脚本中启动Thrift服务器。这就是如何开始脚本。
spark-submit --master local[2] \
--conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
"C:\\Temp\\spark_kafka.py"
希望这能帮到别人。顺便说一句,我正处于初步研究阶段,所以不要对我评头论足。
在< code>win7 64bit中安装< code>docker toolbox 我的代码: 我在chrome浏览器中看不到任何结果。 重新安装 192.168.99.100:8000 显示: 运行: 显示: 运行: 显示: 更新 http://192.168.99.100:8000/ docker内部, > < li> 首先检查nginx是否正在运行。 查看哪个进程正在使用端口80
我有运行Spark2(V2.2)的Hortonworks HDP 2.6.3。我的测试用例非常简单: > 使用一些随机值创建配置单元表。端口10000上的配置单元 在10016打开Spark Thrift服务器 null 我有一个错误: 我怀疑它与列有关,所以我更改为:df.select(“desc”).show() 然后我得到了这个奇怪的结果: 由于某种原因,它返回了列名?! 案例2 直线: 显
我有自定义服务,我想在树枝模板中使用它。在交响乐 但在Symfony 3.3中,我有一个错误: 无法自动连接服务"AppBundle\Service\ViewService":方法"__construct()"引用类"Symfony\Component\DependencyInject\Container"的参数"$容器",但不存在此类服务。尝试将type-hint更改为其父级之一:接口"Psr\
由于需要研究rtmp协议交互方式及报数据格式,使用nginx临时搭建一个rtmp服务器,主要通过nginx的rtmp扩展模块实现接收RTMP推送的音视频流,同时提供RTMP拉流服务的能力。
问题内容: 使用以下选项启动Java应用程序时: Java使用临时端口,这对于避免冲突非常有用。 是否可以从应用程序内部以编程方式获取实际端口(或连接URL)? 问题答案: 这将打印如下网址
本文向大家介绍在Linux系统上查看Apache服务器的错误日志,包括了在Linux系统上查看Apache服务器的错误日志的使用技巧和注意事项,需要的朋友参考一下 错误日志和访问日志文件为系统管理员提供了有用的信息,比如,为 Web 服务器排障,保护系统不受各种各样的恶意活动侵犯,或者只是进行各种各样的分析以监控 HTTP 服务器。根据你 Web 服务器配置的不同,其错误/访问日志可能放在你系统中