For concurrent processing, Crystal uses the same CSP (communicating sequential processes) model as Go: lightweight processes called fibers that communicate data through channels. It does this faster and less resource-intensive than Node.js, for example. You learned about fibers and channels in Chapter 3, Executing Code Concurrently Through Fibers. Before getting into more detail, let’s examine Crystal’s execution mechanism.
When a Crystal program starts up, a main fiber (main for short) runs to execute your top-level code. In that code, one can spawn many other fibers, which are queued. In the background, Crystal operates a minimal runtime that consists of the following components:
1) A Runtime Fiber Scheduler takes care of executing all fibers in the queue at the right time. A fiber that can’t proceed (because it’s waiting for Input/Output) works cooperatively: it tells the scheduler to switch to another fiber to start or to resume work. The scheduler also coordinates channels to communicate data between fibers.
2) A non-blocking Event Loop is just another fiber. It’s in charge of everything I/O related—asynchronous tasks, such as file reading; network packet sending; timers; and so on. It waits on time-consuming tasks to end, while other fibers execute at the same time.
3) A Garbage Collector (GC) cleans up memory that the program is no longer using. This is currently a standard mark-and-sweep Boehm GC.
You could describe a running Crystal program as a cooperative multitasking mechanism between any number of lightweight fibers, which allows for low overhead context switching. This works best for I/O intensive processing. It’s a bit less adapted for CPU intensive processing, such as heavy number crunching.
Each fiber starts out with a stack size of 4KB, which can grow to a maximum of 8MB—the typical memory consumption of a thread. You can create lots of them if you need to, though you may find yourself limited on older hardware. On a 64-bit machine, you can spawn millions and millions of fibers. But on a 32-bit machine, you can only spawn 512.
All of this except the GC runs in one native OS thread. Up until the time of writing, Crystal has worked with only one execution thread, but real parallellism (as in Go) is coming soon. The GC runs in parallel, in a separate thread from the rest of the Crystal application.
The built-in scheduler means that you don’t have to write the scheduling of fibers yourself. This might seem limiting, but it also means that you won’t find yourself in callback hell as in Node.js with JavaScript. You just write your code as if it would execute in a linear order.
You know that a fiber is created with each spawn do code end line, where code will be executed by the new fiber. What do you think the following snippet will output?
| puts "Before start fiber" |
| spawn do |
| puts "Hello from within fiber" |
| end |
| puts "After fiber" |
It will only output:
| Before start fiber |
| After fiber |
Why is this? The main fiber exits when the program ends, and with it, all the fibers it has spawned. When a fiber is spawned, it doesn’t execute immediately. In this case, it never gets to show its message. We can remedy this by adding a sleep 1.second. Then the main fiber is halted for one second, letting the spawned fiber output its message. Another more natural way is to add Fiber.yield, which tells the scheduler to execute the other fiber(s).
Working with shared memory variables is a recipe for bugs. Instead, you’ll want to let fibers pass values to each other through typed channels.
Channels and Shared Memory | |
---|---|
DO NOT COMMUNICATE BY SHARING MEMORY. That is, don’t use shared variables to communicate data values! Instead, SHARE MEMORY BY COMMUNICATING. That is, send/receive data values over channels. |
What are these channels? They’re objects of class Channel(T)—indeed, Channel is a generic class. They can pass typed values from one fiber to another. The following schema by Stanislav Kozlovski[49] illustrates concurrency in Crystal:
Not only are values sent over channels safe from data races, the sending and receiving of the values allows for a natural method of synchronization. Internally, a channel implements all the locking mechanisms needed to avoid data races. Take a look at this example—you’ll see that main sends a value 42 over a channel to be received by the fiber. What will the output be?
| ch = Channel(Int32).new |
| |
| spawn do |
| puts "start fiber" |
| n = ch.receive # fiber will block here if nothing on the channel yet |
| puts "fiber received #{n}" |
| end |
| |
| puts "before send" |
| ch.send 42 |
| puts "main has send 42" |
| |
| # => |
① | before send |
② | start fiber |
③ | fiber received 42 |
④ | main has send 42 |
Look at the output of the program:
First comes line ① because the fiber hasn’t started yet. Then main sends a value, and it’s blocked until a value is received. This means the fiber can start in line ②, receiving the value in line ③, and then ends. Now main can continue and line ④ is printed. In the same way, a fiber (or main) that receives on a channel is blocked until a value is sent on that channel. Sending a value immediately switches to the fiber waiting on that channel to receive, and then execution continues on that fiber.
Sending and receiving values, even the value nil, naturally synchronizes the execution of fibers.
➤ a. main_fiber3: Now let the fiber send the value and main receive it. Explain the output.
➤ b. main_fiber4: Let main send the numbers 1 to 10 over a channel while a fiber prints them out. Then again reverse the receiving and sending.
In the following code snippet, you can see the event loop in action: the fiber waits for input from the keyboard, and main waits to receive a value:
| ch = Channel(String).new |
| spawn do |
| while line = gets |
| ch.send(line) |
| end |
| end |
| |
| puts ch.receive |
| |
| # For example: |
| # => |
| # hello |
| # hello |
(Compile and execute this example on the command line.)
This is the case for any I/O, like receiving values over a network socket, or reading from a file.
In many cases, a fiber will be given a method to execute with argument values, so it’s created in a one-liner like this: spawn method1 argumentlist. In fact, here spawn is used as a macro. Take a look at the following example:
| def pname(name) |
| 3.times do |i| |
| puts "#{name} - #{i}" |
| end |
| end |
| |
| spawn pname "spawned" # started on another fiber (in background) |
| pname("normal") # started by main fiber |
| Fiber.yield |
| # => |
| # normal - 0 |
| # normal - 1 |
| # normal - 2 |
| # spawned - 0 |
| # spawned - 1 |
| # spawned - 2 |
The method pname is first called in a fiber and then through main.
Here’s another example—it shows how a worker fiber signals the end of its execution by sending a value over a channel, which main then receives:
| # # Synchronization of channels: |
| # background worker signals on channel when it is done |
| # main fiber only continues when that signal is received |
| def worker(done : Channel(Bool)) |
| puts "worker: working" |
| sleep 2 |
| puts "worker: done" |
| done.send true |
| end |
| |
| done = Channel(Bool).new |
| spawn worker(done) |
| |
| done.receive # main blocks here |
| puts "main: next" |
| |
| # => |
| # worker: working |
| # worker: done |
| # main: next |
The channels we’ve used until now can contain only one value. That’s why the switch from the sending to the receiving fiber happens instantly. But we also have buffered channels that can contain a certain number of values, specified when the channel is initialized:
| ch = Channel(Char).new(2) # A buffered channel of capacity 2 |
| |
| spawn do |
| puts "Before send 1" |
| ch.send('u03B1') |
| puts "Before send 2" |
| ch.send('u03B2') |
| if ch.empty? |
| puts "Channel is empty" |
| else |
| puts "Channel is not empty" |
| end |
| puts "Before send 3" |
| ch.send('u03C9') |
| puts "After send" |
| end |
| 3.times do |i| |
| puts ch.receive |
| end |
| |
| # => |
| # Before send 1 |
| # Before send 2 |
| # Channel is not empty |
| # Before send 3 |
| # α |
| # β |
| # After send |
| # ω |
Here, the switch to another fiber will occur only when the buffer is full. That’s why α and β are printed one after the other. You can test whether a channel still contains a value with the empty? method. You can close a channel when you’re finished with it and then test it with the closed? method.
Suppose several fibers are working concurrently and you want to do something as soon as any one of them returns a result. Crystal has a select when statement that does just that:
| def generator(n : T) forall T |
| chan = Channel(T).new |
| spawn do |
| loop do |
| sleep n |
| chan.send n |
| end |
| end |
| chan |
| end |
| |
| ch1 = generator(1) |
| ch2 = generator(1.5) |
| ch3 = generator(5) |
| |
| loop do |
| select |
| when n1 = ch1.receive |
| puts "Int: #{n1}" |
| when f1 = ch2.receive |
| puts "Float: #{f1}" |
| when ch3.receive |
| break |
| end |
| end |
| |
| # Output: |
| # Int: 1 |
| # Float: 1.5 |
| # Int: 1 |
| # Float: 1.5 |
| # Int: 1 |
| # Int: 1 |
| # Float: 1.5 |
Here, three fibers are created, each with its own channel. Each fiber has to sleep for a number of seconds (its argument), and then sends that argument on the channel. It repeats these actions in an infinite loop. Because the seconds argument is given as an integer and as a float, we write a generic method generator. Notice you have to add the keyword forall T to declare a method as generic.
main then receives the values from the channels in an infinite loop through select when. The loop is stopped with break after five seconds when channel ch3 returns a value.
The last example also teaches you how to work with files in the context of fibers. Can you figure out what the following code does before executing it?
| ch = Channel(Int32).new |
| total_lines = 0 |
| files = Dir.glob("*.txt") |
| |
| files.each do |f| |
| spawn do |
| lines = File.read_lines(f).size |
| ch.send lines |
| end |
| end |
| |
| files.size.times do |
| total_lines += ch.receive |
| end |
| |
| puts "Total number of lines in text files: #{total_lines}" |
| # => Total number of lines in text files: 7 |
You probably guessed it: the Dir.glob method accepts a pattern and returns an Array with all of the file names that correspond to that pattern—here, the .txt files. Then, a fiber is spawned for each text file that reads the file’s lines into an Array, whose size is the number of lines. At the end, the fiber sends this result over the channel. The main thread loops over the number of files, receiving the line-count for each of them and summing them all up.
This is a common idiom in concurrent programming:
Let’s now turn to another important aspect of many applications: databases.
3.137.223.10