[Python 3.5] async for 文で利用するスリープソートを実装してみた。 | パークのソフトウエア開発者ブログ|ICT技術(Java・Android・iPhone・C・Ruby)なら株式会社パークにお任せください

パークのソフトウエア開発者ブログ|ICT技術(Java・Android・iPhone・C・Ruby)なら株式会社パークにお任せください

開発の解決方法や新しい手法の情報を、パークのエンジニアが提供します。パークのエンジニアが必要な場合は、ぜひお気軽にお問い合わせ下さい。 株式会社パーク:http://www.pa-rk.co.jp/

ちかです
最近は TypeScript に浸かっています。
もう何年も Python に触れていなかったのですが
Python 3.5 には型ヒントやらコルーチンのための構文 (async/await) やら追加されて
少し気になっていました。

型ヒントについては Python と型ヒント (Type Hints) を読んで
構造的部分型の型チェック (TypeScript みたいな) ができるようになることに期待しつつ放置。

async/await は (少なくとも他の言語では神機能だし) 使ってみようかと。
特に非同期イテレーション専用の構文 async for がおもしろそう!
ということで
以前 Go 言語と D 言語でスリープソートを実装してみたのとの比較も込めて
スリープソートを実装してみました。

import asyncio
from contextlib import closing

class ArrivalOrderedIterator:
    ''' コルーチン (または Future) のリストを受け取り、結果の到着順に結果値を返す非同期イテレーターを表します。 '''
    def __init__(self, coroutines):
        self.completed = set()
        self.incomplete = [asyncio.ensure_future(co) for co in coroutines]
 
    async def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.completed:
            return self.completed.pop().result()
        if not self.incomplete:
            raise StopAsyncIteration()
        self.completed, self.incomplete = await asyncio.wait(self.incomplete, return_when=asyncio.FIRST_COMPLETED)
        return self.completed.pop().result()

class sleepsort:
    ''' 数値のリストをスリープソートする非同期シーケンスを表します。 '''
    def __init__(self, values):
        self.values = values
 
    async def __aiter__(self):
        return ArrivalOrderedIterator(asyncio.sleep(x, x) for x in self.values)

if __name__ == '__main__':
    async def main():
        async for value in sleepsort([5, 2, 19, 11, 7, 13, 3, 17]):
            print(value)
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(main())

故意に async for 文を使っています (31 行目)。
そのせいで必要以上に長いコードになっているかもしれません。

async for 文を使うことで非同期のイテレーションを読みやすく記述できます。
...利用側では。

async for value in async_iterable:
  do_something(value)

非同期イテレーターの実装側では
__anext__() メソッドで「次の値」を返すか StopAsyncIteration を発生させてループを終了させます。
基本的にはクラスを作成して現在の状態をフィールドに持たせることになると思います。
ジェネレーター関数みたいに記述できたらラクなのですが。。

また
Reactive Extensions (Rx) を彷彿とさせますが (私だけ?)
内包表記や map/filter などは使えないようで
必要なら自前で用意する必要がありそうです。。
((i * i async for value in sleepsort(values) if i < 10) みたいな構文はないみたいです。)
参考: python - how can I asynchronously map/filter an asynchronous iterable? - Stack Overflow

ということで Python 3.5 の非同期イテレーション構文の紹介でした。

追記: ちなみに async for を使わず素直にコールバック形式で書くなら:

import asyncio
from contextlib import closing

async def sleepsort(values, callback):
    ''' 数値のリストをスリープソートします。 '''
    incomplete = [asyncio.ensure_future(asyncio.sleep(x, x)) for x in values]
    while incomplete:
        completed, incomplete = await asyncio.wait(incomplete, return_when=asyncio.FIRST_COMPLETED)
        while completed:
            callback(completed.pop().result())

if __name__ == '__main__':
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(sleepsort([5, 2, 19, 11, 7, 13, 3, 17], print))

追々記: map/filter を実装してみました。

import asyncio
from functools import partial
from contextlib import closing

class AsyncIterator:
    ''' 「次の値を取得する関数」から非同期イテレーターを構築します。 '''
    def __init__(self, anext):
        self.anext = anext

    async def __aiter__(self):
        return self

    async def __anext__(self):
        return await self.anext()

def async_iterable(next_getter_creator):
    ''' 「「次の値を取得する関数」を取得する関数」から非同期シーケンスを構築します。 '''
    class AsyncIterable:
        def __init__(self, *args, **kwds):
            self.args = args
            self.kwds = kwds

        async def __aiter__(self):
            return AsyncIterator(await next_getter_creator(*self.args, **self.kwds))

    return AsyncIterable

@async_iterable
async def async_map(selector, source):
    ''' 非同期シーケンスを射影します。 '''
    iterator = await source.__aiter__()
    async def next():
        return selector(await iterator.__anext__())
    return next

@async_iterable
async def async_filter(predicate, source):
    ''' 非同期シーケンスを絞り込みます。 '''
    iterator = await source.__aiter__()
    async def next():
        while True:
            value = await iterator.__anext__()
            if predicate(value):
                return value
    return next

@async_iterable
async def sleepsort(values):
    ''' 数値のリストをスリープソートする非同期シーケンスを表します。 '''
    # tasks = (completed, incomplete)
    tasks = [set(), [asyncio.ensure_future(asyncio.sleep(x, x)) for x in values]]
    async def next():
        if tasks[0]:
            return tasks[0].pop().result()
        if not tasks[1]:
            raise StopAsyncIteration()
        tasks[0], tasks[1] = await asyncio.wait(tasks[1], return_when=asyncio.FIRST_COMPLETED)
        return tasks[0].pop().result()
    return next

def chain(source, *selectors):
    ''' f(g(h(x))) を chain(x, h, g, f) と記述するメソッドチェイン機構を提供します。 '''
    for selector in selectors:
        source = selector(source)
    return source

if __name__ == '__main__':
    async def main():
        async for value in chain(
            sleepsort([5, 2, 19, 11, 7, 13, 3, 17]),
            partial(async_map, lambda x: x * x),
            partial(async_filter, lambda x: 10 < x < 200)
        ):
            print(value)
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(main())

出力結果

25
49
121
169

(開始 5 秒後に 25, 7 秒後に 49, ... が出力されます)