当前位置: 首页 > 知识库问答 >
问题:

如何在Thrift服务器上查看pyspark临时表?

慕容超
2023-03-14

从pyspark.sql导入行l=[('ankit',25),('jalfaizy',22),('saurabh',20),('bala',26)]rdd=sc.parallelize(l)people=rdd.map(lambda x:Row(name=x[0],age=int(x[1]))people=people.todf().cache()peebs=people.createorreplacetempview('peebs')result=sqlcontext.sql('select*

到目前为止一切都很好,一切都很好。

在另一个终端上,我初始化spark thrift Server:./sbin/start-thriftServer.sh--hiveconf hive.server2.thrift.port=10001-conf spark.executor.cores=1--master spark://172.18.0.2:7077

服务器似乎正常启动,我可以看到pyspark和thrift服务器作业在我的spark cluster master UI上运行。

然后我使用beeline连接到集群

./bin/Beeline Beeline>!连接jdbc:hive2://172.18.0.2:10001

我想强调的两件有趣的事情是:

>

  • 启动pyspark时,会收到以下警告

    警告ObjectStore:6666-在Metastore中找不到版本信息。未启用hive.mastore.schema.verification,因此记录模式1.2.0版

    谢谢


  • 共有1个答案

    丁勇
    2023-03-14

    如果有人需要在Spark Streaming中这样做,我让它像这样工作。

    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    from py4j.java_gateway import java_import
    
    spark = SparkSession \
        .builder \
        .appName('restlogs_qlik') \
        .enableHiveSupport()\
        .config('spark.sql.hive.thriftServer.singleSession', True)\
        .config('hive.server2.thrift.port', '10001') \
        .getOrCreate()
    
    sc=spark.sparkContext
    sc.setLogLevel('INFO')
    
    #Order matters! 
    java_import(sc._gateway.jvm, "")
    sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
    
    #Define schema of json
    schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")
    
    #load data into spark-structured streaming
    df = spark \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "localhost:9092") \
          .option("subscribe", "rest_logs") \
          .load() \
          .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
    
    #Print output
    query = df.writeStream \
                .outputMode("append") \
                .format("memory") \
                .queryName("view_figures") \
                .start()
    
    query.awaitTermination();
    

    启动后,可以用beehive进行JDBC测试。我无法理解的是,我必须在相同的脚本中启动Thrift服务器。这就是如何开始脚本。

        spark-submit --master local[2] \
    --conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
    "C:\\Temp\\spark_kafka.py"
    

    希望这能帮到别人。顺便说一句,我正处于初步研究阶段,所以不要对我评头论足。

     类似资料:
    • 在< code>win7 64bit中安装< code>docker toolbox 我的代码: 我在chrome浏览器中看不到任何结果。 重新安装 192.168.99.100:8000 显示: 运行: 显示: 运行: 显示: 更新 http://192.168.99.100:8000/ docker内部, > < li> 首先检查nginx是否正在运行。 查看哪个进程正在使用端口80

    • 我有运行Spark2(V2.2)的Hortonworks HDP 2.6.3。我的测试用例非常简单: > 使用一些随机值创建配置单元表。端口10000上的配置单元 在10016打开Spark Thrift服务器 null 我有一个错误: 我怀疑它与列有关,所以我更改为:df.select(“desc”).show() 然后我得到了这个奇怪的结果: 由于某种原因,它返回了列名?! 案例2 直线: 显

    • 我有自定义服务,我想在树枝模板中使用它。在交响乐 但在Symfony 3.3中,我有一个错误: 无法自动连接服务"AppBundle\Service\ViewService":方法"__construct()"引用类"Symfony\Component\DependencyInject\Container"的参数"$容器",但不存在此类服务。尝试将type-hint更改为其父级之一:接口"Psr\

    • 由于需要研究rtmp协议交互方式及报数据格式,使用nginx临时搭建一个rtmp服务器,主要通过nginx的rtmp扩展模块实现接收RTMP推送的音视频流,同时提供RTMP拉流服务的能力。

    • 问题内容: 使用以下选项启动Java应用程序时: Java使用临时端口,这对于避免冲突非常有用。 是否可以从应用程序内部以编程方式获取实际端口(或连接URL)? 问题答案: 这将打印如下网址

    • 本文向大家介绍在Linux系统上查看Apache服务器的错误日志,包括了在Linux系统上查看Apache服务器的错误日志的使用技巧和注意事项,需要的朋友参考一下 错误日志和访问日志文件为系统管理员提供了有用的信息,比如,为 Web 服务器排障,保护系统不受各种各样的恶意活动侵犯,或者只是进行各种各样的分析以监控 HTTP 服务器。根据你 Web 服务器配置的不同,其错误/访问日志可能放在你系统中