当前位置: 首页 > 知识库问答 >
问题:

spark流媒体应用程序和kafka log4j appender问题

胡鸿志
2023-03-14

我正在测试我的火花流应用程序,我在我的代码中有多个函数:-其中一些在DStream[RDD[XXX]上运行,其中一些在RDD[XXX]上运行(在我做DStream.foreachRDD之后)。

我使用Kafka log4j appender来记录发生在我的函数中的业务案例,这些案例在DStream[RDD]上运行

但只有当来自于在RDD上运行的函数时,数据才会附加到Kafka-

有人知道这种行为的原因吗?

我在一台虚拟机上工作,在那里我有Spark

编辑

事实上,我已经解决了问题的一部分。数据仅从我的主函数中的代码部分附加到Kafka。所有的代码,是在我的主,不写数据Kafka。

大体上,我是这样声明记录器的:

val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafkaLogger")

在我的主要任务之外,我必须这样声明:

@transient lazy val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafkaLogger")

以避免序列化问题。

原因可能是JVM序列化概念的背后,或者仅仅是因为工作人员看不到log4j配置文件(但我的log4j文件在我的源代码中,在资源文件夹中)

编辑2

我尝试了很多方式发送log4j文件给执行者,但不工作。我试过:

>

在spark submit中设置:--conf“spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/home/vagrant/log4j.properties”

设置log4j。spark submit的驱动程序类路径中的属性文件。。。

这些选项都不起作用。

有人有解决办法吗?我在错误日志中没有看到任何错误。。

非常感谢。

共有1个答案

施飞昂
2023-03-14

我想你很接近。。首先,要确保使用--files标志将所有文件导出到所有节点上的工作目录(而不是类路径)。然后您希望将这些文件引用到executors和driver的classpath选项。我已经附加了以下命令,希望对您有所帮助。关键是要理解一旦导出了文件,就可以使用工作目录的文件名(而不是url路径)在节点上访问所有文件。

注意:将log4j文件放入资源文件夹将不起作用(至少当我尝试时,它没有)。

sudo -u hdfs spark-submit --class "SampleAppMain" --master yarn --deploy-mode cluster --verbose --files file:///path/to/custom-log4j.properties,hdfs:///path/to/jar/kafka-log4j-appender-0.9.0.0.jar --conf "spark.driver.extraClassPath=kafka-log4j-appender-0.9.0.0.jar" --conf "spark.executor.extraClassPath=kafka-log4j-appender-0.9.0.0.jar"  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=custom-log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=custom-log4j.properties"  /path/to/your/jar/SampleApp-assembly-1.0.jar
 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • 我正在尝试从Spark官方网站运行Spark Streaming示例 这些是我在pom文件中使用的依赖项: 这是我的Java代码: 当我尝试从Eclipse运行它时,我遇到以下异常: 我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢

  • 我正在使用Flume 1.6.0-cdh5.9.1使用Twitter源流式传输推文。 配置文件如下所示: 对于Cloudera. jar依赖项,我使用Maven使用以下依赖项构建了: 现在,当我运行Flume Agent时,它成功启动,连接到Twitter,但在最后一行(接收状态流)后停止: 在最后一行之后什么都没有发生。它不会终止,不会流式传输任何东西。我看了一下HDFS位置,那里没有创建任何东

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

  • 我通过spark streaming应用了一个实时异常检测系统。在每个流媒体时间间隔内,如果数据点异常,AWS SNS会发送电子邮件订阅帐户。但是AWS SNS java sdk不喜欢在spark流媒体中工作。下面是错误消息 ERROR StreamingContext:启动上下文时出错,将其标记为停止的java。伊奥。NotSerializableException:已启用数据流检查点,但数据流