k8s 中文文档 k8s 中文文档
指南
kubernetes.io (opens new window)
指南
kubernetes.io (opens new window)
  • k8s 是什么
  • 互动教程

  • Minikube 介绍

  • 概念

  • Kubectl CLI

  • Kubectl 命令表

  • 安装设置

  • API 使用

  • 集群管理

  • TASKS

spark-jobserver provides a RESTful interface for submitting and managing Apache Spark jobs, jars, and job contexts. This repo contains the complete Spark job server project, including unit tests and deploy scripts. It was originally started at Ooyala, but this is now the main development repo.

Other useful links: Troubleshooting, cluster, YARN client, YARN on EMR, Mesos, JMX tips.

Also see Chinese docs / 中文.

Table of Contentsgenerated with DocToc

Users
Features
Version Information
Getting Started with Spark Job Server
Development mode
WordCountExample walk-through
Package Jar - Send to Server
Ad-hoc Mode - Single, Unrelated Jobs (Transient Context)
Persistent Context Mode - Faster & Required for Related Jobs

Debug mode

Create a Job Server Project
Creating a project from scratch using giter8 template
Creating a project manually assuming that you already have sbt project structure
NEW SparkJob API
NEW SparkJob API with Spark v2.1
Dependency jars
Named Objects
Using Named RDDs
Using Named Objects

HTTPS / SSL Configuration
Server authentication
Client authentication

Access Control
Shiro Authentication
Keycloak Authentication

User Authorization

Deployment
Manual steps
Context per JVM
Configuring Spark Jobserver backend
HA Deployment (beta)
Chef

Architecture
API
Binaries
Contexts
Jobs
Data
Data API Example

Context configuration
Other configuration settings
Job Result Serialization
HTTP Override

Clients
Contribution and Development
Contact
License
TODO

Users


(Please add yourself to this list!)

Spark Job Server is included in Datastax Enterprise!

Ooyala
Netflix
Avenida.com
GumGum
Fuse Elements
Frontline Solvers
Aruba Networks
Zed Worldwide
KNIME
Azavea
Maana
Newsweaver
Instaclustr
SnappyData
Linkfluence
Smartsct
Datadog
Planalytics
Target
Branch
Informatica
Cadenz.ai

Features


"Spark as a Service"* : Simple REST interface (including HTTPS) for all aspects of job, context management
Support for Spark SQL, Hive, Streaming Contexts/jobs and custom job contexts!  See Contexts.
Python, Scala, and Java (see TestJob.java ) support
LDAP Auth support via Apache Shiro integration
Separate JVM per SparkContext for isolation (EXPERIMENTAL)
Supports sub-second low-latency jobs via long-running job contexts
Start and stop job contexts for RDD sharing and low-latency jobs; change resources on restart
Kill running jobs via stop context and delete job
Separate jar uploading step for faster job startup
Asynchronous and synchronous job API.  Synchronous API is great for low latency jobs!
Works with Standalone Spark as well on cluster, Mesos, YARN client and on EMR )
Job and jar info is persisted via a pluggable DAO interface
Named Objects (such as RDDs or DataFrames) to cache and retrieve RDDs or DataFrames by name, improving object sharing and reuse among jobs.
Supports Scala 2.11 and 2.12
Support for supervise mode of Spark (EXPERIMENTAL)
Possible to be deployed in an HA setup of multiple jobservers (beta)

Version Information


Version Spark Version Scala Version
:--- :--- :---
0.8.1 2.2.0 2.11
0.10.2 2.4.4 2.11
0.11.1 2.4.4 2.11, 2.12

For release notes, look in the notes/ directory.

Due to the sunset of Bintray all previous release binaries were deleted. Jobserver had to migrate to JFrog Platform and only recent releases are available there. To use Spark Jobserver in your SBT project please include the following resolver in you build.sbt file:

  1. ``` scala
  2. resolvers += "Artifactory" at "https://sparkjobserver.jfrog.io/artifactory/jobserver/"
  3. ```

Check Creating a project manually assuming that you already have sbt project structure for more information.

If you need non-released jars, please visit Jitpack - they provide non-release jar builds for any Git repo.  :)

Getting Started with Spark Job Server


The easiest way to get started is to try the Docker container which prepackages a Spark distribution with the job server and lets you start and deploy it.

Alternatives:

Build and run Job Server in local development mode within SBT.  NOTE:  This does NOT work for YARN, and in fact is only recommended with spark.master set to local[*].  Please deploy if you want to try with YARN or other real cluster.
Deploy job server to a cluster.  There are two alternatives (see the deployment section ):
server_deploy.sh deploys job server to a directory on a remote host.
server_package.sh deploys job server to a local directory, from which you can deploy the directory, or create a .tar.gz for Mesos or YARN deployment.

