latest log

酩酊状態で書いたエンジニアポエムです。酩酊状態で読んでください。

Future と Stream という同期/非同期処理の混在をシンプルにコード化できる車輪を再発明したよ


Script Junkie | Creating Responsive Applications Using jQuery Deferred and Promises
(日本語訳: jQueryのDeferredとPromiseで応答性の良いアプリをー基本編 | ゆっくりと… ) をみて、

(ε・◇・)з Deferred… Promise… お 覚えられん…
(ε・◇・)з 脳みそちっちゃいので、もっと気軽に使えて、簡単に書けるのがいいなぁ~

って思ってました。

なので、自分なりに、

  • 複数の同期/非同期処理を同時に実行し、処理の合流ができたり多少の失敗があっても前に進める Future
  • 複数の同期/非同期処理を順番/同時に実行し、流れを直感的に記述できる Stream

という2つの仕組みを考えました。

Stream は一本の流れを制御し Future は同時多発的な流れを制御するため、これらを組み合わせる事で、複雑で面倒な複数ストリームの待ち合わせ処理がシンプルに記述できるようになります。

Stream の具体的な実装は、String.prototype.stream (String#stream) や Array.prototype.stream (Array#stream) になります。

この記事中に出てくるサンプルコードは、 http://mofmof-js.googlecode.com/svn/trunk/test/base.js.htm ページのコンソール上で実際に動かす事ができます。

String#stream の基本的な使い方

まずは、関数の呼び出し順を > と + でつなげた文字列(ストリームへのコマンド)を作ります。

  • > は、その処理が終わったら次の処理へ移動する
  • + は、その隣の処理と並列で処理する
  • 関数名を書くと関数を呼び出す
  • 数値を書くと、その時間分だけ遅延する

という意味になります。

「最初に1000ms(= 1秒)待ってから、fn1を実行し、fn1が終わったら fn2 と fn3 が同時に走りだし、両方終わってから fn4 が実行される」をコマンドに変換すると、
"1000 > fn1 > fn2 + fn3 > fn4" になります。

"1000 > fn1 > fn2 + fn3 > fn4".stream({
  fn1: function() { ... }
  fn2: function() { ... }
  fn3: function() { ... }
  fn4: function() { ... }
});

呼び出したい関数が全て同期処理の場合

同期で結果を返す関数(fn1~fn4)を
"1000 > fn1 > fn2 + fn3 > fn4" というコマンドで制御する例です。
同期関数は return true (成功)または return false (失敗) を返す必要があります。

"1000 > fn1 > fn2 + fn3 > fn4".stream({
  fn1: function() { mm.log("fn1"); return true; },
  fn2: function() { mm.log("fn2"); return true; },
  fn3: function() { mm.log("fn3"); return true; },
  fn4: function() { mm.log("fn4"); return true; }
});

以下が実行結果です。

[ 2012-02-06T16:48:06.009Z ]: fn1
[ 2012-02-06T16:48:06.010Z ]: fn2
[ 2012-02-06T16:48:06.011Z ]: fn3
[ 2012-02-06T16:48:06.011Z ]: fn4

全て同期処理なので、ほぼノーウェイトで実行されている事がわかります。

呼び出したい関数が全て非同期処理の場合

String#stream は非同期処理も扱う事ができます。

以下は、非同期で結果を返す関数(async_fn1 ~ async_fn4)を
"1000 > async_fn1 > async_fn2 + async_fn3 > async_fn4"というコマンドで制御する例です(async_ が付いているだけで、先程の同期処理と内容的には同じコマンドです)。

同期処理との違いは、

  • 非同期処理は実行結果を return で返せない
  • 関数にコールバック引数( function(callback) {...} )が渡されているので処理終了で callback() するとそれが結果になる
  • callback() や callback(true) なら 成功。 callback(false) なら失敗になる

です。

同期関数は関数の終わりに return true/false で結果を返し、非同期関数は callback(true/false) で結果を返します。

function async_fn1(callback) {
    mm.log("fn1 begin");
    5..wait(function() { // wait 5sec
        mm.log("fn1 end");
        callback();
    });
}
function async_fn2(callback) {
    mm.log("fn2 begin");
    2..wait(function() { // wait 2sec
        mm.log("fn2 end");
        callback();
    });
}
function async_fn3(callback) {
    mm.log("fn3 begin");
    1..wait(function() { // wait 1sec
        mm.log("fn3 end");
        callback();
    });
}
function async_fn4(callback) {
    mm.log("fn4 begin");
    5..wait(function() { // wait 5sec
        mm.log("fn4 end");
        callback();
    });
}


"1000 > async_fn1 > async_fn2 + async_fn3 > async_fn4".stream({
    async_fn1: async_fn1,
    async_fn2: async_fn2,
    async_fn3: async_fn3,
    async_fn4: async_fn4
});

以下が実行結果です。

[ 2012-02-06T16:40:41.010Z ]: fn1 begin 
[ 2012-02-06T16:40:46.011Z ]: fn1 end   
[ 2012-02-06T16:40:46.012Z ]: fn2 begin 
[ 2012-02-06T16:40:46.013Z ]: fn3 begin 
[ 2012-02-06T16:40:47.013Z ]: fn3 end   
[ 2012-02-06T16:40:48.012Z ]: fn2 end   
[ 2012-02-06T16:40:48.013Z ]: fn4 begin 
[ 2012-02-06T16:40:53.014Z ]: fn4 end   

分かりやすく加工すると(↓)このようになり、正しく動作している事がわかります。

[ 41.010 ]: fn1 begin ←┐5秒       ←─┐12秒(5+2+5)
[ 46.011 ]: fn1 end  ─┘          │
[ 46.012 ]: fn2 begin ←───┐2秒  ←┐7秒 │
[ 46.013 ]: fn3 begin ←┐1秒 │    │  │
[ 47.013 ]: fn3 end  ─┘  │    │  │
[ 48.012 ]: fn2 end  ────┘←┐5秒 │  │
[ 48.013 ]: fn4 begin ←┐5秒   │  │  │
[ 53.014 ]: fn4 end  ─┴────┴──┴──┘

初期化, 終了処理, 同期 + 非同期の混在

以下は、より複雑な例です。

  • init 実行後に2秒待ち async_fn1 を実行
  • async_fn1 終了で async_fn2, fn3, async_fn4 をパラレルに実行し、最低8秒たってから fin を実行
  • init, fin, fn3 は同期、async_fn1, async_fn2, async_fn4 は非同期処理
    • init と fin は同期なので、本来 return で結果を返せるが、あえて callback(true) で結果を返している
"init > 2000 > async_fn1 > async_fn2 + 8000 + fn3 + async_fn4 > fin".stream({
  init: function(callback) { mm.log("init"); callback(true); },
  fin:  function(callback) { mm.log("fin");  callback(true); },
  async_fn1:
        function(callback) {
          mm.log("fn1 begin");
          4..wait(function() { mm.log("fn1 end"); callback(true) }) },
  async_fn2:
        function(callback) {
          mm.log("fn2 begin");
          2..wait(function() { mm.log("fn2 end"); callback(true) }) },
  fn3:  function()   { mm.log("fn3"); return true; },
  async_fn4:
        function(callback) {
          mm.log("fn4 begin");
          4..wait(function() { mm.log("fn4 end"); callback(true) }) }
});

以下が実行結果です。

[ 2012-02-07T10:06:16.667Z ]: init
[ 2012-02-07T10:06:18.672Z ]: fn1 begin
[ 2012-02-07T10:06:22.672Z ]: fn1 end
[ 2012-02-07T10:06:22.672Z ]: fn2 begin
[ 2012-02-07T10:06:22.673Z ]: fn3
[ 2012-02-07T10:06:22.674Z ]: fn4 begin
[ 2012-02-07T10:06:24.674Z ]: fn2 end
[ 2012-02-07T10:06:26.675Z ]: fn4 end
[ 2012-02-07T10:06:30.674Z ]: fin

FunctionHash と FunctionArray

Array#stream は 関数の配列(FunctionArray) を受け取り順番に実行します。Array#stream は String#stream の簡易版です。

String#stream は FunctionHash( { name: function, .., } ) と FunctionArray( [function, ...] ) を受け取ります。

状況に応じて Array#stream と String#stream を使い分ける事で、よりシンプルになります。

Array#stream で関数を順番に実行する

Array#stream は、配列に格納された関数を順番に実行します。同期処理と非同期処理が混在した流れを、とても簡単に記述できます。

// 非同期処理
function fn1(callback) { mm.log("fn1 begin"); 4..wait(callback); }

// 同期処理
function fn2()         { mm.log("fn2"); return true; }

// 非同期処理
function fn3(callback) { mm.log("fn3 begin"); 1..wait(callback); }

// 実行
[fn1, fn2, fn3].stream();

全て同期関数で、単純に実行するだけなら、Array#forEach や for ループがベストな選択でしょう。

function fn1() { mm.log("fn1"); return true; }
function fn2() { mm.log("fn2"); return true; }
function fn3() { mm.log("fn3"); return true; }

[fn1, fn2, fn3].forEach(function(fn) {
    fn();
});

String#stream に FunctionArray を与える事もできます。

以下の3つのコードブロックは、書き方が異なるだけで全て同じ結果になります

function fn1() { mm.log("fn1"); return true; }
function fn2() { mm.log("fn2"); return true; }

var ary = [fn1, fn2];

// ----------------
// 1. String#stream(FunctionArray) -> Array#stream を使いましょう

ary.names().join(" > ").stream(ary);



// ----------------
// 2. Array#stream -> シンプルです

ary.stream();



// ----------------
// 3. String#stream(Command + FunctionHash) -> 複雑な指定が可能です

"fn1 > fn2".stream({
    fn1: fn1,
    fn2: fn2
});

オブジェクトを this で参照

関数内部から String#stream に渡した FunctionHash を参照することができます。
以下は、args をデータの入れ物として用意し、fn1 の中から this.args で参照しています。

"fn1".stream({
    args: "the world",
    fn1:  function() {
        mm.log(this.args); // args を参照
        return true;
    }
});

実行結果

[ 2012-02-06T17:37:23.627Z ]: the world

中断(halt)

同期処理や非同期処理が return false を返すと、そこで中断(halt)します(非同期処理は適当なタイミングで中断します)。

また、Array#stream や String#stream が返すオブジェクトの halt() メソッドを呼ぶことで、ストリームの処理を中断できます。

halt しているかどうかを関数内から確認するには、if (this.halted) { ... } で判断可能です。

String#stream に渡した FunctionHash に halt: function(factor) { ... } を定義しておくと、halt のタイミングでコールバックします。
factor には halt が発生した要因(return false / callback(false) した関数名 か "halt" という文字列)が格納されています。

function async_fn1(fn) {
    var that = this;

    mm.log("fn1 begin");
    10..wait(function() {
        if (!that.halted) {
            mm.log("fn1 end");
        }
        fn(true);
    });
}
function async_fn2(fn) {
    var that = this;

    mm.log("fn2 begin");
    8..wait(function() {
        if (!that.halted) {
            mm.log("fn2 end");
        }
        fn(true);
    });
}

var obj = "1000 > async_fn1 + async_fn2 > fn3 > fn4".stream({
    async_fn1: async_fn1,
    async_fn2: async_fn2,
    fn3:  function() { // raise halt
        return false;
    },
    fn4:  function() { // not reachable
        return true;
    },
    halt: function halt(factor) { // @param String:
        mm.log("halted: " + this.halted); // -> true
        mm.log(factor + " -> HALT");
    }
});

// obj.halt();

何もせず 10秒待つと以下のようなログが出力されます。

[ 2012-02-06T17:18:25.638Z ]: fn1 begin
[ 2012-02-06T17:18:25.639Z ]: fn2 begin
[ 2012-02-06T17:18:33.640Z ]: fn2 end
[ 2012-02-06T17:18:35.640Z ]: fn1 end
[ 2012-02-06T17:18:35.640Z ]: halted: true
[ 2012-02-06T17:18:35.640Z ]: fn3 -> HALT

開始から10秒以内に obj.halt() を実行すると以下のようなログが出力されます。
すでに実行中の非同期処理は halt() を実行しても即座に停止しないため注意してください。

[ 2012-02-06T17:19:03.704Z ]: fn1 begin
[ 2012-02-06T17:19:03.704Z ]: fn2 begin
obj.halt();
[ 2012-02-06T17:19:10.299Z ]: halted: true
[ 2012-02-06T17:19:10.299Z ]: halt -> HALT

Future で複数ストリームの待ち合わせ

Future を使うと、複数のストリームの待ち合わせも可能です。

1000 > fn1 > fn2 + fn3 > fn4 -+
                               +-- finished
2000 > fnA + fnB > fnC -------+
function finished(result) {
    if (result.ok) {
        mm.log("finished");
    }
}

var future = finished.every(2); // ストリーム2本を集約する future

"1000 > fn1 > fn2 + fn3 > fn4".stream({
  fn1: function(fn) {
        mm.log("fn1 begin");
        4..wait(function() { mm.log("fn1 end"); fn(true) }) },
  fn2: function(fn) {
        mm.log("fn2 begin");
        2..wait(function() { mm.log("fn2 end"); fn(true) }) },
  fn3: function()   { mm.log("fn3"); return true; },
  fn4: function()   { mm.log("fn4"); future.pass(); return true; },
  halt: function()  { future.miss(); }
});

"2000 > fnA + fnB > fnC".stream({
  fnA: function()   { mm.log("fnA"); return true; },
  fnB: function(fn) {
        mm.log("fnB begin");
        1..wait(function() { mm.log("fnB end"); fn(true) }) },
  fnC: function(fn) {
        mm.log("fnC begin");
        2..wait(function() { mm.log("fnC end"); fn(true);
                             future.pass(); }) },
  halt: function()  { future.miss(); }
});

future = finished.every(2) で Future オブジェクトを作り、各ストリームの終端で、
future.pass() か future.miss() を呼び出すとストリームの合流が実現できます。

とても簡単ですね。

実行結果です。

[ 2012-02-06T17:54:14.324Z ]: fn1 begin
[ 2012-02-06T17:54:15.325Z ]: fnA
[ 2012-02-06T17:54:15.326Z ]: fnB begin
[ 2012-02-06T17:54:16.326Z ]: fnB end
[ 2012-02-06T17:54:16.327Z ]: fnC begin
[ 2012-02-06T17:54:18.326Z ]: fn1 end
[ 2012-02-06T17:54:18.328Z ]: fn2 begin
[ 2012-02-06T17:54:18.329Z ]: fn3
[ 2012-02-06T17:54:18.330Z ]: fnC end
[ 2012-02-06T17:54:20.330Z ]: fn2 end
[ 2012-02-06T17:54:20.331Z ]: fn4
[ 2012-02-06T17:54:20.331Z ]: finished

終わりに

Stream は Future と一緒に使う事で最大限の効果を発揮します。

(ε・◇・)з つい先日 http://kjirou.sakura.ne.jp/mt/2012/02/timelinejs_1.html を読んで
(ε・◇・)з これと似たような事どっかでやった気が… って思い出したら
(ε・◇・)っ これだった http://d.hatena.ne.jp/uupaa/20091221/1261341232 (2年前
(ε・◇・)з 発明といっても、2年前のリベンジですかね

(ε・◇・)з ajax, image, canvas とか 同期/非同期を意識せず、流れだけを意識していられたら、もっと幸せになれるのになー
(ε・◇・)з って思ってたの

String#stream のソースコード

(ε・◇・)з こんなに説明がてんこ盛りなのに、ソースコード的には 80行ぐらいです。
http://code.google.com/p/mofmof-js/source/browse/trunk/src/base.js?spec=svn885&r=885#1971

Future の仕様

Future の細かな仕様については、 http://code.google.com/p/mofmof-js/wiki/Function をご覧ください。

(ε・◇・)з なんかあれば @uupaa まで~

ポロリ・ゴニョゴニョ

(ε・◇・)з 沢山の人に使ってもらうには、どういった名前にすればいいかな… (熟考
(ε・?・)з Promise -> Future って考え方もあるんだね http://ja.wikipedia.org/wiki/Future
(ε・?・)з Future いいなぁ… そのままだと導入できないけど、ワードを借りたいなぁ…
(ε・?・)з 遅延( Deferred ) を 約束( Promise )ってワーディングは… 失われた20年を彷彿とさせるかな…
(ε・?・)з プロミス も、人前で発言するには、あんまりいい響きじゃないかな… #サラ金的な意味で
(ε・◇・)з [!] ピコーン
(ε・◇・)з 未来( Future ) の 流れ( Stream )って感じだといいんじゃないかな!!
(ε・◇・)з という、ブレイン ストリーム がありましたの