Chapter 5. Building a Mesos Executor

Now you’ve seen how to build a scheduler for Mesos. However, there are some things that can’t be done with the scheduler API alone. For instance, perhaps you’d like to run several tasks in the same container. Maybe your application has lifecycle messages for the scheduler, reporting on its progress or application-specific statistics. Or perhaps you’d like to layer additional functionality into the executed task. In order to accomplish these things, you need to write a custom Mesos executor.

This is what we’ll learn how to do in this chapter. Initially, we’ll simply provide the functionality of the built-in CommandExecutor, which we learned about at the end of the last chapter. Then, we’ll add support for heartbeats to enable faster failure detection. Finally, we’ll discuss potential designs for progress reporting, improved logging, and running multiple tasks in the same container.

The Executor

We’ve already learned what a scheduler is: it’s the component that interacts with the Mesos masters and the framework’s clients, manages the running tasks, and handles failovers. But what is an executor? An executor has three responsibilities:

  • Executing tasks as requested by the scheduler

  • Keeping the scheduler informed of the status of those tasks

  • Handling other requests from the scheduler

You Probably Don’t Want to Do This

Writing executors is very tedious. Unfortunately, the only way to test whether the communication between your scheduler and executors is working is to start a new instance of your framework on a Mesos cluster. Furthermore, correctly implementing health checks, process management, and concurrency is very tricky. I strongly encourage you to think about how you could leverage the CommandExecutor rather than building your own executor. Nevertheless, there are many problems that can only be solved by writing an executor.

Building a Work Queue’s Worker

Recall that the work queue scheduler needs individual workers to execute the work items. When we discussed it earlier, I suggested using an external queuing system, like Redis or RabbitMQ, that your worker processes could connect with directly, but that’s not the only way to build it. Using a custom executor, you could instead send tasks directly to the executor. If you’re using RabbitMQ or Redis, there’s not much advantage to doing this; however, if you chose to store your state in a database like Postgres, you might not be able to easily scale to thousands of connections from every worker. As a result, you could instead simply have the scheduler send multiple tasks to the same executor.

This has upsides and downsides. On the one hand, it could allow you to avoid adding yet another piece of infrastructure (a queuing layer) to solve your scaling problems. On the other hand, it’s now your responsibility to implement the queuing semantics you want in your scheduler, which is much more challenging than relying on battle-tested queues.

Running Pickled Tasks

What Is a Pickled Task?

Serialized code or configuration data that represents a task is known as a pickled task. For example, sending RPCs requires a way to serialize a task’s parameters so that it can be transmitted across the network. Many languages support sending bytecode, executable source code, or other data. The term “pickle” comes from Python’s serialization library.

Many of us are using the Java Virtual Machine (JVM) or Docker to run our code. Due to their potentially long initialize time, we may want to provide generated code to an executor. For example, it might take 45 seconds to start a JVM with all the classes we need: this is not something we’d like to do thousands of times per second. In order to avoid the 45-second overhead for each task, it’d be nice to reuse the JVMs for tasks on the same host. With a custom executor, it is possible to read raw bytes from each task and deserialize them into executable code. Then, the executor can simply run that code, reusing the whole JVM. For short tasks, this could be the difference between a task taking 10 seconds or 55 seconds: this is one of the tricks that enables Spark’s low latency.

This approach can also be used to partially mitigate the challenge of writing an executor: you need only write the “pickle executor” once. After you’ve written it, you can benefit from the dynamic resizability of the executor (see “Multiple Tasks”), and you won’t need to debug the communications again. Note that this doesn’t require building a custom executor—you could also communicate pickled tasks through an out-of-band mechanism; the only downside is that you’d lose the ability to dynamically resize the container.

Sharing Resources

Imagine writing an application where each task reads a large static dataset and a small user-provided dataset, crunches the numbers, and returns the result. This could be used for running a series of simulations or training a machine learning model. A custom executor could be used to easily share a large static dataset, and still benefit from Mesos’s resource accounting and automatic cleanup when the executor terminates. To do this, you could configure the custom executor to initialize itself by downloading the large dataset into its sandbox and reserving enough memory for it to remain in the Filesystem’s in-memory block cache. Then, the tasks you launch on that executor could memory map that dataset, so that they get in-memory, direct access. This approach would reduce the bandwidth and disk costs of copying that dataset more times than necessary.

Another time that a custom executor could share resources is when building general purpose graphics processing unit (GPGPU) applications. Frequently, you’ll only have one GPGPU in each slave. If you used the CommandExecutor and made the GPGPU a custom resource (see “Configuring Custom Resources”), you’d only be able to run one task at a time on each slave; however, you might want to manage the sharing of that GPGPU in an application-specific way. To do that, you could use the pickled tasks pattern described in the previous section to ensure each task programmatically exposes when it needs to use the GPGPU. Then, the custom executor could run all the tasks within the same container, so that it can broker and scheduler the tasks’ use of the GPGPU.

These are just a few examples of the ways in which you might want to try to share resources between tasks. Of course, there are many other ways in which tasks can share their work, from common subproblems to shared caches. Sometimes, the most efficient way to take advantage of these structures is to run them in the same container, which requires writing an executor.

Better Babysitting

Mesos is engineered to maximize availability: when a scheduler crashes, it can automatically fail over; when a slave is upgraded, executors can automatically reconnect to the new version; when a network link goes down, the slave and executors continue to run with their last known instructions. For some applications, however, this isn’t actually the behavior you want.

