Dynamic Async Queuing

Most of the time, the simple methods from the last two sections are enough to solve your async dilemmas. But async.series and async.parallel have their limitations.

  • The task array is static. Once you’ve called async.series or async.parallel, you can’t add or remove tasks.

  • There’s also no way to ask, “How many tasks have been completed?” It’s a black box, unless you dispatch updates from the tasks themselves.

  • You’re limited to either no concurrency or unlimited concurrency. That’s a pretty big deal when it comes to file I/O. If we’re operating on thousands of files, we don’t want to be inefficient by doing a series, but we’re likely to anger the OS if we try to do everything in parallel.

Async.js provides a versatile method that addresses every one of these issues: async.queue.

Understanding the Queue

The basic concept underlying async.queue is reminiscent of a DMV; it can handle multiple people simultaneously (up to the number of clerks on duty), but rather than have a separate line for each clerk, it has a single stack of numbers. When you arrive, you get a number. As each clerk becomes free, the clerk calls the next number.

async.queue’s interface is a bit more complex than that of async.series and async.parallel. It takes a function called the worker (rather than an array of functions) and a concurrency value (the maximum number of simultaneous tasks the worker can process). Then it returns a queue that we can push arbitrary task data onto (along with an optional callback).

Here’s a trivial example:

Asyncjs/simpleQueue.js
 
var​ async = require(​'async'​);
 
 
function​ worker(data, callback) {
 
console.log(data);
 
callback();
 
}
 
var​ concurrency = 2;
 
var​ queue = async.queue(worker, concurrency);
 
queue.push(1);
 
queue.push(2);
 
queue.push(3);

No matter what the concurrency is (as long as it’s at least 1), we get the following output:

<= 
1
 
2
 
3

There is a difference under the hood, though: with concurrency of 2, we need two trips to the event queue. If it were 1, we’d need three trips, one for each line. And if it were 3 or more, we’d need just one trip.

A queue with concurrency of 0 will do nothing. And if you want maximum concurrency, just use Infinity.

Pushing Tasks

Although queue.push shares the same name as [].push, there are two critical differences.

First,

 
queue.push([1, 2, 3]);

is equivalent to the following:

 
queue.push(1);
 
queue.push(2);
 
queue.push(3);

This means you can’t use arrays directly as task data. You can, however, use anything else—even functions. In fact, if you want to use an array of functions like you would with async.series or async.parallel, all you need to do is define a worker that passes its second argument to its first.

 
function​ worker(task, callback) {
 
task(callback);
 
}
 
var​ concurrency = 2;
 
var​ queue = async.queue(worker, concurrency);
 
queue.push(tasks);

Second, you can provide a callback function along with each push; if you do, it’s given directly to the worker function as the callback argument. So, for instance,

 
queue.push([1, 2, 3], ​function​(err, result) {
 
console.log(​'Task complete!'​);
 
});

will (assuming that the worker runs its callback) emit the output Task complete! three times. push callbacks are invaluable because async.queue, unlike async.series/async.parallel, doesn’t store results internally. If you want them, you’ll have to capture them yourself.

Handling Completion

As with async.series and its ilk, we can give async.queue a completion handler. Instead of passing it as an argument, though, we need to attach it as a property called drain. (Picture a tub full of incomplete tasks; when the last one has gone down the drain, the callback fires.) Here’s a demonstration with timers:

Asyncjs/queueTimers.js
 
var​ async = require(​'async'​);
 
 
function​ worker(data, callback) {
 
setTimeout(callback, data);
 
}
 
var​ concurrency = 2;
 
var​ queue = async.queue(worker, concurrency);
 
var​ start = ​new​ Date;
 
queue.drain = ​function​() {
 
console.log(​'Completed in '​ + (​new​ Date - start) + ​'ms'​);
 
};
 
 
queue.push([100, 300, 200]);

Recall that async.series took ~600ms to get through these timeouts (the sum), while async.parallel took only ~300ms (the max). Here, concurrency is 2, so initially, the first two timeouts will run in parallel. But when the 100ms timeout finishes, the next task on the queue (the 200ms timeout) will immediately start. So, in this case, async.queue will finish at about the same time as async.parallel. The order matters: if 300 were the third timeout, the queue would take ~400ms to complete.

Note that we can always push more tasks onto the queue, and drain will fire every time the last task on the queue has finished. Unfortunately, this means that async.queue can’t give us neatly ordered results the way async.waterfall could. If we want to collect data from our queued tasks, we’re on our own.

Advanced Queue Callbacks

Although drain is usually the only handler you’ll need, async.queue provides a few other events.

  • When the last task has started running, the queue calls empty. (When the task finishes, the queue calls drain.)

  • When the concurrency limit is reached, the queue calls saturated.

  • If you provide a function as the second argument in a push, it’ll be called when the given task (or each task in the given array) is finished.

In this section, we’ve seen how async.queue is one of the most powerful functions in Async.js. When you need to run a large number of async tasks with limited concurrency, think async.queue.

That’s it for our coverage of Async.js, the most widely used and, arguably, most feature-rich JavaScript flow control library. However, I don’t want you to get the impression that Async.js is the right tool for every callback-driven job. Let’s close out the chapter by looking at one of its top rivals, Step.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.87.61