Mainak Ghosh, Le Xu, Indranil Gupta1
Department of Computer Science, University of Illinois at Urbana-Champaign, Urbana, IL, USA
Cloud computing relies on software for distributed batch and stream processing, as well as distributed storage. This chapter focuses on an oft-ignored angle of assuredness: performance assuredness. A significant pain point today is the inability to support reconfiguration operations, such as changing of the shard key in a sharded storage/database system, or scaling up (or down) of the number of virtual machines (VMs) being used in a stream or batch processing system. We discuss new techniques to support such reconfiguration operations in an online manner, whereby the system does not need to be shut down and the user/client-perceived behavior is indistinguishable regardless of whether a reconfiguration is occurring in the background, that is, the performance continues to be assured in spite of ongoing background reconfiguration. Next, we describe how to scale-out and scale-in (increase or decrease) the number of machines/VMs in cloud computing frameworks like distributed stream processing and distributed graph processing systems, again while offering assured performance to the customer in spite of the reconfigurations occurring in the background. The ultimate performance assuredness is the ability to support SLAs/SLOs (service-level agreements/objectives) such as deadlines. We present a new real-time scheduler that supports priorities and hard deadlines for Hadoop jobs. We implemented our reconfiguration systems as patches to several popular and open-source cloud computing systems, including MongoDB and Cassandra (storage), Storm (stream processing), LFGraph (graph processing), and Hadoop (batch processing).
Cloud computing relies on distributed systems for storing (and querying) large amounts of data, for batch computation of these data and for streaming computation of big data, incoming at high velocity. Distributed storage systems include NoSQL databases such as MongoDB [1] and key-value stores such as Cassandra [2], which have proliferated for workloads that need fast and weakly consistent access to data. A large amount of batch computation today relies on Apache Hadoop [3], which uses the MapReduce paradigm. Apache Storm [4] is a prominent stream processing system used by a variety of companies; others include Heron [5] (used by Twitter) and Samza [6] (used by LinkedIn).
Design of the first generation of these systems focused largely on scale and optimizing performance, but these systems have reached a level of maturity that now calls for performance assuredness. In other words, in spite of changes made to a distributed cloud service (such as the ones already listed), the service should run uninterrupted and offer performance (latency or throughput) and availability indistinguishable from those in the common-case operations. In general, the requirement is that clients (front-end machines) and eventually user devices should be unaware of any reconfiguration changes taking place in the background.
Examples of such reconfiguration operations include (i) changing the layout of data in a sharded cloud database, for example, by changing the shard key; (ii) scaling-out or scaling-in the number of machines/VMs involved in a distributed computation or database; and (iii) eventually, supporting predictable service level agreements/objectives (SLAs/SLOs) for jobs. Support for those operations will make the performance of each job predictable, push systems toward supporting multitenant scenarios (in which multiple jobs share the same cluster), and allow developers to specify requirements (in the form of SLAs/SLOs); the scheduler would then be able to manage resources automatically to satisfy these SLAs/SLOs without human involvement.
This chapter is not focused on security challenges. However, even meeting performance assuredness goals is a significant undertaking today, and it entails several challenges. First, the sheer scale of the data being stored or processed, and the nontrivial number of machines/VMs involved, means that decision to migrate data or computation need to be made wisely. Second, the de facto approach used today in industry systems is to shut down services before reconfiguration; this is unacceptable, as it negatively affects availability and latency (in storage/databases), drops throughput to zero (in stream processing systems), and wastes work (in batch processing systems). What is needed is a way to migrate components in the background, without affecting the foreground performance perceived by clients and users. Third, the mechanisms to scale-out and scale-in can be used either manually or in an adaptive fashion, such that they are driven by logic that seeks to satisfy SLAs/SLOs.
In this chapter, we describe multiple new approaches to attacking different pieces of the broad problem already described. We incorporate performance assuredness into cloud storage/database systems, stream processing systems, and batch processing systems. Specifically, we describe the following innovations:
In this chapter, we present the key design techniques and algorithms, outline design and implementation details, and touch on key experimental results. We performed all experiments by deploying both the original system(s) and our modified system on real clusters and subjecting them to real workloads.
Missions that rely on the cloud, by definition, have strong time criticality. This implies two requirements:
The research contributions described in this chapter fulfill both of these requirements. First, our Morphus and Parqua systems make it possible to change configuration parameters (such as the shard key) that affect the layout of data in a database, without affecting query latency, and while minimizing total transfer time. That gives the team flexibility to make such configuration changes at any time. Our Stela system allows the team to scale-out or scale-in, at will, the number of machines or VMs associated with a stream processing job. For instance, consider a stream processing job tracking the camera feed from an aircraft fleet (e.g., in order to identify enemy positions). When the volume of such data increases, our Stela system allows the team to scale-out the number of machines quickly to cope with the increased workload, while still continuing to process the incoming stream at a high rate. Similar flexibility extends to our scale-out/scale-in techniques for distributed graph processing.
Second, our Natjam system allows Hadoop jobs (running in a YARN cluster) to be associated with a priority or a deadline. The Natjam scheduler automatically ensures that higher priority (or earlier deadline) jobs preempt other jobs, run sooner, and thus finish sooner – an ability that is much needed in critical missions that rely on the cloud.
Here, we summarize the state of the art for each of the individual challenges and systems covered in this chapter.
Research in distributed databases has focused on query optimization and load balancing [7], and orthogonally on using group communication for online reconfiguration [8]; however, that work has not solved the core algorithmic problems for efficient reconfiguration. Online schema change was targeted in Google [9], but the resultant availabilities were lower than those provided by Morphus and Parqua. In a parallel work, Elmore et al. [10] looked into the reconfiguration problem for a partitioned main memory database such as H-Store. Data placement in parallel databases has used hash-based and range-based partitioning [11,12], but optimality for reconfiguration has not been a goal.
The problem of live migration has been looked into in the context of databases. Albatross [13], Zephyr [14], and ShuttleDB [15] address live migration in multitenant transactional databases. Albatross and ShuttleDB use iterative operation replay, such as Morphus, while Zephyr routes updates based on current data locations. Data migration in these systems happens between two different sets of servers, while Morphus and Parqua achieve data migration within a replica set. Also, these papers do not propose optimal solutions for any reconfiguration operation. Opportunistic lazy migration, explored in the Relational Cloud [16], entails longer completion times. Tuba [17] addressed the problem of migration in a geo-replicated setting. The authors avoided write throttle by having multiple masters at the same time, which is not supported by MongoDB and Cassandra.
Morphus's techniques naturally bear some similarities to live VM migration. Precopy techniques migrate a VM without stopping the OS, and if this fails, then the OS is stopped [18]. Like precopy, Morphus also replays operations that occurred during the migration. Precopy systems also use write throttling [19], and precopy has been used in database migration [20].
For network flow scheduling, Chowdhury et al. [21] proposed a weighted flow-scheduling approach that allocates multiple TCP connections to each flow to minimize migration time. Our WFS approach improves upon their approach by considering network latencies as well. Morphus's performance will likely improve further if we also consider bandwidth. Hedera [22] also provides a dynamic flow scheduling algorithm for multirooted network topology. While these techniques may improve reconfiguration time, Morphus's approach is end-to-end and is less likely to disrupt normal reads and writes that use the same network links.
Based on its predecessor Aurora [23], Borealis [24] is a stream processing engine that enables modification of queries on the fly. Borealis focuses on load balancing on individual machines and distributes load shedding in a static environment. Borealis also uses ROD (resilient operator distribution) to determine the best operator distribution plan that is closest to an “ideal” feasible set: a maximum set of machines that are underloaded. Borealis does not explicitly support elasticity. Twitter's Heron [5] improves on Storm's congestion-handling mechanism by using back pressure; however, elasticity is not explicitly addressed.
Stormy [25] uses a logical ring and consistent hashing to place new nodes upon a scale-out. Unlike Stela, it does not take congestion into account. StreamCloud [26] builds elasticity into the Borealis Stream Processing Engine [27]. StreamCloud modifies the parallelism level by splitting queries into subqueries and uses rebalancing to adjust resource usage. Stela does not change running topologies, because we believe that would be intrusive to the applications.
SEEP [28] uses an approach to elasticity that focuses mainly on an operator's state management. It proposes mechanisms to back up, restore, and partition operators' states in order to achieve short recovery time. Several papers have focused on elasticity for stateful stream processing systems. Work [29,30] from IBM enabled elasticity for both IBM System S [31–33] and SPADE [34] by increasing the parallelism of processing operators. It applied networking concepts such as congestion control to expand and contract the parallelism of a processing operator by constantly monitoring the throughput of its links. They do not assume that a fixed number of machines is provided (or taken away) by the users. Our system aims at intelligent prioritization of target operators to parallelize more toward (or migrate further from) the user-determined number of machines that join (or are taken away from) the cluster, with a mechanism for optimizing throughput.
Recent work [35] proposes an elasticity model that provides latency guarantees by tuning the task-wise parallelism level in a fixed-size cluster. Meanwhile, another recent effort [36] implemented stream processing system elasticity; however, it focused on both latency (not throughput) and policy (not mechanisms). Nevertheless, Stela's mechanisms can be used as a black box inside this system.
Some of these recent efforts have looked at policies for adaptivity [36], while others [25,26,29,30] focus on the mechanisms for elasticity. These are important building blocks for adaptivity. To the best of our knowledge, Ref. [37] describes the only prior mechanism for elasticity in stream processing systems; in Sections 6.5.1.9–6.5.1.12, we compare it to Stela.
To the best of our knowledge, we are the first to explore elasticity for distributed graph computations. However, elasticity has been explored in many other areas.
AutoScale [38] enables elastic capacity management for data centers. Its goal is to reduce power wastage by maintaining just enough server capacity for the current workload. Zhang et al. [39] proposes to solve the same problem using a Model Predictive Control framework.
The creators of CloudScale [40] and Jiang et al. [41] propose mechanisms for scaling up VM resources based on predicted application needs in an Infrastructure as a Service (IaaS) environment. AGILE [42] proposes mechanisms to scale-out VM resources based on predicted application needs.
Pujol et al. [43] propose a social partitioning and replication middleware to enable efficient storage of social network graphs. The technique enables the storage system to scale-out/scale-in based on need. Albatross [13] enables scaling of multitenant databases by using live migration. TIRAMOLA [44] allows NoSQL storage clusters to scale-out or scale-in by using user-provided policies to apply a Markov decision process on the workload. The creators of Transactional Auto Scaler [45] propose another storage resource scaling mechanism, which uses analytical modeling and machine learning to predict workload behavior.
Starfish [46] performs tuning of Hadoop parameters at job, workflow, and workload levels. Herodotou et al. [47] propose Elastisizer, which has the ability to predict cluster size for Hadoop workloads.
PowerGraph [48] performs vertex assignment using balanced partitioning of edges. The aim is to limit the number of servers spanned by the vertices. Distributed GraphLab [49] involves a two-phase assignment of vertices to servers; the first phase creates partitions that are more numerous than the servers. In the second phase, the servers load their respective partitions based on a balanced partitioning. That makes it possible to load the graph in a distributed manner, and to change the number of servers, without affecting the initial load phase. Stanton et al. [50] discuss partitioning strategies of streaming graph data. However, they do not explore elasticity. Use of partitioning during a scale-out/scale-in operation is not viable, as it would partition the entire graph; instead, we do incremental repartitioning.
Vaquero et al. [51] proposes a vertex migration heuristic between partitions, to maintain balance and reduce communication cost. GPS [52] involves dynamic repartition of vertices by colocating the vertices that exchange larger numbers of messages with each other. While those efforts did not explicitly explore on-demand elasticity, our techniques are orthogonal and can be applied to such systems. More specifically, one can use our vertex repartitioning technique in such systems by treating each partition as the set of vertices currently assigned to that server.
Sharing finite resources among applications is a fundamental issue in operating systems [53]. Not surprisingly, Natjam's eviction policies are analogous to multiprocessor scheduling techniques (e.g., shortest task first) and to eviction policies for caches and paged OSes. However, our results are different, because MapReduce jobs need to have all tasks finished. PACMan [54] looks at eviction policies for caches in MapReduce clusters, and it can be used orthogonally with Natjam.
Amoeba, a system built in parallel with ours, provides instantaneous fairness with elastic queues and uses a checkpointing mechanism [55]. The main differences between Natjam and Amoeba are that (i) Natjam focuses on job and task eviction policies, (ii) Natjam focuses on jobs with hard deadlines, and (iii) our implementation of it works directly with Hadoop 0.23, while Amoeba requires use of the prototype Sailfish system [56]. Further, Sailfish was built on Hadoop 0.20, and Hadoop 0.23 later addressed many relevant bottlenecks, for example, the use of read-ahead seeks and the use of Netty [57] to speed up shuffle. Finally, our eviction policies and scheduling can be implemented orthogonally in Amoeba.
Delay scheduling [58] avoids killing map tasks while achieving data locality. In comparison, Natjam focuses on reduce tasks, as they are longer than maps and release resources more slowly, making our problem more challenging. Global preemption [59] selects tasks to kill across all jobs, which is a suboptimal solution.
A recently started Hadoop JIRA issue [60] also looks at checkpointing and preemption of reduce tasks. Such checkpointing can be used orthogonally with our eviction policies, which comprise the primary contribution of this part of our work. Finally, Piccolo [61] is a data-processing framework that uses checkpointing based on consistent global snapshots; in comparison, Natjam's checkpoints are local.
ARIA [62] and Conductor [63] estimate how a Hadoop job needs to be scaled up to meet its deadline, for example, based on past execution profiles or a constraint satisfaction problem. They do not target clusters with finite resources. Real-time constraint satisfaction problems have been solved analytically [64], and Jockey [65] addressed DAGs of data-parallel jobs; however, eviction policies and Hadoop integration were not fleshed out in that work. Statistics-driven approaches have been used for cluster management [66] and for Hadoop [67]. Much work has also been done in speeding up MapReduce environments by tackling stragglers, for example [68,69], but those efforts do not support job priorities.
Dynamic proportional share scheduling [70] allows applications to bid for resources but is driven by economic metrics rather than priorities or deadlines. The network can prioritize data for time-sensitive jobs [21], and Natjam can be used orthogonally.
Natjam focuses on batch jobs rather than stream processing or interactive queries. Stream processing in the cloud has been looked at intensively, for example, in the work on Hadoop Online [71], Spark [72], Storm [73], Timestream [74], and Infosphere [75]. BlinkDB [76] and MeT [77] optimize interactive queries for SQL and NoSQL systems.
Finally, classical work on real-time systems has proposed a variety of scheduling approaches, including classical EDF and rate monotonic scheduling [78,79], priority-based scheduling of periodic tasks [80], laxity-based approaches [81], and handling of task DAGs [82]. Natjam is different in its focus on MapReduce workloads.
Providing fairness across jobs has been a recent focus in cloud computing engines. Outcomes of such work include Hadoop's Capacity Scheduler [83] and Fair Scheduler [84], which provide fairness by allowing an administrator to configure queue capacities and job priorities. They do not, however, allow resource preemption [85]. Quincy [86] solves an optimization problem to provide fairness in DryadLINQ [87]. Quincy does consider preemption but proposes neither eviction policies nor checkpointing mechanisms. Finally, there has been a recent focus on satisfying SLAs [88] and satisfying real-time QoS [89], but such efforts have not targeted MapReduce clusters.
Recent cluster management systems have targeted SLOs, for example, Omega [90], Cake [91], Azure [92], Centrifuge [93], and Albatross [13]. Mesos [94] uses dominant resource fairness across applications that share a cluster, and Pisces [95] looks at multitenant fairness in key-value stores.
In this section, we describe how to perform reconfiguration operations in both sharded NoSQL databases (e.g., MongoDB) and key-value stores that use hash-based partitioning (e.g., Cassandra). We first discuss motivations in Section 6.4.1, then our Morphus system (integrated into MongoDB) in Section 6.4.2, and finally our Parqua system (integrated into Cassandra) in Section 6.4.3. (We discussed related work in Section 6.3.1.)
Distributed NoSQL storage systems comprise one of the core technologies in today's cloud computing revolution. These systems are attractive because they offer high availability and fast read/write operations for data. They are used in production deployments for content management, archiving, e-commerce, education, finance, gaming, e-mail, and health care. The NoSQL market is expected to earn $14 billion in revenue during 2015–2020 and become a $3.4 billion market by 2020 [96].
In today's NoSQL deployments [1,2,97,98], data-centric2 global reconfiguration operations are quite inefficient. The reason is that executing them relies on ad hoc mechanisms rather than solution of the core underlying algorithmic and system design problems. The most common solution involves first saving a table or the entire database, and then reimporting all the data into the new configuration [99]. This approach leads to a significant period of unavailability. A second option may be to create a new cluster of servers with the new database configuration and then populate it with data from the old cluster [13–15]. This approach does not support concurrent reads and writes during migration, a feature we would like to provide.
Consider an admin who wishes to change the shard key inside a sharded NoSQL store such as MongoDB [99]. The shard key is used to split the database into blocks, where each block stores values for a contiguous range of shard keys. Queries are answered much faster if they include the shard key as a query parameter (because otherwise the query needs to be multicast). For today's systems, it is strongly recommended that the admin choose the shard key at database creation time and not change it afterward. However, that approach is challenging because it is hard to guess how the workload will evolve in the future. In reality, there are many reasons why admins might need to change the shard key, such as changes in the nature of the data being received, evolving business logic, the need to perform operations such as joins with other tables, and the discovery in hindsight that prior design choices were suboptimal. As a result, the reconfiguration problem has been a fervent point of discussion in the community for many years [100,101].
In this work, we present two systems that support automated reconfiguration. Our systems, called Morphus and Parqua, allow reconfiguration changes to happen in an online manner, that is, by concurrently supporting reads and writes on the database table while its data are being reconfigured.
This section is based on Ref. [102], and we encourage the reader to refer to it for further details on design, implementation, and experiments.
Morphus assumes that the NoSQL system features master–slave replication, range-based (as opposed to hash-based) sharding,3 and flexibility in data assignment.4 Several databases satisfy these assumptions, for example, MongoDB [1], RethinkDB [103], and CouchDB [104]. To integrate our Morphus system, we chose MongoDB because of its clean documentation, strong user base, and significant development activity. To simplify discussion, we assume a single data center, but our paper [105] present results for geo-distributed experiments. Finally, we focus on NoSQL rather than ACID databases because the simplified CRUD (Create, Read, Update, Delete) operations allow us to focus on the reconfiguration problem. Addressing of ACID transactions is an exciting avenue that our work opens up.
Morphus solves three major challenges: (i) In order to be fast, data migration across servers must incur the least traffic volume. (ii) Degradation of read and write latencies during reconfiguration must be small compared to operation latencies when there is no reconfiguration. (iii) Data migration traffic must adapt itself to the data center's network topology.
We have chosen to incorporate Morphus into a popular sharded key-value store, MongoDB v2.4 [1]. As noted earlier, our choice of MongoDB was driven not just by its popularity but also by its clean documentation, its strong user base, and the significant development and discussion around it.
A MongoDB deployment consists of three types of servers. The mongod servers store the data chunks themselves, which typically are grouped into disjoint replica sets. Each replica set contains the same number of servers (typically 3), which are exact replicas of each other, with one of the servers marked as a primary (master) and the others acting as secondaries (slaves). The configuration parameters of the database are stored at the config servers. Clients send CRUD queries to a set of front-end servers, also called mongos. The mongos servers also cache some of the configuration information from the config servers, for example, in order to route queries, they cache mappings from each chunk range to a replica set.
A single database table in MongoDB is called a collection. Thus, a single MongoDB deployment consists of several collections.
Morphus allows a reconfiguration operation to be initiated by a system administrator on any collection. Morphus executes the reconfiguration via five sequential phases, as shown in Figure 6.1.
First, in the prepare phase, Morphus prepares for the reconfiguration by creating partitions (with empty new chunks) by using the new shard key (prepare phase). Second, in the isolation phase, Morphus isolates one secondary server from each replica set. Third, in the execution phase, these secondaries exchange data based on the placement plan chosen by the mongos. In the meantime, further operations may have arrived at the primary servers, and they are now replayed at the secondaries in the fourth phase – recovery phase. When the reconfigured secondaries have caught up, they swap places with their primaries in the fifth phase – commit phase.
At that point, the database has been reconfigured and can start serving queries with the new configuration. However, other secondaries in all replica sets need to reconfigure as well. This slave catchup is done in multiple rounds, with the number of rounds equal to the size of the replica set.
We discuss the individual phases in detail in our paper [105].
The end of the commit phase marks the switch to the new shard key. Until this point, all queries with the old shard key were routed to the mapped server and all queries with the new shard key were multicast to all the servers (which is normal MongoDB behavior). After the commit phase, a query with the new shard key is routed to the appropriate server (the new primary). Queries that do not use the new shard key are handled with a multicast, which again is normal MongoDB behavior.
Reads in MongoDB offer per-key sequential consistency. Morphus is designed so that it continues to offer the same consistency model for data undergoing migration.
In a reconfiguration operation, the data present in shards across multiple servers are resharded. The new shards need to be placed at the servers in such a way as to reduce the total network transfer volume during reconfiguration and achieve load balance. This section presents optimal algorithms for this planning problem.
We present two algorithms for placement of the new chunks in the cluster. Our first algorithm is greedy and is optimal in the total network transfer volume. However, it may create bottlenecks by clustering many new chunks at a few servers. Our second algorithm, based on bipartite matching, is optimal in network transfer volume among all those strategies that ensure load balance.
The greedy approach considers each new chunk independently. For each new chunk NCi, the approach evaluates all the N servers. For each server Sj, it calculates the number of data items of chunk NCi that are already present in old chunks at server Sj. The approach then allocates each new chunk NCi to the server Sj that has the maximum value of , that is, . As chunks are considered independently, the algorithm produces the same output irrespective of the order in which chunks are considered by it.
To illustrate the greedy scheme in action, Figure 6.2 provides two examples for the shard key change operation. In each example, the database has three old chunks, OC1–OC3, each of which contains three data items. For each data item, we show the old shard key Ko and the new shard key Kn (both in the range 1–9). The new configuration splits the new key range evenly across the three chunks, shown as NC1–NC3.
In Figure 6.2a, the old chunks are spread evenly across servers S1–S3. The edge weights in the bipartite graph show the number of data items of NCi that are local at Sj, that is, values. Thick lines show the greedy assignment.
However, the greedy approach may produce an unbalanced chunk assignment for skewed bipartite graphs, as in Figure 6.2b. While the greedy approach minimizes network transfer volume, it assigns new chunks NC2 and NC3 to server S1, while leaving server S3 empty.
Load balancing of chunks across servers is important for several reasons. First, it improves read/write latencies for clients by spreading data and queries across more servers. Second, it reduces read/write bottlenecks. Finally, it reduces the tail of the reconfiguration time by preventing the allocation of too many chunks to any one server.
Our second strategy achieves load balance by capping the number of new chunks allocated to each server. With m new chunks, this per-server cap is chunks. We then create a bipartite graph with two sets of vertices, top and bottom. The top set consists of vertices for each of the N servers in the system; the vertices for server are denoted by . The bottom set of vertices consists of the new chunks. All edges between a top vertex and a bottom vertex NCi have an edge cost equal to , that is, the number of data items that will move to server if new chunk NCi is allocated to it.
Assigning new chunks to servers in order to minimize data transfer volume now becomes a bipartite matching problem. Thus, we find the minimum weight matching by using the classical Hungarian algorithm [106]. The complexity of this algorithm is , where chunks. This reduces to O(m3). The greedy strategy becomes a special case of this algorithm with .
Figure 6.2b shows the outcome of the bipartite matching algorithm with dotted lines in the graph. While it incurs the same overall cost as the greedy approach, it provides the benefit of a load-balanced new configuration, wherein each server is allocated exactly one new chunk.
While we focus on the shard key change, this technique can also be used for other reconfigurations, such as changing shard size, or cluster scale-out and scale-in. The bipartite graph would be drawn appropriately (depending on the reconfiguration operation) and the same matching algorithm used. For the purpose of concreteness, the rest of this section of the chapter focuses on shard key change.
Finally, we have used data size (number of key-value pairs) as the main cost metric. Instead, we could use traffic to key-value pairs as the cost metric and derive edge weights in the bipartite graph (Figure 6.2) from these traffic estimates. The Hungarian approach on this new graph would balance out traffic load, while trading off optimality. Further exploration of this variant is beyond our scope here.
Data centers use a wide variety of topologies, the most popular being hierarchical; for example, a typical two-level topology consists of a core switch and multiple rack switches. Others that are commonly used in practice include fat-trees [107], CLOS [108], and butterfly topologies [109].
Our first-cut data migration strategy, discussed in Section 6.4.2.3, was chunk based: It assigned as many sockets (TCP streams) to a new chunk C at its destination server as there were source servers for C, that is, it assigned one TCP stream per server pair. Using multiple TCP streams per server pair has been shown to better utilize the available network bandwidth [21]. Further, the chunk-based approach also results in stragglers in the execution phase. In particular, we observe that 60% of the chunks finish quickly, followed by a 40% cluster of chunks that finish late.
To address these two issues, we propose a weighted fair sharing (WFS) scheme that takes both data transfer size and network latency into account. Consider a pair of servers i and j, where i is sending some data to j during the reconfiguration. Let denote the total amount of data that i needs to transfer to j, and denote the latency in the shortest network path from i to j. Then, we set , the weight for the flow from server i to j, as follows:
In our implementation, the weights determine the number of sockets that we assign to each flow. We assign each destination server j a total number of sockets , where K is the total number of sockets throughout the system. Thereafter, each destination server j assigns each source server i a number of sockets, .
However, may be different from the number of new chunks that j needs to fetch from i. If is larger, we treat each new chunk as a data slice, and iteratively split the largest slice into smaller slices until equals the total number of slices. Similarly, if is smaller, we use iterative merging of the smallest slices. Finally, each slice is assigned a socket for data transfer. Splitting or merging of slices is done only for the purpose of socket assignment and to speed up data transfer; it does not affect the final chunk configuration that was computed in the prepare phase.
Our approach above could have used estimates of available bandwidth instead of latency estimates. We chose the latter because (i) they can be measured with a lower overhead, (ii) they are more predictable over time, and (iii) they are correlated to the effective bandwidth.
We used the data set of Amazon reviews as our default collection [110]. Each data item had 10 fields. We chose product ID as the old shard key and userID as the new shard key; update operations used these two fields and a price field. Our default database size was 1 GB. (We later show scalability with data size.)
The default Morphus cluster used 10 machines, which included one mongos (front end) and three replica sets, each containing a primary and two secondaries. There were three config servers, each of which was colocated on a physical machine with a replica set primary; this is an allowed MongoDB installation. Each physical machine was a d710 Emulab node [111] with a 2.4 GHz processor, four cores, 12 GB RAM, two hard disks of 250 GB and 500 GB, and 64-bit CentOS 5.5, connected to a 100 Mbps LAN switch.
We implemented a custom workload generator that injects YCSB-like workloads via MongoDB's pymongo interface. Our default injection rate was 100 ops/s with 40% reads, 40% updates, and 20% inserts. To model realistic key access patterns, we selected keys for each operation via one of three YCSB-like [112] distributions: (i) Uniform (default), (ii) Zipf, and (iii) Latest. For the Zipf and Latest distributions, we employed a shape parameter α = 1.5. The workload generator ran on a dedicated pc3000 node in Emulab that ran a 3 GHz processor, 2 GB RAM, two 146 GB SCSI disks, and 64-bit Ubuntu 12.04 LTS.
Morphus was implemented in about 4000 lines of C++ code, which is publicly available at http://dprg.cs.uiuc.edu/downloads. Each plotted data point is an average of at least three experimental trials, shown along with standard deviation bars. Section 6.4.2.4 outlined two algorithms for the shard key change reconfiguration: Hungarian and greedy. We implemented both in Morphus, and call them variants Morphus-H and Morphus-G, respectively.
While we present only selected experimental results in this chapter, we refer the reader to Ref. [102] for extensive information on experiments and evaluation of our system.
A key goal of Morphus is to ensure the availability of the database during reconfiguration. To evaluate its success, we generated read and write requests and measured their latency while a reconfiguration was in progress. We used Morphus-G with a chunk-based migration scheme. We ran separate experiments for all the key access distributions and also for a read-only workload.
Table 6.1 lists the percentages of read and write requests that succeeded during reconfiguration. The number of writes that failed is low: For the Uniform and Zipf workloads, fewer than 2% of writes failed. We observe that many of the failed writes occurred during one of the write-throttling periods. Recall from Section 6.4.2.3 that the number of write-throttling periods is the same as the replica set size, with one throttle period at the end of each reconfiguration round. The Latest workload has a slightly higher failure rate, since if an attempt has been made to write a particular key, that increases the likelihood that in the near future there will be another attempt to write or read that same key. Still, the write failure rate of 3.2% and the read failure rate of 2.8% are reasonably low.
Table 6.1 Percentages of reads and writes that succeeded under reconfiguration.
Read | Write | |
Read only | 99.9 | — |
Uniform | 99.9 | 98.5 |
Latest | 97.2 | 96.8 |
Zipf | 99.9 | 98.3 |
Overall, the availability numbers are higher, at 99% to 99.9% for Uniform and Zipf workloads, which is comparable to the numbers for a scenario with no insertions. We conclude that unless there is temporal and spatial (key-wise) correlation between writes and reads (i.e., Latest workloads), the read latency is not affected much by concurrent writes. When there is correlation, Morphus mildly reduces the offered availability.
Going further, we plot in Figure 6.3a the CDF of read latencies for the four settings, and for a situation in which there was no reconfiguration (Uniform workload). Note that the horizontal axis is logarithmic scale. We consider latencies only for successful reads. We observe that the 96th percentile latencies for all workloads are within a range of 2 ms. The median (50th percentile) latency for No Reconfiguration is 1.4 ms, and this median holds for both the Read only (No Write) and Uniform workloads. The medians for the Zipf and Latest workloads are lower, at 0.95 ms. This lowered latency has two causes: caching at the mongod servers for the frequently accessed keys, and, in the case of Latest, the lower percentage of successful reads. In Figure 6.3b, we plot the corresponding CDF for write latencies. The median for writes when there is no reconfiguration (Uniform workload) is similar to that of the other distributions.
We conclude that under reconfiguration, the read and write availability provided by Morphus is high (close to 99%), while latencies of successful writes degrade only mildly compared to those observed when there is no reconfiguration in progress.
First, Figure 6.4a shows the length of the execution phase (when a 500 MB Amazon collection was used) for two hierarchical topologies and five migration strategies. The topologies were (i) homogeneous, in which nine servers were distributed evenly across three racks; and (ii) heterogeneous, in which three racks contained six, two, and one servers, respectively. The switches were Emulab pc3000 nodes and all links were 100 Mbps. The inter-rack and intra-rack latencies were 2 and 1 ms, respectively. The five strategies were (i) fixed sharing, with one socket assigned to each destination node; (ii) a chunk-based approach (see Section 6.4.2.3); (iii) Orchestra [21] with ; (iv) WFS with (see Section 6.4.2.5); and (v) WFS with .
We observed that in the homogeneous clusters, the WFS strategy with was 30% faster than fixed sharing and 20% faster than the chunk-based strategy. Compared to Orchestra, which weights flow only by their data size, WFS with does 9% better, because it takes the network into account as well. Increasing K from 21 to 28 improves completion time in the homogeneous cluster, but causes degradation in the heterogeneous cluster. The reason is that a higher K results in more TCP connections, and at , this begins to cause congestion at the rack switch of six servers.
Second, Figure 6.4b shows that Morphus's network-aware WFS strategy has a shorter tail and finishes earlier. Network awareness lowers the median chunk finish time by around 20% in both the homogeneous and heterogeneous networks.
We conclude that the WFS strategy improves performance compared to existing approaches, and K should be chosen to be as high as possible without leading to congestion.
In this experiment, we increased data and cluster size simultaneously such that the amount of data per replica set was constant. We ran this experiment on Google Cloud [113]. We used n1-standard-4 VMs, each with four virtual CPUs and 15 GB of memory. The disk capacity was 1 GB, and the VMs were running Debian 7. We generated a synthetic data set by randomly dispersing data items among new chunks. Morphus-H was used for reconfiguration with the WFS migration scheme, and K = the number of old chunks.
Figure 6.5 shows a sublinear increase in reconfiguration time as data size and cluster size increased. Note that the x-axis uses a log scale. In the execution phase, all replica sets communicated among themselves to migrate data. As the number of replica sets increased with cluster size, the total number of connections increased, leading to network congestion. Thus, the execution phase took longer.
The amount of data per replica set affects reconfiguration time superlinearly. On the other hand, cluster size has a sublinear impact. In this experiment, the latter dominated, as the amount of data per replica set was constant.
This section is based on Ref. [114], and we refer the reader to that publication for further details on design, implementation, and experiments.
In this section, we describe how to perform reconfigurations in ring-based key-value/NoSQL stores such as Cassandra [2], Riak [115], Dynamo [116], and Voldemort [117].
The techniques described for Morphus in Section 6.4.2 cannot be applied directly, for two reasons. First, ring-based systems place data strictly in a deterministic fashion around the ring (e.g., using consistent hashing), which determines which keys can be placed where. Thus, our optimal placement strategies from Morphus do not apply to ring-based systems. Second, unlike sharded systems (e.g., MongoDB), ring-based systems do not allow isolation of a set of servers for reconfiguration (a fact that Morphus leveraged). In sharded databases, each participating server exclusively owns a range of data (as master or slave). In ring-based stores, however, ranges of keys overlap across multiple servers in a chained manner (because a node and its successors on the ring are replicas), and this makes full isolation impossible.
That motivated us to build a new reconfiguration system oriented toward ring-based key-value/NoSQL stores. Our system, named Parqua,5 enables online and efficient reconfigurations in virtual ring-based key-value/NoSQL systems. Parqua suffers no overhead when the system is not undergoing reconfiguration. During reconfiguration, Parqua minimizes the impact on read and write latency by performing reconfiguration in the background while responding to reads and writes in the foreground. It keeps the availability of data high during the reconfiguration and migrates to the new reconfiguration at an atomic switch point. Parqua is fault-tolerant, and its performance improves as cluster size increases. We have integrated Parqua into Apache Cassandra.
Parqua is applicable to any key-value/NoSQL store that satisfies the following assumptions. First, we assume a distributed key-value store that is fully decentralized, without the notion of a single master node or replica. Second, each node in the cluster must be able to deterministically choose the destination of the entries that are being moved because of the reconfiguration. This is necessary because there is no notion of a master in a fully decentralized distributed key-value store, and for each entry, all replicas should be preserved after the reconfiguration is finished. Third, we require the key-value store to utilize SSTable (Sorted String Table) to ensure that the entries persist permanently. An SSTable is essentially an immutable sorted list of entries stored on disk [98]. Fourth, each write operation accompanies a timestamp or a version number that can be used to resolve a conflict. Finally, we assume that the operations issued are idempotent. Therefore, supported operations are insert, update, and read operations, and nonidempotent operations such as counter-incrementing are not supported.
Parqua runs reconfiguration in four phases. A graphical overview of the Parqua phases is given in Figure 6.6. Next, we discuss these individual phases in detail.
In this phase, the initiator node, in which the reconfiguration command is run, creates a new (and empty) column family (i.e., database table), denoted by Reconfigured CF (column family). It does so using a schema derived from the Original CF, except it uses the desired key as the new primary key. The Reconfigured CF enables reconfiguration to happen in the background while the Original CF continues to serve reads and writes using the old reconfiguration. We also record the timestamp of the last operation before the Reconfigured CF is created so that all operations that arrive while the execute phase is running can be applied later in the recovery phase.
The initiator node notifies all other nodes to start copying data from the Original CF to the Reconfigured CF. Read and write requests from clients continue to be served normally during this phase. At each node, Parqua iterates through all entries for which it is responsible and sends them to the appropriate new destination nodes. The destination node for an entry is determined by (i) hashing the new primary key-value on the hash ring, and (ii) using the replica number associated with the entry. Key-value pairs are transferred between corresponding nodes that have matching replica numbers in the old configuration and the new configuration.
For example, in the execute phase of Figure 6.6, the entry with the old primary key “1” and the new primary key “10” has a replica number of 1 at node A, 2 at B, and 3 at C. In this example, after the primary key is changed, the new position of the entry on the ring is between nodes C and D, where nodes D, E, and F are replica numbers 1, 2, and 3, respectively. Thus, in the execute phase, the said entry in node A is sent to node D, and similarly the entry in B is sent to E, and from C to F.
After the execute phase, the Reconfigured CF has the new configuration, and the entries from the Original CF have been copied to the Reconfigured CF. Now, Parqua atomically swaps both the schema and the SSTables between the Original CF and the Reconfigured CF. The write requests are locked in this phase, while reads still continue to be served. To implement the SSTable swap, we leverage the fact that SSTables are maintained as files on disk, stored in a directory named after the column family. Therefore, we move SSTable files from one directory to another. This does not cause disk I/O, as we update the inodes only when moving files.
At the end of the commit phase, the write lock is released at each node. At this point, all client-facing requests are processed according to the new configuration. In our case, the new primary key is now in effect, and the read requests must use the new primary key.
During this phase, the system catches up with the recent writes that were not transferred to the Reconfigured CF in the execute phase. Read/write requests are processed normally, with the difference that until the recovery is done, the read requests may return stale results.6 At each node, Parqua iterates through the SSTables of the Original CF to recover the entries that were written during the reconfiguration. We limit the amount of disk accesses required for recovery by iterating only the SSTables that were created after the reconfiguration started. The iterated entries are routed to appropriate destinations in the same way as in the execute phase.
Since all writes in Cassandra carry a timestamp [118], Parqua can ensure that the recovery of an entry does not overshadow newer updates, thus guaranteeing the eventual consistency.
We used the Yahoo! Cloud Service Benchmark (YCSB) [112] to generate the data set and used the Uniform, Zipfian, and Latest key access distributions to generate CRUD workloads. Our default database size was 10 GB in all experiments. The operations consisted of 40% reads, 40% updates, and 20% inserts. Again, we present only selected experimental results in this chapter, and refer the reader to Ref. [114] for extensive information on experiments and evaluation of our system.
In this experiment, we measured the availability of our system during reconfiguration, shown in Table 6.2. The slight degradation in availability was due to the rejection of writes in the commit phase. The total duration of the unavailability was only a few seconds, which is orders of magnitude better than the current state of the art.
Table 6.2 Percentages of reads and writes that succeeded during reconfiguration.
Read (%) | Write (%) | |
Read only | 99.17 | — |
Uniform | 99.27 | 99.01 |
Latest | 96.07 | 98.92 |
Zipfian | 99.02 | 98.92 |
The lowest availability was observed for the Latest distribution. The reason is that YCSB does not wait for the database to acknowledge an insert of a key. Because of the temporal nature of the distribution, as keys are further read and updated, the operations fail because the inserts are still in progress.
Figure 6.7a shows the CDF of read latencies under various workloads while reconfiguration is being executed. As a baseline, we also plot the CDF of read latency when no reconfiguration is being run using the Uniform key access distribution. We plot the latencies of successful reads only.
The median (50th percentile) latencies for the read-only workload and the baseline are similar because they both use Uniform distribution. Under reconfiguration, 20% of the reads take longer. With writes in the workload, the observed latencies for the Uniform curve are higher overall.
Compared to other workloads, Latest had the smallest median latency. Because of that workload's temporal nature, recently inserted keys were present in Memtables, which is a data structure maintained in memory. As a result, reads were faster than in other distributions that require disk accesses.
Overall, Parqua affects median read latency minimally across all the distributions. Our observations for write latency are similar. We refer the reader to our technical report for more detail in Ref. [119].
Next, we measured how well Parqua scales with (i) database size, (ii) cluster size, (iii) operation injection rate, and (iv) replication factor. For lack of space, we omit the plots for the last two experiments and refer the reader our technical report for them [119]. To evaluate our system's scalability, we measured the total reconfiguration times along with the time spent in each phase. We did not inject operations for the experiments presented next.
Figure 6.7b depicts the reconfiguration times as the database size is increased up to 30 GB. Since we used a replication factor (number of copies of the same entry across the cluster) of 3 for fault tolerance, 30 GB of data implies 90 GB of total data in the database. In this plot, we observe that the total reconfiguration time scales linearly with database size. The bulk of the reconfiguration time is spent in transferring data in the execute phase.
In Figure 6.7c, we observe that the reconfiguration time decreases as the number of Cassandra peers increases. The decrease occurs because as the number of machines increases, the same amount of data, divided into smaller chunks, gets transferred by a larger number of peers. Again, the execute phase dominated the reconfiguration time.
Next, we describe a transparent way to scale-out and scale-in cloud computing applications. Scaling-out means increasing the number of machines (or VMs) running the application, and scaling-in means reducing the number. First, we tackle distributed stream processing systems in Section 6.5.1, where we describe our Stela system, which supports scale-out/scale-in and is implemented in Apache Storm [4]. Then, in Section 6.5.2, we address distributed graph processing systems, describing how to support scale-out/scale-in; our implementation is in LFGraph.
This section is based on Ref. [120], and we refer the reader to that paper for further details on design, implementation, and experiments.
The volume and velocity of real-time data require frameworks that can process large dynamic streams of data on the fly and serve results with high throughput. To meet this demand, several new stream processing engines have recently been developed that are now widely in use in industry, for example, Storm [4], System S [33], and Spark Streaming [121], among others [25,27,31]. Apache Storm is one of the most popular.
Unfortunately, these new stream processing systems used in industry lack an ability to scale the number of servers seamlessly and efficiently in an on-demand manner. On-demand means that the scaling is performed when the user (or some adaptive program) requests an increase or decrease in the number of servers in the application. Today, Storm supports an on-demand scaling request by simply unassigning all processing operators and then reassigning them in a round-robin fashion to the new set of machines. This approach is not seamless, as it interrupts the ongoing computation for a long time. It is not efficient either, as it results in suboptimal throughput after the scaling is completed (as our experiments showed, as we will discuss later).
Scaling-out and scaling-in are critical tools for customers. For instance, a user might start running a stream processing application with a given number of servers, but if the incoming data rate rises or if there is a need to increase the processing throughput, the user may wish to add a few more servers (scale-out) to the stream processing application. On the other hand, if the application is currently underutilizing servers, then the user may want to remove some servers (scale-in) in order to reduce the dollar cost (e.g., if the servers are VMs in AWS [122]).
On-demand scaling operations should meet two goals: (i) The post-scaling throughput (tuples per second) should be optimized, and (ii) the interruption to the ongoing computation (while the scaling operation is being carried out) should be minimized. We have created a new system, named Stela (STream processing ELAsticity), that meets those two goals. For scale-out, Stela carefully selects which operators (inside the application) are given more resources, and does so with minimal intrusion. Similarly, for scale-in, Stela carefully selects which machine(s) to remove in a way that minimizes the overall detriment to the application's performance.
To select the best operators to give more resources when scaling out, Stela uses a new metric called ETP (effective throughput percentage). The key intuition behind ETP is that it is used to capture those operators (e.g., bolts and spouts in Storm) that both (i) are congested (i.e., are being overburdened with incoming tuples), and (ii) affect throughput the most because they reach a large number of sink operators. For scale-in, we also use an ETP-based approach to decide which machine(s) to remove and where to migrate operator(s).
The ETP metric is both hardware- and application-agnostic. Thus, Stela needs neither hardware profiling (which can be intrusive and inaccurate) nor knowledge of application code.
The design of Stela is generic to any data flow system (see Section 6.5.1.2). For concreteness, we integrated Stela into Apache Storm. We compare Stela against the most closely related elasticity techniques in the literature [37]. We generated experimental results by using microbenchmark Storm applications, as well as production applications from industry (Yahoo! Inc. and IBM [29]). We believe our metric can be applied to other systems as well.
Our main contributions in this are (i) development of the novel metric, ETP, that captures the “importance” of an operator; (ii) to the best of our knowledge, the first description and implementation of on-demand elasticity within Storm; and (iii) the evaluation of our system both on microbenchmark applications and on applications used in production.
We target distributed data stream processing systems that represent each application as a directed acyclic graph (DAG) of operators. An operator is a user-defined logical processing unit that receives one or more streams of tuples, processes each tuple, and outputs one or more streams of tuples. We assume operators are stateless, and that tuple sizes and processing rates follow an ergodic distribution. These assumptions hold true for most Storm topologies used in industry. Operators that have no parents are sources of data injection. They may read from a Web crawler. Operators with no children are sinks. The intermediate operators perform processing of tuples. Each sink outputs data to a GUI or database, and the application throughput is the sum of the throughputs of all sinks in the application. An application may have multiple sources and sinks.
An instance of an operator is an instantiation of the operator's processing logic and is the physical entity that executes the operator's logic. The number of instances is correlated with the operator's parallelism level. For example, in Storm, these instances are called “executors” [4].
In this section, we give an overview of how Stela supports scale-out. When a user requests a scale-out with a given number of new machines, Stela needs to choose the operators to which it will give more resources by increasing their parallelism.
Stela first identifies operators that are congested based on their input and output rates. It identifies all congested operators in the graph by continuously sampling the input rate and processing rate of each operator. When the ratio of input to processing exceeds a threshold CongestionRate, we consider that operator to be congested. The CongestionRate parameter can be tuned as needed and controls the sensitivity of the algorithm. For our Stela experiments, we set CongestionRate to be 1.2.
After an operator is identified as congested, Stela calculates a per-operator metric called the ETP. ETP takes the topology into account: It captures the percentage of total application throughput (across all sinks) on which the operator has direct impact but ignores all down-stream paths in the topology that are already congested; it selects the next operator to increase its parallelism, and iterates this process. To ensure load balance, the total number of such iterations equals the number of new machines added times the average number of instances per machine prescale. We determine the number of instances to allocate a new machine with Ninstances = (total number of instances)/(number of machines); in other words, Ninstances is the average number of instances per machine prior to scale-out. This ensures load balance post-scale-out. The schedule of operators on existing machines is left unchanged.
To estimate the impact of each operator on the application throughput, Stela uses a new metric we developed called ETP. An operator's ETP is defined as the percentage of the final throughput that would be affected if the operator's processing speed were changed.
The ETP of an operator o is computed as
Here, ThroughputEffectiveReachableSinks denotes the sum of the throughputs of all sinks reachable from o by at least one uncongested path, that is, a path consisting only of operators that are not classified as congested. Throughputworkflow denotes the sum of the throughputs of all sink operators of the entire application. The algorithm that calculates ETPs does a depth-first search throughout the application DAG and calculates ETPs via a postorder traversal. ProcessingRateMap stores the processing rates of all operators. Note that if an operator o has multiple parents, then the effect of o's ETP is the same at each of its parents (i.e., it is replicated, not split).
In Figure 6.8, we illustrate the ETP calculation with an example application.7 The processing rate of each operator is shown. The congested operators, that is, operators 1, 3, 4, and 6, are shaded. The total throughput of the workflow is calculated with Throughputworkflow = 5000 tuples/s as the sum of throughputs of sink operators 4, 7, 8, 9, and 10.
Let us calculate the ETP of operator 3. Its reachable sink operators are 7, 8, 9, and 10. Of these, only 7 and 8 are considered to be the “effectively” reachable sink operators, as they are both reachable via an uncongested path. Thus, increasing the speed of operator 3 will improve the throughput of operators 7 and 8. However, operator 6 is not effectively reachable from operator 3, because operator 6 is already congested; thus, increasing operator 3's resources will only increase operator 6's input rate and make operator 6 further congested, without improving its processing rate. Thus, we ignore the subtree of operator 6 when calculating 3's ETP. The ETP of operator 3 is ETP3 = (1000 + 1000)/5000 = 40%.
Similarly, for operator 1, the sink operators 4, 7, 8, 9, and 10 are reachable, but none of them are reachable via an uncongested path. Thus, the ETP of operator 1 is 0. Likewise, we can calculate the ETP of operator 4 as 40% and the ETP of operator 6 as 20%. Therefore, the priority order in which Stela will assign resources to these operators is 3, 4, 6, 1.
During each iteration, Stela calculates the ETP for all congested operators. Stela targets the operator with the highest ETP and increases the parallelism of that operator by assigning a new instance of it at the newly added machine. If multiple machines are being added, then the target machine is chosen in a round-robin manner. Overall, this algorithm runs Ninstances iterations to select Ninstances target operators. (Section 6.5.1.3 showed how to calculate Ninstances.)
In each iteration, Stela constructs a CongestedMap to store all congested operators. If there are no congested operators in the application, Stela chooses a source operator as a target in order to increase the input rate of the entire application. If congested operators do exist, for each one, Stela finds its ETP using the algorithm discussed in Section 6.5.1.4. The result is sorted in ETPMap. Stela chooses the operator that has the highest ETP value from ETPMap as a target for the current iteration. It increases the parallelism of this operator by assigning one additional random instance to it on one of the new machines in a round-robin manner.
For the next iteration, Stela estimates the processing rate of the previously targeted operator o proportionally, that is, if the o previously had an output rate E and k instances, then o's new projected processing rate is . This is a reasonable approach since all machines have the same number of instances and thus proportionality holds. (This approach may not be accurate, but we find that it works in practice.) Then Stela uses the projected processing rate to update the output rate for o, and the input rates for o's children. (The processing rates of o's children, and indeed o's grand-descendants, do not need updates, as their resources remain unchanged.) Stela updates the emit rate of the target operator in the same manner to ensure that the estimated operator submission rate can be applied.
Once it has done that, Stela recalculates the ETPs of all operators again using the same algorithm. We call these new ETPs projected ETPs, or PETPs, because they are based on estimates. The PETPs are used as ETPs for the next iteration. Iterations are performed until all available instance slots at the new machines are filled. Once that procedure is complete, the schedule is committed through starting of the appropriate executors on new instances.
The algorithm involves searching for all reachable sinks for every congested operator; as a result, each iteration of Stela has a running time complexity of O(n2), where n is the number of operators in the workflow. The entire algorithm has a running time complexity of , where m is the number of new instance slots at the new workers.
For scale-in, we assume that the user specifies only the number of machines to be removed and Stela picks the “best” machines from the cluster to remove. (If the user specifies the exact machines to remove, the problem is no longer challenging.) We describe how techniques used for scale-out, particularly the ETP metric, can also be used for scale-in. For scale-in, we will calculate the ETP not merely per operator but instead per machine in the cluster. That is, we first calculate the ETPSum for each machine, as follows:
The scale-in procedure is called iteratively, as many times as the number of machines the user asked Stela to remove. The procedure calculates the ETPSum for every machine in the cluster and puts the machine and its corresponding ETPSum into the ETPMachineMap. The ETPSum for a machine is the sum of all the ETPs of instances of all operators that currently reside on the machine. Thus, for every instance τi, we first find the operator of which τi is an instance (e.g., operatoro), and then find the ETP of that operatoro. Then, we sum all of these ETPs. The ETPSum of a machine is thus an indication of how much the instances executing on that machine contribute to the overall throughput.
The ETPMachineMap is sorted by increasing order of ETPSum values. The machine with the lowest ETPSum will be the target machine to be removed in this round of scale-in. Operator migration to machines with lower ETPSums will have less of an effect on the overall performance, since machines with lower ETPSums contribute less to the overall performance. This approach also helps shorten the amount of downtime the application experiences because of the rescheduling. Operators from the machine that is chosen to be removed are reassigned to the remaining machines in the cluster in a round-robin fashion in increasing order of their ETPSums.
After the schedule is created, Stela commits it by migrating operators from the selected machines, and then releases these machines. The scale-in algorithm involves sorting of ETPSum, which results in a running time complexity of O(nlog(n)).
Stela runs as a custom scheduler in a Java class that implements a predefined IScheduler interface in Storm. A user can specify which scheduler to use in a YAML-formatted configuration file called storm.yaml. Our scheduler runs as part of the Storm Nimbus daemon. The architecture of Stela's implementation in Storm is presented in Figure 6.9. It consists of the following three modules:
When a scale-in or scale-out signal is sent by the user to the ElasticityScheduler, a procedure is invoked that detects newly joined machines based on previous membership. The ElasticityScheduler invokes the Strategy module, which calculates the entire new scheduling; for example, for scale-out, it decides on all the newly created executors that need to be assigned to newly joined machines. The new scheduling is then returned to the ElasticityScheduler, which atomically (at the commit point) changes the current scheduling in the cluster. Computation is thereafter resumed.
When no scaling is occurring, failures are handled the same way as in Storm, that is, Stela inherits Storm's fault tolerance. If a failure occurs during a scaling operation, Stela's scaling will need to be aborted and restarted. If the scaling is already committed, failures are handled as in Storm.
Our evaluation is two-pronged, and includes use of both microbenchmark topologies and real topologies (including two from Yahoo!). We adopted this approach because of the absence of standard benchmark suites (e.g., TPC-H or YCSB) for stream processing systems. Our microbenchmarks include small topologies such as star, linear, and diamond, because we believe that most realistic topologies will be a combination of these. We also use two topologies from Yahoo! Inc., which we call the Page Load topology and Processing topology, as well as a Network Monitoring topology [29]. In addition, we present a comparison among Stela, the Link Load Strategy [37], and Storm's default scheduler (which is state of the art). We present only selected experimental results in this chapter and refer the reader to Ref. [120] for extensive information on the experiments and evaluation of our system.
For our evaluation, we used two types of machines from the Emulab [111] test bed to perform our experiments. Our typical Emulab setup consisted of a number of machines running Ubuntu 12.04 LTS images, connected via a 100 Mpbs VLAN. A type 1 machine had one 3 GHz processor, 2 GB of memory, and 10,000 RPM 146 GB SCSI disks. A type 2 machine had one 2.4 GHz quad-core processor, 12 GB of memory, and 750 GB SATA disks. The settings for all topologies tested are listed in Table 6.3. For each topology, the same scaling operations were applied to all strategies.
Table 6.3 Experiment settings and configurations.
Topology type | Number of tasks per component | Initial number of executors per component | Number of worker processes | Initial cluster size | Cluster size after scaling | Machine type |
Page Load | 8 | 4 | 28 | 7 | 8 | 1 |
Network | 8 | 4 | 32 | 8 | 9 | 2 |
Page Load Scale-in | 15 | 15 | 32 | 8 | 4 | 1 |
We obtained the layout of a topology in use at Yahoo! Inc. We refer to this topology as the Page Load topology (which is not its original name). The layout of the Page Load topology is displayed in Figure 6.10a and the layout of the Network Monitoring topology, which we derived from Ref. [29], is displayed in Figure 6.10b.
We examine the performance of three scale-out strategies: default, Link-based [37], and Stela. The throughput results are shown in Figure 6.11. Recall that link-load-based strategies reduce the network latency of the workflow by colocating communicating tasks on the same machine.
From Figure 6.11, we observe that Stela improves the throughput by 80% after a scale-out of the Page Load topology. In comparison, the Least Link Load strategy barely improves the throughput after a scale-out, because migration of tasks that are not resource-constrained will not significantly improve performance. The default scheduler actually decreases the throughput after the scale-out, since it simply unassigns all executors and reassigns them in a round-robin fashion to all machines, including the new ones. That may cause machines with “heavier” bolts to be overloaded, thus creating newer bottlenecks that damage performance, especially for topologies with a linear structure. In comparison, Stela's postscaling throughput is about 80% better than Storm's for both the Page Load and Network Monitoring topologies, indicating that Stela is able to find the most congested bolts and paths and give them more resources.
In addition to the Page Load and Network Monitoring topologies, we also looked at a published application from IBM [29], and we wrote from scratch a similar Storm topology (shown in Fig. 6.10b). Because we increased the cluster size from 8 to 9, our experimental result (Figure 6.11b) shows that Stela improves the throughput by 21% by choosing to parallelize the congested operator closest to the sink. The Storm default scheduler does not improve postscale throughput, and the Least Link Load strategy decreases system throughput.
We measured interruptions to ongoing computation by measuring the convergence time. A convergence time is the duration of time between the start of a scale-out operation and the stabilization of the overall throughput of the Storm topology. More specifically, the convergence time duration stopping criteria are (i) the throughput oscillates twice above and twice below the average of post-scale-out throughput, and (ii) the oscillation is within a small standard deviation of 5%. Thus, a lower convergence time means that the system is less intrusive during the scale-out operation, and it can resume meaningful work earlier.
Figure 6.12a shows the convergence time for the Yahoo! topology. We observe that Stela is far less intrusive than Storm (with an 88% lower convergence time) when scaling out. The main reason why Stela has a better convergence time than either Storm's default scheduler or the Least Link Load strategy [37] is that Stela (unlike the other two) does not change the current scheduling at existing machines, instead choosing to schedule operators at the new machines only.
In the Network Monitoring topology, Stela experiences longer convergence times than Storm's default scheduler and the Least Link Load strategy, because of re-parallelization during the scale-out operation (Figure 6.12b). On the other hand, Stela provides the benefit of higher post-scale throughput, as shown in Figure 6.11b.
We examined the performance of Stela scale-in by running Yahoo's Page Load topology. The initial cluster size was 8, and Figure 6.13a shows how the throughput changed after the cluster size shrank to four machines. (We initialized the operator allocation so that each machine could be occupied by tasks from fewer than two operators (bolts and spouts).) We compared the performance against that of a round-robin scheduler (the same as Storm's default scheduler), using two alternative groups of randomly selected machines.
We observe that Stela preserved throughput after scale-in, while the two Storm groups experienced 80% and 40% decreases in throughput, respectively. Thus, Stela's post-scale-in throughput is 25× higher than that obtained when the machines to remove are chosen randomly. Stela also achieved 87.5% and 75% less downtime (time during which the throughput is zero) than group 1 and group 2, respectively (see Fig. 6.13b). The main reason is that Stela's migration of operators with low ETPs will intrude less on the application, which will allow downstream congested components to digest tuples in their queues and continue producing output. In the Page Load topology, the two machines with the lowest ETPs were chosen for redistribution by Stela, and that resulted in less intrusion for the application and, thus, significantly better performance than Storm's default scheduler.
Therefore, Stela is intelligent at picking the best machines to remove (via ETPSum). In comparison, Storm has to be lucky. In the above scenario, two out of the eight machines were the “best.” The probability that Storm would have been lucky enough to pick both (when it picks 4 at random) = , which is low.
This section is based on Ref. [123], and we refer the reader to that publication for further details on design, implementation, and experiments.
Large graphs are increasingly common; examples include online social networks such as Twitter and Facebook, Web graphs, Internet graphs, biological networks, and many others. Processing and storing these graphs in a single machine is not feasible. Google's Pregel [124] and GraphLab [125] were the first attempts at processing these graphs in a distributed way. Subsequently, the research community has developed more efficient engines that adopt the vertex-centric approach for graph processing, such as LFGraph [126], PowerGraph [48], and GPS [52].
Today's graph processing frameworks operate on statically allocated resources; the user must decide on resource requirements before an application starts. However, partway through computation, the user may wish to scale-out (e.g., to speed up computation) or scale-in (e.g., to reduce hourly costs). The capability to scale-out/scale-in when required by the user is called on-demand elasticity. Alternatively, an adaptive policy may request scale-out or scale-in.8 Such a concept has been explored for data centers [38,39], cloud systems [40–42], storage systems [13,43–45,127], and data processing frameworks such as Hadoop [46,47] and Storm [37]. However, on-demand elasticity remains relatively unexplored in batch-distributed graph processing systems.
Partitioning techniques have been proposed to optimize computation and communication [48], but they partition the entire graph across servers and are thus applicable only at the start of the graph computation. On-demand elasticity requires an incremental approach to (re-)partitioning vertices on demand. Solving the problem of on-demand elasticity is also the first step toward adaptive elasticity (e.g., satisfying an SLA in a graph computation), for which our techniques may be employed as black boxes.
A distributed graph processing system that supports on-demand scale-out/scale-in must overcome three challenges:
Our approach to solving the problem of on-demand elasticity and overcoming the above challenges is motivated by two critical questions:
To answer the first question, we created and analyzed two techniques. The first, called contiguous vertex repartitioning (CVR), achieves load balance across servers. However, it may result in high overhead during the scale-out/scale-in operation. Thus, we developed a second technique, called ring-based vertex repartitioning (RVR), that relies on ring-based hashing to lower the overhead. To address the second question, of when to migrate, we integrated our techniques into the LFGraph graph processing system [126], and used our implementation to carefully decide when to begin and end background migration, and when to migrate static versus dynamic data. We also use our implementation to explore system optimizations that make migration more efficient.
We performed experiments with multiple graph benchmark applications on a real Twitter graph with 41.65 million vertices and 1.47 billion edges. Our results indicate that our techniques are within 9% of an optimal mechanism for scale-out operations and within 21% for scale-in operations.
In this section, we address the question of which vertices to migrate when the user requests a scale-out/scale-in operation.
Our first technique assumes that the hashed vertex space is divided into as many partitions as there are servers, and each server is assigned one partition. Partitions are equisized in order to accomplish load balancing. The top of Figure 6.14a shows an example graph containing 100 vertices, split across 4 servers. The vertex sequence (i.e., Vi) is random but consistent due to our use of consistent hashing and is split into four equisized partitions, which are then assigned to servers S1–S4 sequentially.
Upon a scale-out/scale-in operation, the key problem we need to solve is, how do we assign the (new) equisized partitions to servers (one partition per server), such that network traffic volume is minimized? For instance, the bottom of Figure 6.14 shows the problem when scaling out from four to five servers. To solve this problem, we now (i) show how to reduce the problem to one of graph matching, and (ii) propose an efficient heuristic.
When we scale-out/scale-in, we repartition the vertex sequence into equisized partitions. Assigning these new partitions in an arbitrary fashion to servers may be suboptimal and involve transfer of large amounts of vertex data across the network. For instance, in the bottom of Figure 6.14a, we scale-out by adding one server, resulting in five new partitions. Merely adding the new server to the end of the server sequence and assigning partitions to servers in that order results in movement of 50 total vertices. On the other hand, Figure 6.14b shows the optimal solution for this example, wherein adding the new server in the middle of the partition sequence results in movement of only 30 vertices.
To achieve the optimal solution, we consider the scale-out problem formally. (The solution for scale-in is analogous and excluded for brevity.) Let the cluster initially have N servers S1,…, SN. With a graph of V vertices, the initial size of each partition is , where 1 ≤ i ≤ N. Each jth vertex ID is hashed, and then the resulting value is used to assign the vertex to partition where . If we add k servers to this cluster, the size of each new partition becomes . We label these new partitions , 1 ≤ i ≤ N + k, and assign each jth vertex, as usual, to a new partition by first hashing the vertex ID and using the resulting hash to partition where .
Next, we create a bipartite graph B, which contains (i) a left set of vertices, with one vertex per new partition , and (ii) a right set of vertices, with one vertex per server . The left and right sets each contain (N + k) vertices. The result is a complete bipartite graph, with the edge joining a partition and a server associated with a cost. The cost is equal to the number of vertices that must be transferred over the network if partition is assigned to server after scale-out. In other words, the cost is equal to .
The problem of minimizing network transfer volume now reduces to that of finding a minimum-cost perfect matching in B. This is a well-studied problem, and an optimal solution can be obtained by using the Hungarian algorithm [128]. However, the Hungarian algorithm has O(N3) complexity [129], which may be prohibitive for large clusters.
As a result, we propose a greedy algorithm that iterates sequentially through S1,…, SN, in that order. 9 For each server , the algorithm considers the new partitions with which it has a nonzero overlap; because of the contiguity of partitions, there are only O(1) such partitions. Among these partitions, the one with the largest number of overlapping vertices with is assigned to server . Because of the linear order of traversal, when is considered, is guaranteed to have at least one (overlapping) candidate position. This makes the greedy algorithm run efficiently in O(N). For example, in Figure 6.14b, to determine the new partition for , we need to consider only two partitions, and ; next, we need to consider partitions and , and so on.
In this technique, we assume an underlying hash-based partitioning that leverages Chord-style consistent hashing [127]. To maintain load balance, servers are not hashed directly to the ring; instead (as in Cassandra [2] and Riak [115]), we assume that each server is assigned an equisized segment of the ring. Specifically, a server with ID ni is responsible for vertices hashed in the interval , where is ni's predecessor.
Under that assumption, performing a scale-out/scale-in operation is straightforward: a joining server splits a segment with its successor, while a leaving server gives up its segment to its successor. For instance, in a scale-out operation involving one server, the affected server receives its set of vertices from its successor in the ring, that is, a server ni takes the set of vertices from its successor . Scale-in operations occur symmetrically: a leaving server ni migrates its vertex set to its successor , which is then responsible for the set of vertices in .
More generally, we can state that a scale-out/scale-in operation that involves simultaneous addition or removal of k servers affects at most k existing servers. If some of the joining or leaving servers have segments that are adjacent, the number of servers affected would be smaller than k.
While the technique is minimally invasive to existing servers and the ongoing graph computation, it may result in load imbalance. We can mitigate load imbalance for the scale-out case by choosing intelligently the point on the ring to which the new server(s) should be added. For the scale-in case, we can intelligently decide which server(s) to remove.
Consider a cluster with N servers, each with vertices. If we use CVR to add m × N servers or remove servers (for m ≥ 1), then the optimal position of servers to be added or removed is same as their position with RVR.
Given the knowledge of which vertices must be migrated and to where, we must now decide when to migrate them in a way that minimizes interference with normal execution. Two types of data need to be migrated between servers: (i) static data, including sets of vertex IDs, neighboring vertex IDs, and edge values to neighbors, and (ii) dynamic data, including the latest values of vertices and latest values of neighbors. Static data correspond to graph partitions, while dynamic data represent computation state. Once this migration is complete, the cluster can switch to the new partition assignment.
LFGraph uses a publish–subscribe mechanism. Before the iterations start, each server subscribes to in-neighbors of the vertices hosted by the server. Based on these subscriptions, each server builds a publish list for every other server in the cluster. After each iteration, servers send updated values of the vertices present in the publish lists to the respective servers. After a scale-out/scale-in operation, we perform the publish–subscribe phase again to update the publish lists of servers.
A first-cut approach is to perform migration of both static and dynamic data during the next available barrier synchronization interval. However, when we implemented this approach, we found that it added significant overheads by prolonging that iteration. As a result, we introduced two further optimizations as follows:
This technique is based on the observation that static data can be migrated in the background while computation is going on. Recall that static data consists of vertex IDs, their neighboring vertex IDs, and edge values to neighbors. Only dynamic data (vertex values and neighboring vertex values) need to wait to be migrated during a barrier synchronization interval (i.e., after such data are last updated). This reduces the overhead on that iteration.
LFGraph has two barrier synchronization intervals. One interval is between the gather and scatter phases, and the other is after the scatter phase. That gives us two options for the transfer of dynamic data. We choose to perform dynamic data transfer and cluster reconfiguration in the barrier synchronization interval between the gather and scatter phases. This enables us to leverage the default scatter phase to migrate neighboring vertex values. The scatter phase simply considers the new set of servers in the cluster while distributing updated vertex values. This optimization further reduces the overhead on the iteration.
A scale-out/scale-in operation that starts in iteration i ends in iteration i + 2. Background static data migration occurs in iterations i and i + 1, while vertex value migration occurs after the gather phase of iteration i + 2. At that point, computation continues on the new set of servers. The performance impact due to background data migration is greater in iteration i than in iteration i + 1, that is, iteration times are longer in iteration i. The reason is that a majority of the migration happens in iteration i. In iteration i + 1 servers build their new subscription lists for the publish–subscribe phase.
To explain further, we will describe the steps involved in a scale-out as follows: (i) The joining server sends a Join message containing its IP address and port to the barrier server at the start of iteration i. (ii) The barrier server responds with a Cluster Info message assigning the joining server an ID and the contact information of the servers from which it should request its vertices. (iii) In addition, the barrier server sends an Add Host message to all servers, informing them about the new server in the cluster. (iv) The joining server requests its vertices with a Vertex Request message. (v) After receiving its vertices, it informs the barrier server with a Ready message that it can join the cluster. Concurrently, the servers start updating their subscription lists to reflect the modifications in the cluster servers. (vi) The barrier server sends a Reconfigure message to the servers in the synchronization interval after the gather phase of iteration i + 2. (vii) Upon receiving the Reconfigure message, joining servers request the vertex values with a Vertex Value Request message. In addition, all servers update their vertex-to-server mapping to reflect newly added servers. (viii) The scatter phase of iteration i + 2 executes with this new mapping. From then on, computation proceeds on the new set of servers.
In our repartitioning techniques, the barrier server accepts join and leave requests and determines an optimal partition assignment. We adopted this approach, instead of a fully decentralized reassignment, for two reasons: (i) fully decentralized reassignment may lead to complex race conditions, and (ii) the barrier server, once initialized, has the capability to obtain per-server iteration run times via the barrier synchronization messages and assigns new servers to alleviate the load on the busiest servers.
In this section, we describe our experimental evaluation of the efficiency and overhead of our elasticity techniques. We present only selected experimental results in this chapter and refer the reader to Ref. [123] for extensive information on experiments and evaluation of our system.
We performed our experiments with both our CVR and RVR techniques on virtual instances, each with 16 GB RAM and 8 VCPUs. We used a Twitter graph [130] containing 41.65 million vertices and 1.47 billion edges. (With larger graphs, we expect similar performance improvements.) We evaluated our techniques using five graph benchmarks: PageRank, single-source shortest paths (SSSP), connected components, k-means clustering, and multiple-source shortest paths (MSSP).
Our first set of experiments measured the overhead experienced by the computation because of a scale-out operation. Figure 6.15 illustrates two experiments in which a scale-out from X servers to 2X servers (for X ∈ {5, 10, 15}) was performed, with the scale-out starting at iteration i = 1 and ending at iteration 3. The vertical axis plots the per-iteration run time. For comparison, we plot the per-iteration times for a run with X servers throughout, and a run with 2X servers throughout.
In Figure 6.15a–c, we can observe that (i) both CVR and RVR appear to perform similarly, and (ii) after the scale-out operation is completed, the performance of the scaled-out system converges to that of a cluster with 2X servers, demonstrating that our approaches converge to the desired throughput after scale-out.
Similarly, Figure 6.16 shows the plots for scale-in from 2X servers to X servers (for X ∈ {5, 10, 15}). Once again, the cluster converges to the performance of X servers.
This section is based on Ref. [131], and we encourage the reader to refer to that publication for further details on design, implementation, and experiments.
Today, computation clusters running engines such as Apache Hadoop [3,132], DryadLINQ [87], DOT [133], Hive [134], and Pig Latin [135] are used to process a variety of big data sets. The batch MapReduce jobs in these clusters have priority levels or deadlines. For instance, a job with a high priority (or short deadline) may be one that processes click-through logs and differentiates ads that have reached their advertiser targets from ads that it would be good to display. For such jobs, it is critical to produce timely results, since they directly affect revenue. On the other hand, a lower priority (or long-deadline) job may, for instance, identify more lucrative ad placement patterns via a machine learning algorithm on long-term historical click data. Such jobs affect revenue indirectly and therefore need to complete soon, but they must be treated as lower priority.
The most common use case is a dual-priority setting, with only two priority levels: high-priority jobs and low-priority jobs. We call the high-priority jobs production jobs and the low-priority ones research jobs.10 A popular approach among organizations is to provision two physically separate clusters: one for production jobs and one for research jobs. Administrators tightly restrict the workloads allowed on the production cluster, perform admission control manually based on deadlines, keep track of deadline violations via alert systems such as pagers, and subsequently readjust job and cluster parameters manually.
In addition to requiring intensive human involvement, the above approach suffers from (i) long job completion times, and (ii) inefficient resource utilization. For instance, jobs in an overloaded production cluster might take longer, even though the research cluster is underutilized (and vice versa). In fact, MapReduce cluster workloads are time-varying and unpredictable, for example, in the Yahoo! Hadoop traces we used in the work described here, hourly job arrival rates exhibited a max–min ratio as high as 30. Thus, there are times when the cluster is resource-constrained, that is, it has insufficient resources to meet incoming demand. Since physically separate clusters cannot reclaim resources from each other, the infrastructure's overall resource utilization stays suboptimal.
The goals of the work described here are (i) to run a consolidated MapReduce cluster that supports all jobs, regardless of their priority or deadline; (ii) to achieve low completion times for higher priority jobs; and (iii) to do so while still optimizing the completion times of lower priority jobs. The benefits are high cluster resource utilization, and, thus, reduced capital and operational expenses.
Natjam11 achieves the above goals, and we have integrated it into the Hadoop YARN scheduler (Hadoop 0.23). Natjam's first challenge is to build a unified scheduler for all job priorities and deadlines in a way that fluidly manages resources among a heterogeneous mix of jobs. When a higher priority job arrives in a full cluster, today's approaches involve either killing lower priority jobs' tasks [59,84] or waiting for them to finish [83]. The former approach prolongs low-priority jobs because they repeat work, while the latter prolongs high-priority jobs. Natjam solves those problems by using an on-demand checkpointing technique that saves the state of a task when it is preempted, so that it can resume where it left off when resources become available. This checkpointing is fast, inexpensive, and automatic in that it requires no programmer involvement.
Natjam's second challenge is to enable quick completion of high-priority jobs, but not at the expense of extending many low-priority jobs' completion times. Natjam addresses this by leveraging smart eviction policies that select which low-priority jobs and their constituent tasks are affected by arriving high-priority jobs. Natjam uses a two-level eviction approach: It first selects a victim job (via a job eviction policy) and then, within that job, one or more victim tasks (via a task eviction policy). For the dual-priority setting with only two priority levels, our eviction policies take into account (i) resources utilized by a job, and (ii) time remaining in a task. We then generalize to arbitrary real-time job deadlines via eviction policies based on both a job's deadline and its resource usage.
We provide experimental results from deployments on a test cluster, both on Emulab and on a commercial cluster at Yahoo!. Our experiments used both synthetic workloads and Hadoop workloads from Yahoo! Inc. We evaluated various eviction policies and found that compared to their behavior in traditional multiprocessor environments, eviction policies have counterintuitive behavior in MapReduce environments; for example, we discovered that longest-task-first scheduling is optimal for MapReduce environments. For the dual-priority setting, Natjam incurs overheads of under 7% for all jobs. For the real-time setting with arbitrary deadlines, our generalized system, called Natjam-R, meets deadlines with only 20% extra laxity in the deadline compared to the job runtime.
In brief, at a high level, our work is placed within the body of related work as follows (see Section 6.3.4 for more details). Our focus is on batch jobs rather than streaming or interactive workloads [71–74,76,77]. Some systems have looked at preemption in MapReduce [55], with respect to fairness [86], at intelligent killing of tasks [59] (including the Hadoop Fair Scheduler [84]), and in SLOs (service level objectives) in generic cluster management [90,93,94]. In comparison, our work is the first to study the effects of eviction policies and deadline-based scheduling for resource-constrained MapReduce clusters. Our strategies can be applied orthogonally in systems such as Amoeba [55]. We are also the first to incorporate such support directly into Hadoop YARN. Finally, MapReduce deadline scheduling has been studied in infinite clusters [62–65] but not in resource-constrained clusters.
This section presents the eviction policies, and the following section describes the systems architecture. Section 6.1.4 generalizes the solution to the case where jobs have multiple priorities.
Eviction policies lie at the heart of Natjam. When a production (high-priority) MapReduce job arrives at a resource-constrained cluster and there are insufficient resources to schedule it, some tasks of research (low-priority) MapReduce jobs need to be preempted. Our goals here are to minimize job completion times both for production and for research jobs. This section addresses the twin questions of (i) how to choose a victim (research) job so that some of its tasks can be preempted, and (ii) within a given victim job, how to choose victim task(s) for preemption. We call these job eviction and task eviction policies, respectively.
The job and task eviction policies are applied in tandem, that is, for each required task of the arriving production job, a running research task is evicted through application of the job eviction policy followed by the task eviction policy. A research job chosen as victim may be evicted only partially; in other words, some of its tasks may continue running, for example, if the arriving job is relatively small, or if the eviction policy also picks other victim research jobs.
The choice of victim job affects the completion time of lower priority research jobs by altering resources already allocated to them. Thus, job eviction policies need to be sensitive to current resource usage of individual research jobs. We discuss three resource-aware job eviction policies:
Most Resources (MR): This policy chooses as victim the research job that is currently using the most resources inside the cluster. In Hadoop YARN, resource usage would be in terms of the number of containers used by the job, while in other versions of MapReduce, it would be determined by the number of cluster slots.12
The MR policy, which is loosely akin to the worst-fit policy in OS segmentation, is motivated by the need to evict as few research jobs as possible; a large research job may contain sufficient resources to accommodate one large production job or multiple small production jobs. Thus, fewer research jobs are deferred, more of them complete earlier, and average research job completion time is minimized.
The downside of the MR policy is that when there is one large research job (as might be the case with heavy tailed distributions), it is always victimized whenever a production job arrives. This may lead to starvation and thus longer completion times for large research jobs.
Least Resources (LR): In order to prevent starving of large research jobs, this policy chooses as victim the research job that is currently using the least resources inside the cluster. The rationale here is that small research jobs that are preempted can always find resources if the cluster frees up even a little in the future. However, the LR policy can cause starvation for small research jobs if the cluster stays overloaded; for example, if a new production job arrives whenever one completes, LR will pick the same smallest jobs for eviction each time.
Probabilistically Weighted on Resources (PR): In order to address the starvation issues of LR and MR, our third policy selects a victim job using a probabilistic metric based on resource usage. In PR, the probability of choosing a job as a victim is directly proportional to the resources it currently holds. In effect, PR treats all tasks identically in choosing ones to evict, that is, if the task eviction policy were random, the chances of eviction for all tasks would be identical and independent of their jobs. The downside of PR is that it spreads out evictions across multiple jobs; in PR, unlike MR, one incoming production job may slow down multiple research jobs.
Once a victim job has been selected, the task eviction policy is applied within that job to select one task that will be preempted (i.e., suspended).
Our approach makes three assumptions, which are based on use case studies: (i) reduces are long enough that preemption of a task takes less time than the task itself; (ii) only reduce tasks are preempted; and (iii) reduces are stateless in between keys. For instance, in Facebook workloads the median reduce task takes 231 s [58], substantially longer than the time needed to preempt a task (see Section 6.1.5). There are two reasons why we focus on preemption only of reduces. First, the challenge of checkpointing reduce tasks subsumes that of checkpointing map tasks, since a map processes individual key-value pairs, while a reduce processes batches of them. Second, several use case studies have revealed that reduces are substantially longer than maps and thus have a bigger effect on the job tail. In the same Facebook trace already mentioned, the median map task time is only 19 s. While 27.1 map containers are freed per second, only 3 (out of 3100) reduce containers are freed per second. Thus, a small production job with 30 reduces would wait on average 10 s, and a large job with 3000 reduces would wait 1000 s. Finally, the traditional stateless reduce approach is used in many MapReduce programs; however, Natjam could be extended to support stateful reducers.
A MapReduce research job's completion time is determined by its last-finishing reduce task. A long tail, or even a single task that finishes late, will extend the research job's completion time. This concern implies that tasks with a shorter remaining time (for execution) must be evicted first. However, in multiprocessors, shortest-task-first scheduling is known to be optimal [53]. In our context, this means that the task with the longest remaining time must be evicted first. That motivated two contrasting task eviction policies, SRT and LRT:
Shortest Remaining Time ( SRT): In this policy, tasks that have the shortest remaining time are selected to be suspended. This policy aims to minimize the impact on the tail of a research job. Further, a task suspended by SRT will finish quickly once it has been resumed. Thus, SRT is loosely akin to the longest-task-first strategy in multiprocessor scheduling. Rather counterintuitively, SRT is provably optimal under certain conditions:
We note that the assumption – that tasks of the victim job will be resumed simultaneously – is reasonable in real-life scenarios in which production job submission times and sizes are unpredictable. Our experiments also validated this theorem.
Longest Remaining Time ( LRT): In this policy, the task with the longest remaining time is chosen to be suspended earlier. This policy is loosely akin to shortest-task-first scheduling in multiprocessors. Its main advantage over SRT is that it is less selfish and frees up more resources earlier. LRT might thus be useful in scenarios in which production job arrivals are bursty. Consider a victim job containing two tasks: one short and one with a long remaining time. SRT evicts the shorter task, freeing up resources for one production task. LRT evicts the longer task, but the shorter unevicted task will finish soon anyway, thus releasing resources for two production tasks, while incurring the overhead for only one task suspension. However, LRT can lengthen the tail of the research job, increasing its completion time.
In order to understand the design decisions required to build eviction policies into a MapReduce cluster management system, we incorporated Natjam into the popular Hadoop YARN framework in Hadoop 0.23. We now describe Natjam's architecture, focusing on the dual-priority setting (for production and research jobs).
Background: Hadoop YARN Architecture: In the Hadoop YARN architecture, a single cluster-wide Resource Manager (RM) performs resource management. It is assisted by one Node Manager (NM) per node (server). The RM receives periodic heartbeats from each NM containing status updates about resource usage and availability at that node.
The RM runs the Hadoop Capacity Scheduler. The Capacity Scheduler maintains multiple queues that contain jobs. An incoming job is submitted to one of these queues. An administrator can configure two capacities per queue: a minimum (guaranteed) capacity, and a maximum capacity. The scheduler varies the queue capacity between these two queues based on the jobs that have arrived at them.
The basic unit of resource allocation for a task is called a container. A container is effectively a resource slot that contains sufficient resources (primarily memory) to run one task: a map, a reduce, or a master task. An example master task is the Application Master (AM), which is allocated one container. One AM is assigned to each MapReduce job and performs job management functions.
An AM requests and receives, from the RM, container allocations for its tasks. The AM assigns a task to each container it receives and sends launch requests to the container's NM. It also performs speculative execution when needed.
An AM sends heartbeats to the RM. The AM also receives periodic heartbeats from its tasks. For efficiency, YARN piggybacks control traffic (e.g., container requests and task assignments) atop heartbeat messages.
Natjam Components: Natjam entails changes to the Hadoop Capacity Scheduler (at the RM) and the AM, while the NM stays unchanged. Specifically, Natjam adds the following new components to Hadoop YARN:
When we modify Hadoop, instead of adding new messages that will incur overhead, Natjam leverages and piggybacks atop YARN's existing heartbeats for efficiency. The trade-off is a small scheduling delay, but our experiments show that such delays are small.
We will detail those two components in Section 6.1.3. Now we show how preemption and checkpointing work.
Natjam Preemption Mechanism Example: We illustrate how Natjam's preemption works in YARN. Figure 6.17 depicts an example in which a research job 2 is initially executing in a full cluster, when a production Job 1 requires a single container.13 The steps in the figure are as follows:
With that, the Natjam-specific steps are done. For completeness, we list below the remaining steps, which are taken by YARN by default to give AM1 the new container.
Checkpoint Saved and Used by Natjam: When Natjam suspends a research job's reduce task, an on-demand checkpoint is saved automatically. It contains the following items: (i) an ordered list of past suspended container IDs, one for each attempt, that is, each time this task was suspended in the past; (ii) a key counter, that is, the number of keys that have been processed so far; (iii) reduce input paths, that is, local file paths; and (iv) the hostname associated with the last suspended attempt, which is useful for preferably resuming the research task on the same server. Natjam also leverages intermediate task data already available via Hadoop [136], including (v) reduce inputs, which are stored at a local host, and (vi) reduce outputs, which are stored on HDFS.
Task Suspend: We modified YARN so that the reduce task keeps track of two pieces of state: paths to files in the local file system that hold reduce input and the key counter, that is, the number of keys that have been processed by the reduce function so far. When a reduce task receives a suspend request from its AM, the task checks whether it is in the middle of processing a particular key, and finishes that key. Second, it writes the input file paths to a local log file. Third, Hadoop maintains a partial output file per reduce attempt, in the HDFS distributed file system. It holds the output so far generated from the current attempt. The partial output file is given a name that includes the container ID. When a task suspends, this partial output file is closed. Finally, the reduce compiles its checkpoint and sends the result to its AM, and the reduce task exits.
Task Resume: On a resume, the task's AM sends the saved checkpoint state as launch parameters to the chosen NM. The Preemptor is in charge of scheduling the resuming reduce on a node. The Preemptor prefers the old node on which the last attempt ran (available from the hostname field in the checkpoint). If the resumed task is assigned to its old node, the reduce input can be read without network overhead, that is, from local disk. If it is resumed on a different node, the reduce input is assembled from map task outputs, much like a new task.
Next, the reduce task creates a new partial output file in HDFS. It skips over the input keys that the checkpoint's key counter field indicates have already been processed. It then starts execution as a normal reduce task.
Commit after Resume: When a previously suspended reduce task finishes, it needs to assemble its partial output. It starts that by finding, in HDFS, all its past partial output files; it does so by using the ordered list of past suspended container IDs from its checkpoint. It then accumulates their data into output HDFS files that are named in that order. This order is critical so that the output is indistinguishable from that of a reduce task that was never suspended.
This section first explains how we modify the AM state machines in Hadoop YARN, and then describes the Preemptor and Releaser. As mentioned earlier, we leverage existing Hadoop mechanisms such as heartbeats.
Application Master's State Machines: For job and task management, Hadoop YARN's AM maintains separate state machines per job, per task, and per task attempt. Natjam does not change the job state machine; we enabled this state machine only to handle the checkpoint. Thus, both the suspend and the resume occur during the Running state in this state machine.
We modify the task state machine very little. When the AM learns that a task attempt has been suspended (from step 8 in Figure 6.17), the task state machine goes ahead and creates a new task attempt to resume the task. However, this does not mean that the task is scheduled immediately; the transitions of the task attempt state machine determine whether it does.
The task attempt state machine is used by YARN to assign the container, set up execution parameters, monitor progress, and commit output. Natjam adds two states to the task attempt state machine, as shown in Figure 6.18: Suspend-Pending and Suspended. The task attempt has a state of Suspend-Pending when it wishes to suspend a task but has not received suspension confirmation from the local task (steps 6b–7 from Figure 6.17). The state becomes Suspended when the saved checkpoint is received (step 8), and this is a terminal state for that task attempt.
The new transitions for suspension in Figure 6.18 are as follows:
A resuming reduce task starts from the New state in the task attempt state machine. However, we modify some transitions to distinguish a resuming task from a new (nonresuming) task attempt as follows:
Preemptor: Recall that Natjam sets up the RM's Capacity Scheduler with two queues: one for production jobs and one for research jobs. The Preemptor is implemented as a thread within the Capacity Scheduler. In order to reclaim resources from the research queue for use by the production queue, the Preemptor periodically runs a reclaim algorithm, with sleeps of 1 s between runs. A run may generate reclaim requests, each of which is sent to some research job's AM to reclaim a container (which is step 4 in Figure 6.17). In a sense, a reclaim request is a statement of a production job's intention to acquire a container.
We keep track of a per-production job reclaim list. When the RM sends a reclaim request on behalf of a job, an entry is added to the job's reclaim list. When a container is allocated to that job, that reclaim list entry is removed. The reclaim list is needed to prevent the Preemptor from generating duplicate reclaim requests, which might occur because our reliance on heartbeats entails a delay between a container suspension and its subsequent allocation to a new task. Thus, we generate a reclaim request whenever (i) the cluster is full, and (ii) the number of pending container requests from a job is greater than the number of requests in its reclaim list.
In extreme cases, the Preemptor may need to kill a container, for example, if the AM has remained unresponsive for too long. Our threshold to kill a container is reached when a reclaim request has remained in the reclaim list for longer than a killing timeout (12 s). A kill request is sent directly to the NM to kill the container. This bypasses the AM, ensuring that the container will indeed be killed. When a kill request is sent, the reclaim request is added to an expired list. It remains there for an additional time interval (2 s), after which it is assumed the container is dead, and the reclaim request is thus removed from the expired list. With those timeouts, we have never observed killings of any tasks in any of our cluster runs.
Releaser: The Releaser runs at each job's AM and decides which tasks to suspend. Since the task eviction policies discussed in Section 6.1.2 (i.e., SRT and LRT) use the time remaining at the task, the Releaser needs to estimate it. We use Hadoop's default exponentially smoothed task runtime estimator, which relies on the task's observed progress [69]. However, calculating this estimate on demand can be expensive due to the large numbers of tasks. Thus, we have the AM only periodically estimate the progress of all tasks in the job (once a second), and use the latest complete set of estimates for task selection. While the estimates might be stale, our experiments show that this approach works well in practice.
Interaction with Speculative Execution: Our discussion so far has ignored speculative execution, which Hadoop uses to replicate straggler task attempts. Natjam does not change speculative execution and works orthogonally, that is, speculative task attempts are candidates for eviction. When all attempts of a task are evicted, the progress rate calculation of the task is not skewed, because speculative execution tracks the progress of task attempts rather than the tasks themselves. While this interaction could be optimized further, it works well in practice. Natjam can be further optimized to support user-defined (i.e., per job) task eviction policies that would prioritize the eviction of speculative tasks, but discussion of that is beyond the scope of this chapter.
We have created Natjam-R, a generalization of Natjam that targets environments in which each job has a hard and fixed real-time deadline. Unlike Natjam, which allowed only two priority levels, Natjam-R supports multiple priorities; in the real-time case, a job's priority is derived from its deadline. While Natjam supported inter-queue preemption (with two queues), Natjam-R uses only intra-queue preemption. Thus, all jobs can be put into one queue; there is no need for two queues. Jobs in the queue are sorted based on priority.
First, for job eviction, we explored two deadline-based policies inspired by the classical real-time literature [78,81]: Maximum Deadline First (MDF) and Maximum Laxity First (MLF). MDF chooses as victim the running job that has the longest deadline. On the other hand, MLF evicts the job with the highest laxity, where laxity = the deadline minus the job's projected completion time. For MLF, we extrapolate Hadoop's reported job progress rate to calculate a job's projected completion time.
While MDF is a static scheduling policy that accounts only for deadlines, MLF is a dynamic policy that also accounts for a job's resource needs. If a job has an unsatisfactory progress rate, MLF may give it more resources closer to its deadline. It may do so by evicting small jobs with long deadlines. In essence, while MLF may run some long-deadline, high-resource jobs, MDF might starve all long-deadline jobs equally. Further, MLF is fair in that it allows many jobs with similar laxities to make simultaneous progress. However, this fairness can be a shortcoming in scenarios with many short deadlines; MLF results in many deadline misses, while MDF would meet at least some deadlines. Section 6.1.6 describes our experimental evaluation of this issue.
Second, our task eviction policies remain the same as before (SRT, LRT) because the deadline is for the job, not for individual tasks.
In addition to the job and task eviction policies, we need to have a job selection policy. When resources free up, this policy selects a job from among the suspended ones and gives it containers. Possible job selection policies are earliest deadline first (EDF) and least laxity first (LLF). In fact, we implemented these but observed thrashing-like scheduling behavior if the job eviction policy was inconsistent with the job selection policy. For instance, if we used MDF job eviction and LLF job selection, a job selected for eviction by MDF would soon after be selected for resumption by LLF, and thus enter a suspend-resume loop. We concluded that the job selection policy needed to be dictated by the job eviction policy, that is, MDF job eviction implies EDF job selection, while MLF implies LLF job selection.
The main changes that differentiate Natjam-R from Natjam are in the RM. In Natjam-R, the RM keeps one Capacity Scheduler queue sorted by decreasing priority. A priority is inversely proportional to the deadline for MDF, and to laxity for MLF. The Preemptor periodically (once a second) examines the queue and selects the first job (say Ji) that still has tasks waiting to be scheduled. Then it considers job eviction candidates from the queue, starting with the lowest priority (i.e., later deadlines or larger laxities) up to Ji's priority. If it encounters a job that still has allocated resources, that job is picked as the victim; otherwise, no further action is taken. To evict the job, the Releaser from Natjam uses the task eviction policy to free a container. Checkpointing, suspend, and resume work in Natjam-R as described earlier for Natjam (see Section 6.1.3).
We present two sets of experiments, increasing in complexity and scale. This section presents microbenchmarking results for a small Natjam cluster. Section 6.1.6 evaluates Natjam-R. While we present only selected experimental results in this chapter, we refer the reader to Ref. [131] for extensive information on experiments and evaluation of our system.
We first evaluated the core Natjam system that supports a dual-priority workload, that is, research and production jobs. We addressed the following questions: (i) How beneficial is Natjam relative to existing techniques? (ii) What is the overhead of the Natjam suspend mechanism? (iii) What are the best job eviction and task eviction policies?
We used a small-scale test bed and a representative workload because this first experimental stage involved exploration of different parameter settings and study of many fine-grained aspects of system performance. A small test bed gave us flexibility.
Our test cluster had seven servers running on a 1 GigE network. Each server had two quad-core processors and 16 GB of RAM, of which 8 GB were configured to run 1 GB-sized Hadoop containers. (Thus, 48 containers were available in the cluster.) One server acted as the Resource Manager, while the other six were workers. Each entity (AM, map task, and reduce task) used one container.
In our experiments, we injected a mix of research and production jobs, as shown in Table 6.4. To reflect job size variation, the job sizes ranged from XL (filling the entire cluster) to S (filling a fourth of the cluster). To mimic use case studies [58], each job had a small map execution time, and was dominated by the reduce execution time. To model variance in task running times, we selected reduce task lengths uniformly from the interval (0.5, 1.0], where 1.0 is the normalized largest reduce task. To emulate computations, we used SWIM [137] to create random keys and values, with thread sleeps called between keys. Shuffle and HDFS traffic were incurred as usual.
Table 6.4 Microbenchmark settings.
Job | Number of reduces | Average time (s) |
Research-XL | 47 | 192.3 |
Research-L | 35 | 193.8 |
Research-M | 23 | 195.6 |
Research-S | 11 | 202.6 |
Production-XL | 47 | 67.2 |
Production-L | 35 | 67.0 |
Production-M | 23 | 67.6 |
Production-S | 11 | 70.4 |
The primary metric was job completion time. Each of our data points shows an average and standard deviation over five runs. Unless otherwise noted, Natjam used MR job eviction and SRT task eviction policies.
Figure 6.19 compares Natjam to several alternatives versus an ideal setting versus two existing mechanisms in the Hadoop Capacity scheduler, and versus pessimistic killing of tasks (instead of saving the cheap checkpoint). The ideal setting measures each job's completion time when it is executed on an otherwise empty cluster; thus, it ignores resource sharing and context switch overheads. For the second setting, we chose the Hadoop Capacity Scheduler because it represents approaches that we might take with two physically separate clusters sharing the same scheduler. Finally, killing of tasks is akin to approaches such as those described in Ref. [59] and for the Hadoop Fair Scheduler [84].
In this experiment, a Research-XL job was submitted initially to occupy the entire cluster. Then, 50 s later, a Production-S job was submitted. Figure 6.19 shows that killing of tasks (the fourth pair of bars) finished production jobs fast, but prolonged research jobs by 23% compared to the ideal case (the first pair of bars). Thus, saving the overhead of checkpoints is not worth the repeated work due to task restarts.
We next examined two popular Hadoop Capacity Scheduler approaches called Hard cap and Soft cap. Recall that the Capacity Scheduler allows the administrator to set a maximum cap on the capacity allocated to each of the two queues (research and production). In Hard cap, that cap is used as a hard limit for each queue. In the Soft cap approach, each queue is allowed to expand to the full cluster if there are unused resources, but it cannot scale down without waiting for its scheduled tasks to finish (e.g., if the production queue needs resources from the research queue). We configured these two approaches with the research queue set to 75% capacity (36 containers) and production queue to 25% capacity (12 containers), as these settings performed well.
Figure 6.19 shows that in Hard cap (the second pair of bars), the research job took 52% longer than ideal, while the production job was unaffected. Under Soft cap (the third pair of bars), the production job could obtain containers only when the research job freed them; this resulted in an 85% increase in production job completion time, while the research job was unaffected.
The last pair of bars shows that when Natjam was used, the production job's completion time was 7% worse (5.4 s longer) than ideal and 77% better than the result for the Hadoop Capacity Scheduler's Soft cap. The research job's completion time was only 2% worse (4.7 s longer) than ideal, 20% better than that of killing, and 49% better than that of Hadoop Hard cap. One of the reasons the research job was close to ideal is that it was able to make progress in parallel with the production job. There are other internal reasons for the performance benefit, which we explore next.
We measured Natjam's suspend overhead on a fully loaded cluster. We observed that it took an average of 1.35 s to suspend a task and 3.88 s to resume a task. Standard deviations were low. In comparison, default Hadoop took an average of 2.63 s to schedule a task on an empty cluster. From this it might appear that Natjam incurs a higher total overhead of 5.23 s per task suspend-resume. However, in practice the effective overhead is lower; for instance, Figure 6.19 showed only a 4.7 s increase in research job completion time. The reason is that task suspends typically occur in parallel, and in some cases, task resumes do too. Thus, the time overheads are parallelized rather than aggregated.
We now compare the two task eviction policies (SRT and LRT) from Section 6.1.2 against each other, and against a random eviction strategy that we also implemented. We performed two sets of experiments: one with Production-S and another with Production-L. The production job was injected 50 s after a Research-XL job.
Table 6.5 tabulates the results. In all cases the production job incurred overhead similar to that for an empty cluster. Thus, we discuss only research job completion time (last column). As shown in the top half of the table, a random task eviction strategy resulted in a 45 s increase in completion time compared to the ideal; we observed that a fourth of the tasks were suspended, leading to a long job tail. Evicting the LRT incurred a higher increase of 55 s because LRT prolongs the tail. Evicting the SRT emerged as the best policy and was only 4.7 s worse than the ideal because it respects the job tail.
Table 6.5 Task eviction policies.
Task eviction policy | Production job | Mean (S.D.) runtime (s) | Research job | Mean (S.D.) runtime (s) |
Random | Production-S | 76.6 (3.0) | Research-XL | 237.6 (7.8) |
LRT | Production-S | 78.8 (1.8) | Research-XL | 247.2 (6.3) |
SRT | Production-S | 75.6 (1.5) | Research-XL | 197.0 (5.1) |
Random | Production-L | 75.0 (1.9) | Research-XL | 244.2 (5.6) |
LRT | Production-L | 75.8 (0.4) | Research-XL | 246.6 (6.8) |
SRT | Production-L | 74.2 (1.9) | Research-XL | 234.6 (3.4) |
At t = 0 s, a Research-XL job was submitted; at t = 50 s, the production job was submitted. Job completion times are shown. The ideal job completion times are shown in Table 6.4. |
In the lower half of Table 6.5, it can be seen that a larger production job caused more suspensions. The research job completion times for the random and LRT eviction policies are similar to those in the top half because the job's tail was already long for the small production job, and was not much longer for the larger job. SRT is worse than it was for a small production job, yet it outperformed the other two eviction strategies.
We conclude that SRT is the best task eviction policy, especially when production jobs are smaller than research jobs. We believe this is a significant use case since research jobs run longer and process more data, while production jobs are typically small due to the need for faster results.
We next compare the three job eviction policies discussed in Section 6.1.2. Based on the previous results, we always used SRT task eviction.14 We initially submitted two research jobs and followed 50 s later with a small production job. We examined two settings: one in which the initial research jobs were comparable in size and another in which they were different. We observed that the production job completion time was close to ideal; hence Table 6.6 shows only research job completion times.
Table 6.6 Job eviction policies.
Job eviction policy | Research job | Mean (S.D.) runtime (s) | Research job | Mean (S.D.) runtime (s) |
PR | Research-M | 195.8 (1.3) | Research-M | 201.2 (0.8) |
MR | Research-M | 196.2 (1.3) | Research-M | 200.6 (2.1) |
LR | Research-M | 200.6 (1.3) | Research-M | 228.8 (12.7) |
PR | Research-L | 201.6 (8.3) | Research-S | 213.8 (18.8) |
MR | Research-L | 195.8 (1.1) | Research-S | 204.8 (2.2) |
LR | Research-L | 195.8 (0.4) | Research-S | 252.4 (9.3) |
At t = 0 s, two research jobs were submitted (either two Research-M's or a Research-S and a Research-L); at t = 50 s, a Production-S job was submitted. Only the research job completion times are shown. The ideal job completion times are shown in Table 6.5. |
The top half of Table 6.6 shows that when research job sizes were comparable, probabilistic weighing of job evictions by resources (PR) and eviction of the job with the MR performed comparably: research job completion times for the two policies were within 2 s (0.5%) of each other. This is desirable due to the matching job sizes. On the other hand, eviction of the job with the LR performed the worst, because it caused starvation in one of the jobs. Once tasks start getting evicted from a research job (which may at first have been picked randomly by LR if all jobs had the same resource usage), subsequently LR will always pick the same job (until it is fully suspended).
That behavior of LR is even more pronounced on small research jobs in a heterogeneous mix, as can be seen in the bottom half of Table 6.6. The Research-S job is picked as a victim by PR less often than by LR, and thus PR outperforms LR. PR penalizes the Research-L job slightly more than LR does, since PR evicts more tasks from a larger job. Even so, PR and MR are within 10 s (5%) of each other; any differences are due to the variable task lengths, and the effectiveness of the SRT task eviction policy. We observed that MR evicted no tasks at all from the Research-S job.
We conclude that when the best task eviction policy (SRT) is used, the PR and MR job eviction policies are preferable to LR, and MR is especially good under heterogeneous mixes of research job sizes.
We evaluated the real-time support of our Natjam-R system (described in Section 6.1.4). Our experiments addressed the following questions: (i) How do MDF and MLF job eviction strategies compare? (ii) How good is Natjam-R at meeting deadlines? And (iii) Do Natjam-R's benefits hold under realistic workloads?
We used eight Emulab servers [111,138], each with eight-core Xeon processors and 250 GB disk space. One server was the Resource Manager, and each of the other seven servers ran three containers of 1 GB each. (Thus, there were 21 containers in total.)
We injected three identical jobs, Job 1 to Job 3, each with 8 maps and 50 reduces. (Each job took 87 s on an empty cluster.) They were submitted in numerical order starting at t = 0 s and 5 s apart, thus overloading the cluster. Since MDF and MLF will both meet long deadlines, we chose shorter deadlines. To force preemption, the deadlines of job 1, job 2, and job 3 were set 10 s apart: 200 s, 190 s, and 180 s, respectively.
Figure 6.20 depicts the progress rate for the MDF cluster and the MLF cluster. Our first observation is that while MDF allowed the short-deadline jobs to run earlier and thus satisfy all deadlines, MLF missed all deadlines (see Figure 6.20b). In the reduce phase for MLF, after a while jobs proceeded in lockstep in the reduce phase, because when a lower laxity job (e.g., job 3) has run for a while in lieu of a higher laxity job (e.g., job 1), their laxities become comparable. Thereafter, the two jobs take turns preempting each other. Breaking ties, for example, by using a deadline, does not eliminate this behavior. In a sense, MLF tries to be fair to all jobs by allowing them all to make progress simultaneously, but this fairness is in fact a drawback.
MLF also takes longer to finish all jobs, that is, 239 s compared to MDF's 175 s. MLF's lockstep behavior incurs a high context switch overhead. We conclude that MDF is preferable to MLF, especially under short deadlines.
We submitted a job (job 1) just as described previously, and 5 s later submitted an identical job (job 2) whose deadline was 1 s earlier than job 1's. We measured job 1's clean compute time as the time to run the job in an empty cluster. Then, we set its deadline = submission time + (clean compute time × (1 + ɛ)). Figure 6.21 shows the effect of ɛ on a metric called margin. We define a job's margin =(deadline) minus (job completion time). A negative margin implies a deadline miss. We observe that an ε as low as 0.8 still meets both deadlines, while an ɛ as low as 0.2 meets at least the shorter deadline. This means that given one critical job with a very short deadline, Natjam-R can satisfy it if it has at least 20% more time than the job's clean compute time. This percentage is thus an estimate of Natjam-R's overhead. We also performed experiments that varied the second job's size as a fraction of the first job from 0.4 to 2.0, but we saw little effect on margin.
We used the Yahoo! Hadoop traces to evaluate Natjam-R's deadline satisfaction. We used only the production cluster trace, scaled so as to overload the target cluster. Since the original system did not support deadline scheduling, no deadlines were available from the traces. Thus, we chose ε randomly for each job from the interval [0, 2.0], and used it to set the job's deadline forward from its submission time (as described earlier). A given job's deadline was selected to be the same in all runs.
Figure 6.22 compares Natjam-R against Hadoop Soft cap. It shows the CDF of the difference between the margins of the two approaches; a negative difference implies that Natjam-R is better. Natjam-R's margin is better than Soft cap's for 69% of jobs. The largest improvement in margin was 366 s. The plot is biased by one outlier job that took 1000 s longer in Natjam-R; the next greatest outlier is only –287 s. The first outlier job suffered in Natjam-R because the four jobs submitted just before it and one job right after had much shorter deadlines. Yet the conclusion is positive: among the 400 jobs with variable deadlines, there was only one such outlier. We conclude that Natjam-R satisfies deadlines well under a realistic workload.
In this chapter, we have given an overview of five systems we created that are oriented toward offering performance assuredness in cloud computing frameworks, even while the system is under change.
For each system, we described its motivations, design, and implementation, and presented experimental results. Our systems are implemented in popular open-source cloud computing frameworks, including MongoDB (Morphus), Cassandra (Parqua), Storm (Stela), LFGraph, and Hadoop (Natjam). Readers who are interested in more detailed design and implementation and more extensive experimental findings are advised to see our original papers that introduced these systems [102,114,120,123,131].
Overall, building systems that perform predictably in the cloud remains one of the biggest challenges today, both in mission-critical scenarios and in non-real-time scenarios. The work outlined in this chapter has made deep inroads toward solving key issues in this area.
More specifically, the work described in this chapter constitutes the starting steps toward realization of a truly autonomous and self-aware cloud system for which the mission team merely needs to specify SLAs/SLOs (service level agreements and objectives), and the system will reconfigure itself automatically and continuously over the lifetime of the mission to ensure that these requirements are always met. For instance, as of this writing, we are currently building on our scale-out/scale-in work in the areas of distributed stream processing and distributed graph processing, by adding in an extra layer of adaptive scale-out/scale-in that seeks to meet SLA/SLO requirements such as latency or throughput (for stream processing), and completion time deadlines or throughput (for graph processing). These adaptive techniques will automatically give resources to a job that is facing a higher workload or more stringent deadlines and take away resources from a job that has more relaxed needs. Adaptivity implies that there is no human involvement in making decisions on, for example, the number of machines to give or take away from a job, or when to do so; such decisions will be made automatically by the system. Such adaptive logic may be able to leverage machine learning techniques that will learn the system's performance characteristics and adjust the resource allocation changes over time to ensure that the best possible performance is gained, given the cloud resources at hand. It is also potentially possible to add an adaptive layer atop our database reconfiguration systems (Morphus and Parqua); however, that would need to be done wisely and relatively rarely because of the enormous cost of each reconfiguration operation for large databases.
3.17.183.186