At large scale, partial and unexpected failures can reveal subtle bugs that weren’t identified during testing and QA. To reduce the impact of these unknown, unpredictable issues, you can require that every executor checks in with the scheduler periodically, and if some executor misses too many check-ins, it can be presumed LOST by the scheduler.

Suppose that a slave loses connectivity to the scheduler, but not to its databases. An executor running on that slave might be able to continue uninterrupted, even though the scheduler can’t receive messages when its tasks finish. This can cause the system to appear stuck to an administrator.

To solve this, we could add a heartbeating mechanism to the executor: the executor will periodically let the scheduler know it’s still running, so that if an executor misses several heartbeats, the scheduler can presume the executor LOST and restart it on a healthy slave. This is known as babysitting—constant, active supervision that helps to detect failures faster. A custom executor can implement heartbeating functionality with the Mesos messaging APIs, ensuring that the overall system remains healthy.

Mesos-Native Heartbeating

In an upcoming version after 0.25, Mesos will add native support for three types of health checks—HTTP requests, TCP connection checks, and running a command. When these are enabled in the TaskInfo, they’ll send periodic StatusUpdates. You might recognize these as the health checks that Marathon provides (see “Health Checks”). The Mesos project often integrates useful functionality from frameworks back into the core. See MESOS-2533 and MESOS-3567 for updates.

Augmented Logging

The final example of a custom executor’s usefulness is in augmenting the logging facilities of Mesos. Mesos writes each process’s stdout and stderr to local files in the executor sandbox. With a custom executor, the process’s stdout and stderr can be forwarded to a centralized log repository or data storage location (such as HDFS, S3, or Logstash), allowing you to upgrade all CommandExecutor-based tasks to centrally store their logs. However, this could also be accomplished by running a log redirector such as logger, which connects a process’s stdout to syslog, thus avoiding the need to make a custom executor.

Rewriting the CommandExecutor

Now that we’ve seen some examples of when we’d want to write an executor, let’s take a look at how to actually do so. We’re going to start by writing something very simple: an executor that is compatible with the built-in CommandExecutor.

We’ll start by looking at the skeleton of MyExecutor, our CommandExecutor clone. The imports and class declaration are presented in Example 5-1.

Example 5-1. Imports and class declaration of MyExecutor
package com.example;
import org.apache.mesos.*;
import org.apache.mesos.Protos.*;
import java.util.*;
import java.io.File;
import org.json.*; 1
import java.lang.ProcessBuilder.Redirect; 2

public class MyExecutor implements Executor, Runnable { 3
    // We'll discuss the inner bits as we go
}
1

We’ll use JSON to encode the task we want to run. This JSON will be read from the TaskInfo so that we know what to invoke.

2

The ProcessBuilder is the most convenient way to start a process in Java; we’ll use it to launch our application, and we’ll redirect the application’s stdout and stderr to separate log files from our executor.

3

Since there’s no callback API for finding out when a process finishes, we’ll start a thread to wait for the process—that’s why we implement Runnable.

The main method is shown in Example 5-2.

Example 5-2. main of MyExecutor
public static void main(String ... args) throws Exception {
    Executor executor = new MyExecutor();
    ExecutorDriver driver = new MesosExecutorDriver(executor);
    driver.run();
}

Although it’s tempting to try to configure some things via the command line passed in to main, you’ll be thankful that all executor configuration comes from the Mesos APIs—this will simplify development in the long run by ensuring all configuration comes from a single source. When the scheduler has total control over the executors, this simplifies development: global (scheduler) changes and local (executor) changes are now managed from a single codebase, the scheduler’s. This makes developing new features easier, and it simplifies operations, since the administrators need only to modify and reload the scheduler to make framework-wide changes.

As with the scheduler, in the main of our executor we create a corresponding driver to communicate with Mesos. Unfortunately, this driver turns out to be a major annoyance. When the framework calls MesosExecutorDriver.start(), it needs to have been started by a slave. The slave injects an environment variable, MESOS_SLAVE_PID, which is used to allow the executor to connect to the slave that started it and begin participating in the Mesos protocols. As a result, if you want to test your executor, you need to implement a mock ExecutorDriver. If you forget to do this, you’ll see a weird stack trace like this, regardless of what language you’re developing your executor in:

F0605 21:32:01.538770 18480 os.hpp:173] Expecting 'MESOS_SLAVE_PID' in
environment variables
*** Check failure stack trace: ***
    @     0x7fc535912dfd  google::LogMessage::Fail()
    @     0x7fc535914c3d  google::LogMessage::SendToLog()
    @     0x7fc5359129ec  google::LogMessage::Flush()
    @     0x7fc535915539  google::LogMessageFatal::~LogMessageFatal()
    @     0x7fc5352c9ff0  os::getenv()
    @     0x7fc53534139b  mesos::MesosExecutorDriver::start()
    @     0x7fc5359068ee  Java_org_apache_mesos_MesosExecutorDriver_start
    @     0x7fc5390127f8  (unknown)

We’ll block the main thread until our driver completes—when the task is finished, we’ll call driver.stop() elsewhere to ensure the executor exits correctly. Since executors decouple the notion of tasks from the containers and processes that run them, it’s easy to forget that sending a TASK_FINISHED doesn’t terminate the executor. You should always have executors shut themselves down once they’ve finished their work, or have the scheduler explicitly manage their lifetime by killing their canary tasks (see “Canary tasks”).

