Populating data in the shared array

The security return files are distributed and stored in 100 different directories. Where it gets stored is based upon a simple formula: file index modulus 100, where the file index is the numerical identifier for each security, numbered between 1 to 100,000.

Each data file is in a simple binary format. The upstream process has calculated three source returns for 10,000 future states, as in a 10,000 x 3 matrix. The layout is column-oriented, meaning that the first 10,000 numbers are used for the first return source, the next 10,000 numbers are for the second return source, and so on.

Before we start using distributed computing functions, we must spawn worker processes. Julia comes with a convenient command-line option (-p) that the user can specify the number of worker processes up front as follows:

When the REPL comes up, we would already have 16 processes running and ready to go. The nworkers function confirms that all 16 worker processes are available. 

Let's look into the code now. First, we must load Distributed and SharedArrays packages:

using Distributed
using SharedArrays

To make sure that the worker processes know where to find the files, we have to change directory on all of them:

@everywhere cd(joinpath(ENV["HOME"], "julia_book_ch06_data"))

The @everywhere macro executes the statement on all worker processes.

The main program looks like this:

nfiles = 100_000
nstates = 10_000
nattr = 3
valuation = SharedArray{Float64}(nstates, nattr, nfiles)
load_data!(nfiles, valuation)

In this case, we are creating a 3-dimensional shared array. Then, we call the load_data! function to read all 100,000 files and shovel the data into the valuation matrix. How does the load_data! function work? Let's take a look:

function load_data!(nfiles, dest)
@sync @distributed for i in 1:nfiles
read_val_file!(i, dest)
end
end

It's a very simple for loop that just calls the read_val_file! function with an index number. Notice the use of two macros here—@distributed and @sync. First, the @distributed macro does the magic by sending the body of the for loop to the worker processes. In general, the master program here does not wait for the worker processes to return. However, the @sync macro blocks until all jobs are completely finished. 

How does it actually read the binary file? Let's see:

# Read a single data file into a segment of the shared array `dest`
# The segment size is specified as in `dims`.
@everywhere function read_val_file!(index, dest)
filename = locate_file(index)
(nstates, nattrs) = size(dest)[1:2]
open(filename) do io
nbytes = nstates * nattrs * 8
buffer = read(io, nbytes)
A = reinterpret(Float64, buffer)
dest[:, :, index] = A
end
end

Here, the function first locates the path of the data file. Then, it opens the file and reads all the binary data into a byte array. Since the data is just 64-bit floating pointer numbers, we use the reinterpret function to parse the data into an array of Float64 values. We do expect 30,000 Float64 values here in each file, representing 10,000 future states and 3 source returns. When the data is ready, we just save them into the array for the particular index.

We also use the @everywhere macro to ensure that the function is defined and made available to all worker processes. The locate_file function is a little less interesting. It is included here for completeness:

@everywhere function locate_file(index)
id = index - 1
dir = string(id % 100)
return joinpath(dir, "sec$(id).dat")
end

To load the data files in parallel, we can define a load_data! function as follows:

function load_data!(nfiles, dest)
@sync @distributed for i in 1:nfiles
read_val_file!(i, dest)
end
end

Here, we just put the @sync and @distributed macros in front of a for loop. Julia automatically schedules and distributes the call among all work processes. Now that everything is set up, we can run the program:

nfiles = 100_000
nstates = 10_000
nattr = 3
valuation = SharedArray{Float64}(nstates, nattr, nfiles)

We simply create a valuation SharedArray object. Then, we pass it to the load_data! function for processing:

It only took about three minutes to load 100,000 files into memory using 16 parallel processes. That's pretty good!

If you try to run the program in your own environment but encounter an error, it may be due to system constraints. Refer to the later section, Configuring system settings for shared memory usage, for more information. 
It turns out that this exercise is still IO-bound. CPU utilization hovered just around 5% during the load process. Should the problem demand incremental computation, we could possibly leverage the remaining CPU resource by spawning other asynchronous processes that operate on data and just got loaded into memory.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.175.113