Thread

Threads用于实现Ruby并发编程模型,使用Thread Class是开发者的完美候选
比如,我们可以创建一个与主线程分离的新线程,通过::new
thr = Thread.new { puts "What's the big deal" }
然后我们可以停止主线程执行,等待新线程结束,通过::join
thr.join #=> "What's the big deal"
如果不在主线程结束前使用thr.join,那么包括thr在内的所有线程都会被kill 此外,可以通过数组储存线程实例
threads = []
threads << Thread.new { puts "What's the big deal" }
threads << Thread.new { 3.times { puts "Threads are fun!" } }
然后陆续的等它们完成
threads.each { |thr| thr.join }

Thread的初始化

为了创建新线程,Ruby提供了::new,::start,::fork,所有的方法必须提供block,否则会导致出现ThreadError
当继承Thread Class时,::start,::fork将会忽视子类的initialize方法,此外,确保子类initialize中调用super方法

Thread的终止

对于终止线程,Ruby提供了多种方法实现它 class method ::kill意味着kill掉给出的进程
thr = Thread.new { ... }
Thread.kill(thr)
同样可以使用其实例方法#exit或其别名#kill或#terminate

Thread的状态

Ruby提供了几个方法查询线程的状态,为了得到以字符串表示的状态,使用#status
thr = Thread.new { sleep }
thr.status #=> sleep
thr.exit
thr.status #=> false
也可以使用#alive?方法辨别线程是否运行或睡眠中,#stop?辨别线程是否dead或睡眠中

Thread变量及作用域

因为创建线程伴随着block,所以对于block的规则,同样应用于线程,block内的变量只属于线程

Fiber-local vs. Thread-local

每个Fiber拥有自己的Thread#[]储存,当设置一个Fiber-local,只能在Fiber内访问,为了证明它:
def main()
    Thread.new {
        Thread.current[:foo] = "bar"
        Fiber.new {
            p Thread.current[:foo] # => nil
        }.resume
    }.join
end
main()
这个例子使用#[]来得到和#[]=使用fiber-locals,也可以使用#keys列出给出线程的fiber-local和使用#key?来检查fiber-local是否存在,当来源于thread-locals时,线程能够完全访问作用域
def main()
    Thread.new {
        Thread.current.thread_variable_set(:foo, 1)
        p Thread.current.thread_variable_get(:foo) # => 1
        Fiber.new {
            Thread.current.thread_variable_set(:foo, 2)
            p Thread.current.thread_variable_get(:foo) # => 2
        }.resume
        p Thread.current.thread_variable_get(:foo) # => 2
    }.join
end
main()
可以看到thread-local:foo转到fiber变成2在线程结束,这个例子使用#thread_variable_set创建新的thread-local,而thread_variable_get得到其引用 这里也有thread_variables列出存在的thread-local,也有thread_variable?判断变量是否存在

异常处理

任何线程都能引发异常,使用实例方法#raise,它和Kenerl#raise是相似的,但是注意到除主线程外异常发生取决于#abort_on_exception,这个值默认是false,意味着,任何未处理的异常将会造成线程静默的终止当被#join或#value等待,可以设置abort_on_exception = true或$DEBUG = true,加上类方法::handle_interrupt,可以异步处理线程的异常

调度

Ruby提供了几种方法调度线程,第一种方法是使用::stop,使得当前线程sleep,使得另一个线程得到调度 一旦线程被睡眠,可以调用#wakeup使得线程能够得到合理的调度,也可以使用::pass,使得跳过当前线程执行另一个线程,但是这取决于操作系统,运行中的线程是否转换,还有#priority可以提示调度器线程调度的优先级,同样是平台相关的,可能会被忽略

Mutex

Mutex实现了简单的信号量正确访问共享的多线程变量
def main
    semaphore = Mutex.new
    a = Thread.new { 
        semaphore.synchronize do
            100.times { |index| puts "Thread a: #{index}" }
        end
    }
    b = Thread.new {
        semaphore.synchronize do
            100.times { |index| puts "Thread b: #{index}" }
        end
    }
    a.join
    b.join
end
main()
实例方法 描述
lock 试图获得锁并等待如果不能提供,当前线程重复lock会引发ThreadError
locked? 判断Mutex是否某个线程lock了
owned? 如果当前线程lock了,返回true
try_lock 试图获得锁并立即返回,如果获得锁返回true
unlock 解锁,如果当前线程不持有锁,将会引发ThreadError
synchronize 获得锁,并且运行block,block完成后释放锁
sleep(timeout = nil) 释放锁,并sleep timeout,下次唤醒时将重新要求锁

ConditionVariable

条件变量参数是Mutex对象,使用条件变量可以停止等待正忙的关键部分,直到资源可供
实例方法 描述
wait(mutex, timeout = nil) 释放mutex的锁并且等待,重新唤醒时获得锁(超时会重新获得锁)
signal 唤醒第一个等待的thread
broadcast 唤醒所有等待的线程

def main
    mutex = Mutex.new
    condition = ConditionVariable.new
    a = Thread.new do
        mutex.synchronize do
            2.times { puts "Thread a first two times"}
            condition.wait(mutex)
            10.times { |index| puts "Thread a: seq #{index} is random" }
        end
    end
    b = Thread.new do
        Thread.pass # let a first
        mutex.synchronize do 
            puts "start to wakeup thread a"
            condition.signal
            10.times { |index| puts "Thread b: seq #{index} is random" }
        end
    end
    a.join
    b.join
end
main()