Stages

Let us now see how the stages are implemented. The first stage, FolderScanner, scans a folder and writes all found Pascal files to the output. Its implementation is shown as follows.

The stage method receives two parameters of type IOmniBlockingCollection, which represent its input and output communication channel, and an optional parameter that allows access to the task that runs the stage code (task: IOmniTask).

An IOmniBlockingCollection channel contains items of type TOmniValue, which is a fast variation of Delphi's Variant or TValue. This type can store any other Delphi data type inside.

A typical stage would run a for value in input loop to read all the data from the input. This for doesn't exit if there is no data on the input but simply enters a wait state, which uses no CPU power. Only when the input is put into completed mode and there is no data inside will the for exit.

We know that the input will only contain one item, so we could also just call the input.TryTake function to read that value. The approach implemented in the FolderScanner method is more flexible, though.

The code then uses the helper function, DSiEnumFilesEx, from the DSiWin32 library, which is included with the OmniThreadLibrary. This function scans folders and their subfolders and calls an anonymous method for each found file.

This anonymous method first checks whether the operation should be aborted by calling task.CancellationToken.IsSignalled. If so, it will set the stopEnum flag, which tells DSIEnumFilesEx to stop enumerating files and to exit. If everything is OK, however, it concatenates the folder and filename parts and writes them to the output. A TryAdd method is used instead of Add, as this call may fail when the pipeline is cancelled (when the code calls IOmniPipeline.Cancel):

procedure TfrmPipeline.FolderScanner(
const input, output: IOmniBlockingCollection;
const task: IOmniTask);
var
value: TOmniValue;
begin
for value in input do begin
DSiEnumFilesEx(
IncludeTrailingPathDelimiter(value) + '*.pas', 0, true,
procedure (const folder: string; S: TSearchRec;
isAFolder: boolean; var stopEnum: boolean)
begin
stopEnum := task.CancellationToken.IsSignalled;
if (not stopEnum) and (not isAFolder) then
output.TryAdd(IncludeTrailingPathDelimiter(folder) + S.Name);
end);
end;
end;

The second stage, FileReader, is implemented with a similar loop. It also checks the cancellation token and exits if it is signaled. Without that check, the stage would always process all data waiting in its input queue before exiting, even if the pipeline was cancelled.

Inside the loop, the code loads the contents of a file to a string list. An implicit operator converts value into a string parameter, needed in the LoadFromFile method. After that, the TStringList is sent to the output queue. To assure that somebody will always destroy this object, even if the list is canceled, it is best to store it as an owned object. It will then be reference-counted and destroyed when appropriate:

procedure TfrmPipeline.FileReader(
const input, output: IOmniBlockingCollection;
const task: IOmniTask);
var
sl: TStringList;
value: TOmniValue;
outValue: TOmniValue;
begin
for value in input do begin
if task.CancellationToken.IsSignalled then
break;

sl := TStringList.Create;
try
sl.LoadFromFile(value);
outValue.AsOwnedObject := sl;
sl := nil;
output.TryAdd(outValue);
task.Comm.Send(MSG_OK, value);
except
task.Comm.Send(MSG_ERROR, value);
FreeAndNil(sl);
end;
end;
end;

The preceding code sends a message to the main thread when a file is read or when an exception is raised when accessing a file. This message is sent via OmniThreadLibrary's built-in message channel (task.Comm) and is dispatched to the HandleFileReaderMessage method, which was specified as a message handler while building the pipeline.

This following method displays all errors on the screen but only stores the currently processed file in a global field. It would be useless to overflow the user interface with information on all processed files, and so the code is using a timer (not shown in the book), which every quarter of a second displays the current FLastProcessed on screen:

procedure TfrmPipeline.HandleFileReaderMessage(
const task: IOmniTaskControl;
const msg: TOmniMessage);
begin
if msg.MsgID = MSG_ERROR then
ListBox1.ItemIndex := ListBox1.Items.Add('*** ' + msg.MsgData)
else
FLastProcessed := msg.MsgData;
end;

Let's move to the third stage, StatCollector, which analyzes one file. It is implemented as a simple stage, meaning that the for .. in loop is implemented in the pipeline pattern itself. The pattern also handles the cancellation.

This approach is ideal when each input value produces zero or one output value. The stage method is called once for each item read from the input channel. If the stage writes anything into its output parameter, that value is written to the stage's output channel. If the stage leaves output untouched, nothing is written out.

The third stage calls the GenerateStatistics function to generate the statistics. I decided not to show it in the book, as the implementation is not relevant for the pipeline pattern. The resulting record is assigned to the output parameter via a special FromRecord<T> function, which can store a record inside TOmniValue.

There's no need to destroy the TStringList that is contained in each input TOmniValue. Because it was assigned to the AsOwnedObject property, it will be destroyed automatically:

type
TStatistics = record
Files: Int64;
Lines: int64;
Words: int64;
Chars: int64;
end;

procedure TfrmPipeline.StatCollector(const input: TOmniValue;
var output: TOmniValue);
var
stat: TStatistics;
begin
stat := GenerateStatistics(input.ToObject<TStringList>);
output := TOmniValue.FromRecord<TStatistics>(stat);
end;

The last stage, StatAggregator, reads all TStatistics records from the input queue. The ToRecord<T> function is used to extract record from a TOmniValue item. The Merge function, which is not shown in the book, merges current aggregate and new partial statistics data together.

Only after all the data is processed (when the for loop exits), will the resulting aggregate value be written to the pipeline's output:

procedure TfrmPipeline.StatAggregator(const input,
output: IOmniBlockingCollection; const task: IOmniTask);
var
aggregate: TStatistics;
stat: TStatistics;
value: TOmniValue;
begin
aggregate := Default(TStatistics);

for value in input do begin
if task.CancellationToken.IsSignalled then
break;

stat := value.ToRecord<TStatistics>;
Merge(aggregate, stat);
end;

output.TryAdd(TOmniValue.FromRecord<TStatistics>(aggregate));
end;

If we wanted to implement the parallalelization in some other way (for example, with a Parallel for pattern), then this part would need to implement locking to access the shared aggregate state. As the pipeline pattern gently guides you to use the data duplication and communication instead, there is no need for sharing, locking, and other complications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.159.104