巨大な配列をUIスレッドをブロックせずに処理する Array#async を mofmof.js に実装してみました
(ε・◇・)з o O ( 実装に4時間かかってるけど、1分でよめるよ!
ブラウザ上で動作する JavaScript はシングルスレッドで動作するため、
巨大な配列を Array#forEach などに与えてしまうと UI スレッドが固まってしまう可能性があります。
Array#async は配列を適当な単位*1に分けて処理するイテレータメソッド(each, map, some, every)を返します。
処理終了(each, map)か条件成立(some, every)で callback(戻り値) を呼びます。map なら戻り値は配列になります。
if (!Array.prototype.sync) { Array.prototype.sync = Array_sync; } if (!Array.prototype.async) { Array.prototype.async = Array_async; } function Array_sync() { // @ret ModArray: Array + { map, each, some, every } // @help: Array#sync // @desc: TBD this.map = Array.prototype.map; this.each = Array.prototype.each; this.some = Array.prototype.some; this.every= Array.prototype.every; return this; } function Array_async(callback, // @arg Function(= undefined): callback(result:MixArray/Boolean/undefined, error:Boolean) wait, // @arg Integer(= 0): async wait time (unit: ms) unit) { // @arg Integer(= 0): units to processed at a time. // 0 is auto detection (maybe 50000) // @ret ModArray: Array + { map, each, some, every } // @help: Array#async // @desc: TBD //{@debug mm.allow(callback, "Function/undefined"); mm.allow(wait, "Integer/undefined"); mm.allow(unit, "Integer/undefined"); mm.allow(wait, wait ? wait > 0 : true); mm.allow(unit, unit ? unit > 0 : true); //}@debug var session = { callback: callback || mm_nop, wait: ((wait || 0) / 1000) | 0, unit: unit || 0 }; if (!session.unit) { // auto detection session.unit = 50000; // TODO: bench and detection } this.map = function(fn, that) { return _async_iter(this, fn, that, session, "map" ); }; this.each = function(fn, that) { return _async_iter(this, fn, that, session, "each"); }; this.some = function(fn, that) { return _async_iter(this, fn, that, session, "some"); }; this.every= function(fn, that) { return _async_iter(this, fn, that, session, "every");}; return this; } function _async_iter(ary, // @arg Array: fn, // @arg Function: callback function that, // @arg Mix: callback.apply(fn_that) session, // @arg Object: { callback, wait, unit } iter) { // @arg String: iterator function name. "map", "each", "some" and "every" // @ret Object: { halt } // @innert: var i = 0, iz = ary.length, range, cmd = [], obj = {}, result; if (iter === "map") { result = Array(iz); } for (; i < iz; i += session.unit) { range = Math.min(iz, i + session.unit); switch (iter) { case "map": obj["fn" + i] = _each(ary, fn, that, i, range, true); break; case "each": obj["fn" + i] = _each(ary, fn, that, i, range, false); break; case "some": obj["fn" + i] = _every(ary, fn, that, i, range, true); break; case "every": obj["fn" + i] = _every(ary, fn, that, i, range, false); } cmd.push("fn" + i, session.wait); } obj.end = function(next) { session.callback(result, false); return true; // String#stream spec (need return boolean) }; obj.halt = function(action, error) { session.callback(result, error); }; cmd.pop(); // remove last wait return (cmd.join(" > ") + " > end").stream(obj); // String#stream // --- internal --- function _each(ary, fn, that, i, iz, map) { return function() { for (var r; i < iz; ++i) { if (i in ary) { r = fn.call(that, ary[i], i, ary); map && (result[i] = r); } } return true; // -> next stream }; } function _every(ary, fn, that, i, iz, some) { return function() { for (var r; i < iz; ++i) { if (i in ary) { r = fn.call(that, ary[i], i, ary); if (!r && !some || r && some) { result = some ? true : false; return false; // -> halt stream -> callback(result) } } } result = some ? false : true; return true; // -> next stream }; } }
// 処理終了で呼ばれる。 // Array#async.map, Array#async.some, Array#async.every の戻り値が result に渡される function callback(result, error) { if (error) { console.log("async error: " + mm.dump(result)); } else { console.log("async end: " + mm.dump(result)); } } function fn_map(value, index) { console.log(value, index); return value; } function fn_some(value, index) { console.log(value, index); return (+value) > 3; // 3 } function fn_every(value, index) { console.log(value, index); return (+value) < 5; } // 同期版 "abcdefghi".split("").each(fn_map); // -> undefined "abcdefghi".split("").map(fn_map); // -> ["a", ... "i"] "123012301230".split("").some(fn_some); // -> false "123412301230".split("").some(fn_some); // -> true. because has 4 "123012301230".split("").every(fn_every); // -> true "123912301230".split("").every(fn_every); // -> false. because has 9 // 非同期版 "abcdefghi".split("").async(callback, 1000, 3).each(fn_map); // -> callback(undefined) "abcdefghi".split("").async(callback, 1000, 3).map(fn_map); // -> callback(["a", ... "i"]) "123012301230".split("").async(callback, 1000, 3).some(fn_some); // -> callback(false) "123412301230".split("").async(callback, 1000, 3).some(fn_some); // -> callback(true). because has 4 "123012301230".split("").async(callback, 1000, 3).every(fn_every); // -> callback(true) "123912301230".split("").async(callback, 1000, 3).every(fn_every); // -> callback(false). because has 9
処理の肝は、String#stream の機能を利用しています。
Array#async は String#stream に与えるコマンド*2を組み立てるのが主な仕事です。
処理の中止
Array#async.map などは、String#stream の戻り値 { halt: Function } を返します。
halt() を実行すると、キリの良いタイミングで、callback(戻り値:Mix, error:Boolean) を呼び出し、処理を中断します。
戻り値 にはその時点までの結果が格納されています。
var stream = [1,2,3,4,5,6].async(callback, 10 * 1000, 1).each(fn_map); // 10秒間隔で1個づつ処理する // 適当なタイミングで halt() を呼び出す -> 処理を中断し callback が呼ばれる stream.halt();
テスト
function fn_each(value, index) { // console.log(value, index); } function fn_map(value, index) { // console.log(value, index); return value + index; } var unit = 20 * 1024 * 1024; // 20MB var ary = 0..to(unit); "5 > for > 5 > sync_each > 5 > async_each > 5 > sync_map > 5 > async_map > end".stream({ "for": function() { console.log("sync for begin"); var now = Date.now(); for (var i = 0, iz = ary.length; i < iz; ++i) { fn_each(ary[i], i); } console.log("sync for @@ elapsed @@".at(unit, Date.now() - now)); return true; }, sync_each: function() { console.log("sync each begin"); var now = Date.now(); ary.sync().each(fn_each); console.log("sync each @@ elapsed @@".at(unit, Date.now() - now)); return true; }, async_each: function(next) { console.log("async each begin"); var now = Date.now(); ary.async(function() { console.log("async each @@ elapsed @@".at(unit, Date.now() - now)); next(true); }).each(fn_each); }, sync_map: function() { console.log("sync map begin"); var now = Date.now(); ary.sync().map(fn_map); console.log("sync map @@ elapsed @@".at(unit, Date.now() - now)); return true; }, async_map: function(next) { console.log("async map begin"); var now = Date.now(); ary.async(function() { console.log("async map @@ elapsed @@".at(unit, Date.now() - now)); next(true); }).map(fn_map); }, end: function() { console.log("end"); return true; } });
ベンチマーク
5M から 50M までの配列を確保し、each, map を実行した結果です。単位は ms です。
50MB | 20MB | 10MB | 5MB | |
for | 1188 | 499 | 241 | 191 |
Array#each | 4255 | 1758 | 909 | 474 |
Array#async().each | 10930 | 4682 | 2311 | 1125 |
Array#map | crash | 4878 | 1626 | 1199 |
Array#async().map | crash | 7998 | 2829 | 1842 |
おしまい
70行ほどの小さな機能ですが、こういうのがあると、安心して大きな配列を扱えるんじゃないかなと。