© Hillel Wayne 2018
Hillel WaynePractical TLA+https://doi.org/10.1007/978-1-4842-3829-5_11

11. MapReduce

Hillel Wayne1 
(1)
Chicago, Illinois, USA
 

In this chapter, we will develop a large specification to fully show what the process looks like, ideation, missteps and all.

Problem Overview

MapReduce was one of the first Big Data algorithms . It helped Google scale quickly and handle huge amounts of data, providing the foundation of Hadoop and the big data revolution. Instead of doing a calculation on a single computer, you distribute it among several computers (the map) and use one to combine the data after. The typical example is counting the number of words in 1,000,000 books. It might not fit in memory, so here’s how you can MapReduce the calculation among four computers:
  1. 1.

    The first computer is labeled the reducer. The other three are labeled workers.

     
  2. 2.

    The reducer assigns books 1, 4, 7 … to the first worker, 2, 5, 8 … to the second worker, and the remaining books to the third worker.

     
  3. 3.

    Each worker calculates the number of words in its assigned books and reports that back to the reducer.

     
  4. 4.

    The reducer sums the numbers together to get the final wordcount.

     
This is an informal description and glazes over a lot of details. How does the assignment work? How do workers report back their final counts? How does the reducer know when a worker is done? What happens if a node goes down? There’s an ambiguity to the spec that can lead to buggy implementations or worse, a fundamentally broken design. We will specify MapReduce in three stages:
  1. 1.

    A first spec that assumes all workers always succeed.

     
  2. 2.

    A second, fault tolerant spec that allows workers to fail.

     
  3. 3.

    A final spec that works even if the recovery mechanism partially fails, too.

     

As with the previous chapter, I will be showing the development process and the dead ends we can run into. That way, if you encounter these issues in your own specifications, you’ll have some resources to address them.

Part One: Basics

I called the specification MapReduce and the root file main.
---- MODULE main ----
EXTENDS TLC, Sequences, Integers
PT == INSTANCE PT
CONSTANTS Workers, Reducer, NULL

I automatically extend TLC, Sequences, and Integers because they’re almost always useful. My assumption is that we’ll have a single reducer process and multiple worker processes, so I make them constants. Workers will be a model set, Reducer a model constant. That way we can tweak the number of workers in the spec. I also add NULL because it comes in handy all of the time. I’ll take it out if the spec doesn’t need it, but my base prior is that it will.

What input are we putting in? I think we can assume that each “book” can be represented as a single number (its wordcount), the list of all “books” represented by a sequence of numbers. I want a sequence instead of a set because multiple books might have the same wordcount.

What, then, is the set of possible inputs? There can be an arbitrary number of books, and each book can have an arbitrary number of words. In practice, this can’t be model checked, so I’ll limit it. I could add two more constants so we can test the spec with different ranges in different models. That’s some extra overhead that’s easy to add later, so for now, we’ll hard-code it. I choose four books to evenly split among two workers and allow each to have a wordcount in 0..2. This seems like it would cover enough cases to give us reasonable confidence.
PossibleInputs == PT!TupleOf(0..2, 4)

As I write this, though, I immediately see a question: We’re assuming there are four books. What if there is one, or zero? Will the algorithm still work? I decide to leave that out until later. Checking a wider range may be safer, but it will also take longer to model check, and we want to get quick feedback at the start. That way we can remove the obvious bugs before looking for the subtler ones. Later, as we decide to explore a wider state space, we’ll replace the hard-coded numbers with CONSTANTS.

Next question: What does it mean for our algorithm to be correct? That the final value it gets is equal to the total wordcount, aka the sum of the inputs. Whatever we implement, then, at some point our goal is to ensure that assert final = actual_sum_of_inputs. We need an operator that accurately calculates that for us. This is doable with PT!ReduceSeq.
SumSeq(seq) == PT!ReduceSeq(LAMBDA x, y: x + y, seq, 0)
These are enough support operators for now. Let’s start on the system itself. I'd prefer to keep the spec general, thinking of items instead of books and values instead of wordcounts, so we will use those terms going forward.
(*--algorithm mapreduce
variables input in PossibleInputs;
process reducer = Reducer
variables final = 0;
begin
  Schedule:
    skip;
  ReduceResult:
    skip;
  Finish:
    assert final = SumSeq(input);
end process;
process worker in Workers
begin
  Worker:
    skip;
end process;
end algorithm; *)
====
We know what the overall structure looks like and we know what we want at the end, so this is a start. I then created a model with the following constants:
Workers <- [model value] {w1, w2}
Reducer <- [model value]
NULL <- [model value]

Uncheck Deadlock, we don’t need it for now. Run the model. If everything is set up properly, you should see a model failure. This is to be expected, as we haven’t actually implemented anything.

Ideally, we want to break this down in a way so that we can check that every subcomponent looks okay without having to write the whole spec first. Let’s start by assuming that each worker has figured out some final value: How do we get the reducer to get the value? My mind immediately goes to the reducer pinging a worker to see if it has a return value yet. If it doesn’t have something ready, it returns NULL. Each worker has its own result value, which suggests a function. Since both the reducer and the workers will interact with the result value, the function would have to be global.
variables
  input in PossibleInputs,
  result = [w in Workers |-> NULL];
* ...
process worker in Workers
begin
  Worker:
    result[self] := 5;
end process;

The reducer should wait until a worker has changed its result to something other than NULL and then add the new value to its own total. While writing what this looks like, it occurs to me that we don’t want the reducer reading a non-null result from the same worker twice. We could prevent this by setting its result back to NULL. But that doesn’t seem right to me: if we set a result back to NULL, there’s no way of knowing whether that worker had finished and been consumed, or if it’s still working. A better idea would be for the reducer to privately track which worker results it has consumed.

This is a common loop when writing TLA+ specs: planning your next step, realizing it will lead to a problem later, and fixing it in advance. While TLC is very useful, most of the model checking happens in your head.
process reducer = Reducer
variables final = 0, consumed = [w in Workers |-> FALSE];
begin
  Schedule:
    skip;
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      with w in {w in Workers:  ~consumed[w] / result[w] /= NULL} do
        final := final + result[w];
        consumed[w] := TRUE;
      end with;
    end while;
  Finish:
    assert final = SumSeq(input);
end process;

Try this again and confirm it fails. But it should fail with final = 10. Inspecting the error trace, you should see that final only increments after a worker has run. That suggests to us ReduceResult is successfully summing and retrieving the worker values.

Next we’ll implement the logic for actually processing the inputs. The worker has a private variable we’ll call total, starting at 0. The reducer would send each worker a sequence of items. The worker would iterate through the items, adding each value to the total. When the worker has processed all of the items assigned to it, it sets result[self] to total so that the reducer can read it.
variables
  input in PossibleInputs,
  result = [w in Workers |-> NULL],
  queue = [w in Workers |-> <<1, 2>>]; * for testing
* ...
process worker in Workers
variables total = 0;
begin
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
    end while;
  Result:
    result[self] := total;
end process
Since we hard-coded the queues for each worker, they each set result[self] = 3 and the model fails with final = 6. But I see another problem here: we know we’re done when the queue is empty. But the queue will start out empty until the reducer sends something to the worker! There’s nothing stopping the worker from seeing the initial queue and saying “I’m done.” We’ll have to add one more step to the worker:
process worker in Workers
variables total = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
    end while;
  Result:
    result[self] := total;
end process

Better. We have to put the await in a separate step from the while, because while loops must come directly after a label. One thing I notice is that result[self] will never be updated if we never send any items to this worker. That’s not a problem just yet but will be a problem if we have fewer items than nodes. It’s something to keep an eye on.

We now have everything except the Schedule step . This requires us to divvy up the inputs among all of the workers. Up above I decided that we’d assign each one based on their order. However, there’s no predefined order to the workers. By using PT!OrderSet on them, we can arbitrarily pick one as the first worker, one as the second, etc. What’s important, however, is that TLC will not try to break the spec by reordering the workers. This means that our spec works only if we assume that ordering of workers does not matter. Whenever we write specs, we should carefully keep track of our assumptions and recheck them regularly to confirm they’re still safe.

So, once we have an ordering on the workers, we can use PT!SelectSeqByIndex to assign them. Here’s how I did it:
variables
  input in PossibleInputs,
  result = [w in Workers |-> NULL],
  queue = [w in Workers |-> <<>>]; * remove hardcoding
process reducer = Reducer
variables result = 0, consumed = [w in Workers |-> FALSE];
begin
  Schedule:
    with worker_order = PT!OrderSet(Workers) do
      queue := [ w in Workers |->
        LET offset == PT!Index(worker_order, w) - 1 * sequences start at 1
        IN PT!SelectSeqByIndex(input, LAMBDA i: i % Len(worker_order) = offset)
      ];
    end with;

The code is a little complex, but all we are doing is assigning a number to each worker and cyclically assigning the items to each worker. We run into some friction because sequences have domain 1..n, while x % n has range 0..(n-1). We have to subtract 1 from our offsets to keep them in sync.

Tip

PT!SeqMod has the proper modulo semantics for sequences. You can define a %% b == PT!SeqMod(a, b) if you’d like.