Just like for schedulers, many of the executor callbacks aren’t important to handle. Example 5-3 lists the callbacks we can ignore for now.

Example 5-3. Ignoring callbacks
    public void frameworkMessage(ExecutorDriver driver, byte[] data) { } 1
    public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, 2
                           FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
        System.out.println("registered executor " + executorInfo);
    }
    public void disconnected(ExecutorDriver driver) { } 3
    public void shutdown(ExecutorDriver driver) { } 4
    public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { } 5
    public void error(ExecutorDriver driver, java.lang.String message) { } 6
1

Framework messages are a simple (but not guaranteed, as we’ll see in the upcoming sidebar) way to communicate between the scheduler and its executors by sending arbitrary data serialized as bytes. Note that framework messages aren’t routed to particular tasks.

2

This is invoked the first time that an executor connects to the slave. The most common thing to do is get data from the ExecutorInfo, since that can carry executor configuration information in its data field, which contains arbitrary bytes.

3

This is invoked when the slave disconnects from the executor, which typically indicates a slave restart. Rarely should an executor need to do anything special here.

4

This callback informs the executor to gracefully shut down. It is called when a slave restart fails to complete within the grace period, or when the executor’s framework completes. The executor will be forcibly killed if shutdown doesn’t complete within 5 seconds (the default, configurable on the slave command line with --executor_shutdown_grace_period).

5

This callback is invoked after a successful slave restart; it contains the new slave’s information.

6

This callback is invoked after a fatal error occurs. When this callback is invoked, the driver is no longer running.

As you can see, the executor can actually be a full participant in the slave checkpointing and recovery system; however, you can ignore this for almost all executors. We’ll make use of some of these callbacks later, but for now, let’s continue by looking at how to actually launch a task.

We’ll start by looking at what data we’ll need to access throughout the executor’s implementation. These are global variables that are defined in the MyExecutor class (see Example 5-1):

Process proc

We’ll need to store a reference to the process we launched so that we can wait for it to complete.

TaskID taskId

We also need to communicate the task’s status to Mesos: the driver API to communicate with our scheduler requires the TaskID as an argument.

ExecutorDriver driver

Of course, we’ll want to be able to use the driver, so we’ll store a reference to it as well.

Well-behaved executors should keep Mesos up-to-date with the status of their tasks. Since sending status updates has some boilerplate, we’ll write a convenience method, shown in Example 5-4.

Example 5-4. Status update helper
private void statusUpdate(TaskState state) {
    TaskStatus status = TaskStatus.newBuilder()
        .setTaskId(taskId)
        .setState(state)
        .build()
    driver.sendStatusUpdate(status);
}

Here, we simply construct a bare-bones task status and send it via the driver. Sometimes, you’ll want to communicate additional information. Like most Mesos protobufs, TaskStatus supports arbitrary data, as well as a human-readable message.

In order to keep this example simple, our executor will only support running a single task, after which it will exit. We’ll enforce this invariant with a helper function, as seen in Example 5-5.

Example 5-5. Ensuring a single task
private boolean ensureOneLaunch(ExecutorDriver driver, TaskID id) { 1
    if (this.taskId != null) { 2
        TaskStatus status = TaskStatus.newBuilder()
            .setTaskId(id) 3
            .setState(TaskState.TASK_ERROR)
            .setMessage("this executor only can run a single task") 4
            .build();
        driver.sendStatusUpdate(status);
        return false;
    } else {
        return true;
    }
}
1

We’ll return true if this is the first task, and false if it should be ignored.

2

The code that launches our task will set the value of this.taskId. Thus, we can assume that if this.taskId was already set, we’ve already launched a task, so this one is invalid.

3

Note that we use the TaskID of the newly requested task, not the running task.

4

We’ll take advantage of the human-readable message in the status update to help the user diagnose why the task launch resulted in an error.

Example 5-6 illustrates how we actually launch a process. We’ll take the configuration JSON object as an input, and we’ll return the newly started process. To help separate our logs, we’ll redirect the task process’s stdout and stderr to different files than the executor’s stdout and stderr. For now, the task JSON will have only a single key: "cmd", which is the command line to run the task. We’ll invoke the task with a shell for simplicity.

Example 5-6. Starting a process
private Process startProcess(JSONObject cfg) throws Exception{
    List<String> cmdArgs = Arrays.asList("bash", "-c", cfg.getString("cmd")); 1
    ProcessBuilder pb = new ProcessBuilder(cmdArgs); 2
    File stdoutFile = new File(System.getenv("MESOS_DIRECTORY"), "child_stdout"); 3
    File stderrFile = new File(System.getenv("MESOS_DIRECTORY"), "child_stderr"); 3
    pb.redirectOutput(Redirect.to(stdoutFile)); 4
    pb.redirectError(Redirect.to(stderrFile)); 4
    return pb.start();
}
1

We build the argument vector for the process we will launch.

2

The ProcessBuilder API is a simple way to launch a process in Java.

3

The path to the executor’s scratch space is placed into the environment variable MESOS_DIRECTORY by the slave. It’s best to write any output into this directory, since it’s automatically garbage collected when the slave needs to reclaim disk space after the executor exits.

4

