KayでシンプルにTaskQueueを使いこなす
AppEngineには、どんな処理も30秒以内に終わらせないとダメって言う通称30秒ルールという神の掟があってこれを破るとプロセスをKillれてしまいます。ほとんどの処理は30秒もかからないので問題ないですが、クロールとか形態素解析とかやりだすと30秒なんて一瞬です。特にネットワーク経由で複数のものを取りに行ったりすると2秒かかるのを10個もやれば、DataStoreのIOとかも含めると余裕で30秒を使いきってしまいます。
そこで、TaskQueueの登場です。適当につんどくと無料Versionだと1秒間あたり5個ずつ処理してくれます。先程の例だと、10個やるのに30秒だったのが1個処理するのに30秒使えるようになります。苦も無く10倍ゆっくりやれるわけです。
TaskQueueは、内部でキューに送れずに特定のURLに対するPOSTで積みます。引数はペイロードに入れてあげればイイらしい。今作っているのが、3分間隔でsearch.twitter.comのrssを取得して短縮URLを抜き出して、オリジナルのURLに展開したものをDataStoreに突っ込むという事を、CronとTaskQueueを使って実現しています。
2週間もしないでやり方を絶対忘れるのでメモ(他にもっといいやり方があったら教えてください)
Kayは、メソッドにディスパッチされるので、webapp使っている人はちょっと見た目が違います。GoogleのAppEngineのコードサイトのサンプルは、HandlerとWorkerみたいに別のURLが割り振ってあってそれぞれにクラスを作って処理するように書いてあります。でも実際に考えるときには、そもそも重いひとつの処理がある訳だからそれに対して2つのURLと2つのクラスを与えるのはちょっとメンドクサイ。そこで、ハンドラーはGETでいけてワーカーはPOSTなんだからひとつにすればいいじゃんと言うことでやってみた。
def expandUrlHandler(request, lang='ja'): if request.method == "GET": feedUrl = "http://search.twitter.com/search.atom?lang=%s&q=ustre"%(lang) atom = feedparser.parse(feedUrl) res = "" for i in atom.entries: m = re.search(r"http\:\/\/ustre.am\/[a-z|A-Z|0-9|:]*", i.description) res += "%s<br>"%(m.group()) taskqueue.add(url='/cron/get/channel/%s'%(lang), params={'url': m.group(), 'lang':lang}) return Response(res) elif request.method == "POST": url = request.form['url'] req_lang = request.form['lang'] longURL = expandUrl(url) saveUrlMap(url, longURL, req_lang) return Response('done')
上半分が、TaskQueueを登録するためのGET分で下半分がTaskQueueから呼ばれるPOST部分
これみたいに、これを>こうすると1メソッドでかけた方が脳負荷が少なく感じた。
これで、ustre.amを含む日本語のtweetを抜き出して、リアルのURLに変換のちデータストアに保存するのがサクサクと出来るようになりました。一回のリクエストで15件取得して1時間に20回、一日で480回 * 15件 = 7200回のリクエストで28%ものIncomingBandWidthを消費しています。結構でかいですよね。リクエストしているURLは
http://search.twitter.com/search.atom?lang=ja&q=http%3A%2F%2Fustre.am%2F
こんな感じ
このメソッドを叩くCronは
cron: - description: short to long URL Map generate url: /cron/get/channel/ja schedule: every 3 minutes
こんな感じ。リクエストURLをenとかにすると英語で検索してくるけど今のところやってない。