当前位置: 首页 > 知识库问答 >
问题:

在dstream.transform()中使用SQL而不是Spark流?

伏欣悦
2023-03-14

foreachrdd()中有一些使用SQL over Spark Streaming的示例。但是如果我想在tranform()中使用SQL:

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

我犯了这样的错误:

[error]/users/raochenlin/downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/logstash.scala:52:没有html" target="_blank">方法转换的类型参数:(transformfunc:org.apache.spark.rdd.rdd.rdd[String]=>org.apache.spark.rdd.rdd[U])(隐式证据$5:scala.reflect.classtag[U])org.apache.spark.rdd.dstream.dstream[U],因此可以将其应用于参数(org.apache.spark.rdd.rdd[String]=>org.apache.spark.rdd.rdd[String]字符串<:java.io.serializable])[error]----因为--[error]参数表达式的类型与形参类型不兼容;[error]找到:org.apache.spark.rdd.rdd[String]=>org.apache.spark.rdd.rdd[_>:logstash.alertmsg带有字符串<:java.io.serializable][err]必需:org.apache.spark.rdd.rdd[String]=>org.apache.spark.rdd.rdd[?u][err]lines.transform(rdd=>{[errer]^[err]找到一个错误[errer](compile:compile)编译失败

似乎只有使用sqlreport.map(r=>r.tostring)才是正确的用法?

共有1个答案

尉迟华翰
2023-03-14

dstream.transform接受一个函数transformfunc:(rdd[T])¥rdd[U]在本例中,if必须在两个条件的计算中产生相同的类型,但情况并非如此:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

在本例中,如果rdd.count...,则删除优化,以便具有唯一的转换路径。

 类似资料:
  • persistence.xml文件 http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd“>org.hibernate.ejb.hibernatePersistence

  • 是否可以在Android Studio中为设备显示?我正在编写一个使用USB端口的应用程序,我无法将其用于。 目前,我正在使用以下指令查看命令提示符中的logcat,但如果将其集成到AS:http://developer.android.com/tools/help/adb.html#wireless

  • 问题内容: 我正在使用postgreSQL。我有一列: 但是,当我想插入带有空字符串的行时,如下所示: 它不会给我错误并接受。如何检查插入值应为?(既不为空也不为空) PS: 我的专栏定义为: 问题答案: 向列定义添加约束。例如类似: 有关更多信息,请参见http://www.postgresql.org/docs/current/static/ddl- constraints.html

  • 问题内容: 我知道我们可以 但是,这种小字节序格式很难与原始Guid进行比较 如何在SQL语句中使用原始的Guid而不是little endian? 问题答案: 如果您想轻松地与原始内容进行比较而不进行转换,则将其存储为文本。它会占用更多的存储空间,并且读/写/比较会更慢,但更容易被人阅读。

  • 我正在开发Spark SQL应用程序,我有几个问题: 我读到Spark SQL在封面下使用Hive metastore?这是真的吗?我说的是一个纯Spark SQL应用程序,它不显式连接到任何配置单元安装 我正在启动一个Spark SQL应用程序,不需要使用Hive。有什么理由使用蜂箱吗?据我所知,Spark SQL比Hive快得多;所以,我看不出有任何理由使用蜂箱。但我说得对吗