We redirect the task process’s output.

At this point, we’re finally ready to handle launching a task. To do so, we must implement the launchTask callback of the Executor interface (see Example 5-7).

Example 5-7. Implementing launchTask
public void launchTask(ExecutorDriver driver, TaskInfo task) {
    synchronized (this) { 1
        try {
            if (!ensureOneLaunch(driver, task.getTaskId())) { 2
                return;
            }

            this.taskId = task.getTaskId(); 3
            this.driver = driver; 3

            statusUpdate(TaskState.TASK_STARTING); 4

            byte[] taskData = task.getData().toByteArray(); 5
            JSONObject cfg = new JSONObject(new String(taskData, "UTF-8"));
            this.proc = startProcess(cfg); 3 6

            statusUpdate(TaskState.TASK_RUNNING); 4

            Thread t = new Thread(this); 7
            t.setDaemon(true);
            t.start();
        } catch (Exception e) {
            e.printStackTrace(); 8
        }
    }
}
1

To protect access to all global variables, we put a mutex around all the concurrent code in the executor.1

2

We check that this is the first task we’ve received. If it isn’t, we’ll do nothing and just return.

3

We store a few things to which we have access in this callback into global variables: the TaskID, the ExecutorDriver, and the Process, as explained earlier.

4

As we process the task, we keep Mesos informed of our progress.

5

The details of the task must be extracted and parsed.

6

Finally, we can start the process.

7

We’ll need to watch the process to know when it completes, and whether it completes successfully. We’ll look at the implementation of this completion monitoring thread next.

8

In real production code, you should try to make it easier to diagnose what goes wrong in executor code. This is easiest to do by also transmitting exceptions via StatusUpdate if they’re fatal.

At this point, we nearly know the entirety of how our executor goes about managing the lifecycle of its task. Example 5-8 illustrates how we track the completion of the task.

Example 5-8. Waiting for the process to finish
public void run() {
    int exitCode;
    try {
        exitCode = proc.waitFor(); 1
    } catch (Exception e) {
        exitCode = -99; 2
    }
    synchronized (this) { 3
        if (proc == null) { 4
            return;
        }
        proc = null; 5
        if (exitCode == 0) { 6
            statusUpdate(TaskState.TASK_FINISHED);
        } else { 7
            driver.sendStatusUpdate(TaskStatus.newBuilder()
                    .setTaskId(taskId)
                    .setState(TaskState.TASK_FAILED)
                    .setMessage("Process exited with code " + exitCode)
                    .build());
        }
    }
    driver.stop(); 8
}
1

waitFor() will block until the process terminates, and it returns the process’s exit code.

2

If waitFor() throws an exception for some reason, we’ll assume something went terribly wrong and use a distinctive exit code to signal that.

3

As mentioned before, the mutex on this protects access to the global variables.

4

If we reach this point and proc is null, this is a special signal that means that process was killed by killTask (which we haven’t seen yet). In that case, our work’s already done.

5

By setting proc to null, we set that special signal from above.

6

When the process exits with a successful status, we report that the task finished (remember, FINISHED is a successful status in Mesos).

7

Otherwise, we know that the task failed, and so we send a FAILED status.

8

We stop() the executor driver at this point, so that our main function can return and the executor will gracefully shut down.

We’ve seen how our new executor handles accepting a task, starting a process, and sending the appropriate status updates when that process finishes. All we have left is to handle requests from the scheduler to kill the task early. Example 5-9 shows how to handle a killTask message.

Example 5-9. Implementation of killTask
public void killTask(ExecutorDriver driver, TaskID taskId) {
    synchronized (this) { 1
        if (proc != null 2
                && taskId.equals(this.taskId)) { 3
            proc.destroy(); 4
            statusUpdate(TaskState.TASK_KILLED); 5
            proc = null;
        }
        driver.stop();
    }
}
1

As mentioned before, we must protect all access to global variables with a mutex.

2

If proc is null, then the process has already died, either because it finished or because of an earlier killTask.

3

We should also ensure that the task we’re running is the task we’re supposed to kill. This isn’t strictly necessary for our one-task executor, but it protects us from programming errors in the scheduler, in which we send kill messages to the wrong executor.

4

We kill the process.

5

We inform Mesos that the task was killed as per the user’s request. Note the possible race condition if the thread crashes after calling destroy() but before sending the status update: Mesos might believe the task is still running. This is why we set proc to null afterward—it will ensure that a second kill message from the scheduler will cause the TASK_KILLED message to be resent.

We have now seen the entirety of how to implement a basic executor with functionality similar to the CommandExecutor. Of course, you might be wondering how to actually use this executor from your scheduler. To integrate this executor with our example scheduler from Chapter 4, we’ll simply need to update the makeTask method of the Job object, as shown in Example 5-10.

Example 5-10. Enhanced makeTask for our new executor
public TaskInfo makeTask(SlaveID targetSlave, FrameworkID fid) {
    TaskID id = TaskID.newBuilder()
        .setValue(this.id)
        .build();
    ExecutorID eid = ExecutorID.newBuilder() 1
        .setValue(this.id)
        .build();
    CommandInfo ci = CommandInfo.newBuilder()
        .setValue("java -jar /path/to/custom/executor.jar") 2
        .build();
    ExecutorInfo executor = ExecutorInfo.newBuilder() 3
        .setExecutorId(eid)
        .setFrameworkId(fid)
        .setCommand(ci)
        .build();
    JSONObject cfg = new JSONObject();
    try {
        cfg.put("cmd", this.command);
        return TaskInfo.newBuilder()
            // ... Elided unchanged code ... 4
            .setExecutor(executor) 5
            .setData(ByteString.copyFrom(cfg.toString().getBytes("UTF-8"))) 6
            .build();
    } catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException();
    }
}
1