Now we have everything in order. If you run the spec now, it should pass with 7,209 states. We’ve completed the first part of this example. Our final spec should look like this:
EXTENDS TLC, Sequences, Integers
PT == INSTANCE PT
CONSTANTS Workers, Reducer, NULL
PossibleInputs == PT!TupleOf(0..2, 4)
SumSeq(seq) == PT!ReduceSeq(LAMBDA x, y: x + y, seq, 0)
(*--algorithm mapreduce
variables
  input in PossibleInputs,
  result = [w in Workers |-> NULL],
  queue = [w in Workers |-> <<>>];
process reducer = Reducer
variables final = 0, consumed = [w in Workers |-> FALSE];
begin
  Schedule:
    with worker_order = PT!OrderSet(Workers) do
      queue := [ w in Workers |->
        LET offset == PT!Index(worker_order, w) - 1 * sequences start at 1
        IN PT!SelectSeqByIndex(input, LAMBDA i: i % Len(worker_order) = offset)
      ];
    end with;
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      with w in {w in Workers:  ~consumed[w] / result[w] /= NULL} do
        final := final + result[w];
        consumed[w] := TRUE;
      end with;
    end while;
  Finish:
    assert final = SumSeq(input);
end process;
process worker in Workers
variables total = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
    end while;
  Result:
    result[self] := total;
end process;
end algorithm; *)
* BEGIN TRANSLATION
* ...
* END TRANSLATION

Part Two: Liveness

assert final = SumSeq(input); doesn’t actually check that our spec gets the right answer. It checks that if gets a final answer, then the answer is the right one. In other words, we’ve demonstrated safety but still have to do liveness. Let’s add a temporal property after the translation:
* BEGIN TRANSLATION
* ...
* END TRANSLATION
Liveness == <>[](final = SumSeq(input))

Add this as a temporal property and rerun the model. You should see it fail, eventually reaching a stuttering step. If the workers never complete, then we will never finish reducing.

We could solve this by making the workers fair processes. But they aren’t. Workers crash all the time in the field, and MapReduce should assume they can fail. We want our algorithm to work not only in the happy path, but also be fault tolerant. This makes up part two of this example: ensuring that MapReduce continues to work if some (but not all) of the workers stutter.

To simplify this step, we will make the following assumptions:
  1. 1.

    The reducer is fair. If it’s not, we can’t guarantee anything happens.

     
  2. 2.

    There is at least one fair worker. If there’s none, then we can easily see the algorithm couldn’t possible succeed: just have every worker keep crashing and you’ll never meet Liveness.

     
  3. 3.

    It doesn’t matter which worker is the fair one. This assumption significantly reduces our state space, since we can arbitrarily pick one with CHOOSE.

     
  4. 4.

    The reducer may or may not detect an unfair worker failing, but it will never falsely decide a fair worker has failed. This is the biggest assumption here, but it’s an assumption that makes our system a lot easier to design .

     
