Threading in Ruby

Original author: David Thomas, Chad Fowler, Andrew Hunt
  • Transfer
Translation of the chapter on Multithreading by David Thomas and Andrew Hunt, Programming Ruby: The Pragmatic Programmers' Guide, Second Edition.

Often the easiest way to do two things at once is to use threads in Ruby. They are in-process, embedded in the Ruby interpreter. This makes Ruby threads fully portable, i.e. independent of the operating system. But at the same time, you definitely won’t get the benefit of using native, native threads. What does it mean?

You may experience starvation ( thread starvation- this is when a thread with a low priority does not have a chance to start). If you want to block your threads, then the whole process will stop with a rattle. And if a situation arises that some threads will send calls to the operating system, which takes a lot of time to complete, then all threads will hang until the interpreter gets control back. And finally, if your machine has more than one processor, Ruby threads will not use it, because they run in the same process, and in a single native thread they will be forced to run on the same processor at a time.

It all sounds scary. However, in practice, in many cases, the benefits of using threads far outweigh any potential problems that may arise. Ruby threads are an efficient and easy way to achieve concurrency in your code. You just have to understand the main implementation problems, and, accordingly, the architecture.

Creating Ruby Threads


Creating a new thread is pretty straight forward. The following code is a simple example. He simultaneously downloads a set of Web pages. For each URL requested for downloading, the code creates a separate stream that controls the HTTP transaction.

require 'net/http'
pages = %w( www.rubycentral.com slashdot.org www.google.com )
threads = []
for page_to_fetch in pages
    threads << Thread.new(page_to_fetch) do |url|
        h = Net::HTTP.new(url, 80)
        puts "Fetching: #{url}"
        resp = h.get('/', nil )    
        puts "Got #{url}#{resp.message}"
    end
end 
threads.each {|thr| thr.join }


Result:
Fetching: www.rubycentral.com
Fetching: slashdot.org
Fetching: www.google.com
Got www.google.com: OK
Got www.rubycentral.com: OK
Got slashdot.org: OK 


Let's take a closer look at this code: here are a few subtle points. New threads are created by calling Thread.new. This defines a block containing the code that will be executed in the new thread. In our case, the block uses the net / http library to retrieve the main page of these sites. Our trace clearly shows that these retrievals are performed in parallel.

When we create the stream, we specify the required URL as a parameter. This parameter is passed to the block as a url variable. Why are we doing this when it would be easier to use the value of the page_to_fetch variable inside a block?

A thread has shared access to all global variables, instance variables, and local variables that were available at the time the thread started. Anyone with a younger brother can tell you that sharing or sharing is not always a good thing. In this case, all three threads will share the page_to_fetch variable. When the first thread starts, page_to_fetch is set to " www.rubycentral.com ". Meanwhile, the loop creating the threads is still working. At the next point in time, page_to_fetch is set to "slashdot.org." If the first thread has not yet finished using the page_to_fetch variable, then it will unexpectedly start using its new value. This type of error is very difficult to track.

However, local variables created inside a stream block are really local to this stream - each stream will have its own copy of the page address. You can specify any number of arguments in a block using Thread.new.

Flow management

Another subtlety happens on the last line of our program. Why do we call join for each thread being created?

When the Ruby program exits, all threads are killed, regardless of their state. However, you can wait for a single thread to complete by calling the Thread # join method. The calling thread will block until the current thread completes. By invoking join for each thread, you can be sure that all three requests will be executed before the main program terminates. If you do not want to block the stream permanently, you can pass a time limit parameter to join - if this limit expires before the stream ends, the join call will return nil. Another option to join is the Thread # value method, which returns the value of the last operation performed in the thread.

In addition to join, several other convenient operations are used to control flows. Access to the current thread can always be obtained using Thread.current. You can get a list of all threads using Thread.list, which returns a list of all Thread objects: both running and stopped. You can use Thread # status and Thread # alive? To determine the status of an individual thread.
Additionally, you can set thread priority using Thread # priority =. Threads with a higher priority will be launched before threads with a lower priority. We will talk a little later about scheduling thread schedules, as well as about starting and stopping them.

Flow variables

A thread has normal access to all variables that are in scope during its launch. The local variables of the block containing the stream code are local to the stream itself and do not have access to it.

But what to do if you need such variables in the stream that you could have access to from other streams - include them in the main stream? A characteristic feature of the Thread class is a special feature that allows you to create and have name access to local thread variables. You simply treat the stream object as a hash, setting element values ​​with [] = and reading them with []. In the following example, each thread writes the current counter value to a local thread variable with the mycount key. To do this, the code uses the string "mycount" as the index of the stream object.

