latest log

酩酊状態で書いたエンジニアポエムです。酩酊状態で読んでください。

巨大な配列をUIスレッドをブロックせずに処理する Array#async を mofmof.js に実装してみました

(ε・◇・)з o O ( 実装に4時間かかってるけど、1分でよめるよ!

ブラウザ上で動作する JavaScript はシングルスレッドで動作するため、
巨大な配列を Array#forEach などに与えてしまうと UI スレッドが固まってしまう可能性があります。
Array#async は配列を適当な単位*1に分けて処理するイテレータメソッド(each, map, some, every)を返します。

処理終了(each, map)か条件成立(some, every)で callback(戻り値) を呼びます。map なら戻り値は配列になります。

if (!Array.prototype.sync) {
     Array.prototype.sync = Array_sync;
}
if (!Array.prototype.async) {
     Array.prototype.async = Array_async;
}

function Array_sync() { // @ret ModArray: Array + { map, each, some, every }
                        // @help: Array#sync
                        // @desc: TBD
    this.map  = Array.prototype.map;
    this.each = Array.prototype.each;
    this.some = Array.prototype.some;
    this.every= Array.prototype.every;
    return this;
}

function Array_async(callback, // @arg Function(= undefined): callback(result:MixArray/Boolean/undefined, error:Boolean)
                     wait,     // @arg Integer(= 0): async wait time (unit: ms)
                     unit) {   // @arg Integer(= 0): units to processed at a time.
                               //                    0 is auto detection (maybe 50000)
                               // @ret ModArray: Array + { map, each, some, every }
                               // @help: Array#async
                               // @desc: TBD
//{@debug
    mm.allow(callback, "Function/undefined");
    mm.allow(wait,     "Integer/undefined");
    mm.allow(unit,     "Integer/undefined");
    mm.allow(wait,     wait ? wait > 0 : true);
    mm.allow(unit,     unit ? unit > 0 : true);
//}@debug

    var session = {
            callback: callback || mm_nop,
            wait:     ((wait || 0) / 1000) | 0,
            unit:     unit || 0
        };

    if (!session.unit) { // auto detection
         session.unit = 50000; // TODO: bench and detection
    }
    this.map  = function(fn, that) { return _async_iter(this, fn, that, session, "map" ); };
    this.each = function(fn, that) { return _async_iter(this, fn, that, session, "each"); };
    this.some = function(fn, that) { return _async_iter(this, fn, that, session, "some"); };
    this.every= function(fn, that) { return _async_iter(this, fn, that, session, "every");};
    return this;
}

function _async_iter(ary,     // @arg Array:
                     fn,      // @arg Function: callback function
                     that,    // @arg Mix: callback.apply(fn_that)
                     session, // @arg Object: { callback, wait, unit }
                     iter) {  // @arg String: iterator function name. "map", "each", "some" and "every"
                              // @ret Object: { halt }
                              // @innert:
    var i = 0, iz = ary.length, range, cmd = [], obj = {}, result;

    if (iter === "map") {
        result = Array(iz);
    }
    for (; i < iz; i += session.unit) {
        range = Math.min(iz, i + session.unit);
        switch (iter) {
        case "map":   obj["fn" + i] =  _each(ary, fn, that, i, range, true);  break;
        case "each":  obj["fn" + i] =  _each(ary, fn, that, i, range, false); break;
        case "some":  obj["fn" + i] = _every(ary, fn, that, i, range, true);  break;
        case "every": obj["fn" + i] = _every(ary, fn, that, i, range, false);
        }
        cmd.push("fn" + i, session.wait);
    }
    obj.end = function(next) {
        session.callback(result, false);
        return true; // String#stream spec (need return boolean)
    };
    obj.halt = function(action, error) {
        session.callback(result, error);
    };
    cmd.pop(); // remove last wait
    return (cmd.join(" > ") + " > end").stream(obj); // String#stream

    // --- internal ---
    function _each(ary, fn, that, i, iz, map) {
        return function() {
            for (var r; i < iz; ++i) {
                if (i in ary) {
                    r = fn.call(that, ary[i], i, ary);
                    map && (result[i] = r);
                }
            }
            return true; // -> next stream
        };
    }
    function _every(ary, fn, that, i, iz, some) {
        return function() {
            for (var r; i < iz; ++i) {
                if (i in ary) {
                    r = fn.call(that, ary[i], i, ary);
                    if (!r && !some || r && some) {
                        result = some ? true : false;
                        return false; // -> halt stream -> callback(result)
                    }
                }
            }
            result = some ? false : true;
            return true; // -> next stream
        };
    }
}
// 処理終了で呼ばれる。
// Array#async.map, Array#async.some, Array#async.every の戻り値が result に渡される
function callback(result, error) {
    if (error) {
        console.log("async error: " + mm.dump(result));
    } else {
        console.log("async end: " + mm.dump(result));
    }
}

function fn_map(value, index) {
    console.log(value, index);
    return value;
}

function fn_some(value, index) {
    console.log(value, index);
    return (+value) > 3; // 3
}

function fn_every(value, index) {
    console.log(value, index);
    return (+value) < 5;
}


// 同期版
"abcdefghi".split("").each(fn_map); // -> undefined
"abcdefghi".split("").map(fn_map); // -> ["a", ... "i"]

"123012301230".split("").some(fn_some); // -> false
"123412301230".split("").some(fn_some); // -> true. because has 4

"123012301230".split("").every(fn_every); // -> true
"123912301230".split("").every(fn_every); // -> false. because has 9

// 非同期版
"abcdefghi".split("").async(callback, 1000, 3).each(fn_map); // -> callback(undefined)
"abcdefghi".split("").async(callback, 1000, 3).map(fn_map); // -> callback(["a", ... "i"])

"123012301230".split("").async(callback, 1000, 3).some(fn_some); // -> callback(false)
"123412301230".split("").async(callback, 1000, 3).some(fn_some); // -> callback(true). because has 4

"123012301230".split("").async(callback, 1000, 3).every(fn_every); // -> callback(true)
"123912301230".split("").async(callback, 1000, 3).every(fn_every); // -> callback(false). because has 9

処理の肝は、String#stream の機能を利用しています。
Array#async は String#stream に与えるコマンド*2を組み立てるのが主な仕事です。

処理の中止

Array#async.map などは、String#stream の戻り値 { halt: Function } を返します。
halt() を実行すると、キリの良いタイミングで、callback(戻り値:Mix, error:Boolean) を呼び出し、処理を中断します。
戻り値 にはその時点までの結果が格納されています。

var stream = [1,2,3,4,5,6].async(callback, 10 * 1000, 1).each(fn_map); // 10秒間隔で1個づつ処理する

// 適当なタイミングで halt() を呼び出す -> 処理を中断し callback が呼ばれる
stream.halt();

テスト

function fn_each(value, index) {
//    console.log(value, index);
}
function fn_map(value, index) {
//    console.log(value, index);
    return value + index;
}

var unit = 20 * 1024 * 1024; // 20MB
var ary = 0..to(unit);

"5 > for > 5 > sync_each > 5 > async_each > 5 > sync_map > 5 > async_map > end".stream({
    "for": function() {
        console.log("sync for begin");
        var now = Date.now();

        for (var i = 0, iz = ary.length; i < iz; ++i) {
            fn_each(ary[i], i);
        }
        console.log("sync for @@ elapsed @@".at(unit, Date.now() - now));

        return true;
    },
    sync_each: function() {
        console.log("sync each begin");
        var now = Date.now();

        ary.sync().each(fn_each);
        console.log("sync each @@ elapsed @@".at(unit, Date.now() - now));

        return true;
    },
    async_each: function(next) {
        console.log("async each begin");
        var now = Date.now();

        ary.async(function() {
            console.log("async each @@ elapsed @@".at(unit, Date.now() - now));
            next(true);
        }).each(fn_each);
    },
    sync_map: function() {
        console.log("sync map begin");
        var now = Date.now();

        ary.sync().map(fn_map);
        console.log("sync map @@ elapsed @@".at(unit, Date.now() - now));

        return true;
    },
    async_map: function(next) {
        console.log("async map begin");
        var now = Date.now();

        ary.async(function() {
            console.log("async map @@ elapsed @@".at(unit, Date.now() - now));
            next(true);
        }).map(fn_map);
    },
    end: function() {
        console.log("end");
        return true;
    }
});

ベンチマーク

5M から 50M までの配列を確保し、each, map を実行した結果です。単位は ms です。

Google Chrome 22

50MB 20MB 10MB 5MB
for 1188 499 241 191
Array#each 4255 1758 909 474
Array#async().each 10930 4682 2311 1125
Array#map crash 4878 1626 1199
Array#async().map crash 7998 2829 1842

おしまい

70行ほどの小さな機能ですが、こういうのがあると、安心して大きな配列を扱えるんじゃないかなと。

*1:デフォルトでは5万個

*2:DSLのようなもの