今回は batch ノードです。なにか「バッチ処理」をしてくれるノードかと思いきや「バッチ処理」しやすいようにmsg列をグループ化してくれるもの。topicとpartsの属性を付け加えてくれます。それをjoinノードに渡せばparts属性の順番で、グループに属するmsg群を一個のmsgにまとめることができます。
※「ブロックを積みながら」投稿順 index はこちら
batchノードの主要機能
このノードの主要機能はグループ化(msgを一束にする)です。到来するmsgを特定の基準で分類し、topicとpartsというプロパティを付与してくれます。
-
- msgをある個数毎に一束にする
- msgをある時間インターバル毎に一束にする
個数毎のグループ化例
まずは一定の個数毎にグループ化する例のフローです。
-
- 左端Injectノードで数値配列を載せた msg を送り出す
- splitノードで配列要素毎のmsgに分解
- batchノードでグループ化
- joinノードでグループを1個のmsgに連結
という流れです。batch後とjoin後で結果を観察しています。
batchノードの設定が以下に。到来するmsg4個毎に1グループとしますが、オーバラップを1個設定しています。
まず、以下はbatch直後の様子です。payloadみると数値配列の各要素毎の状態になっておりsplit直後の状態に近いです。しかしpayloadに4が載っているmsgが2つある(1個は前のグループ、もう1個は後のグループ)ことでグループ化とオーバラップ処理が行われていることが分かります。
そして上記をjoinノードに通して1グループ=1msgとしてみると以下のようです。
元は1つの配列であったものが、流れ下るうちにオーバラップありの2配列に分割されました。
単位時間毎のグループ化例
次は単位時間毎に到来したmsgをグループ化するフローの例です。
batch ノードの設定が以下に。5秒毎にグループとしています。グループをjoinノードで1個のmsgとしています。また時間内に何も到来しない場合、payloadにnullを載せて送出する設定です。
Injectノードからは手動でタイムスタンプ数値を送り出します。適当にランダムな間隔でボタンを押しています。
各レコードの左上のタイムスタンプを見ると毎5秒毎であることが分かると思います。5秒間に2回ボタンを押したところでは2個のタイムスタンプ値が配列に載っており、3回のときは3要素配列になっています。入力到来しない場合は mull が送られてます。