We need to explicitly assign our executor an ID. The CommandExecutor uses the same ID for the task and executor, so we’ll do that too.

2

You’ll need to build the custom executor’s .jar and distribute it to your slaves. In the next section, we’ll look at some approaches to this.

3

The ExecutorInfo requires several pieces of information, including the FrameworkID. The scheduler was trivially modified to pass it to makeTask.

4

The unchanged configuration to the TaskInfo is not repeated here (refer back to Example 4-3). We did, however, drop the .setCommand(...), since now we’re setting the executor.

5

TaskInfo must have exactly one of the command or executor set, but never both—that’s an error.

6

Finally, we include the JSON configuration of the task.

That’s all there is to it! Now we’ve enhanced our scheduler from Chapter 4 to support our new, custom executor.

Bootstrapping Executor Installation

One challenge in creating your own executor is deploying the binaries everywhere. We’ll look at four solutions to this problem, and weigh their pros and cons:

Use HDFS, S3, or another non-POSIX store

Any Mesos executor can list a set of URIs that will be downloaded and optionally unzipped into the local working directory before the executor is launched. Mesos has built-in support for HTTP, FTP, and HDFS stores to download files (see “Configuring the process’s environment”). As a result, you can easily host your applications on a local HDFS cluster, Amazon S3, or another data store to avoid the need to deploy any new technologies, as the other strategies might require. The challenge is that this data store must be sufficiently scalable to be able to deliver the container to every concurrently launching task. A common reason that a newly developed framework falls over during scalability testing is that it launches hundreds of executors simultaneously; these executors all simultaneously request their binaries from a single server, which kills the server. This is commonly known as the thundering herd problem. Make sure that you sufficiently replicate the server that offers the binaries, or use a system with proven scalability, like S3.

The “next-level” version of this approach is to actually host the binaries in the scheduler process itself, by having the scheduler run an embedded HTTP server. Of course, this will exacerbate the thundering herd problem. The solution is to have the scheduler stagger the launch times of tasks, to ensure that the server is never overwhelmed. This is done by waiting for a random duration before submitting the launch to Mesos.

Use a configuration management system

Many Mesos clusters will be deployed using a configuration management system like Chef, Puppet, or Ansible. If this applies to your cluster, then you can also use that system to push your executor binaries out to every slave in your Mesos cluster. The benefit to this is that you can ensure that the binaries are always immediately available to launch tasks—you don’t need to wait for them to download or run the risk of an untimely network failure that causes your tasks to become LOST. The downside, however, is that deploying and upgrading to a new version of the executor is more difficult. Of course, you’ll want to version every deployed executor (to ensure that multiple versions can be running concurrently, during upgrades); nevertheless, it can be extremely tedious during development to wait for the executor to be pushed out to all slaves.

Use a shared POSIX filesystem

Often, a compute cluster already has a shared POSIX-compatible filesystem. Some popular choices here are NFS and GlusterFS. The shared filesystem approach is similar to the configuration management approach: in both cases, your executor exists at a known path. The benefit of having that path be on a shared filesystem is that you don’t need to wait for the configuration management system to actually push the binaries out to every slave; instead, you can immediately launch the executor as soon as it has been copied onto the shared filesystem. The downside is that it can be difficult to scale the shared filesystem, and if it fails, your entire cluster goes down.

Use Docker

Mesos has first-class support for Docker containers (see “Using Docker”), which are themselves a way to ship around complete binary images. Once you Dockerize your executor, you can start that container on any slave in the cluster. The challenge here is the Docker repository: you must find a Docker repository (be it Docker Hub or a self-hosted one) that can host your Docker containers with sufficient scalability and security. The most common pitfall with this approach is the time to download a Docker container: when a Docker container takes 10 minutes to download, the executor launch might time out. This has the symptom of unpredictable, transient LOST tasks.

Now that we’ve reviewed several options, how do you choose the right one? If you are already heavily using Docker, then I recommend continuing with Docker to host your Mesos executors. Otherwise, if you have a shared POSIX filesystem, that option offers the fastest and easiest development. If you would rather avoid the challenge of setting up a shared POSIX filesystem for this purpose, then serving the binaries from HDFS or S3 still offers rapid deployment, and nearly everyone has this capability. Using a configuration management system does centralize deployment, but it dramatically slows down your ability to redeploy a newer or older version of an executor, and so it is an option of last resort.

Fetcher Cache

In Mesos 0.23, a new feature called the fetcher cache was released. The fetcher cache seeks to reduce the load on S3, HDFS, or other filesystems by caching executor downloads on every slave, so that they’re downloaded only once per slave. Although this doesn’t eliminate the scalability challenges with shared filesystems, it should help a lot. To fully take advantage of the fetcher cache and improve scalability, ensure that your scheduler staggers the launch of executors. That way, there will never be too many simultaneous requests to the shared filesystem, and the fetcher cache will further reduce the load. The fetcher cache guarantees that each artifact will be fetched once per slave; even if multiple executors request an artifact, they’ll all wait for the single download.