EC2 Deploy scripts - follow the instructions in EC2 to spin up a Spark cluster with job server and an example application.
EMR Deploy instruction - follow the instruction in EMR

NOTE: Spark Job Server can optionally run SparkContext s in their own, forked JVM process when the config option spark.jobserver.context-per-jvm is set to true.  This option does not currently work for SBT/local dev mode. See Deployment section for more info.

Development mode


The example walk-through below shows you how to use the job server with an included example job, by running the job server in local development mode in SBT.  This is not an example of usage in production.

You need to have SBT installed.

To set the current version, do something like this:

  1. ``` sh
  2. export VER=`sbt version | tail -1 | cut -f2`

  3. ```

From SBT shell, simply type "reStart".  This uses a default configuration file.  An optional argument is a path to an alternative config file.  You can also specify JVM parameters after "---".  Including all the options looks like this:

  1. ``` sh
  2. job-server-extras/reStart /path/to/my.conf --- -Xmx8g

  3. ```

Note that reStart (SBT Revolver) forks the job server in a separate process.  If you make a code change, simply type reStart again at the SBT shell prompt, it will compile your changes and restart the jobserver.  It enables very fast turnaround cycles.

NOTE2: You cannot do sbt reStart from the OS shell.  SBT will start job server and immediately kill it.

For example jobs see the job-server-tests/ project / folder.

When you use reStart, the log file goes to job-server/job-server-local.log.  There is also an environment variable EXTRA_JAR for adding a jar to the classpath.

WordCountExample walk-through


Package Jar - Send to Server


First, to package the test jar containing the WordCountExample: sbt job-server-tests/package. Then go ahead and start the job server using the instructions above.

Let's upload the jar:

  1. ``` sh
  2. curl -X POST localhost:8090/binaries/test -H "Content-Type: application/java-archive" --data-binary @job-server-tests/target/scala-2.12/job-server-tests_2.12-$VER.jar
  3. OK⏎

  4. ```

Ad-hoc Mode - Single, Unrelated Jobs (Transient Context)


The above jar is uploaded as app test.  Next, let's start an ad-hoc word count job, meaning that the job server will create its own SparkContext, and return a job ID for subsequent querying:

  1. ``` sh
  2. curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample"
  3. {
  4.   "duration": "Job not done yet",
  5.   "classPath": "spark.jobserver.WordCountExample",
  6.   "startTime": "2016-06-19T16:27:12.196+05:30",
  7.   "context": "b7ea0eb5-spark.jobserver.WordCountExample",
  8.   "status": "STARTED",
  9.   "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4"
  10. }⏎

  11. ```

NOTE: If you want to feed in a text file config and POST using curl, you want the --data-binary option, otherwise curl will munge your line separator chars.  Like:

  1. ``` sh
  2. curl --data-binary @my-job-config.json "localhost:8090/jobs?appNam=..."

  3. ```

NOTE2: If you want to send in UTF-8 chars, make sure you pass in a proper header to CURL for the encoding, otherwise it may assume an encoding which is not what you expect.

From this point, you could asynchronously query the status and results:

  1. ``` sh
  2. curl localhost:8090/jobs/5453779a-f004-45fc-a11d-a39dae0f9bf4
  3. {
  4.   "duration": "6.341 secs",
  5.   "classPath": "spark.jobserver.WordCountExample",
  6.   "startTime": "2015-10-16T03:17:03.127Z",
  7.   "context": "b7ea0eb5-spark.jobserver.WordCountExample",
  8.   "result": {
  9.     "a": 2,
  10.     "b": 2,
  11.     "c": 1,
  12.     "see": 1
  13.   },
  14.   "status": "FINISHED",
  15.   "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4"
  16. }⏎

  17. ```

Note that you could append &sync=true when you POST to /jobs to get the results back in one request, but for real clusters and most jobs this may be too slow.

You can also append &timeout=XX to extend the request timeout for sync=true requests.

Persistent Context Mode - Faster & Required for Related Jobs


Another way of running this job is in a pre-created context.  Start a new context:

  1. ``` sh
  2. curl -d "" "localhost:8090/contexts/test-context?num-cpu-cores=4&memory-per-node=512m"
  3. OK⏎

  4. ```

You can verify that the context has been created:

  1. ``` sh
  2. curl localhost:8090/contexts
  3. ["test-context"]⏎

  4. ```

