Roy H. Campbell1, Shadi A. Noghabi1, and Cristina L. Abad2
1Department of Computer Science, University of Illinois at Urbana-Champaign, Urbana, IL, USA
2Escuela Superior Politecnica del Litoral, ESPOL, Guayaquil, Ecuador
This chapter explores the problems of scalability of cloud computing systems. Scalability allows a cloud application to change in size, volume, or geographical distribution while meeting the needs of the cloud customer. A practical approach to scaling cloud applications is to improve the availability of the application by replicating the resources and files used; this includes creating multiple copies of the application across many nodes in the cloud. Replication improves availability through use of redundant resources, services, networks, file systems, and nodes, but also creates problems with respect to clients' ability to observe consistency as they are served from the multiple copies. Variability in data sizes, volumes, and the homogeneity and performance of the cloud components (disks, memory, networks, and processors) can impact scalability. Evaluating scalability is difficult, especially when there is a large degree of variability. That leads to the need to estimate how applications will scale on clouds based on probabilistic estimates of job load and performance. Scaling can have many different dimensions and properties. The emergence of low-latency worldwide services and the desire to have higher fault tolerance and reliability have led to the design of geo-distributed storage with replicas in multiple locations. At the end of this chapter, we consider scalability in terms of the issues involved with cloud services that are geo-distributed and also study, as a case example, scalable geo-distributed storage.
Cloud computing system scalability has many dimensions, including size, volume, velocity, and geographical distribution, which must be handled while continuing to meet the needs of the cloud customer. Here, we will address scalability and the related design issues in a number of steps, covering, in our opinion, the most important current issues. First, we address the size and volume scalability problem by examining how many replicas to allocate for each file in a cloud file system and where to place them, using probabilistic sampling and a competitive aging algorithm independently at each node. We discuss a statistical metadata workload model that captures the relevant characteristics of a workload (the attributes of its metadata, i.e., directories, file sizes, and number of files) and is suitable for synthetic workload generation. Then, we examine traces of file access in real workloads, characterizing popularity, temporal locality, and arrival patterns of the workloads. In particular, we show how traces of workloads from storage, feature animation, and streaming media can be used to derive synthetic workloads that may be used to help design cloud computing file systems.
Next, we introduce and analyze a set of complementary mechanisms that enhance workload management decisions for processing MapReduce jobs with deadlines. The three mechanisms we consider are the following: (i) a policy for job ordering in the processing queue; (ii) a mechanism for allocating a tailored number of map and reduce slots to each job with a completion time requirement; and (iii) a mechanism for allocating and deallocating (if necessary) spare resources in the system among the active jobs.
Finally, we examine a solution to building a geo-distributed cloud storage service giving, as an example, an implementation on which we collaborated: LinkedIn's Ambry. This solution offers an entirely decentralized replication protocol, eliminating any leader election overheads, bottlenecks, and single points of failure. In this geo-distributed system case, data replication and the consistency of data are simplified by making the data in the storage immutable (written once and never modified). This makes the behavior of the system easier to analyze with respect to the issues of scalability. Other chapters in this volume examine the problems of consistency and correctness of design in storage that is not immutable.
The success of cloud-based applications and services has increased the confidence and willingness of federal government organizations to move mission-critical applications to the cloud. A mission-critical application is typically one that is essential to the successful operation of that government organization and can involve timeliness, availability, reliability, and security. Unfortunately, the situation is unpredictable and ever-evolving as more cloud solutions are adopted and more cloud applications are developed. Scalability of applications and services on the cloud is a key concern and refers to the ability of the system to accommodate larger loads just by adding resources either vertically by making hardware perform better (scale-up) or horizontally by adding additional nodes (scale-out). Further, as clouds become more ubiquitous and global, geo-distributed storage and processing become part of the scaling issue. This chapter's vision is to consider the tools and methodologies that can underlie the test and design of cloud computing applications to make it possible to build them more reliably, and potentially to guide solutions that scale within the limitations of the resources of the cloud. We discuss how these tools can guide the development of geo-distributed storage and processing for clouds. Such tools and methodologies would enhance the ability to create mission-oriented cloud applications and services and help accelerate the adoption of cloud solutions by the government.
The concept of assured cloud computing encompasses our ability to provide computation and storage for mission-oriented applications and services. This involves the design, implementation, and evaluation of dependable cloud architectures that can provide assurances with respect to security, reliability, and timeliness of computations or services. Example applications include dependable big data applications; data analytics; high-velocity, high-volume stream processing; real-time computation; control of huge cyber-physical systems such as power systems; and critical computations for rescue and recovery.
Scalability concerns accommodation of larger loads through addition of resources. Elasticity is the ability to best fit the resources needed to cope with loads dynamically as the loads change. Typically, elasticity relates to scaling out appropriately and efficiently. When the load increases, the system should scale by adding more resources, and when demand wanes, the system should shrink back and remove unneeded resources. Elasticity is mostly important in cloud environments in which, on the one hand, customers who pay per use don't want to pay for resources they do not currently need, and on the other hand, it is necessary to meet demands when they rise.
When a platform or architecture scales, the hardware costs increase linearly with demand. For example, if one server can handle 50 users, 2 servers can handle 100 users, and 10 servers can handle 500 users. If, every time a thousand users were added to a system, the system needed to double the number of servers, then it can be said that the design does not scale, and the service organization would quickly run out of money as the user count grew.
Elasticity is how well your architecture can adapt to a changing workload in real time. For example, if one user logs on to a website every hour, this could be handled by one server. However, if 50,000 users all log on at the same time, can the architecture quickly (and possibly automatically) provision new Web servers on the fly to handle this load? If so, it can be said that the design is elastic.
Vertical scaling can essentially resize your server with no change to your code. It is the ability to increase the capacity of existing hardware or software by adding resources. Vertical scaling is limited by the fact that you can get only as big as the size of the server. Horizontal scaling affords the ability to scale wider to deal with traffic. It is the ability to connect multiple hardware or software entities, such as servers, so that they work as a single logical unit. This kind of scaling cannot be implemented at a moment's notice.
The availability, reliability, and dependability of applications and services are one aspect of providing mission-oriented computation and storage. The variability of demand for those services is another aspect, and it depends on the success of the system in meeting concerns of scalability and elasticity. For example, in an emergency scenario, a set of rescue services might need to be elastically scaled from a few occurrences a day to hundreds or thousands. Once the emergency is over, can the resources used to provide the services be quickly released to reduce costs? Similarly, scalability may determine whether an existing e-mail system can be expanded to coordinate the activities of a new six-month task force organized to cope with a rescue mission. Scalable, elastic, mission-oriented cloud-based systems require understanding of what the workload of the system will be under the new circumstances and whether the architecture of the system allows that workload to operate within its quality of service or service-level agreements. The difficulty here is that example workloads might not be available until the applications are deployed and are in use. The difficulty is compounded when the application has a huge number of users and its timeliness and availability are very sensitive to the mission it is accomplishing. Understanding the nature of the workloads and the ability to create synthetic workloads and volume and stress testing benchmarks that can be used in testing and performance measurement become crucial elements for system planners, designers, and implementers. Workloads and synthetic workloads can identify bottlenecks, resource constraints, high latencies, and what-if demand scenarios while allowing comparisons. An example of how synthetic and real workloads help in real system deployment is given in Ref. [1], which describes how synthetic workloads can be scaled and used to predict the performance of different cluster sizes and hardware architectures for MapReduce loads. However, in some applications that involve personal or private data, example workloads may not even be available, making synthetic workloads essential. Representing those workloads by synthetic traces that do not have private or personal information may then be the only way to evaluate designs and test performance.
Our goal in this chapter, then, is to describe technology that makes it possible to construct scalable systems that can meet the missions to which they are assigned.
Many of the experimental methodologies for evaluating large-scale systems are surveyed in Ref. [2]. As the systems evolve, these methodologies also change. The evaluations often use example workloads, traces, and synthetic traces. Much effort has gone into the collection of workloads and synthetic traces for cloud computing, mainly because researchers studying the performance of new cloud solutions have not had easy access to appropriate data sets [3–5]. Many of the larger cloud companies, including Google, Microsoft, Twitter, and Yahoo!, have published actual and trace-based workloads that do not have private or personal information in an effort to help research. In our research, Twitter and Yahoo! permitted University of Illinois students who interned with those companies to build trace-based workloads that do not have private or personal information from more sensitive internal workload data sets [4,6]. The research described in this chapter has built on the existing body of work and examines many scalability issues as well as the production of traces.
Cloud computing systems allow the economical colocation of large clusters of computing systems, fault-tolerant data storage [7,8], frameworks for data-intensive applications [9,10], and huge data sets. Data locality, or placing of data as close as possible to computation, is a common practice in the cloud to support high-performance, data-intensive computation economically [11,12]. Current cluster computing systems use uniform data replication to ensure data availability and fault tolerance in the event of failures [13–18], to improve data locality by placing a job at the same node as its data [9], and to achieve load balancing by distributing work across the replicas. Data locality is beneficial as the amount of data being processed in data centers keeps growing at a tremendous pace, exceeding increases in the available bandwidth provided by the network hardware in the data centers [19].
Our research introduced DARE [7], a distributed data replication and placement algorithm that adapts to changes in workload. We assume that the scheduler used is one that is oblivious to the data replication policy, such as the first-in, first-out (FIFO) scheduler or the Fair scheduler in Hadoop systems, so our algorithm will be compatible with existing schedulers. We implemented and evaluated our algorithm using the Hadoop framework [10], Apache's open-source implementation of MapReduce [9]. In the tested implementation, when local data are not available, a node retrieves data from a remote node in order to process the assigned task and discards the data once the task is completed. The algorithm takes advantage of existing remote data retrievals and selects a subset of the data to be inserted into the file system, hence creating a replica without consuming extra network and computation resources.
Each node runs the algorithm independently to create replicas of data that are likely to be heavily accessed during a short period of time. Observations from a 4000-node production Yahoo! cluster log indicate that the popularity of files follows a heavy-tailed distribution [20]. This makes it possible to predict file popularity from the number of accesses that have already occurred; for a heavy-tailed distribution of popularity, the more a file has been accessed, the more future accesses it is likely to receive.
From the point of view of an individual data node, the algorithm comes down to quickly identifying the most popular set of data and creating replicas for this set. Popularity means that a piece of data receives not only a large number of accesses but also a high intensity of accesses. We observe that this is the same as the problem of heavy hitter detection in network monitoring: In order to detect flows occupying the largest bandwidth, we need to identify flows that are both fast and large. In addition, the popularity of data is relative: We want to create replicas for files that are more popular than others. Hence, algorithms based on a hard threshold of number of accesses do not work well.
We designed a probabilistic dynamic replication algorithm with the following features:
The contribution of the research is twofold. First, our analysis examined existing production systems to obtain effective bandwidth and data popularity distributions, and to uncover characteristics of access patterns. Second, we proposed the distributed dynamic data replication algorithm, which significantly improves data locality and task completion times.
MapReduce clusters [9,10] offer a distributed computing platform suitable for data-intensive applications. MapReduce was originally proposed by Google, and its most widely deployed implementation, Hadoop, is used by many companies, including Facebook, Yahoo!, and Twitter.
MapReduce uses a divide-and-conquer approach in which input data are divided into fixed-size units processed independently and in parallel by map tasks. The map tasks are executed in a distributed manner across the nodes in the cluster. After the map tasks are executed, their output is shuffled, sorted, and then processed in parallel by one or more reduce tasks.
To provide cost-effective, fault-tolerant, fast movement of data into and out of the compute nodes, the compute nodes use a distributed file system (GFS [22] for Google's MapReduce and HDFS [16] for Hadoop).
MapReduce clusters use a master–slave design for the compute and storage systems. For the sake of simplicity, we will use the HDFS terminology to refer to the components of the distributed file system, where name node refers to the master node and data node refers to the slave. The master file system node handles the metadata operations, while the slaves handle the reads/writes initiated by clients. Files are divided into fixed-sized blocks, each stored at a different HDFS data node. Files are read-only, but appends may be performed in some implementations.
MapReduce clusters use a configurable number of replicas per file (three by default). While this replica policy makes sense for availability, it is ineffective for locality and load balancing when access patterns of data are not uniform.
As it turns out, the data access patterns (or popularity distributions) of files in MapReduce clusters are not uniform; it is common for some files to be much more popular than others (e.g., job configuration files during initial job stages), while some may be significantly unpopular (e.g., old log files are rarely processed). For job files, popularity can be predicted (e.g., launching a job creates a hotspot), so a solution adopted in currently implemented systems is to have the framework automatically increase the replication factor for these files [9,16]. For other cases, the current approach is to manually increase or decrease the number of replicas for a file by using organization heuristics based on data access patterns. For example, Facebook dereplicates aged data, which can have a lower number of replicas (as low as one copy) than other data [3]. The manual approach described above is not scalable and can be error-prone.
Within this chapter, the term file denotes the smallest granularity of data that can be accessed by a MapReduce job. A file is composed of N fixed-size data blocks (of 64–256 MB).
The data access distribution, or file popularity, of data in a MapReduce cluster can be nonuniform, as our analysis of a Yahoo! cluster [20] shows [7]. Figure 5.1, which illustrates data generated during that analysis, shows a heavy-tailed distribution in which some files are significantly more popular than others. In this case, the reason is that the cluster is used mainly to perform different types of analysis on a common (time-varying) data set. Similar results were obtained by analyzing the 64-node CCT Hadoop production cluster, and have previously been observed by the developers of Microsoft's Bing [11]. This suggests that a uniform increase in the number of replicas is not an adequate way of improving locality and achieving load balancing.
In our observations, we found that around 80% of the accesses of a file occur during its first day of life. In our analysis, typically, 50% of a file's accesses occur within 10 h following its creation. A similar finding has been presented by Fan et al. [23]; it was obtained from the Yahoo! M45 research cluster, for which the authors found that 50% of the accesses of a block occurred within 1 min after its creation.
Given those observations, the research goal has been to create an adaptive replication scheme that seeks to increase data locality by replicating “popular” data while keeping a minimum number of replicas for unpopular data. In addition, the scheme should (i) dynamically adapt to changes in file access patterns, (ii) use a replication budget to limit the extra storage consumed by the replicas, and (iii) impose a low network overhead.
The solution described in DARE is a greedy reactive scheme that takes advantage of existing data retrievals to avoid incurring any extra network traffic [7], as follows: When a map task is launched, its data can be local or remote to the node (i.e., located in a different node). In the case of remote data, the original MapReduce framework fetches and processes the data, without keeping a local copy for future tasks. With DARE, when a map task processes remote data, the data are inserted into the HDFS at the node that fetched them.
Data are replicated at the granularity of a block. DARE uses a replication budget to limit the extra storage consumed by the dynamically replicated data. The budget is configurable, but a value between 10% and 20% proved reasonable. To avoid completely filling the storage space assigned to dynamically created replicas, an eviction mechanism can employ an LRU or least frequently used (LFU) strategy to free up storage. Whether a file should be cached locally is probabilistically determined using a scheme similar to the ElephantTrap [21]. If cached, the number of replicas for the data is automatically increased by one, without incurring explicit network traffic. Because file caching is determined probabilistically, the algorithm is more stable and avoids the possibility of thrashing when the cache is full and eviction is required.
In summary, data access patterns in MapReduce clusters are heavy-tailed, with some files being considerably more popular than others. For nonuniform data access patterns, current replication mechanisms that replicate files a fixed number of times are inadequate, can create suboptimal task locality, and hinder the performance of MapReduce clusters. DARE [7] is an adaptive data replication mechanism that can improve data locality by more than seven times for a FIFO scheduler and 70% for the Fair scheduler, without incurring extra networking overhead. Turnaround time and slowdown are improved by 19% and 25%, respectively. The scheme is scheduler-agnostic and can be used in parallel with other schemes, such as Zaharia et al.'s delay scheduling [12], that aim to improve locality.
Several efforts have dealt with the specific case of dynamic replication in MapReduce clusters, including CDRM [24] and Scarlett [11]. In Ref. [24], Wei et al. presented CDRM, a “cost-effective dynamic replication management scheme” for cloud storage clusters. CDRM is a replica placement scheme for Hadoop that aims to improve file availability by centrally determining the ideal number of replicas for a file, along with an adequate placement strategy based on the blocking probability. The effects of increasing locality are not studied. In this chapter, we consider the case of maximizing the (weighted) overall availability of files given a replication budget and a set of file class weights. We propose an autonomic replication number computation algorithm that assigns more replicas for the files belonging to the highest-priority classes and fewer replicas for files in lower-priority classes, without exceeding the replication budget. Parallel to our work, Ananthanarayanan et al. [11] proposed Scarlett, an offline system that replicates blocks based on their observed probability in a previous epoch. Scarlett computes a replication factor for each file and creates budget-limited replicas distributed throughout the cluster with the goal of minimizing hotspots. Replicas are aged to make space for new replicas. While Scarlett uses a proactive replication scheme that periodically replicates files based on predicted popularity, we proposed a reactive approach that is able to adapt to popularity changes at smaller time scales and can help alleviate recurrent as well as nonrecurrent hotspots. Zaharia et al.'s delay scheduling [17] increases locality by delaying, for a small amount of time, a map task that – without the delay – would have run nonlocally. DARE is scheduler-agnostic and can work together with this and other scheduling techniques that try to increase locality. The delay scheduling technique is currently part of Hadoop's Fair scheduler, one of the two schedulers used in our evaluations.
The growth of data analytics for big data encourages the design of next-generation storage systems to handle peta- and exascale storage requirements. As demonstrated by DARE [7], a better understanding of the workloads for big data becomes critical for proper design and tuning. The workloads of enterprise storage systems [25], Web servers [26], and media server clusters [27] have been extensively studied in the past. There have been several studies of jobs and the workload created by jobs in big data clusters [28,29] but few storage-system-level studies [30]. A few recent studies have provided us with some limited insight on the access patterns in MapReduce scenarios [7,11,23]. However, these have been limited to features of interest to the researchers for their specific projects, such as block age at time of access [23] and file popularity [7,11]. Parallel to that work, other researchers did a large-scale characterization of MapReduce workloads, including some insights on data access patterns [31]. Their work concentrates on interactive query workloads and does not study the batch type of workload used in many production systems. Furthermore, the logs they processed are those of the Hadoop scheduler, and for this reason do not provide access to information such as the age of the files in the system, or the time when a file was deleted.
In the work we described in a prior study [3], we explored a frequently used application of big data storage clusters: those that are dedicated to supporting a mix of MapReduce jobs. Specifically, we studied the file access patterns of two multipetabyte Hadoop clusters at Yahoo! across several dimensions, with a focus on popularity, temporal locality, and arrival patterns. We analyzed two 6-month traces, which together contained more than 940 million creates and 12 billion file open events.
We identified unique properties of the workloads and made the following key observations:
Derived from those key observations and a knowledge of the domain and application-level workloads running on the clusters, we highlight the following insights and implications for storage system design and tuning:
Perhaps the work most similar to ours (in approach) is that of Cherkasova and Gupta [27], who characterized enterprise media server workloads. An analysis of the influence of new files and file life span was made, but they did not possess file creation and deletion time stamps, so they consider a file to be “new” the first time it is accessed and its lifetime to “end” the last time it is accessed. No analysis on the burstiness of requests was made.
Our work complements prior research by providing a better understanding of one type of big data workload: filling of gaps at the storage level. The workload characterization, key observations, and implications for storage system design are important contributions. More studies of big data storage workloads and their implications should be encouraged so that storage system designers can validate their designs, and deployed systems can be properly tuned [30].
For the case of the workloads we studied, the analysis demonstrated how traditional popularity metrics (e.g., the percentage of the file population that accounts for 90% of the frequency counts – in this case, accesses) can be misleading and make it harder to understand what those numbers imply about the popularity of the population (files). In our analysis, the problem arose from the high percentage of short-lived (and thus infrequently accessed) files. New or adapted models and metrics are needed to better express popularity under these conditions.
The high rate of change in file populations has some interesting implications for the design of the storage systems: Does it make sense to handle the short-lived files in the same way as longer-lived files? Tiered storage systems that combine different types of storage media for different types of files can be tailored to these workloads for improved performance. While the burstiness and autocorrelations in the request arrivals may be a result of typical MapReduce workloads in which multiple tasks are launched within some small time window (where all of the tasks are operating on different parts of the same large file or set of related files), a characterization of the autocorrelations is relevant independent of the MapReduce workload that produced them, for the following reasons.
Our research that used data analytics to investigate the behavior of big data systems [3,7] allowed us to propose new algorithms and data structures to improve their performance when Hadoop and HDFS are used. In general, it is difficult to obtain real traces of systems. Often, when data are available, the traces must be de-identified to be used for research. However, workload generation can often be used in simulations and real experiments to help reveal how a system reacts to variations in the load [32]. Such experiments can be used to validate new designs, find potential bottlenecks, evaluate performance, and do capacity planning based on observed or predicted workloads. Workload generators can replay real traces or do model-based synthetic workload generation. Real traces capture observed behavior and may even include nonstandard or undiscovered (but possibly important) properties of the load [33]. However, real trace-based approaches treat the workload as a “black box” [32]. Modifying a particular workload parameter or dimension is difficult, making such approaches inappropriate for sensitivity and what-if analysis. Sharing of traces can be hard because of their size and privacy concerns. Other problems include those of scaling to a different system size and describing and comparing traces in terms that can be understood by implementers [33].
Model-based synthetic workload generation can be used to facilitate testing while modifying a particular dimension of the workload, and can model expected future demands. For that reason, synthetic workload generators have been used extensively to evaluate the performance of storage systems [4,33], media streaming servers [34,35], and Web caching systems [32,36]. Synthetic workload generators can issue requests on a real system [32,33] or generate synthetic traces that can be used in simulations or replayed on actual systems [35,37].
Our research on this topic [38] focused on synthetic generation of object request streams, where the object can be of different types depending on context, like files [4], disk blocks [33], Web documents [36], and media sessions [35].
Two important characteristics of object request streams are popularity (access counts) and temporal reference locality (i.e., the phenomenon that a recently accessed object is likely to be accessed again in the near future) [35]. While highly popular objects are likely to be accessed again soon, temporal locality can also arise when the interarrival times are highly skewed, even if the object is unpopular [39].
For the purpose of synthetic workload generation, it is desirable to simultaneously reproduce the access counts and the request interarrivals of each individual object, as both of these dimensions can affect system performance. However, single-distribution approaches – which summarize the behavior of different types of objects with a single distribution per dimension – cannot accurately reproduce both at the same time. In particular, the common practice of collapsing the per-object interarrival distributions into a single system-wide distribution (instead of individual per-object distributions) obscures the identity of the object being accessed, thus homogenizing the otherwise distinct per-object behavior [37].
As big data applications lead to emerging workloads and these workloads keep growing in scale, the need for workload generators that can scale up the workload and/or facilitate its modification based on predicted behavior is increasingly urgent.
Motivated by previous observations about big data file request streams [3,23,40], we set the following goals for our model and synthetic generation process [41]:
Our research [41] initially considered a stationary segment [33] of the workload. It used a model based on a set of delayed renewal processes (one per object in the stream) in which the system-wide popularity distribution asymptotically emerges through explicit reproduction of the per-object request arrivals and active span (time during which an object is accessed). However, this model is unscalable, as it is heavy on resources (needs to keep track of millions of objects).
Instead, we built a lightweight version of the model that uses unsupervised statistical clustering to identify groups of objects with similar behavior and significantly reduce the model space by modeling “types of objects” instead of individual objects. As a result, the clustered model is suitable for synthetic generation.
Our synthetic trace generator uses this lightweight model, and we evaluated it across several dimensions. Using a big data storage (HDFS [16]) workload from Yahoo!, we validated our approach by demonstrating its ability to approximate the original request interarrivals and popularity distributions. (The supremum distance between the real and synthetic cumulative distribution functions – CDFs – was under 2%.) Workloads from other domains included traces for ANIM and MULTIMEDIA. ANIM is a 24-h NFS trace from a feature animation company that supports rendering of objects, obtained in 2007 [43]. MULTIMEDIA is a 1-month trace generated using the Medisyn streaming media service workload generator from HP Labs [35]. Both were also modeled successfully (with only a 1.3–2.6% distance between the real and synthetic CDFs). Through a case study in Web caching and a case study in the big data domain (on load in a replicated distributed storage system), we next showed how our synthetic traces can be used in place of the real traces (with results within 5.5% of the expected or real results), outperforming previous models.
Our model can accommodate the appearance and disappearance of objects at any time during the request stream (making it appropriate for workloads with high object churn) and is suitable for synthetic workload generation. Experiments have shown that we can generate a 1-day trace with more than 60 million object requests in under 3 min. Furthermore, our assumptions are minimal, since the renewal process theory does not require that the model be fit to a particular interarrival distribution, or to a particular popularity distribution.
In addition, the use of unsupervised statistical clustering leads to autonomic “type-awareness” that does not depend on expert domain knowledge or introduce human biases. The statistical clustering finds objects with similar behavior, enabling type-aware trace generation, scaling, and “what-if” analysis. (For example, in a storage system, what if the short-lived files were to increase in proportion to the other types of files?)
Concretely, our technical contributions [41] are (i) we provide a model based on a set of delayed renewal processes in which the system-wide popularity distribution asymptotically emerges through explicit reproduction of the per-object request interarrivals and active span; (ii) we use clustering to build a lightweight clustered variant of the model, suitable for synthetic workload generation; and (iii) we show that clustering enables workload-agnostic type-awareness, which can be exploited during scaling, what-if, and sensitivity analysis.
Synthetic workload generators are a potentially powerful approach, and several synthetic workload generators have been proposed for Web request streams [32,36]. The temporal locality of requests is modeled using a stack distance model of references that assumes that each file is introduced at the start of the trace. Although this approach is suitable for static file populations, it is inadequate for populations with high file churn [35]. Our approach is a little more flexible in that it considers files with delayed introduction.
ProWGen [36] was developed to enable investigation of the sensitivity of Web proxy cache replacement policies to three workload characteristics: the slope of the Zipf-like document popularity distribution, the degree of temporal locality in the document request stream, and the correlation (if any) between document size and popularity. Instead of attempting to accurately reproduce real workloads, ProWGen's goal is to allow the generation of workloads that differ in one chosen characteristic at a time, thus enabling sensitivity analysis of the differing characteristics. Further, through domain knowledge of Web request streams, the authors note that a commonly observed workload is that of “one-timers,” or files accessed only once in the request stream. One-timers are singled out as a special type of file whose numbers can be increased or decreased as an adjustment in relation to other types of files. In contrast, we were able to approximate the percentage of one-timers in the HDFS workload without explicitly modeling them. (Those approximations were 9.79% of the real workload and 10.69% of our synthetic trace, when the number of types of files was chosen to be 400.)
GISMO [34] and MediSyn [35] model and reproduce media server sessions, including their arrival patterns and per-session characteristics. For session arrivals, both generators have the primary goals of (i) reproducing the file popularity and (ii) distributing the accesses throughout the day based on observed diurnal or seasonal patterns (e.g., percentage of accesses to a file that occur during a specific time slot). In addition, MediSyn [35] uses a file introduction process to model accesses to new files, and explicitly considers two types of files that differ in their access patterns: regular files and news-like files. Our work allows the synthetic workload generation of objects with different types of behavior without prior domain knowledge.
In earlier work, we developed Mimesis [4], a synthetic workload generator for namespace metadata traces. While Mimesis is able to generate traces that mimic the original workload with respect to the statistical parameters included with it (arrivals, file creations and deletions, and age at time of access), reproducing the file popularity was left for future work.
Chen et al. [40] proposed the use of multidimensional statistical correlation (k-means) to obtain storage system access patterns and design insights at the user, application, file, and directory levels. However, the clustering was used for synthetic workload generation.
Hong et al. [38] used clustering to identify representative trace segments to be used for synthetic trace reconstruction, thus achieving trace compression ratios of 75–90%. However, the process of fitting trace segments, instead of individual files based on their behavior, neither facilitates deeper understanding of the behavior of the objects in the workload nor enables what-if or sensitivity analysis.
Ware et al. [37] proposed the use of two-level arrival processes to model bursty accesses in file system workloads. In their implementation, objects are files, and accesses are any system calls issued on a file (e.g., read, write, lookup, or create). Their model uses three independent per-file distributions: interarrivals to bursts of accesses, intraburst interarrival times, and distribution of burst lengths. A two-level synthetic generation process (in which burst arrivals are the first level, and intraburst accesses to an object are the second level) is used to reproduce bursts of accesses to a single file. However, the authors do not distinguish between the access to the first burst and the accesses to subsequent bursts and, as a consequence, are unable to model file churn. In addition, the authors use one-dimensional hierarchical clustering to identify bursts of accesses in a trace of per-file accesses. The trace generation process is similar to ours: one arrival process per file. However, the size of the systems they modeled (the largest ∼567 files out of a total of 8000) did not require a mechanism to reduce the model size. We are considering systems two orders of magnitude larger, so a mechanism to reduce the model size is necessary. The approach of modeling intraburst arrivals independent of interburst arrivals can be combined with our delayed first arrival plus clustering of similar objects approach to capture per-file burstiness.
The model we presented in Ref. [4] supports an analysis and synthetic generation of object request streams. The model is based on a set of delayed renewal processes, where each process represents one object in the original request stream. Each process in the model has its own request interarrival distribution, which, combined with the time of the first access to the object plus the period during which requests to the object are issued, can be used to approximate the number of arrivals or renewals observed in the original trace. Key contributions of this work in Ref. [33] include the following:
Our model is suitable for request streams with a large number of objects and a dynamic object population. Furthermore, the statistical clustering enables autonomic type-aware trace generation, which facilitates sensitivity and “what-if” analysis.
The emergence of low-latency worldwide services and the desire to have high fault tolerance and reliability necessitate geo-distributed storage with replicas in multiple locations. Social networks connecting friends from all around the world (such as LinkedIn and Facebook) or file sharing services (such as YouTube) are examples of such services. In these systems, hundreds of millions of users continually upload and view billions of diverse objects, from photos and videos to documents and slides. In YouTube alone, hundreds of hours of videos (approximately hundreds of GBs) are uploaded, and hundreds of thousands of hours of videos are viewed per minute by a billion people from all around the globe [44]. These objects must be stored and served with low latency and high throughput in a geo-distributed system while operating at large scale. Scalability in this environment is particularly important, both because of the growth in request rates and since data rarely get deleted (e.g., photos in your Facebook album).
In collaboration with LinkedIn, we developed “Ambry,” a scalable geo-distributed object store [45]. For over 2.5 years, Ambry has been the main storage mechanism for all of LinkedIn's media objects across all four of its data centers, serving more than 450 million users. It is a production-quality system for storing large sets of immutable data (called blobs). Ambry is designed in a decentralized way and leverages techniques such as logical blob grouping, asynchronous replication, rebalancing mechanisms, zero-cost failure detection, and OS caching. Our experimental results show that Ambry reaches high throughput (reaching up to 88% of the network bandwidth) and low latency (serving 1 MB objects in less than 50 ms), works efficiently across multiple geo-distributed data centers, and improves the imbalance among disks by a factor of 8X–10X while moving minimal data.
In a geo-distributed environment, locality becomes a key. Inter-data-center links, going across the continent, are orders of magnitude slower and more expensive than intra-data-center links. Thus, moving data around becomes a significant and limiting factor, and data centers should be as independent as possible. To alleviate this issue, Ambry uses a mechanism called asynchronous writes to leverage data locality. In asynchronous writes, a put request is performed synchronously only among replicas of the local data center; that is, the data in the replicas are stored either in or as close by the data center receiving the request as possible. The request is counted as successfully finished at this point. Later on, the data are asynchronously replicated to other data centers by means of a lightweight background replication algorithm.
Ambry randomly groups its large immutable data or blobs into virtual units called partitions. Each partition is physically placed on machines according to a separate and independent algorithm. Thus, logical and physical placements are decoupled, making data movement transparent, simplifying rebalancing, and avoiding rehashing of data during cluster expansion [45].
A partition is implemented as an append-only log in a pre-allocated large file. Partitions are fixed-size during the lifetime of the system. The partition size is chosen so that the overhead of managing a partition, that is, the additional data structures maintained per partition, is negligible, and the time for failure recovery and rebuild is practical. Typically, 100 GB partitions are used. Rebuilding may be done in parallel from multiple replicas, which allows the 100 GB partitions to be rebuilt in a few minutes. In order to provide high availability and fault tolerance, each partition is replicated on multiple data store nodes through a greedy approach based on available disk space.
Ambry uses an entirely decentralized replication protocol, eliminating any leader election overheads, bottlenecks, and single points of failure. In this procedure, each replica individually acts as a master and syncs up with other replicas in an all-to-all fashion. Synchronization occurs using an asynchronous two-phase replication protocol. This protocol is pull-based, whereby each replica independently requests missing data from other replicas. It operates as follows:
In order to operate on a large scale and with geo-distribution, a system must be scalable. One main design principle in Ambry is to remove any master or manager. Ambry is a completely decentralized system with an active–active design, that is, data can be read or written from any of the replicas. However, load imbalance is inevitable during expansion (scale-out); new machines are empty while old machines have years-old and unpopular data. Ambry uses a nonintrusive rebalancing strategy based on popularity, access patterns, and size. This strategy uses spare network bandwidth to move data around in the background.
The design of Ambry is inspired by log-structured file systems (LFS) [46,47]. These systems are optimized for write throughput in that they sequentially write in log-like data structures and rely on the OS cache for reads. Although these single-machine file systems suffer from fragmentation issues and cleaning overhead, the core ideas are very relevant, especially since blobs are immutable. The main differences between the ideas of a log-structured file system and Ambry's approach are the skewed data access pattern in Ambry's workload and a few additional optimizations used by Ambry, such as segmented indexing and Bloom filters.
In large file systems, metadata and small files need efficient management to reduce disk seeks [48] by using combinations of log-structured file systems (for metadata and small data), fast file systems (for large data) [49], and stores for the initial segment of data in the index block [50]. Our system resolves this issue by using in-memory segmented indexing plus Bloom filters and batching techniques.
NFS [51] and AFS [52], GFS [22], HDFS [16], and Ceph [53] manage large amounts of data, data sharing, and reliable handling of failures. However, all these systems suffer from the high metadata overhead and additional capabilities (e.g., nested directories and permissions) unnecessary for a simple blob store. In many systems (e.g., HDFS, GFS, and NFS), a separate single metadata server increases metadata overhead, is a single point of failure, and limits scalability. Metadata [53] can be distributed or cached [54]. Although these systems reduce the overhead for accessing metadata, each small object still has a large amount of metadata (usually stored on disk), decreasing the effective throughput of the system.
Key-value stores [8,55–57] handle a large number of requests per second in a distributed manner, but currently are unable to handle massively large objects (tens of MBs to GBs) efficiently, and add overhead to provide consistency that is unnecessary in Ambry. Certain systems [55–57] hash data directly to machines, and that can create large data movement whenever nodes are added/deleted.
PNUTS [58] and Spanner [59] are scalable, geographically distributed systems, where PNUTS maintains load balance as well. However, both systems provide more features and stronger guarantees than needed in a simple immutable blob store.
A concept similar to that of partitions in Ambry has been used in other systems. Haystack uses logical volumes [60]; Twitter's blob store uses virtual buckets [6]; and the Petal file system introduces virtual disks [61]. Ambry can reuse some of these optimizations, like the additional internal caching in Haystack. However, neither Haystack nor Twitter's blob store tackle the problem of load imbalance. Further, Haystack uses synchronous writes across all replicas, impacting efficiency in a geo-distributed setting.
Facebook has also designed f4 [62], a blob store that uses erasure coding to reduce the replication factor of old data (that have become cold). Despite the novel ideas in this system, which potentially can be included in Ambry, our main focus is on both new and old data. Oracle's Database [63] and Windows Azure Storage (WAS) [64] also store mutable blobs, and WAS is even optimized for a geo-distributed environment. However, they both provide additional functionalities, such as support for many data types other than blobs, strong consistency guarantees, and modification to blobs, that are not needed in our use case.
To summarize, our contributions with Ambry were as follows: (i) we designed and developed an industry-scale object store optimized for a geo-distributed environment, (ii) we minimized cross-data-center traffic by using asynchronous writes that update local data-center storage and propagate updates to storage at more remote sites in the background, (iii) we developed a two-phase background replication mechanism, and (iv) we developed a load-balancing mechanism that returns the system to a balanced state after expansion.
Clouds are complex systems involving difficult scalability concerns. Availability, workload considerations, performance, locality, and geo-distribution are some of the topics discussed in this chapter. We examine many of the key issues related to those topics that concern the design of applications and storage for clouds, and we show how knowledge of the behavior of a cloud is important in matching its services to user and cloud provider requirements. The scalability concerns of a cloud impact the behavioral analysis of its operation because example workloads may be enormous, involve large periods of time and huge volumes of data, process high-velocity data, or contain data for which there are privacy concerns. Building of tools that can aid in modeling of cloud workloads and be employed in the design of appropriate clouds is of considerable interest to researchers and developers.
This chapter discussed how, for example, the design of systems of applications in clouds in the social networking area is becoming influenced by extreme workloads that involve billions of users and petabytes of data. In the future, cloud processing will continue to push the computation toward the data, an attribute we have seen earlier in many applications such as MapReduce and graph processing. Edge computing is an optimization for building large, potentially geographically distributed clouds by pushing data processing out to the edge of the networks, toward the source of the data. In our research, we have analyzed applications such as Ambry that allow cloud computing to stretch across the world and enable geographically distributed computation. Follow-on work after Ambry's development has examined stream processing solutions for cloud computing in the form of LinkedIn's Samza [65]. In Samza, information gathered from clients may consist of large, complex media objects that need low-latency data analysis. Samza reduces latency by not writing these media objects to long-term storage until they have been stream-processed. The reliability concerns for Samza include how to recover any data that are only stored in volatile random access memory if there is a fault (e.g., a power failure) in stream processing. Samza uses fault tolerance in the form of both redundant processing and fast restart to avoid lost data and minimize delay.
Looking to the future, the performance of cloud-based large machine learning computations is likely to become a major issue in cloud computing. In the few months prior to the time of this writing, production implementations of TensorFlow [66] and other deep-learning systems have come online at Google, Amazon, and Microsoft [67–69]. The models from these systems are used for inferencing in both clouds and local devices such as cell phones. These systems, when coupled with Edge learning systems or smartphones, form complex distributed learning systems that require performance analysis and evaluation. Ubiquitous sensors, autonomous vehicles that exchange state information about traffic conditions, and a host of close-to-real-time and health applications continue to expand the boundaries of cloud computing.
The future is exciting and difficult to predict. Regardless of whether edge computing will create “Cloudlet” solutions, how stream processing will influence design, or what new technology is used in these future cloud extensions, the techniques discussed in this chapter – measurement, modeling, and optimization based on performance – will still govern the design of such systems.
18.223.209.118