When it’s ready.

出来るまで出来ない

「普通に、行きたい、不自然な、お祭り」  FIFOを初めて知った。 BaseManager第2弾

id:mopemopeさんとlirisさんのおかげでサーバーとクライアントの通信が出来るようになった。ありがとうです。

こんな簡単に別PCとやりとり出来るようになるなんてホント素敵すぎる。通信出来ると言ってもqueueを一つ渡し何も加工せずに戻ってきたのをprintしてるだけなので、サーバーサイドで何かやらせないとせっかく別PCに送った意味がない。

queueというのが、やりとりしてるモノっぽいのでソース見てみる。

/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/Queue.py

self._init(maxsize)
# mutex must be held whenever the queue is mutating.  All methods
# that acquire mutex must release it before returning.  mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()

なんかいきなり、mutexとか出てきておぞましい感じがしたので退散。ソースを追うことが出来ても意味が分らないので読むのを諦める(あきらめが肝心)
10年くらい前、CGを複数人で作っている際に後から上書きでデータを消してしまうミスが多発した経験がある。その時は、保存帽子と呼ばれる帽子を作ってその帽子をかぶっている人しかSAVE出来ない運用にした。たったそれだけでミスが無くなったことがある。mutexも似たようなモンだろうと勝手に理解しておく。

queueを渡したり戻ったりしているので、Queueソース以外で調べてみたら、Python2.4だけどhttp://www.python.jp/doc/2.4/lib/module-Queue.htmlこんなページが見つかった。

Queueモジュールは、多生産者-多消費者FIFOキューを実装します。これは、複数のスレッドの間で情報を安全に交換しなければならないときのスレッドプログラミングで特に有益です。このモジュールのQueueクラスは、必要なすべてのロックセマンティクスを実装しています。

多生産者ー多消費者ってイイ響き。どんどん作ってドンドン使える。しかも、安全らしいぞ。これはいい。ところで、FIFOって何だよ!これもググる、フィフォ>ファースト イン ファースト アウトらしい。先入れ先出しロケットペンシルみたいな感じかな?入れるところと出てくるところが決っていて、順番も固定なのは安全の為だろうと理解する。

と言うことで、はじめてぇのぉQueue♪ してみる。

from Queue import Queue

q = Queue()
# qに入れる
q.put('first')
q.put(['second', '3rd'])
# qから取出す>1回目
print q.get()
# qのサイズを調べる
print 'queue size is', q.qsize()
# qから取出す>2回目
print q.get()

# 一気に5つ登録する
[q.put(i)for i in range(5)]
# qのサイズを調べる
print 'queue size is', q.qsize()

# qが空っぽになるまで取出す
while not q.empty():
  print q.get()

実行すると↓

first
queue size is 1
['second', '3rd']
queue size is 5
0
1
2
3
4

おもすれぇな、これ。Listのappendとpopで同じ事出来る気がするけど。まぁ、気にするのはやめよう。きっと、mutexがなんか絡んでるんだろうなぁ・・・

これを踏まえて、BaseManagerのサーバー側でデータをゲットして適当に処理してみる事にする。

bm_server.py
-----------------------------------------
from multiprocessing.managers import BaseManager
import Queue
from datetime import datetime

queue = Queue.Queue()
class QueueManager(BaseManager):
   pass

def _call():
   print('server call')
   queue.put(0)
   f()
   return queue

# 登録されたqueueを次々と足しつつQueueにput
tmpTotal = 0
def f():
   global tmpTotal
   tmpTotal += queue.get()
   print tmpTotal
   queue.put(tmpTotal)
   return 0


QueueManager.register('getQueue', callable=_call)
m = QueueManager(address=('', 50000), authkey='test')
m.get_server().serve_forever()
bm_client.py
-----------------------------------------
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
      pass

QueueManager.register('getQueue')
m = QueueManager(address=('localhost', 50000),authkey='test')
m.connect()
queue = m.getQueue()

for i in range(10):
   queue.put(i)

print(queue.qsize())

while not queue.empty():
   print queue.get()

とりあえづ、ドンドン足し算やってもらう簡単なのやってみようと思ったけど、サーバ側のf()が1回しか呼ばれてない状態になってる。どんなタイミングでサーバー側のファンクションが動くのか判らない。queue.put(hoge)のタイミングでどんどこデータを送ってると思ってたんだけど違うぽい。
そんな事したら、あっちこっちでデータの整合性が保てなくなるなぁとか、思いながらもそう挙動することを望んでいる自分が居たりして・・・

引続き、BaseManagerと遊んでみる。これは、想像以上に楽しい。