Monitoring(监控)

融焕
2023-12-01

Monitoring and Instrumentation

有几种方法可以监控Spark应用程序:Web UI,指标和外部检测。

Web Interfaces

默认情况下,每个SparkContext都会在端口4040上启动Web UI,以显示有关应用程序的有用信息。 这包括:

  • 调度程序阶段和任务的列表
  • RDD大小和内存使用情况的摘要
  • 环境信息
  • 有关运行执行程序的信息

请注意,此信息仅在应用程序的默认时间内可用。 要在事后查看Web UI,请在启动应用程序之前将spark.eventLog.enabled设置为true。 这会将Spark配置为记录Spark事件,该事件将UI中显示的信息编码为持久存储。

Viewing After the Fact(事后观察)

如果应用程序的事件日志存在,仍然可以通过Spark的历史服务器构建应用程序的UI。 您可以通过执行以下命令启动历史记录服

./sbin/start-history-server.sh

这默认情况下在http:// :18080创建一个Web界面,列出未完成和已完成的应用程序和尝试。

使用文件系统提供程序类时(请参阅下面的spark.history.provider),必须在spark.history.fs.logDirectory配置选项中提供基本日志记录目录,并且应包含每个代表应用程序事件日志的子目录。

必须将spark作业本身配置为记录事件,并将它们记录到同一个共享的可写目录中。 例如,如果服务器配置了hdfs:// namenode / shared / spark-logs的日志目录,那么客户端选项将是:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

历史服务器可以配置如下:

Environment Variables

Environment VariableMeaning
SPARK_DAEMON_MEMORY要分配给历史服务器的内存(默认值:1g)。
SPARK_DAEMON_JAVA_OPTS历史服务器的JVM选项(默认值:none)。
SPARK_DAEMON_CLASSPATH历史服务器的类路径(默认值:none)。
SPARK_PUBLIC_DNS历史服务器的公共地址。 如果未设置此选项,则指向应用程序历史记录的链接可能会使用服务器的内部地址,从而导致链接断开(默认值:none)。
SPARK_HISTORY_OPTSspark.history。*历史服务器的配置选项(默认值:none)。

Spark configuration options

Property NameDefaultMeaning
spark.history.providerorg.apache.spark.deploy.history.FsHistoryProvider实现应用程序历史后端的类的名称。 目前,Spark只提供了一个实现,用于查找存储在文件系统中的应用程序日志。
spark.history.fs.logDirectoryfile:/tmp/spark-events对于文件系统历史记录提供程序,包含要加载的应用程序事件日志的目录的URL。 这可以是本地file://路径,HDFS路径hdfs:// namenode / shared / spark-logs或Hadoop API支持的备用文件系统。
spark.history.fs.update.interval10s文件系统历史记录提供程序检查日志目录中的新日志或更新日志的时间段。 较短的间隔可以更快地检测新应用程序,代价是更多服务器负载重新读取更新的应用程序。 更新完成后,已完成和未完成的应用程序列表将反映更改。
spark.history.retainedApplications50保留缓存中UI数据的应用程序数。 如果超过此上限,则将从缓存中删除最旧的应用程序。 如果应用程序不在缓存中,则必须从磁盘加载它(如果从UI访问)。
spark.history.ui.maxApplicationsInt.MaxValue要在历史摘要页面上显示的应用程序数。 即使应用程序UI未显示在历史摘要页面上,它们仍然可以直接访问其URL。
spark.history.ui.port18080历史服务器的Web界面绑定的端口。
spark.history.kerberos.enabledfalse指示历史记录服务器是否应使用kerberos登录。 如果历史记录服务器正在访问安全Hadoop集群上的HDFS文件,那么这是必需的。 如果这是真的,它使用配置spark.history.kerberos.principal和spark.history.kerberos.keytab。
spark.history.kerberos.principal(none)历史服务器的Kerberos主体名称。
spark.history.kerberos.keytab(none)历史记录服务器的kerberos密钥表文件的位置。
spark.history.ui.acls.enablefalse指定是否应检查acls以授权查看应用程序的用户。 如果启用,则无论应用程序运行时单个应用程序为spark.ui.acls.enable设置了什么,都会进行访问控制检查。 应用程序所有者将始终有权查看自己的应用程序,并且运行应用程序时通过spark.ui.view.acls.groups指定的spark.ui.view.acls和组指定的任何用户也将有权查看该应用程序。 如果禁用,则不进行访问控制检查。
spark.history.ui.admin.aclsempty逗号分隔的用户/管理员列表,具有对历史服务器中所有Spark应用程序的查看访问权限。 默认情况下,只允许在运行时查看应用程序的用户可以访问相关的应用程序历史记录,配置的用户/管理员也可以拥有访问它的权限。 在列表中加上“*”表示任何用户都可以拥有admin权限。
spark.history.ui.admin.acls.groupsempty逗号分隔的组列表,具有对历史服务器中所有Spark应用程序的查看访问权限。 默认情况下,只允许在运行时查看应用程序的组可以访问相关的应用程序历史记录,配置的组也可以拥有访问它的权限。 在列表中加上“*”表示任何组都可以拥有admin的权限。
spark.history.fs.cleaner.enabledfalse指定历史记录服务器是否应定期清理存储中的事件日志。
spark.history.fs.cleaner.interval1d文件系统作业历史记录清理程序检查要删除的文件的频率。 仅当文件早于spark.history.fs.cleaner.maxAge时才会删除文件
spark.history.fs.cleaner.maxAge7d文件系统历史记录清理程序运行时,将删除早于此的作业历史记录文件。
spark.history.fs.numReplayThreads25% of available cores历史记录服务器用于处理事件日志的线程数。
spark.history.store.path(none)本地目录缓存应用程序历史数据的位置。 如果设置,则历史服务器将应用程序数据存储在磁盘上,而不是将其保留在内存中。 写入磁盘的数据将在历史服务器重新启动时重复使用。

