RedisにはStrings, Lists, Sets, Hashes, Sorted Setsなどのデータ型があります。このページではStream型の使い方を説明します。Streamは追記型のログのように動作するデータ構造です。Streamを使用すると、リアルタイムにイベントを記録し、同時にイベントを処理する事ができます。
Streamは「Redis Pub/Sub」とよく似た使い方ができます。「Redis Pub/Sub」と異なり、Streamは過去に遡ったイベントの取得ができます。
前提
公式ドキュメント
参考になる公式ドキュメントを以下に示します。
動作確認済環境
- Rocky Linux 8.6
- Redis 5.0.3
コマンド一覧
Streamsを操作するコマンドを以下にまとめます。いずれも先頭が「X」で始まります。
コマンド | 意味 |
---|---|
XADD key <* | id> field value [field value …] | Streamsにデータを追加します |
XRANGE key start end [COUNT count] | Streamsからデータを取得します |
XREVRANGE key start end [COUNT count] | Streamsから逆順でデータを取得します |
XGROUP CREATE key groupname <id | $> [MKSTREAM] | コンシューマーグループを作成します |
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key > | コンシューマーがcount件のデータを読み取ります |
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key 0 | コンシューマーの未処理のデータ一覧を返します |
XACK key group id [id …] | コンシューマーがデータの処理が完了した旨をRedisサーバに通知します |
単一コンシューマの場合
追加(XADD)
XADDコマンドを使用すると、Streamにデータを追加できます。XADDコマンドの書式は以下の通りです。
XADD key <* | id> field value [field value ...]
例えば、東日本に複数の気象情報センサーが存在するような業務を想定します。温度、気圧、湿度を登録する操作ならば以下のように指定します。
XADD temperatures:japan-east:10007 * temp_f 87.2 pressure 29.69 humidity 46 XADD temperatures:japan-east:10007 * temp_f 83.1 pressure 29.21 humidity 46.5 XADD temperatures:japan-east:10007 * temp_f 81.9 pressure 28.37 humidity 43.7
操作ログは以下の通りです。Stream idに「*」を指定した場合は、Stream idが自動生成されます。
自動生成されたStream idは「<タイムスタンプ>–<連番>」の採番ルールで生成され、以下に出力された「1669282706129-0」や「1669282709781-0」などがStream idです。
127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 87.2 pressure 29.69 humidity 46 "1669282706129-0" 127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 83.1 pressure 29.21 humidity 46.5 "1669282709781-0" 127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 81.9 pressure 28.37 humidity 43.7 "1669282717530-0" 127.0.0.1:6379>
情報取得(XRANGE, XREVRANGE)
XRANGEコマンドまたはXREVRANGEコマンドを使用すると、指定範囲のStreamを取得できます。書式は以下の通りです。
XRANGE key start end [COUNT count] XREVRANGE key start end [COUNT count]
全件取得ならば、以下のように指定します。「-」は最小のStream IDを意味し、「+」は最大のSteam IDを意味します。
127.0.0.1:6379> XRANGE temperatures:japan-east:10007 - + 1) 1) "1669282706129-0" 2) 1) "temp_f" 2) "87.2" 3) "pressure" 4) "29.69" 5) "humidity" 6) "46" 2) 1) "1669282709781-0" 2) 1) "temp_f" 2) "83.1" 3) "pressure" 4) "29.21" 5) "humidity" 6) "46.5" 3) 1) "1669282717530-0" 2) 1) "temp_f" 2) "81.9" 3) "pressure" 4) "28.37" 5) "humidity" 6) "43.7" 127.0.0.1:6379>
範囲を指定してデータを取得するならば、以下のように操作します。以下はStream idが「1669282706129-0」以上「1669282709781-0」以下の範囲を取得する操作例です。
127.0.0.1:6379> XRANGE temperatures:japan-east:10007 1669282706129-0 1669282709781-0 1) 1) "1669282706129-0" 2) 1) "temp_f" 2) "87.2" 3) "pressure" 4) "29.69" 5) "humidity" 6) "46" 2) 1) "1669282709781-0" 2) 1) "temp_f" 2) "83.1" 3) "pressure" 4) "29.21" 5) "humidity" 6) "46.5" 127.0.0.1:6379>
新情報の待ち受け(XREAD)
XREADコマンドを使用すると、新情報をListen(待ち受ける)事ができます。書式は以下の通りです。
XREAD [BLOCK milliseconds] STREAMS key id
以下のようなXREADコマンドを発行すると、Stream idが「1669282709781-0」よりも大きいデータを10000ミリ秒(10秒)間待ち受けます。
XREAD BLOCK 10000 STREAMS temperatures:japan-east:10007 1669282709781-0
Stream idに「$」を指定した場合は、ストアされたデータのうち最も大きいStream idを表します。ですので、前述のコマンドは以下のように書き換える事もできます。
XREAD BLOCK 10000 STREAMS temperatures:japan-east:10007 $
それでは簡単な動作確認をしてみましょう。1つ目の端末で以下のコマンドを入力し、新しいデータを60000ミリ秒(60秒)間待ち受けます。
XREAD BLOCK 60000 STREAMS temperatures:japan-east:10007 $
60秒以内に、2つ目の端末を起動し、以下のようなデータを生成します。
XADD temperatures:japan-east:10007 * temp_f 89.5 pressure 28.72 humidity 57
1つ目の端末がデータを受信できている事が確認できます。
複数コンシューマの場合
コンシューマグループの作成(XGROUP CREATE)
あるStreamを複数のクライアント(コンシューマー)で並列処理するような要件を想定します。このような場合、event01とevent02をClient01が処理しようとするならば、Client02は同じイベントを処理するのはムダですのでevent03からの処理を試みた方が効率的です。
Redis Streamは以下の図に示すような、イベントとクライアント(コンシューマー)の割り振りを実現する事ができます。
さて、イベントとコンシューマの割り振りをするには、コンシューマグループというものを定義する必要があります。
コンシューマーグループの動作確認をするために、以下のようなデータを生成します。
XADD mystream * event 01 message apple XADD mystream * event 02 message orange XADD mystream * event 03 message strawberry XADD mystream * event 04 message apricot XADD mystream * event 05 message banana
コンシューマーグループを作成するにはXGROUP CREATEコマンドを使用します。書式は以下の通りです。
XGROUP CREATE key groupname id
以下のように指定すると、mystreamに対してStream idが0より大きいデータに対して「mygroup」という名前のコンシューマーグループを作成します。
引数のidは「0」以外のも「$」の指定も可能です。「$」はストアされたデータのうち最も大きいStream idを意味しますので、「$」を指定した場合は既存ではなく新規のデータに対してのみコンシューマーグループを作成する意味になります。
XGROUP CREATE mystream mygroup 0
データの取り込み(XREADGROUP)
コンシューマーグループを設定した環境でデータを取り込むにはXREADGROUP GROUPコマンドを使用します。書式は以下の通りです。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key id
1つ目の端末で以下のようなコマンドを入力します。すると、mygroupに属するclient01という名前のコンシューマーがmystreamから未処理のデータ2件を取り込む事を意味します。idに「>」を指定すると、どのクライアントにもメッセージを送っていない未処理のデータを取得する意味になります。
XREADGROUP GROUP mygroup client01 COUNT 2 STREAMS mystream >
2つ目の端末を起動し、以下のようなコマンドを入力してみましょう。event01とevent02はclient01によって処理済ですので、client02はevent03のデータを受信します。
XREADGROUP GROUP mygroup client02 COUNT 1 STREAMS mystream >
以上の処理をすると、以下スクリーンショットのようになります。
未処理イベントの一覧表示(XREADGROUP)
Redisのコンシューマーグループには、イベントの処理が完了したか否かを管理する仕組みがあります。前述のXREADGROUP GROUPコマンドのidに0を指定すると、Stream idが0より大きいイベントの一覧が出力されます。
XREADGROUP GROUP mygroup client01 STREAMS mystream 0
操作ログは以下のようになります。client01の未処理のイベント一覧を表示されると、event01とevent02が出力されます。
127.0.0.1:6379> XREADGROUP GROUP mygroup client01 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1669285535043-0" 2) 1) "event" 2) "01" 3) "message" 4) "apple" 2) 1) "1669285538242-0" 2) 1) "event" 2) "02" 3) "message" 4) "orange" 127.0.0.1:6379>
イベントの完了通知(XACK)
Redisサーバにイベントの処理が完了した事を伝えるには、XACKコマンドを使用します。書式は以下の通りです。
XACK key group id
Stream idが1669285535043-0であるイベント(event01)の完了を通知します。
127.0.0.1:6379> XACK mystream mygroup 1669285535043-0 (integer) 1 127.0.0.1:6379>
すると、以下コマンドで取得できる未処理ベントの一覧が2件から1件に減った事が分かります。
127.0.0.1:6379> XREADGROUP GROUP mygroup client01 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1669285538242-0" 2) 1) "event" 2) "02" 3) "message" 4) "orange" 127.0.0.1:6379>