【技术实践】Spark+SQL作业查询API

2019-02-15 人浏览 点击收藏: 分享至:

  Spark SQL作业查询API

  1.需求

  查询当前用户的所有Spark SQL的执行历史情况。

  技术实现:

  第一步.首先查询yarn app中,当前用户的 yarn app(name=spark JDBC Servername=当前用户),得到该apptracking url,就是spak uisparkhistory ui

  第二步.从该uiAPI获取作业:

  2. WEB UI

  YARN app

  运行中的AppTracking UI http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/:

  已结束的app history http://slave5.cluster.local:18081/history/application_1541409376608_0044/jobs/ :

  基本类似。

  3.YARN API

  查看app

  curl http://slave5.cluster.local:8088/ws/v1/cluster/apps?user=hdfs&name="Thrift JDBC/ODBC Server"

  name参数不支持:

  * state [deprecated] - state of the application

  * states - applications matching the given application states, specified as a comma-separated list.

  * finalStatus - the final status of the application - reported by the application itself

  * user - user name

  * queue - queue name

  * limit - total number of app objects to be returned

  * startedTimeBegin - applications with start time beginning with this time, specified in ms since epoch

  * startedTimeEnd - applications with start time ending with this time, specified in ms since epoch

  * finishedTimeBegin - applications with finish time beginning with this time, specified in ms since epoch

  * finishedTimeEnd - applications with finish time ending with this time, specified in ms since epoch

  * applicationTypes - applications matching the given application types, specified as a comma-separated list.

  * applicationTags - applications matching any of the given application tags, specified as a comma-separated list.

  * deSelects - a generic fields which will be skipped in the result.

  而应用的类型也是在代码中写死的:https://github.com/apache/spark/blob/v2.4.0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

  def createApplicationSubmissionContext(

  newApp: YarnClientApplication,

  containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {

  val appContext = newApp.getApplicationSubmissionContext

  appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))

  appContext.setQueue(sparkConf.get(QUEUE_NAME))

  appContext.setAMContainerSpec(containerContext)

  appContext.setApplicationType("SPARK")

  只能查出来之后再按 name="Thrift JDBC/ODBC Server" 过滤。

  查询到的app列表:

  

  

  application_1541409376608_0041

  hdfs

  livy-session-16

  default

  FINISHED

  SUCCEEDED

   100.0

   History

  

  http://slave5.cluster.local:8088/proxy/application_1541409376608_0041/

  

  

  1541409376608

  SPARK

  livy-session-16-rslkzght

   0

  1543111595930

  1543112016116

  420186

  

  http://slave2:8042/node/containerlogs/container_e82_1541409376608_0041_01_000001/hdfs

  

  slave2:8042

  10.221.129.26:0

  slave2:45454

  -1

  -1

  -1

  -1

  -1

  2442054

  1190

  0.0

  0.0

  

  

  memory-mb

  2442054

  

  

  vcores

  1190

  

  

   0

   0

  0

  0

   0

   0

  

  SUCCEEDED

  false

  

  

  

  LIFETIME

  UNLIMITED

  -1

  

  

  4.SPARK API

  从yarn app中取得trackingUrl属性,查询spark sql任务。

  4.1 查看app信息

  历史App job:

  curl http://slave5.cluster.local:18081/api/v1/applications

  [

  {

  "id": "local-1543304247490",

  "name": "Thrift JDBC/ODBC Server",

  "attempts": [

  {

  "startTime": "2018-11-27T07:37:25.240GMT",

  "endTime": "2018-11-27T07:37:30.094GMT",

  "lastUpdated": "2018-11-27T07:37:30.886GMT",

  "duration": 4854,

  "sparkUser": "hdfs",

  "completed": true,

  "appSparkVersion": "2.3.2",

  "startTimeEpoch": 1543304245240,

  "endTimeEpoch": 1543304250094,

  "lastUpdatedEpoch": 1543304250886

  }

  ]

  }

  当前运行的Appjob:

  curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications

  [ {

  "id" : "application_1541409376608_0046",

  "name" : "Thrift JDBC/ODBC Server",

  "attempts" : [ {

  "startTime" : "2018-11-27T07:40:28.712GMT",

  "endTime" : "1969-12-31T23:59:59.999GMT",

  "lastUpdated" : "2018-11-27T07:40:28.712GMT",

  "duration" : 0,

  "sparkUser" : "spark",

  "completed" : false,

  "appSparkVersion" : "2.3.1.3.0.0.0-1634",

  "lastUpdatedEpoch" : 1543304428712,

  "startTimeEpoch" : 1543304428712,

  "endTimeEpoch" : -1

  } ]

  } ]

  注:我们使用了yarn端口的proxy来访问spark uiAPI,可以屏蔽后端spark ui的地址。

  4.2 查看sql job

  job列表:

  curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs

  [ {

  "jobId" : 0,

  "name" : "run at AccessController.java:0",

  "submissionTime" : "2018-11-27T07:41:50.904GMT",

  "completionTime" : "2018-11-27T07:42:31.746GMT",

  "stageIds" : [ 0 ],

  "jobGroup" : "afd3e9b1-8810-4df8-a25c-12f0fd5e6163",

  "status" : "SUCCEEDED",

  "numTasks" : 2,

  "numActiveTasks" : 0,

  "numCompletedTasks" : 2,

  "numSkippedTasks" : 0,

  "numFailedTasks" : 0,

  "numKilledTasks" : 0,

  "numCompletedIndices" : 2,

  "numActiveStages" : 0,

  "numCompletedStages" : 1,

  "numSkippedStages" : 0,

  "numFailedStages" : 0,

  "killedTasksSummary" : { }

  } ]

  某个job

  curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs/0

  {

  "jobId" : 0,

  "name" : "run at AccessController.java:0",

  "submissionTime" : "2018-11-27T07:41:50.904GMT",

  "completionTime" : "2018-11-27T07:42:31.746GMT",

  "stageIds" : [ 0 ],

  "jobGroup" : "afd3e9b1-8810-4df8-a25c-12f0fd5e6163",

  "status" : "SUCCEEDED",

  "numTasks" : 2,

  "numActiveTasks" : 0,

  "numCompletedTasks" : 2,

  "numSkippedTasks" : 0,

  "numFailedTasks" : 0,

  "numKilledTasks" : 0,

  "numCompletedIndices" : 2,

  "numActiveStages" : 0,

  "numCompletedStages" : 1,

  "numSkippedStages" : 0,

  "numFailedStages" : 0,

  "killedTasksSummary" : { }

  }

  job返回的信息中没有执行的sql和时间等信息,需要进一步查询stageIds中的stage数据。一个job可能会保护多个stage,需要汇总多个stage的执行时间:

  curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/stage/0

  [

  {

  "status": "COMPLETE",

  "stageId": 0,

  "attemptId": 0,

  "numTasks": 2,

  "numActiveTasks": 0,

  "numCompleteTasks": 2,

  "numFailedTasks": 0,

  "numKilledTasks": 0,

  "numCompletedIndices": 2,

  "executorRunTime": 23209,

  "executorCpuTime": 3730752861,

  "submissionTime": "2018-11-27T07:41:51.030GMT",

  "firstTaskLaunchedTime": "2018-11-27T07:42:07.414GMT",

  "completionTime": "2018-11-27T07:42:31.741GMT",

  "inputBytes": 8718,

  "inputRecords": 500,

  "outputBytes": 0,

  "outputRecords": 0,

  "shuffleReadBytes": 0,

  "shuffleReadRecords": 0,

  "shuffleWriteBytes": 0,

  "shuffleWriteRecords": 0,

  "memoryBytesSpilled": 0,

  "diskBytesSpilled": 0,

  "name": "run at AccessController.java:0",

  "description": "select * from spokes",

  "details": "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)\njava.security.AccessController.doPrivileged(Native Method)\njavax.security.auth.Subject.doAs(Subject.java:422)\norg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)\norg.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185)\njava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\njava.util.concurrent.FutureTask.run(FutureTask.java:266)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\njava.lang.Thread.run(Thread.java:745)",

  "schedulingPool": "default",

  "rddIds": [

  4,

  3,

  2,

  0,

  1

  ],

  "accumulatorUpdates": [

  ],

  "tasks": {

  "0": {

  "taskId": 0,

  "index": 0,

  "attempt": 0,

  "launchTime": "2018-11-27T07:42:07.414GMT",

  "duration": 23071,

  "executorId": "1",

  "host": "slave3",

  "status": "SUCCESS",

  "taskLocality": "NODE_LOCAL",

  "speculative": false,

  "accumulatorUpdates": [

  ],

  "taskMetrics": {

  "executorDeserializeTime": 4366,

  "executorDeserializeCpuTime": 426890376,

  "executorRunTime": 16589,

  "executorCpuTime": 1993215589,

  "resultSize": 3787,

  "jvmGcTime": 1626,

  "resultSerializationTime": 3,

  "memoryBytesSpilled": 0,

  "diskBytesSpilled": 0,

  "peakExecutionMemory": 0,

  "inputMetrics": {

  "bytesRead": 5812,

  "recordsRead": 251

  },

  "outputMetrics": {

  "bytesWritten": 0,

  "recordsWritten": 0

  },

  "shuffleReadMetrics": {

  "remoteBlocksFetched": 0,

  "localBlocksFetched": 0,

  "fetchWaitTime": 0,

  "remoteBytesRead": 0,

  "remoteBytesReadToDisk": 0,

  "localBytesRead": 0,

  "recordsRead": 0

  },

  "shuffleWriteMetrics": {

  "bytesWritten": 0,

  "writeTime": 0,

  "recordsWritten": 0

  }

  }

  },

  "1": {

  "taskId": 1,

  "index": 1,

  "attempt": 0,

  "launchTime": "2018-11-27T07:42:22.386GMT",

  "duration": 9350,

  "executorId": "2",

  "host": "slave3",

  "status": "SUCCESS",

  "taskLocality": "NODE_LOCAL",

  "speculative": false,

  "accumulatorUpdates": [

  ],

  "taskMetrics": {

  "executorDeserializeTime": 1449,

  "executorDeserializeCpuTime": 369738428,

  "executorRunTime": 6620,

  "executorCpuTime": 1737537272,

  "resultSize": 3770,

  "jvmGcTime": 1133,

  "resultSerializationTime": 1,

  "memoryBytesSpilled": 0,

  "diskBytesSpilled": 0,

  "peakExecutionMemory": 0,

  "inputMetrics": {

  "bytesRead": 2906,

  "recordsRead": 249

  },

  "outputMetrics": {

  "bytesWritten": 0,

  "recordsWritten": 0

  },

  "shuffleReadMetrics": {

  "remoteBlocksFetched": 0,

  "localBlocksFetched": 0,

  "fetchWaitTime": 0,

  "remoteBytesRead": 0,

  "remoteBytesReadToDisk": 0,

  "localBytesRead": 0,

  "recordsRead": 0

  },

  "shuffleWriteMetrics": {

  "bytesWritten": 0,

  "writeTime": 0,

  "recordsWritten": 0

  }

  }

  }

  },

  "executorSummary": {

  "1": {

  "taskTime": 23071,

  "failedTasks": 0,

  "succeededTasks": 1,

  "killedTasks": 0,

  "inputBytes": 5812,

  "inputRecords": 251,

  "outputBytes": 0,

  "outputRecords": 0,

  "shuffleRead": 0,

  "shuffleReadRecords": 0,

  "shuffleWrite": 0,

  "shuffleWriteRecords": 0,

  "memoryBytesSpilled": 0,

  "diskBytesSpilled": 0

  },

  "2": {

  "taskTime": 9350,

  "failedTasks": 0,

  "succeededTasks": 1,

  "killedTasks": 0,

  "inputBytes": 2906,

  "inputRecords": 249,

  "outputBytes": 0,

  "outputRecords": 0,

  "shuffleRead": 0,

  "shuffleReadRecords": 0,

  "shuffleWrite": 0,

  "shuffleWriteRecords": 0,

  "memoryBytesSpilled": 0,

  "diskBytesSpilled": 0

  }

  },

  "killedTasksSummary": {

  }

  }

  ]

作者:贾德星

职务:云服务集团云计算产品中心高级架构师

专业领域:大数据

专家简介:系统软件架构师,具备二十余年一线软件开发的工作经历,经验丰富。主持研发浪潮大数据平台产品云海InsightHD,专注于大数据Hadoop/Spark/流计算/机器学习/深度学习等相关技术组件的研究与应用及组件研发。参与起草信息技术国家标准二十余项,已正式发布12项国家标准。研发并申请9项国家专利获得授权。

 

 

查看全部
相关文章推荐相关文章推荐
技术实践Spark+SQL作业查询API 技术实践Spark使用hive元数据 技术实践】Dockerfile入门 技术实践】ssl双向认证 技术实践】RSA的公钥、私钥 技术实践】istio的优势在哪里 技术实践】什么是微服务架构 技术实践】安装 go 语言环境 技术实践】SpringBoot的重要特性 技术实践】kubernetes 手绘画
热门解决方案热门解决方案
应急救援管理系统解决方案_应急救援综合管理平台_应急救援信息管理指挥系统 粮食仓储行业解决方案_智慧粮库解决方案 食品监管解决方案_智慧监管平台搭建_食品安全监管解决方案 基础安全解决方案_网络安全管理解决方案_云计算基础安全管理 变电站智能集控运维解决方案_变电站智能运维管理系统 SaaS云解决方案_SaaS云平台搭建_SaaS云服务 工业互联网机械制造质量管理与追溯解决方案_质量追溯管理系统 LIMS平台质检服务解决方案_检化验信息管理系统构建_质检管理系统 装备制造解决方案_装备制造行业信息化解决方案 云会计+云进销存解决方案_企业云财务系统搭建
热门产品推荐热门产品推荐
热门标签热门标签