To use the fetcher cache, you simply set cache to true in your URI protobuf in the TaskInfo. The fetcher cache is isolated by user, since it simply uses the URI as the key. In the future, there will be a way to bypass or purge the cache, but in the meantime, each URI should be unique for each different version of the resource’s content.

In any case, remember that your scheduler specifies exactly from where and how to fetch and launch the executor. Consequently, each version of a scheduler can choose to launch different forwards- and/or backwards-compatible versions of the executor. By ensuring that your scheduler always launches a compatible executor, you can generally avoid internal version mismatches in your frameworks.

Adding Heartbeats

One of the advantages of the Mesos architecture is that temporary communication disruptions don’t affect the operation of the running executors. We often leverage this functionality when designing for high availability, since each executor can continue to process its work or service client requests in spite of other failures. When building a pool of servers scheduler, for example (see “Pool of Servers Scheduler”), this is exactly the behavior we want. On the other hand, sometimes we want to ensure that every executor is running so that we can react quickly to failures—after all, there’s no way to distinguish a temporary and permanent network blip.2

We’ll ensure that every executor is functioning by requiring them to heartbeat every so often (the period will depend on how quickly we want to react to failures). To heartbeat, we’ll use framework messages, which have several interesting features and caveats:

  • Framework messages are unreliable: there’s no guarantee they’re ever delivered.

  • Framework messages are scalable: they usually use an optimized transmission mechanism directly from the executor to the scheduler, thus avoiding bottlenecking at the master.

  • Framework messages can carry an arbitrary binary payload: you can serialize any data you’d like into them, such as progress updates.

  • Framework messages are between executors and schedulers: it’s up to you to include the task ID in the binary payload if you want to send messages about a particular task on an executor.

First, let’s add heartbeats to our executor, as shown in Example 5-11.

Example 5-11. Heartbeats on the executor
public static void main(String ... args) {
    // ... snip ...
    Timer timer = new Timer(); 1
    timer.schedule(new TimerTask() {
        public void run() {
            driver.sendFrameworkMessage(new byte[1]); 2
        }
    }, 0, 5000); 3
    // ... snip ...
}
1

We create a new Timer when our executor starts up to manage the heartbeats.

2

The task we create will simply send a framework message without any interesting data. In a production framework, you could include useful information here, such as when the message was sent (to track the message delivery delay) or what the completion percentage of each of the executor’s tasks is.

3

We start this task immediately, sending a heartbeat every 5 seconds. To increase scalability, heartbeats are often sent every 30 seconds or every few minutes.

Although it’s easy to send heartbeats on the executor, we also need to augment the scheduler. First, we enhance the Job to automatically fail itself when it doesn’t receive a heartbeat in time, as seen in Example 5-12.

Example 5-12. Automatically self-destructing Job
public class Job {
    private Timer timer = new Timer(); 1
    private TimerTask missedHeartbeatTask; 2
    // ... snip ...

    public void heartbeat() { 3
        if (missedHeartbeatTask != null) { 4
            missedHeartbeatTask.cancel();
        }
        missedHeartbeatTask = new HeartbeatTask();
        timer.schedule(missedHeartbeatTask, 20000); 5
    }

    public static Job fromJSON(JSONObject obj) {
        Job job = new Job();
        // ... snip ...
        job.heartbeat(); 6
        return job;
    }

    public void succeed() {
        // ... snip ...
        missedHeartbeatTask.cancel(); 7
    }

    public void fail() {
        // ... snip ...
        missedHeartbeatTask.cancel();
    }

    private class HeartbeatTask extends TimerTask { 8
        public void run() {
            System.out.println("Heartbeat missed; failing");
            fail(); 9
        }
    }
}
1

Each Job will now have a Timer to manage the expiration of its latest heartbeat.

2

We must store the TimerTask that will cause the current instance of the Job to fail, so that we can cancel it when we receive a heartbeat.

3

The heartbeat() method will get called every time we receive a heartbeat from the executor.

4

Every time we receive a heartbeat, we must cancel the current task, since if that task ran we’d fail the Job.

5

When we receive the heartbeat, we schedule the next failure due to lack of heartbeat in 20 seconds. Since we expect a heartbeat every 5 seconds, this should only run if we miss several heartbeats in a row.

6

When we first create the Job, we heartbeat it once to ensure that it fails if the executor never starts running.

7

When the task finishes, we cancel the pending failure task, since it’s no longer relevant.

8

A HeartbeatTask is a subclass of TimerTask. We only need to implement the run() method, which is only called if the task isn’t canceled before its time is up.

9

If our task does get to run, we just fail() the Job, which will cause the Job to get rescheduled later.

Now that our Job has been enhanced to track heartbeats, all we need to do is pass along framework messages from the scheduler to the proper Job. To do this, we’ll implement the frameworkMessage callback in the scheduler, as shown in Example 5-13.

Example 5-13. Implementing frameworkMessage for heartbeats
public void frameworkMessage(SchedulerDriver driver, ExecutorID executorId,
                             SlaveID slaveId, byte[] data) {
    String id = executorId.getValue(); 1
    synchronized (jobs) {
        for (Job j : jobs) { 2
            if (j.getId().equals(id)) {
                j.heartbeat(); 3
            }
        }
    }
}
1