请注意,在所有这些中,表通过单击其标题进行排序,从而可以轻松识别缓慢的任务,数据倾斜等。
笔记:

  1. 历史记录服务器显示已完成和未完成的Spark作业。 如果应用程序在失败后进行多次尝试,则将显示失败的尝试,以及任何正在进行的未完成尝试或最终成功尝试。
  2. 不完整的应用程序仅间歇性更新。 更新之间的时间由检查更改文件(spark.history.fs.update.interval)之间的间隔定义。 在较大的集群上,可以将更新间隔设置为较大的值。 查看正在运行的应用程序的方法实际上是查看自己的Web UI。
  3. 退出但未将自己注册为已完成的应用程序将被列为不完整 - 尽管它们不再运行。 如果应用程序崩溃,可能会发生这种情况。
  4. 发出Spark作业完成信号的一种方法是显式停止Spark上下文(sc.stop()),或者使用with SparkContext()作为sc:construct来处理Spark上下文设置和拆除。

REST API

除了在UI中查看指标外,它们还可以作为JSON使用。 这为开发人员提供了一种为Spark创建新的可视化和监视工具的简便方法。 JSON既可用于正在运行的应用程序,也可用于历史记录服务器。 端点安装在/ api / v1。 例如,对于历史服务器,它们通常可以在http:// :18080 / api / v1访问,对于正在运行的应用程序,可以在http:// localhost:4040 / api / v1访问。

在API中,应用程序由其应用程序ID [app-id]引用。 在YARN上运行时,每个应用程序可能有多次尝试,但是只有集群模式下的应用程序才有尝试ID,而不是客户端模式下的应用程序。 可以通过[attempt-id]来识别YARN群集模式中的应用程序。 在下面列出的API中,当在YARN群集模式下运行时,[app-id]实际上是[base-app-id] / [attempt-id],其中[base-app-id]是YARN应用程序ID。

EndpointMeaning
/applicationsA list of all applications.?status=[completed|running] list only applications in the chosen state. ?minDate=[date] earliest start date/time to list. ?maxDate=[date] latest start date/time to list. ?minEndDate=[date] earliest end date/time to list. ?maxEndDate=[date] latest end date/time to list. ?limit=[limit] limits the number of applications listed. Examples: ?minDate=2015-02-10 ?minDate=2015-02-03T16:42:40.000GMT ?maxDate=2015-02-11T20:41:30.000GMT ?minEndDate=2015-02-12 ?minEndDate=2015-02-12T09:15:10.000GMT ?maxEndDate=2015-02-14T16:30:45.000GMT ?limit=10
/applications/[app-id]/jobsA list of all jobs for a given application. ?status=[running|succeeded|failed|unknown] list only jobs in the specific state.
/applications/[app-id]/jobs/[job-id]Details for the given job.
/applications/[app-id]/stagesA list of all stages for a given application. status=[active|complete|pending|failed] list only stages in the state.
/applications/[app-id]/stages/[stage-id]A list of all attempts for the given stage.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]Details for the given stage attempt.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummarySummary metrics of all tasks in the given stage attempt. ?quantiles summarize the metrics with the given quantiles. Example: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskListA list of all tasks for the given stage attempt. ?offset=[offset]&length=[len] list tasks in the given range. ?sortBy=[runtime|-runtime] sort the tasks. Example: ?offset=10&length=50&sortBy=runtime
/applications/[app-id]/executorsA list of all active executors for the given application.
/applications/[app-id]/allexecutorsA list of all(active and dead) executors for the given application.
/applications/[app-id]/storage/rddA list of stored RDDs for the given application.
/applications/[app-id]/storage/rdd/[rdd-id]Details for the storage status of a given RDD.
/applications/[base-app-id]/logsDownload the event logs for all attempts of the given application as files within a zip file.
/applications/[base-app-id]/[attempt-id]/logsDownload the event logs for a specific application attempt as a zip file.
/applications/[app-id]/streaming/statisticsStatistics for the streaming context.
/applications/[app-id]/streaming/receiversA list of all streaming receivers.
/applications/[app-id]/streaming/receivers/[stream-id]Details of the given receiver.
/applications/[app-id]/streaming/batchesA list of all retained batches.
/applications/[app-id]/streaming/batches/[batch-id]Details of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operationsA list of all output operations of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id]Details of the given operation and given batch.
/applications/[app-id]/environmentEnvironment details of the given application.
/versionGet the current spark version.

