Rubyで結城浩せんせいのJavaマルチスレッド本をやってみる試み、その5
Guarded Suspension
Guarded Suspensionはガード条件が満たされるまで while (your_trun?) { wait() } で まつ。
2本スレッドで、キューを使う
- クライアント:リクエストをキューに置く
- サーバー:リクエストをキューから取り出す。
- キュー:サーバー・クライアントの間で同期しながら動く
キューはFIFOなので、Read-Writeの衝突は起きません。
rubyのQueueクラスを使った例。
Queueがすでに提供されているので。Rubyのスレッドのお試しとして、使ってみた。
(長いのでスクロール)
#!/usr/bin/env ruby # Guarded Suspension # 用意できるまで待ってね # Ruby 組み込みのQueueを使う場合 # Queue を使うとサンプルが再現できるがGuardedSuspensionの勉強にならない。 require "thread" class RequestQueue def initialize @q = Queue.new end def getRequest req = @q.pop end def putRequest req @q.push req #JavaのnotifyAll は Queueクラスがやってくれる end def size @q.size end end class Request def initialize name @name = name end def to_s "[ Request #{@name} ]"; end end class ServerThread < Thread def initialize( queue, name ) @queue = queue @name = name block = Proc.new{ 10000.times{ req = @queue.getRequest puts "#{@name} handles #{req} #{@queue.size}" sleep rand(4) } } super(&block) end end class ClientThread < Thread def initialize( queue, name ) @queue = queue @name = name block = Proc.new{ 10000.times{|i| req = Request.new( "No.#{i}") @queue.putRequest req puts "#{@name} requests #{req} #{@queue.size}" sleep rand(2) } } super(&block) end end q = RequestQueue.new threads = [ ServerThread.new(q, "Bobby"), ClientThread.new(q,"Alice") ] puts threads threads.each{|t|t.join} puts "END"
Guarded Suspensionを実装してQueueを作る。
RubyのQueueで出来ることが分かった。しかしGuarded Suspension の勉強にならない。Queueが空っぽの時、サーバースレッドをWaitセットに入れて待たせることが重要なのです。Queueはそれをやってくれる。ここでQueueに頼ってGuarded Suspension を作れないと、次章以降で困るので、Rubyで作る。
(長いのでスクロール)
#!/usr/bin/env ruby # Guarded Suspension # 用意できるまで待ってね require "thread" class RequestQueue def initialize @q = [] @m = Mutex.new @cv = ConditionVariable.new end def getRequest @m.lock while( @q.empty? ) do @cv.wait(@m) end req = @q.shift @m.unlock req end def putRequest req @m.lock @q.push req @cv.broadcast @m.unlock end def size @m.synchronize { @q.size } end end class Request def initialize name @name = name end def to_s "[ Request #{@name} ]"; end end class ServerThread < Thread def initialize( queue, name ) @queue = queue @name = name block = Proc.new{ 10000.times{ req = @queue.getRequest puts "#{@name} handles #{req} #{@queue.size}" sleep rand(2) } } super(&block) end end class ClientThread < Thread def initialize( queue, name ) @queue = queue @name = name block = Proc.new{ 10000.times{|i| req = Request.new( "No.#{i}") @queue.putRequest req puts "#{@name} requests #{req} #{@queue.size}" sleep rand(2) } } super(&block) end end q = RequestQueue.new threads = [ ServerThread.new(q, "Bobby"), ClientThread.new(q,"Alice") ] puts threads threads.each{|t|t.join} puts "END"
Rubyで実装するときのポイント
rubyはObjectがWaitセットを持たない。またSynchronizeを持たない。そこで、mutexをつかってlockする。のでした。しかしmutexのwaitはmutexをロックしたまま待つ。他スレッドがロック待ちになってしまう。
ConditionVariableを使ってMutexのロックを制御する
@m = Mutex.new @cv = ConditionVariable.new
ConditionVariable#waitでMutexのロックを外し、Waitする。
@m.lock while( @q.empty? ) do @cv.wait(@m) end req = @q.shift @m.unlock
ConditionVariable#broadcat でJavaのnotifyAllの替わりをする。
@m.lock @q.push req @cv.broadcast
こんな感じ。このGuarded Suspensionは結城先生のJavaマルチスレッド デザインパターンの基本定理なのでしっかりやっておく。かりにPythonなどでデザインパターンの書き起すときもGuarded Suspensionは基本になるはず。