The fair and unfair workers have the same implementation; they only differ in whether or not they may crash. We can do a similar thing here that we did in Chapter 6: extracting the body of worker into a procedure and then making each type of worker call that procedure.
EXTENDS TLC, Sequences, Integers, FiniteSets
PT == INSTANCE PT
CONSTANTS Workers, Reducer, NULL
SumSeq(seq) == PT!ReduceSeq(LAMBDA x, y: x + y, seq, 0)
FairWorkers == CHOOSE set_w in SUBSET Workers: Cardinality(set_w) = 1
UnfairWorkers == Workers FairWorkers
(*--algorithm mapreduce
*
procedure work()
  variables total = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
    end while;
  Result:
    result[self] := total;
    return;
end procedure;
fair process reducer = Reducer
* same body
fair process fair_workers in FairWorkers
begin FairWorker:
  call work();
end process;
process worker in UnfairWorkers
begin RegularWorker:
  call work();
end process;

FAIRNESS AND SAFETY

Why did we hard-code a single fair worker? Why not make it some subset, and let TLC check all possible subsets? The behavior of a fair process is a strict subset of the behavior of an unfair process. If a fair process would violate safety, then so would an unfair process. Conversely, if an unfair process is safe, then so is a fair one. There’s no need to check that safety is preserved with two fair workers: TLC will happily check that on its own.

Now we are guaranteed that at least one worker will finish its assigned queue. Rerun the model and it should still fail, but it may fail after more steps complete. While one worker completes, the rest may not, and the reducer still waits forever.

Now for the change in Reducer. While it’s waiting to get all of the data in, it can do one of two things. First, it can do the standard “take worker, declare consumed, add to total.” We won’t touch that, so I pulled it into its own macro called reduce() to make the spec cleaner. Reminder: macros must go above procedures in a spec.
(*--algorithm mapreduce
variables
  input in PossibleInputs,
  result = [w in Workers |-> NULL],
  queue = [w in Workers |-> <<>>];
macro reduce() begin
  with w in {w in Workers:  ~consumed[w] / result[w] /= NULL} do
    final := final + result[w];
    consumed[w] := TRUE;
  end with;
end macro;
procedure work()
* ...
fair process reducer = Reducer
variables final = 0, consumed = [w in Workers |-> FALSE];
begin
  Schedule:
    * Same as before
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      reduce();
    end while;
      * ...

The other thing it can move a failing worker’s queue to a valid worker. How does it know if a worker is failing? Again, this is abstraction specific, but for now we can think of it like this: if a worker result is consumed, then it definitely didn’t fail. So any worker we haven’t consumed might have failed. We’ll pick one of those.

In this phase, we’re not distinguishing between a node that has crashed and a node that hasn’t finished. Without implementing something like a heartbeat protocol, both types of nodes look the same to the reducer. We’re demonstrating that the system is correct, not necessarily that it’s efficient. Also, we’re allowing ourselves, again for simplification, to always pick a fair worker to move data to. This is not a safe assumption for a production system, and we will address it in more detail in Part 3.
fair process reducer = Reducer
variables final = 0, consumed = [w in Workers |-> FALSE];
begin
  Schedule:
    * Same as before
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with from_worker in {w in UnfairWorkers:  ~consumed[w] / result[w] = NULL},
             to_worker in FairWorkers do
          * REASSIGN LOGIC
          * how does it know what to move?
          * And how does it move it?
          skip;
        end with;
     end either;
    end while;

* Reduce and * Reassign are ‘real’ comments in the spec, not just teaching annotations. I could have made them labels. This would make the concurrency more fine-grained at the cost of a slower model. I don’t think they’re necessary yet. I may choose to add them later, once I have the coarser concurrency model working properly. So, what goes in the * REASSIGN LOGIC block? Here’s how I thought through it.

First of all, this is handling a failed worker, right? But the while loop keeps us going until we’ve consumed all of the workers. If a worker truly crashed, we’d be waiting on it forever. If the worker didn’t crash and later gets a result, it will mess up our calculations. Either way, we can prevent the problem by declaring the failed worker consumed. We might also want to record that we thought it failed, but that’s not (yet) useful to us. I’ll burn that bridge when we get to it.

Similarly, on one hand, we might reassign to a consumed worker. If that happens, our calculations are off: the reassigned items will never be totaled. On the other hand, if we only reassign to unconsumed workers, what happens if the last worker we’re waiting on fails? There wouldn’t be any unconsumed workers to reassign to. That tells me we want a compromise: we can reassign to any fair worker, but if it was consumed, we have to unconsume it.
with from_worker in {w in UnfairWorkers:  ~consumed[w] / result[w] = NULL},
     to_worker in FairWorkers do
  * REASSIGN LOGIC
  consumed[from_worker] := TRUE ||
  consumed[to_worker] := FALSE;
end with;

Add this and rerun the model. It will still fail, of course, but it should fail because of the assert statement, not the Liveness property. While we still can’t guarantee it gets the correct answer, at least we can guarantee it gets some answer.

Now for part two: What does it actually mean to “reassign”? Ideally, that we dump anything that was in from_worker‘s queue into to_worker’s queue. But we can’t get that ’anything’ from queue! It represents the data we sent directly between the reducer and the fair worker, so using it would violate our abstraction. Also, we’re destructively updating it, so we can’t guarantee it’s correct data.

Rather, the reducer has to “know” what it sent to from_worker so it can send the same items to to_worker. We can most easily do this by having it locally track the assignments. Then, we can append from_worker’s assignment to to_worker’s queue.
fair process reducer = Reducer
variables final = 0,
consumed = [w in Workers |-> FALSE],
assignments = [w in Workers |-> <<>>];
begin
  Schedule:
    with worker_order = PT!OrderSet(Workers) do
      queue := [ w in Workers |->
        LET offset == PT!Index(worker_order, w) - 1 * sequences start at 1
        IN PT!SelectSeqByIndex(input, LAMBDA i: i % Len(worker_order) = offset)
      ];
      assignments := queue;
    end with;
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with
          from_worker in {w in UnfairWorkers: ~consumed[w] / result[w] = NULL},
          to_worker in FairWorkers
        do
          assignments[to_worker] :=
            assignments[to_worker] o
            assignments[from_worker];
          queue[to_worker] :=
            queue[to_worker] o
            assignments[from_worker];
          consumed[from_worker] := TRUE ||
          consumed[to_worker] := FALSE;
        end with;
     end either;
    end while;
  Finish:
    assert final = SumSeq(input);
end process;

I’m tempted to merge consumed and assignments into a single structure. But since I’m reassigning to the entire queue at once in Schedule, nesting it in a structure would make that mutation considerably more complicated. I also update assignments in the Reassign block. It doesn’t yet change the current behavior of the spec, but it is more comprehensive.

Update the spec and rerun the model checker. This time, the error is a little more complex. The exact details it finds on your computer may be slightly different, but the overall error is this:
  1. 1.

    The reducer assigns values to each worker.

     
  2. 2.

    The fair worker finishes.

     
  3. 3.

    We reassign new items for the fair worker.

     
  4. 4.

    The fair worker is already done, so it doesn’t update.

     
  5. 5.

    We reuse the same result, getting the wrong final answer.

     

Change return to goto WaitForQueue, recompile, and rerun. This still fails, because we didn’t null out the relative value. Add result[to_worker] := NULL; to the with statement and try again. This fails because the reducer can reassign after the worker finishes its queue but before it can run Result. Every small tweak we make leads to a different concurrency error.

Let’s take a step back. While blind guessing can work for tests or typecheckers, it won’t help with specification. We need to think about what we’re doing. Our problem here is this: the reducer has no way of knowing whether a given result includes every item assigned to the worker. We originally knew that because the worker would only write a result once it had completed the entire queue. But we can no longer rely on that. What can we do instead?

What if we included more information in the result? The reducer knows how many items it assigned to the worker. The worker knows how many items it completed. What if, when it was done, the worker sent back both the final result and the number of processed items? Then the reducer knows to consume it only when it matches the size of the assignment . This change touches on many parts of the spec, so they are listed below in isolation:
  result = [w in Workers |-> [total |-> NULL, count |-> NULL]],
macro reduce() begin
  with
    w in {w in Workers:
       ~consumed[w] / result[w].count = Len(assignments[w])}
  do
    final := final + result[w].total;
    consumed[w] := TRUE;
  end with;
end macro;
procedure work()
  variables total = 0, count = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
      count := count + 1;
    end while;
  Result:
    result[self] := [total |-> total, count |-> count];
    goto WaitForQueue;
end procedure;
* in reducer
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with
          from_worker in {w in UnfairWorkers:
            ~consumed[w] /  result[w].count /= Len(assignments[w])
          },
* ...
Make the updates, recompile, rerun. The good news is that it’s no longer getting an 11-step error. The bad news is it’s now getting a 17-step error. Inspect the error trace and see if you can see what the problem is. You should get something similar to this:
  1. 1.

    We assign <<0, 1>> to the fair worker and <<0, 1>> to the unfair worker.

     
  2. 2.

    The fair worker completes as normal. It sets result.total to 1. The reducer reads it and sets final to final := 0 + 1.

     
  3. 3.

    The reducer reassigns the unfair worker’s assignments to worker.

     
  4. 4.

    The fair worker completes the new assignment. It sets result.total to 2.

     
  5. 5.

    The reducer sets final := 1 + 2.

     
  6. 6.

    The reducer completes with final = 3, SumSeq(input) = 2. Error.

     
The problem is we’re double-counting the first assignment in final. One way to fix this would be to subtract the worker’s old result from final whenever we reassign. That seems error prone, so I’d rather track the final result from each worker and invalidate that on reassignment. Then, in Finish, we sum up the final results.
macro reduce() begin
  with
    w in {w in Workers:
       ~consumed[w] / result[w].count = Len(assignments[w])}
  do
    final[w] := result[w].total;
    consumed[w] := TRUE;
  end with;
end macro;
* ...
fair process reducer = Reducer
variables final = [w in Workers |-> 0],
consumed = [w in Workers |-> FALSE],
* In Reassign
          assignments[to_worker] :=
            assignments[to_worker] o
            assignments[from_worker];
          queue[to_worker] :=
            queue[to_worker] o
            assignments[from_worker];
          consumed[from_worker] := TRUE ||
          consumed[to_worker] := FALSE;
          final[to_worker] := 0;
        end with;
     end either;
    end while;
  Finish:
    assert SumSeq(final) = SumSeq(input)
* ...
Liveness == <>[](SumSeq(final) = SumSeq(input))

This, finally, is successful (32,238 states)! Try adding a third or even fourth worker and confirm that the spec is still successful. We now have a working, fault-tolerant version of MapReduce.

Then again, we assumed that the reducer would only reassign from unfair workers and to fair ones. How could it know, though? It’s not like it can tell which workers are stuttering from the outside. In the next section, we will account for exactly that.

Here’s our current version of the spec:
EXTENDS TLC, Sequences, Integers, FiniteSets
PT == INSTANCE PT
CONSTANTS Workers, Reducer, NULL
PossibleInputs == PT!TupleOf(0..2, 4)
SumSeq(seq) == PT!ReduceSeq(LAMBDA x, y: x + y, seq, 0)
FairWorkers == CHOOSE set_w in SUBSET Workers: Cardinality(set_w) = 1
UnfairWorkers == Workers FairWorkers
(*--algorithm mapreduce
variables
  input in PossibleInputs,
  result = [w in Workers |-> [total |-> NULL, count |-> NULL]],
  queue = [w in Workers |-> <<>>];
macro reduce() begin
  with
    w in {w in Workers:
      result[w].count = Len(assignments[w]) / ~consumed[w]}
  do
    final[w] := result[w].total;
    consumed[w] := TRUE;
  end with;
end macro;
procedure work()
  variables total = 0, count = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
      count := count + 1;
    end while;
  Result:
    result[self] := [total |-> total, count |-> count];
    goto WaitForQueue;
end procedure;
fair process reducer = Reducer
variables final = [w in Workers |-> 0],
consumed = [w in Workers |-> FALSE],
assignments = [w in Workers |-> <<>>];
begin
  Schedule:
    with worker_order = PT!OrderSet(Workers) do
      queue := [ w in Workers |->
        LET offset == PT!Index(worker_order, w) - 1 * sequences start at 1
        IN PT!SelectSeqByIndex(input, LAMBDA i: i % Len(worker_order) = offset)
      ];
      assignments := queue;
    end with;
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with
          from_worker in {w in UnfairWorkers:
              result[w].count /= Len(assignments[w]) / ~consumed[w]
          },
          to_worker in FairWorkers
        do
          assignments[to_worker] :=
            assignments[to_worker] o
            assignments[from_worker];
          queue[to_worker] :=
            queue[to_worker] o
            assignments[from_worker];
          consumed[from_worker] := TRUE ||
          consumed[to_worker] := FALSE;
          final[to_worker] := 0;
        end with;
     end either;
    end while;
  Finish:
    assert SumSeq(final) = SumSeq(input);
end process;
fair process fair_workers in FairWorkers
begin FairWorker:
  call work();
end process;
process worker in UnfairWorkers
begin RegularWorker:
  call work();
end process
end algorithm; *)
* TRANSLATION
Liveness == <>[](SumSeq(final) = SumSeq(input))

Part Three: Statuses

In theory, we don’t have a way of distinguishing failing nodes from passing ones. In practice, we can do things that give us a reasonable amount of confidence. For example, we can ping all the servers every N seconds and assume that the ones that don’t answer in time are failing. Of course, the node might not be failing, and it could be that our reducer is acting up.

There are a few different ways of exploring this space. We could manually specify a heartbeat protocol, for one. In the interests of keeping this book under 600 pages, I’d like to simulate this by just loosening an assumption. Before, the server could move the queue of an unfair worker to a fair one. Now, the server can still move the assignments of an unfair worker but does not know which ones are fair. Instead, it must decide which worker to pick. We will continue to assume the system never reassigns away from a fair worker, as that worker always responds to the heartbeat.
  ReduceResult:
    while E w in Workers: ~consumed[w] do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with
          from_worker in {w in UnfairWorkers:
               ~consumed[w] / result[w].count /= Len(assignments[w])
          },
          to_worker in Workers {from_worker}
          * . . .
We need to have two unfair workers to have different behavior here. Since one worker must always be fair by assumption, this means we’ll need at least three total.
Workers <- {w1, w2, w3}

If you now run this, you will see … the model never stops, ever. It turns out we accidentally created an unbounded model. TLC was able to find an infinite number of unique states. This suggests something is growing without limit, such as always being able to increment a number. That, obviously, is an error, too.

In the last chapter we mentioned that it’s good practice to define a TypeInvariant to constrain the values of our variables. We didn’t do that here and it came back to bite us. One of the type invariants of our system is that there’s only a fixed number of items, so no worker should have more than that number of items enqueued. If we had written a type invariant, TLC would have failed that state with an error instead of running forever. Let’s define it now.

But first we need to update PossibleInputs. We hard-coded the inputs to PossibleInput, including the number of items. If we want our invariant to refer to that count, we need to make it a distinct operator or constant. And if we’re doing that, we might as well make it a constant so we can use ASSUME and multiple models.
CONSTANTS ItemRange, ItemCount
ASSUME ItemRange subseteq Nat
ASSUME ItemCount in Nat
PossibleInputs == PT!TupleOf(ItemRange, ItemCount)
To get the original (infinite) model behavior back, set ItemRange <- 0..2, ItemCount <- 4. Since TypeInvariant is an invariant that uses our PlusCal variables , we need to put it in a define block above the macro:
variables
  input in PossibleInputs,
  result = [w in Workers |-> [total |-> NULL, count |-> NULL]],
  queue = [w in Workers |-> <<>>];
define
  TypeInvariant ==
    / A w in Workers:
      / Len(queue[w]) <= ItemCount
      / A item in 1..Len(queue[w]):
        queue[w][item] in ItemRange
      / / result[w].total = NULL
         / result[w].total <= SumSeq(input)
      / / result[w].count = NULL
         / result[w].count <= ItemCount
end define;
macro reduce() begin

We could be more elaborate, but the important thing here is that we check all of the queues have fewer than ItemCount items in them. We could also check the types of the private variables, in which case TypeInvariant would have to go after the translation.

If you set INVARIANT TypeInvariant and rerun the model, you should get a definite failure. What happens is we can reassign away from a worker, mark it consumed, and then reassign back to the worker and unconsume it. This effectively duplicates its queue, leading to an incorrect count.

For the actual fix, I considered “wiping” the old assignments from the internal queue. This might work, but we track when workers are done by the number of assignments they complete. Wiping the assignments might conflict with that logic. A simpler solution is to recognize that “consumed” is tracking two separate responsibilities: a node that’s finished working, and a node that we consider bad. There’s actually three states, though: “active,” “inactive,” and “broken.” We’re done when there are no “active” nodes. When reassigning, we only reassign to “active” or “inactive” nodes. Let’s make those changes.

First, I define a new variable status, which tracks the state of each worker. This is internal to the reducer, but for convenience purposes we put it in the global scope. That way we can add a helper operator that gets all active workers, and add the possible statuses to our type invariant.
(*--algorithm mapreduce
variables
  input in PossibleInputs,
  result = [w in Workers |-> [total |-> NULL, count |-> NULL]],
  queue = [w in Workers |-> <<>>],
  status = [w in Workers |-> "active"]; * Only reducer should touch this
define
  ActiveWorkers == {w in Workers: status[w] = "active"}
  HealthyWorkers == {w in Workers: status[w] /= "broken"}
  TypeInvariant ==
    / status in [Workers -> {"active", "inactive", "broken"}]
    / A w in Workers:
      / Len(queue[w]) <= ItemCount
      / A item in 1..Len(queue[w]):
        queue[w][item] in ItemRange
      / / result[w].total = NULL
         / result[w].total <= SumSeq(input)
      / / result[w].count = NULL
         / result[w].count <= ItemCount
end define;
Instead of reducing overconsumed workers, we reduce over the workers that are active. Instead of setting them to consumed, we set their status to inactive.
macro reduce() begin
  with
    w in {w in ActiveWorkers:
      result[w].count = Len(assignments[w])
      }
  do
    final[w] := result[w].total;
    status[w] := "inactive";
  end with;
end macro;
The rest of the changes are in ReduceResult. We can get rid of the consumed variable, because we’re now tracking statuses instead. Our while loop loops as long as there are active workers. Now that we have a helper operator for the set of active workers, we can replace the conditional with an empty-set check.
  ReduceResult:
    while ActiveWorkers /= {} do
In our with statement, we restrict our from_worker to only the workers that are active, not inactive or broken. Our to_worker can be active or inactive, but it may not be broken.
        with
          from_worker in ActiveWorkers FairWorkers,
          to_worker in HealthyWorkers {from_worker}
        do
Finally, instead of setting the from_worker to inactive, we set it to “broken.” That ensures we don’t ever try to reassign anything to that worker.
          status[from_worker] := "broken" ||
          status[to_worker] := "active";
          final[to_worker] := 0;

A quick back-of-the-envelope suggests that the fixed model is going to have a very high number of states. To sanity-check my fixes, I decide to run them on a smaller model first. I clone the first one and set Workers <- {w1, w2}, ItemCount <- 2, ItemRange <- 0..2. One fewer worker and half the items assigned. I use the same invariants and properties and run the new model, getting a pass with 2,664 states.

With that, I run the main model. It passes with 2,147,724 states, so I’m confident that my fix works. Final spec:
EXTENDS TLC, Sequences, Integers, FiniteSets
PT == INSTANCE PT
CONSTANTS Workers, Reducer, NULL
CONSTANTS ItemRange, ItemCount
ASSUME ItemRange subseteq Nat
ASSUME ItemCount in Nat
PossibleInputs == PT!TupleOf(ItemRange, ItemCount)
SumSeq(seq) == PT!ReduceSeq(LAMBDA x, y: x + y, seq, 0)
FairWorkers == CHOOSE set_w in SUBSET Workers: Cardinality(set_w) = 1
UnfairWorkers == Workers FairWorkers
(*--algorithm mapreduce
variables
  input in PossibleInputs,
  result = [w in Workers |-> [total |-> NULL, count |-> NULL]],
  queue = [w in Workers |-> <<>>],
  status = [w in Workers |-> "active"];
define
  ActiveWorkers == {w in Workers: status[w] = "active"}
  HealthyWorkers == {w in Workers: status[w] /= "broken"}
  TypeInvariant ==
    / status in [Workers -> {"active", "inactive", "broken"}]
    / A w in Workers:
      / Len(queue[w]) <= ItemCount
      / A item in 1..Len(queue[w]):
        queue[w][item] in ItemRange
      / / result[w].total = NULL
         / result[w].total <= SumSeq(input)
      / / result[w].count = NULL
         / result[w].count <= ItemCount
end define;
macro reduce() begin
  with
    w in {w in ActiveWorkers:
      result[w].count = Len(assignments[w])
      }
  do
    final[w] := result[w].total;
    status[w] := "inactive";
  end with;
end macro;
procedure work()
  variables total = 0, count = 0;
begin
  WaitForQueue:
    await queue[self] /= <<>>;
  Process:
    while queue[self] /= <<>> do
      total := total + Head(queue[self]);
      queue[self] := Tail(queue[self]);
      count := count + 1;
    end while;
  Result:
    result[self] := [total |-> total, count |-> count];
    goto WaitForQueue;
end procedure;
fair process reducer = Reducer
variables final = [w in Workers |-> 0],
assignments = [w in Workers |-> <<>>];
begin
  Schedule:
    with worker_order = PT!OrderSet(Workers) do
      queue := [ w in Workers |->
        LET offset == PT!Index(worker_order, w) - 1 * sequences start at 1
        IN PT!SelectSeqByIndex(input, LAMBDA i: i % Len(worker_order) = offset)
      ];
      assignments := queue;
    end with;
  ReduceResult:
    while ActiveWorkers /= {} do
      either
        * Reduce
        reduce();
      or
        * Reassign
        with
          from_worker in ActiveWorkers FairWorkers,
          to_worker in HealthyWorkers {from_worker}
        do
          assignments[to_worker] :=
            assignments[to_worker] o
            assignments[from_worker];
          queue[to_worker] :=
            queue[to_worker] o
            assignments[from_worker];
          status[from_worker] := "broken" ||
          status[to_worker] := "active";
          final[to_worker] := 0;
        end with;
     end either;
    end while;
  Finish:
    assert SumSeq(final) = SumSeq(input);
end process;
fair process fair_workers in FairWorkers
begin FairWorker:
  call work();
end process;
process worker in UnfairWorkers
begin RegularWorker:
  call work();
end process
end algorithm; *)
* BEGIN TRANSLATION
* ...
* END TRANSLATION
Liveness == <>[](SumSeq(final) = SumSeq(input))

Exercise

The last thing we’ll do is cover one edge case. Earlier I said that our definition won’t work if the number of items is less than the number of workers, since some worker won’t leave WaitForQueue. Some might argue this will probably not happen, as we mainly use MapReduce when we want to process vast numbers of items; the odds that we’ll only try to process one or two is vanishingly small. We could represent this by adding an assumption about ItemCount .
ASSUME ItemCount >= Cardinality(Workers)

But I don't want to leave such a large failure mode in the spec, so we should try to fix it. Try running the model with Workers <- {w1, w2, w3}, ItemCount <- 2, PROPERTY Liveness. Surprisingly, it passes! This is because Liveness only checks we reach the correct answer, not that the reducer terminates with the correct answer. It can still correctly sum up all of the values but stay trapped in the ReduceResult loop, never reaching Finish.

Normally we’d check Termination to test this but Termination asserts that all of the processes terminate. Our workers never terminate, so Termination is not quite what we want. Instead, we need to restrict it to just the reducer.
ReducerTerminates == <>(pc[Reducer] = "Finish")

Add PROPERTY ReducerTerminates and rerun. You should see it fail.

I’ll leave this last change to you as an exercise: How can you modify our MapReduce algorithm to satisfy ReducerTerminates? Make sure you also ensure that Liveness and TypeInvariant remain satisfied, and that your fix works for both ItemCount <- 2 and ItemCount <- 4. You might want to make two separate models so that switching between the two is easier. Good luck!

Summary

We fully specified an example of MapReduce. Obviously not every single aspect is covered, but we’ve provided enough to understand how we deal with partial failures. While our model was barely over 100 lines of PlusCal, it was able to find complex errors and liveness issues.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.85.221