Creating Concurrent Code

For concurrent processing, Crystal uses the same CSP (communicating sequential processes) model as Go: lightweight processes called fibers that communicate data through channels. It does this faster and less resource-intensive than Node.js, for example. You learned about fibers and channels in Chapter 3, Executing Code Concurrently Through Fibers. Before getting into more detail, let’s examine Crystal’s execution mechanism.

How Does a Crystal Program Execute?

When a Crystal program starts up, a main fiber (main for short) runs to execute your top-level code. In that code, one can spawn many other fibers, which are queued. In the background, Crystal operates a minimal runtime that consists of the following components:

1) A Runtime Fiber Scheduler takes care of executing all fibers in the queue at the right time. A fiber that can’t proceed (because it’s waiting for Input/Output) works cooperatively: it tells the scheduler to switch to another fiber to start or to resume work. The scheduler also coordinates channels to communicate data between fibers.

2) A non-blocking Event Loop is just another fiber. It’s in charge of everything I/O related—asynchronous tasks, such as file reading; network packet sending; timers; and so on. It waits on time-consuming tasks to end, while other fibers execute at the same time.

3) A Garbage Collector (GC) cleans up memory that the program is no longer using. This is currently a standard mark-and-sweep Boehm GC.

You could describe a running Crystal program as a cooperative multitasking mechanism between any number of lightweight fibers, which allows for low overhead context switching. This works best for I/O intensive processing. It’s a bit less adapted for CPU intensive processing, such as heavy number crunching.

Each fiber starts out with a stack size of 4KB, which can grow to a maximum of 8MB—the typical memory consumption of a thread. You can create lots of them if you need to, though you may find yourself limited on older hardware. On a 64-bit machine, you can spawn millions and millions of fibers. But on a 32-bit machine, you can only spawn 512.

All of this except the GC runs in one native OS thread. Up until the time of writing, Crystal has worked with only one execution thread, but real parallellism (as in Go) is coming soon. The GC runs in parallel, in a separate thread from the rest of the Crystal application.

The built-in scheduler means that you don’t have to write the scheduling of fibers yourself. This might seem limiting, but it also means that you won’t find yourself in callback hell as in Node.js with JavaScript. You just write your code as if it would execute in a linear order.

Establishing Channels Between Fibers

You know that a fiber is created with each spawn do code end line, where code will be executed by the new fiber. What do you think the following snippet will output?

 puts ​"Before start fiber"
 spawn ​do
  puts ​"Hello from within fiber"
 end
 puts ​"After fiber"

It will only output:

 Before start fiber
 After fiber

Why is this? The main fiber exits when the program ends, and with it, all the fibers it has spawned. When a fiber is spawned, it doesn’t execute immediately. In this case, it never gets to show its message. We can remedy this by adding a sleep 1.second. Then the main fiber is halted for one second, letting the spawned fiber output its message. Another more natural way is to add Fiber.yield, which tells the scheduler to execute the other fiber(s).

Working with shared memory variables is a recipe for bugs. Instead, you’ll want to let fibers pass values to each other through typed channels.

Channels and Shared Memory

images/aside-icons/warning.png

DO NOT COMMUNICATE BY SHARING MEMORY. That is, don’t use shared variables to communicate data values!

Instead, SHARE MEMORY BY COMMUNICATING. That is, send/receive data values over channels.

What are these channels? They’re objects of class Channel(T)—indeed, Channel is a generic class. They can pass typed values from one fiber to another. The following schema by Stanislav Kozlovski[49] illustrates concurrency in Crystal:

images/advanced_features/Fig_1_Fibers.png

Synchronizing Data Flows

Not only are values sent over channels safe from data races, the sending and receiving of the values allows for a natural method of synchronization. Internally, a channel implements all the locking mechanisms needed to avoid data races. Take a look at this example—you’ll see that main sends a value 42 over a channel to be received by the fiber. What will the output be?

 ch = Channel(Int32).​new
 
 spawn ​do
  puts ​"start fiber"
  n = ch.​receive​ ​# fiber will block here if nothing on the channel yet
  puts ​"fiber received ​​#{​n​}​​"
 end
 
 puts ​"before send"
 ch.​send​ 42
 puts ​"main has send 42"
 
 # =>
before send
start fiber
fiber received 42
main has send 42

Look at the output of the program:

First comes line ​​ because the fiber hasn’t started yet. Then main sends a value, and it’s blocked until a value is received. This means the fiber can start in line ​​, receiving the value in line ​​, and then ends. Now main can continue and line ​​ is printed. In the same way, a fiber (or main) that receives on a channel is blocked until a value is sent on that channel. Sending a value immediately switches to the fiber waiting on that channel to receive, and then execution continues on that fiber.

Sending and receiving values, even the value nil, naturally synchronizes the execution of fibers.

Your Turn 2

a. main_fiber3: Now let the fiber send the value and main receive it. Explain the output.

b. main_fiber4: Let main send the numbers 1 to 10 over a channel while a fiber prints them out. Then again reverse the receiving and sending.

Waiting with the Event Loop

In the following code snippet, you can see the event loop in action: the fiber waits for input from the keyboard, and main waits to receive a value:

 ch = Channel(String).​new
 spawn ​do
 while​ line = gets
  ch.​send​(line)
 end
 end
 
 puts ch.​receive
 
 # For example:
 # =>
 # hello
 # hello

