それマグで!

知識はカップより、マグでゆっくり頂きます。 takuya_1stのブログ

習慣に早くから配慮した者は、 おそらく人生の実りも大きい。

Rubyでマルチスレッド その5 # Guarded Suspension

Ruby結城浩せんせいのJavaマルチスレッド本をやってみる試み、その5

Guarded Suspension

Guarded Suspensionはガード条件が満たされるまで while (your_trun?) { wait() } で まつ。

2本スレッドで、キューを使う

  1. クライアント:リクエストをキューに置く
  2. サーバー:リクエストをキューから取り出す。
  3. キュー:サーバー・クライアントの間で同期しながら動く

キューはFIFOなので、Read-Writeの衝突は起きません。

rubyのQueueクラスを使った例。

Queueがすでに提供されているので。Rubyのスレッドのお試しとして、使ってみた。
(長いのでスクロール)

#!/usr/bin/env ruby

# Guarded Suspension
#   用意できるまで待ってね
# Ruby 組み込みのQueueを使う場合
# Queue を使うとサンプルが再現できるがGuardedSuspensionの勉強にならない。
require "thread"

class RequestQueue
  def initialize
    @q = Queue.new
  end
  def getRequest
    req = @q.pop
  end
  def putRequest req
    @q.push req #JavaのnotifyAll は Queueクラスがやってくれる
  end
  def size
    @q.size
  end
end
class Request
  def initialize name
    @name = name
  end
  def to_s
    "[ Request #{@name} ]";
  end
end
class ServerThread < Thread
  def initialize( queue, name )
    @queue = queue
    @name = name
    block = Proc.new{
      10000.times{
        req = @queue.getRequest
        puts "#{@name} handles  #{req}  #{@queue.size}"
        sleep rand(4)
      }
    }
    super(&block)
  end
end
class ClientThread < Thread
  def initialize( queue, name )
    @queue = queue
    @name = name
    block = Proc.new{
      10000.times{|i|
        req = Request.new( "No.#{i}")
        @queue.putRequest req
        puts "#{@name} requests #{req}  #{@queue.size}"
        sleep rand(2)
      }
    }
    super(&block)
  end
end

q = RequestQueue.new
threads = [ ServerThread.new(q, "Bobby"), ClientThread.new(q,"Alice") ]
puts threads
threads.each{|t|t.join}
puts "END"

Guarded Suspensionを実装してQueueを作る。

RubyのQueueで出来ることが分かった。しかしGuarded Suspension の勉強にならない。Queueが空っぽの時、サーバースレッドをWaitセットに入れて待たせることが重要なのです。Queueはそれをやってくれる。ここでQueueに頼ってGuarded Suspension を作れないと、次章以降で困るので、Rubyで作る。
(長いのでスクロール)

#!/usr/bin/env ruby

# Guarded Suspension
#   用意できるまで待ってね

require "thread"
class RequestQueue 
  def initialize
    @q = []
    @m = Mutex.new
    @cv = ConditionVariable.new
  end
  def getRequest
    @m.lock
    while( @q.empty? ) do
      @cv.wait(@m)
    end
    req = @q.shift
    @m.unlock
    req
  end
  def putRequest req
    @m.lock
      @q.push req
      @cv.broadcast
    @m.unlock
  end
  def size
    @m.synchronize {
      @q.size
    }
  end
end
class Request
  def initialize name
    @name = name
  end
  def to_s
    "[ Request #{@name} ]";
  end
end
class ServerThread < Thread
  def initialize( queue, name )
    @queue = queue
    @name = name
    block = Proc.new{
      10000.times{
        req = @queue.getRequest
        puts "#{@name} handles  #{req}  #{@queue.size}"
        sleep rand(2)
      }
    }
    super(&block)
  end
end
class ClientThread < Thread
  def initialize( queue, name )
    @queue = queue
    @name = name
    block = Proc.new{
      10000.times{|i|
        req = Request.new( "No.#{i}")
        @queue.putRequest req
        puts "#{@name} requests #{req}  #{@queue.size}"
        sleep rand(2)
      }
    }
    super(&block)
  end
end

q = RequestQueue.new
threads = [ ServerThread.new(q, "Bobby"), ClientThread.new(q,"Alice") ]
puts threads
threads.each{|t|t.join}
puts "END"

Rubyで実装するときのポイント

rubyはObjectがWaitセットを持たない。またSynchronizeを持たない。そこで、mutexをつかってlockする。のでした。しかしmutexのwaitはmutexをロックしたまま待つ。他スレッドがロック待ちになってしまう。

ConditionVariableを使ってMutexのロックを制御する

    @m = Mutex.new
    @cv = ConditionVariable.new  

ConditionVariable#waitでMutexのロックを外し、Waitする。

   @m.lock
    while( @q.empty? ) do
      @cv.wait(@m)
    end
    req = @q.shift
    @m.unlock

ConditionVariable#broadcat でJavaのnotifyAllの替わりをする。

      @m.lock
      @q.push req
      @cv.broadcast

こんな感じ。このGuarded Suspensionは結城先生のJavaマルチスレッド デザインパターンの基本定理なのでしっかりやっておく。かりにPythonなどでデザインパターンの書き起すときもGuarded Suspensionは基本になるはず。