嗨,我正在尝试从一群执行者那里登录到一个Kafka主题,他们使用ApacheSpark和Log4J以及KafkaAppender。我可以使用基本的文件附加器与执行者登录,但不能登录到Kafka。
这是我的log4j.properties我为此定制的:
log4j.rootLogger=INFO, console, KAFKA, file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=test2
log4j.appender.KAFKA.name=localhost
log4j.appender.KAFKA.host=localhost
log4j.appender.KAFKA.port=9092
log4j.appender.KAFKA.brokerList=localhost:9092
log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.requiredNumAcks=0
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=log4j-application.log
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
这是我的代码(到目前为止)。我试图传递一个记录器定义,以便每个执行者都能得到一个副本,但我不知道为什么它不会传到Kafka:
import org.apache.log4j._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import java.io._
import org.apache.kafka.log4jappender.KafkaLog4jAppender
class Mapper(n: Int) extends Serializable{
@transient lazy val suplogger: Logger = Logger.getLogger("myLogger")
def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
val sparkConf: SparkConf =new org.apache.spark.SparkConf()
logger.setLevel((Level) Level.ALL)
suplogger.warn(sparkConf.toDebugString)
val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName());
suplogger.warn("--------------------")
suplogger.warn("mapping: " + i)
val supIterator = new scala.collection.JavaConversions.JEnumerationWrapper(suplogger.getAllAppenders())
suplogger.warn("List is " + supIterator.toList)
suplogger.warn("Num of list is: " + supIterator.size)
//(i + n).toString
"executor pid = "+pid + "debug string: " + sparkConf.toDebugString.size
}
}
object Mapper {
def apply(n: Int): Mapper = new Mapper(n)
}
object HelloWorld {
def main(args: Array[String]): Unit = {
println("sup")
println("yo")
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
val nameIterator = new scala.collection.JavaConversions.JEnumerationWrapper(log.getAllAppenders())
println(nameIterator.toList)
val conf = new SparkConf().setAppName("demo-app")
val sc = new SparkContext(conf)
log.warn(conf.toDebugString)
val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName());
log.warn("--------------------")
log.warn("IP: "+java.net.InetAddress.getLocalHost() +" PId: "+pid)
log.warn("Hello demo")
val data = sc.parallelize(1 to 100, 10)
val mapper = Mapper(1)
val other = mapper.doSomeMappingOnDataSetAndLogIt(data)
other.collect()
log.warn("I am done")
}
}
下面是日志文件的一些示例输出:
2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN myLogger:21 - --------------------
2017-01-25 06:29:15 WARN myLogger:23 - mapping: 1
2017-01-25 06:29:15 WARN myLogger:25 - List is List()
2017-01-25 06:29:15 WARN myLogger:26 - Num of list is: 0
2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN myLogger:21 - --------------------
2017-01-25 06:29:15 WARN myLogger:23 - mapping: 2
2017-01-25 06:29:15 WARN myLogger:25 - List is List()
2017-01-25 06:29:15 WARN myLogger:26 - Num of list is: 0
2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN myLogger:21 - --------------------
谢谢你们的帮助,如果你们有什么需要的话,请告诉我!
这是spark提交命令的副本
spark-submit \
--deploy-mode client \
--files spark_test/mylogger.props \
--packages "com.databricks:spark-csv_2.10:1.4.0,org.apache.kafka:kafka-log4j-appender:0.10.1.1" \
--num-executors 4 \
--executor-cores 1 \
--driver-java-options "-Dlog4j.configuration=file:///home/mapr/spark_test/mylogger.props" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///home/mapr/spark_test/mylogger.props" \
--class "HelloWorld" helloworld.jar
您的问题是没有正确地将spark_test/mylogger.props
文件传递给执行程序。
无论如何,您都需要使用执行程序的file
上传文件。
spark-submit \
--deploy-mode client \
--driver-java-options "-Dlog4j.configuration=file:/home/mapr/spark_test/mylogger.props \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
--files /home/mapr/spark_test/mylogger.props \
...
spark-submit \
--deploy-mode cluster \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
--files /home/mapr/spark_test/mylogger.props \
...
查看我关于配置Spark日志记录的完整帖子:
https://stackoverflow.com/a/55596389/1549135
有关Spark Kafka appender的更多详细信息:
https://stackoverflow.com/a/58883911/1549135
我知道问题是什么了。我没有部署到集群,我只是在客户端模式下部署。说实话,我不知道为什么当我被发送到集群时,这会起作用。
我使用的是MapR沙盒虚拟机https://www.mapr.com/products/mapr-sandbox-hadoop
如果有人能解释为什么客户机/集群在这里发挥了作用,我将非常感激!
我正在使用ApacheSpark和Scala的MLlib。我需要转换一组向量 在标签点中,为了应用MLLib算法,每个向量由0.0(假)或1.0(真)的双值组成。所有向量都保存在RDD中,因此最终的RDD是 因此,在RDD中,有一些向量是用 我如何从这个RDD(data_tmp)或行矩阵(data)创建一个使用MLLib算法的标签点集?例如,我需要在这里应用SVMs线性alghoritms
问题内容: 我正在尝试将一些Python代码移植到Scala。它大量使用了Numpy和Scipy。虽然我已经找到了许多密集矩阵/线性代数库,它们可以作为NumPy的适当替代品(但不是极好的替代品),但我还没有真正找到能够提供我在SciPy中使用的功能的东西。特别是,我正在寻找一个支持稀疏部分本征分解的库(例如SciPy对arpack的包装),然后再寻找SciPy提供的一些简单内容的库(例如直方图)
我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里
我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。 假设搜索id时返回的第一个人是正确的家长 它给出以下错误 “原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,
我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。
我试图在火花笔记本的阿帕奇火花中做NLP。对于这个特定的例子,我正在使用库https://opennlp.apache.org创建一个块来提取名词短语。由于数据量的增加,我需要转向分布式计算。 问题是我无法广播我的chunker对象。通过阅读文档(只在board上投射数组等简单对象),我尝试了以下方法: 但这会引发以下错误: 如果我将chunker的初始化封装在函数中,然后在map方法中调用函数,