count = 0
threads = []
10.times do |i|
    threads[i] = Thread.new do
        sleep(rand(0.1))
        Thread.current["mycount"] = count
        count += 1
    end
end
threads.each {|t| t.join; print t["mycount"], ", " } 
puts "count = #{count}


Result:
4, 1, 0, 8, 7, 9, 5, 6, 3, 2, count = 10 


The main thread waits for the rest of the threads to finish, and then displays the counter values ​​recorded by each thread. For fun, we added a random delay to each thread before writing the counter value.

Threads and Exceptions


What happens if an unhandled exception occurs in the thread? This depends on the value of the abort_on_exception flag and on the value of the debug flag of the interpreter.
If abort_on_exception = false and the debug flag is not enabled (the default state), then an unhandled exception will simply kill the current thread, and all others will continue to work. In reality, you don’t even know anything about the exception until join is called for the thread that throws this exception.

In the following example, stream 2 is inflated and cannot output anything. However, you can still see the trace from the rest of the threads.

threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end 
threads.each {|t| t.join } 


Result:
0 
1
3
prog.rb: 4: Boom! (RuntimeError)
from prog.rb: 8: in `join '
from prog.rb: 8
from prog.rb: 8: in `each '
from prog.rb: 8


We can catch the exception during the execution of join.
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each do |t|
    begin
        t.join
        rescue RuntimeError => e
        puts "Failed: #{e.message}"
    end
end 


Result:
0
1
3 
Failed: Boom!

However, if you set abort_on_exception to true or use -d to disable the debug flag, an unhandled exception will kill all running threads. Once thread 2 dies, no more output will be made.

Thread.abort_on_exception = true
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each {|t| t.join }


Result:
0
1
prog.rb: 5: Boom! (RuntimeError)
from prog.rb: 4: in `initialize '
from prog.rb: 4: in `new '
from prog.rb: 4
from prog.rb: 3: in `times'
from prog.rb: 3


This example also illustrates a glitch. Inside a loop, it is preferable to use print to print the number than puts. Why? Because puts secretly breaks its work into two components: it prints its argument, and then displays a newline character. Between the two, a thread can start, and the output will alternate. By calling print on a single line that already contains a newline, we can work around this problem.

Stream Scheduler Management


In a well-designed application, you'll just let threads do their job. Creating time dependencies in a multi-threaded application is usually considered bad form, because this makes the code much more difficult to read, and also makes it impossible to optimize the execution of your program by the thread scheduler.

However, sometimes you will need to manage threads explicitly. For example, a jukebox showing light music. We have to stop him at a time when the music stops. You can use two flows in the form of a producer-consumer diagram, where the consumer must wait if the manufacturer has unfinished orders.

The Thread class provides a set of methods for managing a thread scheduler. A call to Thread.stop stops the current thread, and a call to Thread # run starts a separate thread. Thread.pass starts the scheduler to transfer execution to another thread, and Thread # join and Thread # value pause the calling thread until the specified threads finish.

We can demonstrate this feature in the next completely pointless program. It creates two child streams: t1 and t2, each of which is an instance of the Chaser class. The chase method increments the counter, but prevents it from becoming more than two compared to the counter in another thread. To stop this increase, the method calls Thread.pass, which allows the chase method to start in another thread. For fun, we immediately stop the threads immediately after the start, and then randomly start them.

class Chaser
    attr_reader :count
    def initialize(name)
        @name = name
        @count = 0
    end
    def chase(other)
        while @count < 5
            while @count - other.count > 1
                Thread.pass
            end
            @count += 1
            print "#@name#{count}\n"
        end
    end 
end 

c1 = Chaser.new("A")
c2 = Chaser.new("B")
threads = [
    Thread.new { Thread.stop; c1.chase(c2) },
    Thread.new { Thread.stop; c2.chase(c1) }
]
start_index = rand(2)
threads[start_index].run
threads[1 - start_index].run 
threads.each {|t| t.join }


Result:
B: 1
B: 2
A: 1
B: 3
A: 2
B: 4
A: 3
B: 5
A: 4
A: 5


However, using such elementary actions to achieve synchronization in real code is not so simple - the state of the race will constantly haunt you. And when you work with general data, the state of the race definitely guarantees you a long and frustrating debugging. Actually, the previous example contains an error: it is possible to increment the counter in one thread, but before displaying its value, another thread starts and displays the value of its counter. As a result, the output will be in the wrong order.

Fortunately, threads have one additional possibility - the idea of ​​mutual exclusion. Using this, we can create secure synchronization schemes.

Also popular now: