Redis データ型(Stream)

スポンサーリンク

RedisにはStrings, Lists, Sets, Hashes, Sorted Setsなどのデータ型があります。このページではStream型の使い方を説明します。Streamは追記型のログのように動作するデータ構造です。Streamを使用すると、リアルタイムにイベントを記録し、同時にイベントを処理する事ができます。

Streamは「Redis Pub/Sub」とよく似た使い方ができます。「Redis Pub/Sub」と異なり、Streamは過去に遡ったイベントの取得ができます。

前提

公式ドキュメント

参考になる公式ドキュメントを以下に示します。

動作確認済環境

  • Rocky Linux 8.6
  • Redis 5.0.3

コマンド一覧

Streamsを操作するコマンドを以下にまとめます。いずれも先頭が「X」で始まります。

コマンド 意味
XADD key <* | id> field value [field value …] Streamsにデータを追加します
XRANGE key start end [COUNT count] Streamsからデータを取得します
XREVRANGE key start end [COUNT count] Streamsから逆順でデータを取得します
XGROUP CREATE key groupname <id | $> [MKSTREAM] コンシューマーグループを作成します
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key > コンシューマーがcount件のデータを読み取ります
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key 0 コンシューマーの未処理のデータ一覧を返します
XACK key group id [id …] コンシューマーがデータの処理が完了した旨をRedisサーバに通知します

単一コンシューマの場合

追加(XADD)

XADDコマンドを使用すると、Streamにデータを追加できます。XADDコマンドの書式は以下の通りです。

XADD key <* | id> field value [field value ...]

例えば、東日本に複数の気象情報センサーが存在するような業務を想定します。温度、気圧、湿度を登録する操作ならば以下のように指定します。

XADD temperatures:japan-east:10007 * temp_f 87.2 pressure 29.69 humidity 46
XADD temperatures:japan-east:10007 * temp_f 83.1 pressure 29.21 humidity 46.5
XADD temperatures:japan-east:10007 * temp_f 81.9 pressure 28.37 humidity 43.7

操作ログは以下の通りです。Stream idに「*」を指定した場合は、Stream idが自動生成されます。

自動生成されたStream idは「<タイムスタンプ>–<連番>」の採番ルールで生成され、以下に出力された「1669282706129-0」や「1669282709781-0」などがStream idです。

127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 87.2 pressure 29.69 humidity 46
"1669282706129-0"
127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 83.1 pressure 29.21 humidity 46.5
"1669282709781-0"
127.0.0.1:6379> XADD temperatures:japan-east:10007 * temp_f 81.9 pressure 28.37 humidity 43.7
"1669282717530-0"
127.0.0.1:6379> 

情報取得(XRANGE, XREVRANGE)

XRANGEコマンドまたはXREVRANGEコマンドを使用すると、指定範囲のStreamを取得できます。書式は以下の通りです。

XRANGE key start end [COUNT count]
XREVRANGE key start end [COUNT count]

全件取得ならば、以下のように指定します。「-」は最小のStream IDを意味し、「+」は最大のSteam IDを意味します。

127.0.0.1:6379> XRANGE temperatures:japan-east:10007 - +
1) 1) "1669282706129-0"
   2) 1) "temp_f"
      2) "87.2"
      3) "pressure"
      4) "29.69"
      5) "humidity"
      6) "46"
2) 1) "1669282709781-0"
   2) 1) "temp_f"
      2) "83.1"
      3) "pressure"
      4) "29.21"
      5) "humidity"
      6) "46.5"
3) 1) "1669282717530-0"
   2) 1) "temp_f"
      2) "81.9"
      3) "pressure"
      4) "28.37"
      5) "humidity"
      6) "43.7"
127.0.0.1:6379> 

範囲を指定してデータを取得するならば、以下のように操作します。以下はStream idが「1669282706129-0」以上「1669282709781-0」以下の範囲を取得する操作例です。

127.0.0.1:6379> XRANGE temperatures:japan-east:10007 1669282706129-0 1669282709781-0
1) 1) "1669282706129-0"
   2) 1) "temp_f"
      2) "87.2"
      3) "pressure"
      4) "29.69"
      5) "humidity"
      6) "46"
2) 1) "1669282709781-0"
   2) 1) "temp_f"
      2) "83.1"
      3) "pressure"
      4) "29.21"
      5) "humidity"
      6) "46.5"
127.0.0.1:6379> 

新情報の待ち受け(XREAD)

XREADコマンドを使用すると、新情報をListen(待ち受ける)事ができます。書式は以下の通りです。

XREAD [BLOCK milliseconds] STREAMS key id

以下のようなXREADコマンドを発行すると、Stream idが「1669282709781-0」よりも大きいデータを10000ミリ秒(10秒)間待ち受けます。

XREAD BLOCK 10000 STREAMS temperatures:japan-east:10007 1669282709781-0

Stream idに「$」を指定した場合は、ストアされたデータのうち最も大きいStream idを表します。ですので、前述のコマンドは以下のように書き換える事もできます。

XREAD BLOCK 10000 STREAMS temperatures:japan-east:10007 $

それでは簡単な動作確認をしてみましょう。1つ目の端末で以下のコマンドを入力し、新しいデータを60000ミリ秒(60秒)間待ち受けます。

XREAD BLOCK 60000 STREAMS temperatures:japan-east:10007 $

60秒以内に、2つ目の端末を起動し、以下のようなデータを生成します。

XADD temperatures:japan-east:10007 * temp_f 89.5 pressure 28.72 humidity 57

1つ目の端末がデータを受信できている事が確認できます。

データの受信確認

複数コンシューマの場合

コンシューマグループの作成(XGROUP CREATE)

あるStreamを複数のクライアント(コンシューマー)で並列処理するような要件を想定します。このような場合、event01とevent02をClient01が処理しようとするならば、Client02は同じイベントを処理するのはムダですのでevent03からの処理を試みた方が効率的です。

Redis Streamは以下の図に示すような、イベントとクライアント(コンシューマー)の割り振りを実現する事ができます。

イベントとクライアント(コンシューマー)の割り振り

さて、イベントとコンシューマの割り振りをするには、コンシューマグループというものを定義する必要があります。

コンシューマーグループの動作確認をするために、以下のようなデータを生成します。

XADD mystream * event 01 message apple
XADD mystream * event 02 message orange
XADD mystream * event 03 message strawberry
XADD mystream * event 04 message apricot
XADD mystream * event 05 message banana

コンシューマーグループを作成するにはXGROUP CREATEコマンドを使用します。書式は以下の通りです。

XGROUP CREATE key groupname id

以下のように指定すると、mystreamに対してStream idが0より大きいデータに対して「mygroup」という名前のコンシューマーグループを作成します。

引数のidは「0」以外のも「$」の指定も可能です。「$」はストアされたデータのうち最も大きいStream idを意味しますので、「$」を指定した場合は既存ではなく新規のデータに対してのみコンシューマーグループを作成する意味になります。

XGROUP CREATE mystream mygroup 0

データの取り込み(XREADGROUP)

コンシューマーグループを設定した環境でデータを取り込むにはXREADGROUP GROUPコマンドを使用します。書式は以下の通りです。

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key id

1つ目の端末で以下のようなコマンドを入力します。すると、mygroupに属するclient01という名前のコンシューマーがmystreamから未処理のデータ2件を取り込む事を意味します。idに「>」を指定すると、どのクライアントにもメッセージを送っていない未処理のデータを取得する意味になります。

XREADGROUP GROUP mygroup client01 COUNT 2 STREAMS mystream >

2つ目の端末を起動し、以下のようなコマンドを入力してみましょう。event01とevent02はclient01によって処理済ですので、client02はevent03のデータを受信します。

XREADGROUP GROUP mygroup client02 COUNT 1 STREAMS mystream >

以上の処理をすると、以下スクリーンショットのようになります。

コンシューマーグループの処理結果

未処理イベントの一覧表示(XREADGROUP)

Redisのコンシューマーグループには、イベントの処理が完了したか否かを管理する仕組みがあります。前述のXREADGROUP GROUPコマンドのidに0を指定すると、Stream idが0より大きいイベントの一覧が出力されます。

XREADGROUP GROUP mygroup client01 STREAMS mystream 0

操作ログは以下のようになります。client01の未処理のイベント一覧を表示されると、event01とevent02が出力されます。

127.0.0.1:6379> XREADGROUP GROUP mygroup client01 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1669285535043-0"
         2) 1) "event"
            2) "01"
            3) "message"
            4) "apple"
      2) 1) "1669285538242-0"
         2) 1) "event"
            2) "02"
            3) "message"
            4) "orange"
127.0.0.1:6379> 

イベントの完了通知(XACK)

Redisサーバにイベントの処理が完了した事を伝えるには、XACKコマンドを使用します。書式は以下の通りです。

XACK key group id

Stream idが1669285535043-0であるイベント(event01)の完了を通知します。

127.0.0.1:6379> XACK mystream mygroup 1669285535043-0
(integer) 1
127.0.0.1:6379> 

すると、以下コマンドで取得できる未処理ベントの一覧が2件から1件に減った事が分かります。

127.0.0.1:6379> XREADGROUP GROUP mygroup client01 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1669285538242-0"
         2) 1) "event"
            2) "02"
            3) "message"
            4) "orange"
127.0.0.1:6379> 
タイトルとURLをコピーしました