(Compile and execute this example on the command line.)

This is the case for any I/O, like receiving values over a network socket, or reading from a file.

Fiber Executing a Method

In many cases, a fiber will be given a method to execute with argument values, so it’s created in a one-liner like this: spawn method1 argumentlist. In fact, here spawn is used as a macro. Take a look at the following example:

 def​ ​pname​(name)
  3.​times​ ​do​ |i|
  puts ​"​​#{​name​}​​ - ​​#{​i​}​​"
 end
 end
 
 spawn pname ​"spawned"​ ​# started on another fiber (in background)
 pname(​"normal"​) ​# started by main fiber
 Fiber.​yield
 # =>
 # normal - 0
 # normal - 1
 # normal - 2
 # spawned - 0
 # spawned - 1
 # spawned - 2

The method pname is first called in a fiber and then through main.

Signaling the End of Execution

Here’s another example—it shows how a worker fiber signals the end of its execution by sending a value over a channel, which main then receives:

 # # Synchronization of channels:
 # background worker signals on channel when it is done
 # main fiber only continues when that signal is received
 def​ ​worker​(done : Channel(Bool))
  puts ​"worker: working"
  sleep 2
  puts ​"worker: done"
  done.​send​ ​true
 end
 
 done = Channel(Bool).​new
 spawn worker(done)
 
 done.​receive​ ​# main blocks here
 puts ​"main: next"
 
 # =>
 # worker: working
 # worker: done
 # main: next

Buffered Channels

The channels we’ve used until now can contain only one value. That’s why the switch from the sending to the receiving fiber happens instantly. But we also have buffered channels that can contain a certain number of values, specified when the channel is initialized:

 ch = Channel(Char).​new​(2) ​# A buffered channel of capacity 2
 
 spawn ​do
  puts ​"Before send 1"
  ch.​send​(​'u03B1'​)
  puts ​"Before send 2"
  ch.​send​(​'u03B2'​)
 if​ ch.​empty?
  puts ​"Channel is empty"
 else
  puts ​"Channel is not empty"
 end
  puts ​"Before send 3"
  ch.​send​(​'u03C9'​)
  puts ​"After send"
 end
 3.​times​ ​do​ |i|
  puts ch.​receive
 end
 
 # =>
 # Before send 1
 # Before send 2
 # Channel is not empty
 # Before send 3
 # α
 # β
 # After send
 # ω

Here, the switch to another fiber will occur only when the buffer is full. That’s why α and β are printed one after the other. You can test whether a channel still contains a value with the empty? method. You can close a channel when you’re finished with it and then test it with the closed? method.

Selecting a Fiber

Suppose several fibers are working concurrently and you want to do something as soon as any one of them returns a result. Crystal has a select when statement that does just that:

 def​ ​generator​(n : T) forall T
  chan = Channel(T).​new
  spawn ​do
 loop​ ​do
  sleep n
  chan.​send​ n
 end
 end
  chan
 end
 
 ch1 = generator(1)
 ch2 = generator(1.5)
 ch3 = generator(5)
 
 loop​ ​do
  select
 when​ n1 = ch1.​receive
  puts ​"Int: ​​#{​n1​}​​"
 when​ f1 = ch2.​receive
  puts ​"Float: ​​#{​f1​}​​"
 when​ ch3.​receive
 break
 end
 end
 
 # Output:
 # Int: 1
 # Float: 1.5
 # Int: 1
 # Float: 1.5
 # Int: 1
 # Int: 1
 # Float: 1.5

Here, three fibers are created, each with its own channel. Each fiber has to sleep for a number of seconds (its argument), and then sends that argument on the channel. It repeats these actions in an infinite loop. Because the seconds argument is given as an integer and as a float, we write a generic method generator. Notice you have to add the keyword forall T to declare a method as generic.

main then receives the values from the channels in an infinite loop through select when. The loop is stopped with break after five seconds when channel ch3 returns a value.

Working with Files

The last example also teaches you how to work with files in the context of fibers. Can you figure out what the following code does before executing it?

 ch = Channel(Int32).​new
 total_lines = 0
 files = Dir.​glob​(​"*.txt"​)
 
 files.​each​ ​do​ |f|
  spawn ​do
  lines = File.​read_lines​(f).​size
  ch.​send​ lines
 end
 end
 
 files.​size​.​times​ ​do
  total_lines += ch.​receive
 end
 
 puts ​"Total number of lines in text files: ​​#{​total_lines​}​​"
 # => Total number of lines in text files: 7

You probably guessed it: the Dir.glob method accepts a pattern and returns an Array with all of the file names that correspond to that pattern—here, the .txt files. Then, a fiber is spawned for each text file that reads the file’s lines into an Array, whose size is the number of lines. At the end, the fiber sends this result over the channel. The main thread loops over the number of files, receiving the line-count for each of them and summing them all up.

This is a common idiom in concurrent programming:

  • Determine the number of separate tasks.
  • Spawn a fiber for each task.
  • Let main aggregate and report the results.

Let’s now turn to another important aspect of many applications: databases.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.223.10