当前位置: 首页 > 知识库问答 >
问题:

Apache Spark/scala中的Log4J Kafka功能

汪建白
2023-03-14

嗨,我正在尝试从一群执行者那里登录到一个Kafka主题,他们使用ApacheSpark和Log4J以及KafkaAppender。我可以使用基本的文件附加器与执行者登录,但不能登录到Kafka。

这是我的log4j.properties我为此定制的:

log4j.rootLogger=INFO, console, KAFKA, file

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n


log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=test2
log4j.appender.KAFKA.name=localhost
log4j.appender.KAFKA.host=localhost
log4j.appender.KAFKA.port=9092
log4j.appender.KAFKA.brokerList=localhost:9092
log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.requiredNumAcks=0
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n



log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=log4j-application.log
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

这是我的代码(到目前为止)。我试图传递一个记录器定义,以便每个执行者都能得到一个副本,但我不知道为什么它不会传到Kafka:

import org.apache.log4j._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import java.io._
import org.apache.kafka.log4jappender.KafkaLog4jAppender

class Mapper(n: Int) extends Serializable{
  @transient lazy val suplogger: Logger = Logger.getLogger("myLogger")

  def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
    rdd.map{ i =>
      val sparkConf: SparkConf =new org.apache.spark.SparkConf()
      logger.setLevel((Level) Level.ALL)
      suplogger.warn(sparkConf.toDebugString)
      val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName());
      suplogger.warn("--------------------")
      suplogger.warn("mapping: " + i)
      val supIterator = new scala.collection.JavaConversions.JEnumerationWrapper(suplogger.getAllAppenders())
      suplogger.warn("List is " + supIterator.toList)
      suplogger.warn("Num of list is: " + supIterator.size)

      //(i + n).toString
      "executor pid = "+pid + "debug string: " + sparkConf.toDebugString.size
    }
}

object Mapper {
  def apply(n: Int): Mapper = new Mapper(n)
}

object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("sup")
    println("yo")
    val log = LogManager.getRootLogger
    log.setLevel(Level.WARN)
    val nameIterator = new scala.collection.JavaConversions.JEnumerationWrapper(log.getAllAppenders())
    println(nameIterator.toList)

    val conf = new SparkConf().setAppName("demo-app")
    val sc = new SparkContext(conf)
    log.warn(conf.toDebugString)
    val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName());
    log.warn("--------------------")
    log.warn("IP: "+java.net.InetAddress.getLocalHost() +" PId: "+pid)

    log.warn("Hello demo")

    val data = sc.parallelize(1 to 100, 10)

    val mapper = Mapper(1)

    val other = mapper.doSomeMappingOnDataSetAndLogIt(data)

    other.collect()

    log.warn("I am done")
  }

}

下面是日志文件的一些示例输出:

2017-01-25 06:29:15 WARN  myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN  myLogger:21 - --------------------
2017-01-25 06:29:15 WARN  myLogger:23 - mapping: 1
2017-01-25 06:29:15 WARN  myLogger:25 - List is List()
2017-01-25 06:29:15 WARN  myLogger:26 - Num of list is: 0
2017-01-25 06:29:15 WARN  myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN  myLogger:21 - --------------------
2017-01-25 06:29:15 WARN  myLogger:23 - mapping: 2
2017-01-25 06:29:15 WARN  myLogger:25 - List is List()
2017-01-25 06:29:15 WARN  myLogger:26 - Num of list is: 0
2017-01-25 06:29:15 WARN  myLogger:19 - spark.driver.port=54335
2017-01-25 06:29:15 WARN  myLogger:21 - --------------------

谢谢你们的帮助,如果你们有什么需要的话,请告诉我!

这是spark提交命令的副本

spark-submit \
    --deploy-mode client \
    --files spark_test/mylogger.props \
    --packages "com.databricks:spark-csv_2.10:1.4.0,org.apache.kafka:kafka-log4j-appender:0.10.1.1" \
    --num-executors 4 \
    --executor-cores 1 \
    --driver-java-options "-Dlog4j.configuration=file:///home/mapr/spark_test/mylogger.props" \
    --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///home/mapr/spark_test/mylogger.props" \
    --class "HelloWorld" helloworld.jar

共有2个答案

奚卓
2023-03-14

您的问题是没有正确地将spark_test/mylogger.props文件传递给执行程序。

无论如何,您都需要使用执行程序的file上传文件。

spark-submit \
    --deploy-mode client \
    --driver-java-options "-Dlog4j.configuration=file:/home/mapr/spark_test/mylogger.props \
    --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
    --files /home/mapr/spark_test/mylogger.props \
    ...
spark-submit \
    --deploy-mode cluster \
    --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
    --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:mylogger.props" \
    --files /home/mapr/spark_test/mylogger.props \
    ...

查看我关于配置Spark日志记录的完整帖子:
https://stackoverflow.com/a/55596389/1549135

有关Spark Kafka appender的更多详细信息:
https://stackoverflow.com/a/58883911/1549135

鲁鹏
2023-03-14

我知道问题是什么了。我没有部署到集群,我只是在客户端模式下部署。说实话,我不知道为什么当我被发送到集群时,这会起作用。

我使用的是MapR沙盒虚拟机https://www.mapr.com/products/mapr-sandbox-hadoop

如果有人能解释为什么客户机/集群在这里发挥了作用,我将非常感激!

 类似资料:
  • 我正在使用ApacheSpark和Scala的MLlib。我需要转换一组向量 在标签点中,为了应用MLLib算法,每个向量由0.0(假)或1.0(真)的双值组成。所有向量都保存在RDD中,因此最终的RDD是 因此,在RDD中,有一些向量是用 我如何从这个RDD(data_tmp)或行矩阵(data)创建一个使用MLLib算法的标签点集?例如,我需要在这里应用SVMs线性alghoritms

  • 问题内容: 我正在尝试将一些Python代码移植到Scala。它大量使用了Numpy和Scipy。虽然我已经找到了许多密集矩阵/线性代数库,它们可以作为NumPy的适当替代品(但不是极好的替代品),但我还没有真正找到能够提供我在SciPy中使用的功能的东西。特别是,我正在寻找一个支持稀疏部分本征分解的库(例如SciPy对arpack的包装),然后再寻找SciPy提供的一些简单内容的库(例如直方图)

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。 假设搜索id时返回的第一个人是正确的家长 它给出以下错误 “原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,

  • 我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。

  • 我试图在火花笔记本的阿帕奇火花中做NLP。对于这个特定的例子,我正在使用库https://opennlp.apache.org创建一个块来提取名词短语。由于数据量的增加,我需要转向分布式计算。 问题是我无法广播我的chunker对象。通过阅读文档(只在board上投射数组等简单对象),我尝试了以下方法: 但这会引发以下错误: 如果我将chunker的初始化封装在函数中,然后在map方法中调用函数,