可以检索的作业和阶段的数量受独立Spark UI的相同保留机制的限制; “spark.ui.retainedJobs”定义触发作业垃圾收集的阈值,以及阶段的spark.ui.retainedStages。 请注意,垃圾回收在回放时发生:可以通过增加这些值并重新启动历史记录服务器来检索更多条目。

API Versioning Policy
这些端点经过了强大的版本控制,可以更轻松地开发应用程序。 特别是,Spark保证:

  • 永远不会从一个版本中删除端点
  • 对于任何给定的端点,永远不会删除单个字段
  • 可以添加新的端点
  • 可以将新字段添加到现有端点
  • 可以在将来添加新版本的api作为单独的端点(例如,api / v2)。 新版本不需要向后兼容。
  • Api版本可能会被删除,但只有在至少一个与新api版本共存的次要版本之后。

请注意,即使在检查正在运行的应用程序的UI时,仍然需要applications / [app-id]部分,尽管只有一个应用程序可用。 例如。 要查看正在运行的应用程序的作业列表,您可以访问http:// localhost:4040 / api / v1 / applications / [app-id] / jobs。 这是为了在两种模式下保持路径一致。

Metrics(度量)

Spark有一个基于Dropwizard指标库的可配置指标系统。这允许用户将Spark指标报告给各种接收器,包括HTTP,JMX和CSV文件。度量系统是通过Spark期望出现在$ SPARK_HOME / conf / metrics.properties中的配置文件配置的。可以通过spark.metrics.conf配置属性指定自定义文件位置。默认情况下,用于驱动程序或执行程序度量标准的根命名空间是spark.app.id的值。但是,用户通常希望能够跟踪应用程序中驱动程序和执行程序的度量标准,这很难用于应用程序ID(即spark.app.id),因为它随应用程序的每次调用而变化。对于此类用例,可以使用spark.metrics.namespace配置属性为度量报告指定自定义命名空间。例如,如果用户想要将metrics命名空间设置为应用程序的名称,则可spark.metrics.namespace属性设置为类似$ {spark.app.name}的值。然后,Spark会适当地扩展此值,并将其用作度量系统的根命名空间。非驱动程序和执行程序指标永远不会以spark.app.id作为前缀,spark.metrics.namespace属性也不会对此类指标产生任何影响。

Spark的度量标准分离到与Spark组件对应的不同实例中。 在每个实例中,您可以配置一组报告度量标准的接收器。 目前支持以下实例:

  • master: The Spark standalone master process.
  • applications: A component within the master which reports on various applications.
  • worker: A Spark standalone worker process.
  • executor: A Spark executor.
  • driver: The Spark driver process (the process in which your SparkContext is created).
  • shuffleService: The Spark shuffle service

每个实例都可以向零个或多个接收器报告。 接收器包含在org.apache.spark.metrics.sink包中:

  • ConsoleSink:将指标信息记录到控制台。
  • CSVSink:定期将指标数据导出为CSV文件。
  • JmxSink:注册要在JMX控制台中查看的度量标准。
  • MetricsServlet:在现有Spark UI中添加servlet,以将度量数据作为JSON数据提供。
  • GraphiteSink:将指标发送到Graphite节点。
  • Slf4jSink:将指标发送到slf4j作为日志条目。
  • StatsdSink:将指标发送到StatsD节点。

Spark还支持Ganglia接收器,由于许可限制,该接收器未包含在默认构建中:

  • GangliaSink: Sends metrics to a Ganglia node or multicast group.

要安装GangliaSink,您需要执行Spark的自定义构建。 请注意,通过嵌入此库,您将在Spark包中包含LGPL许可的代码。 对于sbt用户,请在构建之前设置SPARK_GANGLIA_LGPL环境变量。 对于Maven用户,请启用-Pspark-ganglia-lgpl配置文件。 除了修改集群的Spark构建之外,用户应用程序还需要链接到spark-ganglia-lgpl工件。

度量配置文件的语法在示例配置文件$ SPARK_HOME / conf / metrics.properties.template中定义。


Advanced Instrumentation(高级仪器)

可以使用几个外部工具来帮助分析Spark作业的性能:

  • 群集范围的监视工具(如Ganglia)可以提供对整体群集利用率和资源瓶颈的深入了解。 例如,Ganglia仪表板可以快速显示特定工作负载是磁盘绑定,网络绑定还是CPU绑定。
  • 操作系统分析工具(如dstat,iostat和iotop)可以在各个节点上提供细粒度的分析。
  • JVM实用程序(如jstack用于提供堆栈跟踪,jstat用于创建堆转储),jstat用于报告时间序列统计信息,jconsole用于可视化地探索各种JVM属性,对于那些熟悉JVM内部的人来说非常有用。
 类似资料: