Monitoring and Instrumentation

有几种方法来监视 Spark 应用程序:Web UI,metrics 和外部工具。

Web 界面

每个 SparkContext 都会启动一个 Web UI,默认端口为 4040,显示有关应用程序的有用信息。这包括:

  • 调度器阶段和任务的列表
  • RDD 大小和内存使用的概要信息
  • 环境信息
  • 正在运行的执行器的信息

您可以通过在 Web 浏览器中打开 http://<driver-node>:4040 来访问此界面。如果多个 SparkContexts 在同一主机上运行,则它们将绑定到连续的端口从 4040(4041,4042 等)开始。

请注意,默认情况下此信息仅适用于运行中的应用程序。要在事后还能通过 Web UI 查看,请在应用程序启动之前,将 spark.eventLog.enabled 设置为 true。这配置 Spark 持久存储以记录 Spark 事件,再通过编码该信息在 UI 中进行显示。

事后查看

仍然可以通过 Spark 的历史服务器构建应用程序的UI,只要应用程序的事件日志存在。您可以通过执行以下命令启动历史服务器:

  1. ./sbin/start-history-server.sh

默认情况下,会在 http://<server-url>:18080 创建一个 Web 界面,显示未完成、完成以及其他尝试的任务信息。

当使用 file-system 提供程序类(见下面 spark.history.provider)时,基本日志记录目录必须在spark.history.fs.logDirectory配置选项中提供,并且应包含每个代表应用程序的事件日志的子目录。

Spark 任务本身必须配置启用记录事件,并将其记录到相同共享的可写目录下。例如,如果服务器配置了日志目录hdfs://namenode/shared/spark-logs,那么客户端选项将是:

  1. spark.eventLog.enabled true
  2. spark.eventLog.dir hdfs://namenode/shared/spark-logs

history server 可以配置如下:

环境变量

环境变量 含义
SPARK_DAEMON_MEMORY history server 内存分配(默认值:1g)
SPARK_DAEMON_JAVA_OPTS history server JVM 选项(默认值:无)
SPARK_PUBLIC_DNS history server 公共地址。如果没有设置,应用程序历史记录的链接可能会使用服务器的内部地址,导致链接断开(默认值:无)。
SPARK_HISTORY_OPTS spark.history.* history server 配置选项(默认值:无)

Spark配置选项

属性名称 默认 含义
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider 执行应用程序历史后端的类的名称。目前只有一个实现,由 Spark 提供,它查找存储在文件系统中的应用程序日志。
spark.history.fs.logDirectory file:/tmp/spark-events 为了文件系统的历史提供者,包含要加载的应用程序事件日志的目录URL。这可以是 local file:// 路径,HDFS hdfs://namenode/shared/spark-logs 或者是 Hadoop API 支持的替代文件系统。
spark.history.fs.update.interval 10s 文件系统历史的提供者在日志目录中检查新的或更新的日志期间。更短的时间间隔可以更快地检测新的应用程序,而不必更多服务器负载重新读取更新的应用程序。一旦更新完成,完成和未完成的应用程序的列表将反映更改。
spark.history.retainedApplications 50 在缓存中保留 UI 数据的应用程序数量。如果超出此上限,则最早的应用程序将从缓存中删除。如果应用程序不在缓存中,如果从 UI 界面访问它将不得不从磁盘加载。
spark.history.ui.maxApplications Int.MaxValue 在历史记录摘要页面上显示的应用程序数量。应用程序 UI 仍然可以通过直接访问其 URL,即使它们不显示在历史记录摘要页面上。
spark.history.ui.port 18080 history server 的Web界面绑定的端口。
spark.history.kerberos.enabled false 表明 history server 是否应该使用 kerberos 进行登录。如果 history server 正在访问安全的 Hadoop 集群上的 HDFS 文件,则需要这样做。如果这是真的,它使用配置 spark.history.kerberos.principalspark.history.kerberos.keytab
spark.history.kerberos.principal (none) history server 的 Kerberos 主要名称。
spark.history.kerberos.keytab (none) history server 的 kerberos keytab 文件的位置。
spark.history.ui.acls.enable false 指定是否应检查 acls 授权查看应用程序的用户。如果启用,则进行访问控制检查,无论单个应用程序在运行时为 spark.ui.acls.enable 设置了什么。应用程序所有者将始终有权查看自己的应用程序和通过 spark.ui.view.acls 指定的任何用户和通过 spark.ui.view.acls.groups,当应用程序运行时也将有权查看该应用程序。如果禁用,则不进行访问控制检查。
spark.history.ui.admin.acls empty 通过逗号来分隔具有对 history server 中所有 Spark 应用程序的查看访问权限的用户/管理员列表。默认情况下只允许在运行时查看应用程序的用户可以访问相关的应用程序历史记录,配置的用户/管理员也可以具有访问权限。在列表中添加 “*” 表示任何用户都可以拥有管理员的权限。
spark.history.ui.admin.acls.groups empty 通过逗号来分隔具有对 history server 中所有 Spark 应用程序的查看访问权限的组的列表。默认情况下只允许在运行时查看应用程序的组可以访问相关的应用程序历史记录,配置的组也可以具有访问权限。在列表中放置 “*” 表示任何组都可以拥有管理员权限。
spark.history.fs.cleaner.enabled false 指定 History Server 是否应该定期从存储中清除事件日志。
spark.history.fs.cleaner.interval 1d 文件系统 job history 清洁程序多久检查要删除的文件。如果文件比 spark.history.fs.cleaner.maxAge 更旧,那么它们将被删除。
spark.history.fs.cleaner.maxAge 7d 较早的 Job history 文件将在文件系统历史清除程序运行时被删除。
spark.history.fs.numReplayThreads 25% of available cores history server 用于处理事件日志的线程数。

请注意UI中所有的任务,表格可以通过点击它们的标题来排序,便于识别慢速任务,数据偏移等。

注意

  1. history server 显示完成的和未完成的 Spark 作业。如果应用程序在失败后进行多次尝试,将显示失败的尝试,以及任何持续未完成的尝试或最终成功的尝试。

  2. 未完成的程序只会间歇性地更新。更新的时间间隔由更改文件的检查间隔(spark.history.fs.update.interval)定义。在较大的集群上,更新间隔可能设置为较大的值。查看正在运行的应用程序的方式实际上是查看自己的 Web UI。

  3. 没有注册完成就退出的应用程序将被列出为未完成的,即使它们不再运行。如果应用程序崩溃,可能会发生这种情况。

  4. 一个用于表示完成 Spark 作业的一种方法是明确地停止Spark Context(sc.stop()),或者在 Python 中使用 with SparkContext() as sc: 构造处理 Spark 上下文设置并拆除。

REST API

除了在UI中查看指标之外,还可以使用JSON。这为开发人员提供了一种简单的方法来为 Spark 创建新的可视化和监控工具。JSON可用于运行的应用程序和 history server。The endpoints are mounted at /api/v1。例如,对于 history server,它们通常可以在 http://<server-url>:18080/api/v1 访问,对于正在运行的应用程序,在 http://localhost:4040/api/v1

在 API 中,一个应用程序被其应用程序 ID [app-id] 引用。当运行在 YARN 上时,每个应用程序可能会有多次尝试,但是仅针对群集模式下的应用程序进行尝试,而不是客户端模式下的应用程序。YARN 群集模式中的应用程序可以通过它们的 [attempt-id] 来识别。在下面列出的 API 中,当以 YARN集群模式运行时,[app-id] 实际上是 [base-app-id]/[attempt-id],其中 [base-app-id] YARN 应用程序 ID。

Endpoint 含义
/applications 所有应用程序的列表。
?status=[completed|running] 列出所选状态下的应用程序。
?minDate=[date] 列出最早的开始日期/时间。
?maxDate=[date] 列出最新开始日期/时间。
?minEndDate=[date] 列出最早的结束日期/时间。
?maxEndDate=[date] 列出最新结束日期/时间。
?limit=[limit] 限制列出的应用程序数量。
示例:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 给定应用程序的所有 job 的列表。
?status=[running|succeeded|failed|unknown] 列出在特定状态下的 job。
/applications/[app-id]/jobs/[job-id] 给定 job 的详细信息。
/applications/[app-id]/stages 给定应用程序的所有阶段的列表。
?status=[active|complete|pending|failed] 仅列出状态的阶段。
/applications/[app-id]/stages/[stage-id] 给定阶段的所有尝试的列表。
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 给定阶段的尝试详细信息。
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 给定阶段尝试中所有任务的汇总指标。
?quantiles 用给定的分位数总结指标。

Example: ?quantiles=0.01,0.5,0.99 | | /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList | 给定阶段尝试的所有 task 的列表。
?offset=[offset]&length=[len] 列出给定范围内的 task。
?sortBy=[runtime|-runtime] task 排序.
Example: ?offset=10&length=50&sortBy=runtime | | /applications/[app-id]/executors | 给定应用程序的所有活动 executor 的列表。 | | /applications/[app-id]/allexecutors | 给定应用程序的所有(活动和死亡)executor 的列表。 | | /applications/[app-id]/storage/rdd | 给定应用程序的存储 RDD 列表。 | | /applications/[app-id]/storage/rdd/[rdd-id] | 给定 RDD 的存储状态的详细信息。 | | /applications/[base-app-id]/logs | 将给定应用程序的所有尝试的事件日志下载为一个 zip 文件。 | | /applications/[base-app-id]/[attempt-id]/logs | 将特定应用程序尝试的事件日志下载为一个 zip 文件。 | | /applications/[app-id]/streaming/statistics | streaming context 的统计信息 | | /applications/[app-id]/streaming/receivers | 所有 streaming receivers 的列表。 | | /applications/[app-id]/streaming/receivers/[stream-id] | 给定 receiver 的详细信息。 | | /applications/[app-id]/streaming/batches | 所有被保留 batch 的列表。 | | /applications/[app-id]/streaming/batches/[batch-id] | 给定 batch 的详细信息。 | | /applications/[app-id]/streaming/batches/[batch-id]/operations | 给定 batch 的所有输出操作的列表。 | | /applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] | 给定操作和给定 batch 的详细信息。 | | /applications/[app-id]/environment | 给定应用程序环境的详细信息。 |