Now let's run the job in the context and get the results back right away:

  1. ``` sh
  2. curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true"
  3. {
  4.   "result": {
  5.     "a": 2,
  6.     "b": 2,
  7.     "c": 1,
  8.     "see": 1
  9.   }
  10. }⏎

  11. ```

Note the addition of context= and sync=true.

Debug mode


Spark job server is started using SBT Revolver (which forks a new JVM), so debugging directly in an IDE is not feasible. To enable debugging, the Spark job server should be started from the SBT shell with the following Java options :

  1. ``` shell
  2. job-server-extras/reStart /absolute/path/to/your/dev.conf --- -Xdebug -Xrunjdwp:transport=dt_socket,address=15000,server=y,suspend=y
  3. ```

The above command starts a remote debugging server on port 15000. The Spark job server is not started until a debugging client (Intellij, Eclipse, telnet, ...) connects to the exposed port.

In your IDE you just have to start a Remote debugging debug job and use the above defined port. Once the client connects to the debugging server the Spark job server is started and you can start adding breakpoints and debugging requests.

Note that you might need to adjust some server parameters to avoid short Spary/Akka/Spark timeouts, in your dev.conf add the following values :

  1. ``` shell
  2. spark {
  3.   jobserver {
  4.     # Dev debug timeouts
  5.     context-creation-timeout = 1000000 s
  6.     yarn-context-creation-timeout = 1000000 s
  7.     default-sync-timeout = 1000000 s
  8.   }

  9.   context-settings {
  10.     # Dev debug timeout
  11.     context-init-timeout = 1000000 s
  12.   }
  13. }
  14. akka.http.server {
  15.       # Debug timeouts
  16.       idle-timeout = infinite
  17.       request-timeout = infinite
  18. }
  19. ```

Additionally, you might have to increase the Akka Timeouts by adding the following query parameter timeout=1000000 in your HTTP requests :

  1. ``` shell
  2. curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true&timeout=100000"
  3. ```

Create a Job Server Project


Creating a project from scratch using giter8 template


There is a giter8 template available at https://github.com/spark-jobserver/spark-jobserver.g8

  1. ``` sh
  2. $ sbt new spark-jobserver/spark-jobserver.g8

  3. ```

Answer the questions to generate a project structure for you. This contains Word Count example spark job using both old API and new one.

  1. ``` sh
  2. $ cd /path/to/project/directory
  3. $ sbt package

  4. ```

Now you could remove example application and start adding your one.

Creating a project manually assuming that you already have sbt project structure


In your build.sbt, add this to use the job server jar:

  1. ``` sh
  2.     resolvers += "Artifactory" at "https://sparkjobserver.jfrog.io/artifactory/jobserver/"

  3.     libraryDependencies += "spark.jobserver" %% "job-server-api" % "0.11.1" % "provided"

  4. ```

If a SQL or Hive job/context is desired, you also want to pull in job-server-extras :

  1. ``` sh
  2. libraryDependencies += "spark.jobserver" %% "job-server-extras" % "0.11.1" % "provided"

  3. ```

For most use cases it's better to have the dependencies be "provided" because you don't want SBT assembly to include the whole job server jar.

To create a job that can be submitted through the job server, the job must implement the SparkJob trait. Your job will look like:

  1. ``` scala
  2. object SampleJob extends SparkJob {
  3.     override def runJob(sc: SparkContext, jobConfig: Config): Any = ???
  4.     override def validate(sc: SparkContext, config: Config): SparkJobValidation = ???
  5. }
  6. ```

runJob contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
validate allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code. validate helps you preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

NEW SparkJob API


Note: As of version 0.7.0, a new SparkJob API that is significantly better than the old SparkJob API will take over.  Existing jobs should continue to compile against the old spark.jobserver.SparkJob API, but this will be deprecated in the future.  Note that jobs before 0.7.0 will need to be recompiled, older jobs may not work with the current SJS example.  The new API looks like this:

  1. ``` scala
  2. object WordCountExampleNewApi extends NewSparkJob {
  3.   type JobData = Seq[String]
  4.   type JobOutput = collection.Map[String, Long]

  5.   def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =
  6.     sc.parallelize(data).countByValue

  7.   def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
  8.     JobData Or Every[ValidationProblem] = {
  9.     Try(config.getString("input.string").split(" ").toSeq)
  10.       .map(words => Good(words))
  11.       .getOrElse(Bad(One(SingleProblem("No input.string param"))))
  12.   }
  13. }
  14. ```

It is much more type safe, separates context configuration, job ID, named objects, and other environment variables into a separate JobEnvironment input, and allows the validation method to return specific data for the runJob method
Last Updated: 2023-09-03 19:17:54