For the example executor in this chapter (and for the CommandExecutor), we set the executor ID to be equal to the job ID and the task ID.

2

Since we don’t care about the data in the framework message, we simply scan through all the current jobs to find if one of them has the same ID as the executor that just sent the heartbeat.

3

Once we find a match, we call the heartbeat() method on that Job, so that the Example 5-12 code can do its thing.

This is all it takes to add basic heartbeating functionality to your framework. Of course, there are several enhancements that should be made in a production-quality system. Firstly, the heartbeat mechanism shown here creates one timer for every Job’s heartbeat; for a system with hundreds of Jobs, every Job can share the same Timer to ensure scalability since each Timer creates a new thread. Secondly, this heartbeat system doesn’t actually kill the non-heartbeating Jobs; it just treats them as dead. We should always kill the executors that aren’t heartbeating by using the killTask API, or else we could end up with orphaned or zombie executors, which will consume resources indefinitely on the cluster without doing any useful work. Finally, when we’re not sure if an executor was properly killed, we should be careful not to start an executor with a duplicate ID on a slave. We should actually track all the executor IDs we’ve used, and generate a new, unique ID for each executor. With the preceding code, you can encounter undefined behavior when you try to relaunch a Job on the same slave when the earlier executor is still running but believed dead due to lack of heartbeats.

Now, we’ve seen how to build a simple executor by leveraging the task-level and executor-level APIs. We’ve even added an additional functionality, heartbeats, that wasn’t possible with the built-in executor. In the next few sections, we’ll look at other advanced features that we can add to our executors.

Advanced Executor Features

As you’ve probably discovered by now, it’s difficult to write an executor. Unlike most software that we write, the executor is particularly challenging to test, since we need to build it, package it, and ship it across our cluster, at which point we must wait to see whether it correctly interacts with our scheduler.

Before we start looking at additional features we can add to our executor, keep in mind that an executor doesn’t have to make its task a subprocess. In fact, it’s quite common to have tasks run in the same process as the executor—this can lead to greater efficiency and a simpler design. The only reason we split our executor and task into different processes was simply so that we could imitate the behavior of the built-in CommandExecutor in Mesos.

Progress Reporting

In “Adding Heartbeats”, we briefly touched on the idea that, in addition to using periodic framework messages to detect whether an executor is still running and accessible, we could include periodic status updates in those messages. However, we don’t only have to use framework messages for this: the TaskStatus message also has a data field, and it too can be used to send updates about a task. Furthermore, it is possible to send multiple TaskStatus updates for the same status (i.e., TASK_RUNNING, TASK_STARTING, etc.) but with different values in the data field to reliably send a task’s internal status changes. For instance, we may want to have our tasks report that they’re RUNNING with the data value “initializing” when they start to initialize themselves, and then send a new status update that they’re RUNNING with the data value “ready” when they’re ready to begin processing requests. Let’s look at the reasons we might consider to choose whether to communicate progress updates with status messages or framework updates.

Framework messages are highly scalable, because they are transmitted directly from the executors to the scheduler. Since the Mesos master typically doesn’t process or forward framework messages,3 the throughput of these messages is limited by the scheduler’s ability to consume them. Furthermore, since framework messages aren’t guaranteed to be reliable, during periods of high load, Mesos can choose to drop some messages in order ensure that higher-priority messages can proceed. These features make framework messages a great choice when scalability is the primary concern.

Of course, that scalability comes at a price: as mentioned, sometimes framework messages get dropped. Status updates, on the other hand, are guaranteed to be delivered. But this guarantee comes at a steep performance cost: every status update must be checkpointed to disk on the slave before it’s sent, and then a second disk write must take place to mark the update as having been successfully sent. Furthermore, the slaves must keep track of all outstanding status updates, so that if a scheduler is temporarily offline when the status update would have been delivered, the slave can retransmit it later. This checkpointing and tracking, along with the periodic retransmissions of unacknowledged status updates, gives status updates their reliability, making them suitable for notifying the scheduler of important lifecycle events in tasks, at the cost of far greater resource usage and susceptibility to bottlenecks at the master’s and the slave’s disks.

Thus, the main criterion for whether framework messages or status updates are more suitable is whether every update completely subsumes all earlier updates. For example, if the goal was to report on the percent completed of a task, then missing the 49% update is irrelevant once the 50% update has been received (since the 50% update implies that 49% has already passed). For this purpose, framework messages are a good choice. On the other hand, if a task is reporting when various subtasks have been completed, each update is unique and they all should be delivered to the scheduler. For this purpose, status updates are more appropriate.

Adding Remote Logging

One often-encountered problem in building distributed systems is figuring out “What the hell happened?” This is so challenging because logs tend to be scattered across the cluster; this makes searching the logs for an error message much harder, since now we also need to search the cluster for the log. Furthermore, we often need to correlate several logs to identify how the interaction between multiple programs in our cluster resulted in failure. The most common way to mitigate this issue is to send all logs to a central repository, which then assists us in analyzing, searching, and correlating these logs. Some popular technologies for doing this are Splunk, Logstash, and Papertrail.4

