GAE/Python でフルテキストサーチ実装した。 redisを使ったインチキバージョン
GAEにどんどん機能が追加されていく中、なかなか実装されないのが全文検索。品詞がとれるセグメンターだけでも提供してくれたら全然便利だと思うんだけどそんなアナウンスはまだ有りません。
なきゃ作ればいいじゃんという事で、全文検索もどきを実装してみました。ひとつ前のエントリー通りTriGramです。
以前、恵比寿のイケメン イアンさんと一緒に作ったmisopotetoというモジュールをベースにしています。
今回のポイントは、転置インデックスをredisサーバに送っているところ、GAE(とうかDB全般)は、インサートがめちゃくちゃ遅いので、Ngramでgram毎にエントリーIDをappendしていくというのは辛いです。Twitterの検索結果15個x100文字位をTriGramでインデックスを作ろうとすると、1500個くらいをgetしてappendして、putする必要があります。以前は、TaskQueueが無かったのでおれおれQueueを作って非同期でIndexを作成していましたが今はTaskQueueがあるので問題なく実装できると思っていました。がしかし、全然遅いし、なぞのリトライが多発して、30分くらいで2000+個のTaskQueueが積まれっぱなしで全然消化される気配がありませんでした。
最近メチャ熱な、redisを使うことでサクサクIndex作成をしてみました。この時点でGAE/Pだけじゃなくなってるんですが・・・時間かければGAEでも出来るのでまぁまぁまぁ。redisは、sakuraのVPSにubuntu入れて立ててます。Pythonのredisモジュールはsocketが必須でGAEで使えないので、VPS上にredisプロキシーなFlaskAppを作ってPOSTメソッドでredisのKVSを使えるようにしています。
ソースは、bitbucket.orgに上げました。まるごと上げたので問題有るかも知れないですが、問題あったら教えてください。
ソース:http://bitbucket.org/a2c/a2c-fts/overview
a2c-fts:http://a2c-fts.appspot.com/
(注意:ChromeかSafari以外で見ると悲しい見た目になります)
セグメンター
utilsモジュールにNgramSegmenterクラスを入れてます。これでNgramでぶつ切ります。デフォルトでBigramですが、ノイズがハンパないので今回はTrigramで使用しています。以下ソース
class NgramSegmenter: _word_delimiter_regex = u"[。、" + string.punctuation + " ]" def __init__(self, text, sp=2, word_delimiter_regex=None): if not word_delimiter_regex: word_delimiter_regex = self._word_delimiter_regex self.text = re.sub(word_delimiter_regex, r'', text) self.ngramArr = [] for pos in range(len(self.text)-sp+1): self.ngramArr.append({ 'word_pos' : pos, 'word_text' : self.text[pos:pos+sp]}) def getText(self): return self.text def getNgramArr(self): return self.ngramArr def getSegmenter(self): a = [x['word_text'] for x in self.ngramArr] return a #return ' '.join(a) if __name__ == '__main__': LOG_FILENAME = 'segmenter.log' logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG) text = u'やってみせ 言って聞かせて させて見せ ほめてやらねば 人は動かじ' ana2Str = NgramSegmenter(text , 2) print ana2Str.getSegmenter()
Reidsプロクシ
GAEからredisサーバを使用する為にシンプルなWebアプリ作りました。redisはappend操作もアトミックに動くのでとても楽ちんです。登録する時には、GETでもPOSTでも受け付けます。参照するときにはGETのみ。テスト用にルートにアクセスすると登録用のフォームもあります。abかけてみましたが、秒間数百行けてるです。
#!/usr/bin/env python import os import redis from flask import Flask, request import json app = Flask(__name__) app.debug = True @app.route('/') def redis_input(): html = ''' <!doctype html> <form action="/api/post" method="post"> Key:<input type=text name="key"><br> Val:<input type=text name="val"><br> <input type=submit value="save"> </form> ''' return html # GET method ================================================ @app.route('/api/set/<key>/<val>') def set_id(key, val): gram = 'redis_' + key twit_id = str(val) r = redis.Redis(host='localhost', port=6379, db=0) r.rpush(gram, twit_id) cur_list = r.lrange(gram, 0, -1) return json.dumps(cur_list, indent=2) @app.route('/api/get/<key>') def get_id(key): gram = 'redis_' + key r = redis.Redis(host='localhost', port=6379, db=0) cur_list = r.lrange(gram, 0, -1) return json.dumps(cur_list, indent=2) # POST method ================================================ @app.route('/api/post', methods=['POST']) def set_post_id(): gram = 'redis_' + request.form['key'] twit_id = str(request.form['val']) r = redis.Redis(host='localhost', port=6379, db=0) r.rpush(gram, twit_id) cur_list = r.lrange(gram, 0, -1) return json.dumps(cur_list, indent=2) if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)
TaskQueueに超ハマッた!
GAE-jのグループにも聞いてしまったのですが、TaskQueueに登録するところではまりました。
TaskQueueで登録できるTaskは、自アプリの特定URLオンリーなので外部URLをTaskに登録することが出来ません。そこで、外部サーバーを叩くエンドポイントを作ってそこをTaskQueueで叩くようにしたのですが、正常終了しているはずなのにTaskが削除されずに延々リトライを続けて、GAEのCPUとバンドをドンドン食いつぶす病にかかってしまいました。
以下、駄目だったコード
@app.route('/api/send_redis', methods=['POST']) def saveRedisTwitSearchIndex(): gram = request.form['gram'] twit_id =request.form['twit_id'] ext_url = 'http://redis.hoge.com/api/post' form_fields = { "key": gram, "val": twit_id, } form_data = urllib.urlencode(form_fields) result = urlfetch.fetch(url=ext_url, payload=form_data, method=urlfetch.POST, ) if result.status_code == 200: return 1 return 1
ほぼサンプルからコピペした寄せ集めなので動くはずなんですが、全然だめで、原因もわかりません。
今も原因が分からないですが、どうやら if があるとダメぽいです。
if result.status_code == 200: return 1
この部分をなくすとうまくいきました。動いている現在のコード
@app.route('/api/send_redis', methods=['POST']) def saveRedisTwitSearchIndex(): gram = request.form['gram'] twit_id =request.form['twit_id'] ext_url = 'http://redis.atusi.me/api/post' form_fields = { "key": gram, "val": twit_id, } form_data = urllib.urlencode(form_fields) result = urlfetch.fetch(url=ext_url, payload=form_data, method=urlfetch.POST, ) ''' if result.status_code == 200: #return result.status_code return json.loads(result.content) ''' return "gram: %s<br>twitid: %s"%(gram, twit_id)
コメントアウトするだけで動きました。なんでやねん!
上記変更を加えてから、1000以上のTaskQueueも数分で消化できるようになりました。
検索結果は、GAEの上限の1000件まで返すようになってます。トップページは、いっぱい出ても意味が無いので100件に絞ってます。他にも色々つまずいた所とがあったような気がしたけど忘れた・・・
redisへのFlaskアプリのベンチーマーク結果
Document Path: /api/set/hoge1/1 Document Length: 17 bytes Concurrency Level: 100 Time taken for tests: 0.439 seconds Complete requests: 100 Failed requests: 98 (Connect: 0, Receive: 0, Length: 98, Exceptions: 0) Write errors: 0 Total transferred: 59828 bytes HTML transferred: 40740 bytes Requests per second: 227.68 [#/sec] (mean) Time per request: 439.219 [ms] (mean) Time per request: 4.392 [ms] (mean, across all concurrent requests) Transfer rate: 133.02 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 8 15 3.3 16 21 Processing: 38 154 81.8 144 431 Waiting: 38 154 81.6 144 428 Total: 48 169 83.6 160 439