可检索的 job 和 stage 的数量被 standalone Spark UI 的相同保留机制所约束。"spark.ui.retainedJobs" 定义触发 job 垃圾收集的阈值,以及 spark.ui.retainedStages 限定 stage。请注意,垃圾回收在 play 时进行:可以通过增加这些值并重新启动 history server 来检索更多条目。

API 版本控制策略

这些 endpoint 已被强力版本化,以便更容易开发应用程序。特别是 Spark 保证:

  • endpoint 永远不会从一个版本中删除
  • 任何给定 endpoint 都不会删除个别字段
  • 可以添加新的 endpoint
  • 可以将新字段添加到现有 endpoint
  • 将来可能会在单独的 endpoint 添加新版本的 api(例如:api/v2)。新版本 需要向后兼容。
  • Api 版本可能会被删除,但只有在至少一个与新的 api 版本共存的次要版本之后才可以删除。

请注意,即使在检查正在运行的应用程序的 UI 时,仍然需要 applications/[app-id]部分,尽管只有一个应用程序可用。例如:要查看正在运行的应用程序的作业列表,您可以访问 http://localhost:4040/api/v1/applications/[app-id]/jobs。这是为了在两种模式下保持路径一致。

Metrics

Spark 具有基于Dropwizard Metrics Library的可配置 metrics 系统。这允许用户将 Spark metrics 报告给各种接收器,包括 HTTP,JMX 和 CSV 文件。metrics 系统是通过配置文件进行配置的,Spark 配置文件是 Spark 预计出现在 $SPARK_HOME/conf/metrics.properties上。可以通过spark.metrics.conf 配置属性指定自定义文件位置。默认情况下,用于 driver 或 executor metrics 标准的根命名空间是 spark.app.id 的值。然而,通常用户希望能够跟踪 driver 和 executors 的应用程序的 metrics,这与应用程序 ID(即:spark.app.id)很难相关,因为每次调用应用程序都会发生变化。对于这种用例,可以为使用 spark.metrics.namespace配置属性的 metrics 报告指定自定义命名空间。例如,如果用户希望将度量命名空间设置为应用程序的名称,则可以将spark.metrics.namespace属性设置为像 ${spark.app.name}这样的值。然后,该值会被 Spark 适当扩展,并用作度量系统的根命名空间。非 driver和 executor 的 metrics 标准永远不会以 spark.app.id为前缀,spark.metrics.namespace属性也不会对这些 metrics 有任何这样的影响。

Spark 的 metrics 被分解为与 Spark 组件相对应的不同 instances。在每个实例中,您可以配置一组报告汇总指标。目前支持以下实例:

  • master:Spark standalone 的 master 进程。
  • applications:主机内的一个组件,报告各种应用程序。
  • worker:Spark standalone 的 worker 进程。
  • executor:A Spark executor.
  • driver:Spark driver 进程(创建 SparkContext 的过程)。
  • shuffleService:The Spark shuffle service.

每个实例可以报告为 0 或更多 sinks。Sinks 包含在 org.apache.spark.metrics.sink包中:

  • ConsoleSink:将 metrics 信息记录到控制台。
  • CSVSink:定期将 metrics 数据导出到 CSV 文件。
  • JmxSink:注册在 JMX 控制台中查看的 metrics。
  • MetricsServlet:在现有的 Spark UI 中添加一个 servlet,以将数据作为 JSON 数据提供。
  • GraphiteSink:将 metrics 发送到 Graphite 节点。
  • Slf4jSink:将 metrics 标准作为日志条目发送到 slf4j。

Spark 还支持由于许可限制而不包含在默认构建中的 Ganglia 接收器:

  • GangliaSink:向 Ganglia 节点或 multicast 组发送 metrics。

要安装 GangliaSink,您需要执行 Spark 的自定义构建。请注意,通过嵌入此库,您将包括 LGPL-licensed Spark包中的代码。对于 sbt 用户,在构建之前设置 SPARK_GANGLIA_LGPL环境变量。对于 Maven 用户,启用 -Pspark-ganglia-lgpl 配置文件。除了修改集群的 Spark 构建用户,应用程序还需要链接到 spark-ganglia-lgpl 工件。

metrics 配置文件的语法在示例配置文件 $SPARK_HOME/conf/metrics.properties.template 中定义。

高级工具

可以使用几种外部工具来帮助描述 Spark job 的性能:

  • 集群范围的监控工具,例如 Ganglia可以提供对整体集群利用率和资源瓶颈的洞察。例如,Ganglia 仪表板可以快速显示特定工作负载是否为磁盘绑定,网络绑定或 CPU 绑定。
  • 操作系统分析工具,如 dstatiostatiotop 可以在单个节点上提供细粒度的分析。
  • JVM 实用程序,如 jstack 提供堆栈跟踪,jmap 用于创建堆转储,jstat 用于报告时间序列统计数据和 jconsole 用于可视化地浏览各种 JVM 属性对于那些合适的 JVM 内部使用是有用的。