Recall how we used ProcessBuilder.redirectOutput() to send the stdout and stderr of our process to a file—we could instead get the InputStreams directly from the resultant process so that we can send the outputs anywhere. Once you’re writing your own executor, you have complete control over where your executor’s logs get sent, and where your tasks’ logs get sent. For many applications, since there’s only one task per executor, we want to send the executor and task logs to the same place. For other applications, however, we can actually send the task and executor logs to different places. This is useful when you want to be able to debug tasks separately from one another, and don’t want to see all of an executor’s tasks’ logs intermingled.

Multiple Tasks

Mesos executors are designed to run multiple tasks. To run a task on an existing executor, you can launch that task the same as normal, except that you’ll use an identical ExecutorInfo as the executor that’s already running on that slave. To help you reuse executors, every offer has a field called executor_ids, which contains all of the ExecutorIDs of the executors currently running on that slave; however, it’s still up to you to store all the ExecutorInfos and include them in your task descriptors. The master will validate the TaskInfo, so if you provide an ExecutorInfo that doesn’t match the executor you want to launch on, the task will fail immediately with a TASK_ERROR.

Resizing Containers

A powerful feature of executors is that their size is always the sum of all the tasks they’re running and the executor’s resources. A custom executor is the only way to easily dynamically resize containers depending on what work they’re performing. Of course, there’s no free lunch: you’ll need to keep track of which slave each executor is running on, and you’ll need to use an offer from the existing executor’s slave when you want to scale its container up.

Executors with expensive state

Typically, reusing executors is done when the executor contains some sort of expensive-to-construct state. For instance, the executors can be maintaining a local cache of a large amount of data that’s stored in a slow external filesystem, like S3. In this case, the executor can be responsible for downloading that data, and then each task can access that data as a local file, which will be much faster than reading it over the network. Of course, there’s always a trade-off: if you run multiple tasks at the same time on that executor, those tasks will run in the same container—you’ll be giving up the strong isolation guarantee between tasks that Mesos usually provides! If you are confident that the tasks won’t accidentally consume more of the container’s resources than they should, this is a workable solution. If, however, you’re like most developers and occasionally write bugs, this is a risky proposition, in that it opens up your framework to unanticipated interference and potential nondeterminism.

Multistage initialization

A less common situation in which multiple tasks are used on the same executor is when the executor has a complex initialization process. Rather than sending status updates as the initialization occurs, as suggested in “Progress Reporting”, the scheduler can treat each step of the initialization as a separate task. This way, the initialization logic can be managed on the scheduler, rather than on the executor. Although you’ll occasionally see this technique used, I recommend against it. It is easier to implement the initialization state machine in the executor itself. This has the additional benefits of removing the need to implement parts of the state machine in the executor and other parts in the scheduler, and it reduces the dependency of the framework on a reliable network during executor initialization.

Canary tasks

There is, however, one situation in which an additional task on the executor is absolutely necessary: when we need to receive prompt notification that an executor that may not be presently running any tasks (but is waiting for new ones) has shut down or crashed. As mentioned in our discussion of Scheduler callbacks (see Example 4-6), the callback you might think would tell you when an executor is lost is never actually called. Therefore, to detect when an executor disappears, we instead run a canary task on it. That way, when an executor shuts down or crashes or its slave deactivates, we’ll receive a TASK_LOST from the canary task, which tells us that the executor is lost. Of course, if you don’t need to promptly detect that the executor has been lost, you could always just assume executors are gone if you don’t see an offer from them within some timeout.

When should we use them?

The capability for executors to launch multiple tasks is a lesser-used feature, for good reason. The APIs can be challenging to use, due to the need to store the ExecutorInfos for reuse. Ultimately, multiple tasks should only be used in the following situations:

  • When it’s necessary to dynamically change the amount of resources used by an executor as its workload varies

  • When the executor’s state is so expensive to construct that it’s acceptable to throw away the Mesos isolation functionality to save on reinitialization costs

  • When the failure of an executor that might not be running any tasks must be detected rapidly

Use this feature carefully, as it can make debugging very difficult.

Summary

Executors are the Mesos abstraction for a framework’s workers. They should be used when a framework requires richer interaction between its scheduler and workers. Building an executor is usually very challenging, because of the need to frequently deploy new builds to the cluster during development. By surmounting these challenges, you can build custom executors to enable frameworks to dynamically scale the containers in which their tasks are running, to efficiently share expensive resources, and to provide an easy API to send messages between the scheduler and workers.

In this chapter, after reviewing some of the reasons one might build an executor, we implemented an executor that was mostly compatible with the built-in Mesos CommandExecutor and which we integrated with the job scheduler from Chapter 4. Then, we added heartbeats, which allowed us to more rapidly detect certain types of partial failures on the cluster, resulting in a better experience for the humans that must wait for their jobs to complete.

Finally, we looked at some advanced uses of custom executors: how to choose between framework messages and status updates, considerations when integrating with centralized distributed logging systems, and how and why to develop executors that support multiple tasks.

Now that we’ve looked across the entire spectrum of building applications on Mesos—utilizing existing frameworks, building application-specific schedulers, creating rich executors—we’ll move on to more advanced topics, such as Mesos internals, Docker integration, and exotic, cutting-edge APIs.

1 Java concurrency is beyond the scope of this book, so you’ll just have to trust me that this works.

2 This is known as the FLP impossibility result.

3 Technically, the Mesos master will sometimes process framework messages. This can lead to a bottleneck in rare cases, but the optimized bypass of the master is almost always used.

4 You can read a summary and comparison of centralized logging solutions at Jason Wilder’s blog.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.81.200