blog

redisストリーム

xadd <key> <key-id> [<maxlen> <number>] <k1> <v1> ... メッセージの追加 XCLAIM ...

Apr 4, 2020 · 5 min. read
シェア

背景

ストリームは、Redis 5.0は、新しいデータ型を導入し、軽量メッセージングミドルウェアのプロジェクトとしてRedisをよりよく利用することができます、カフカの設計から借りて、コンシューマグループや他の概念の導入は、メッセージのRedis伝送がより信頼性が高いように、あなたは、永続的なメッセージ、メッセージの再送信、タイムアウトなどをサポートすることができます。

システム原理

ストリームの設計では、消費者が独立して、または消費する消費者グループの形で消費することを選択できますが、ストリームは、複数の消費者グループをハングアップすることができ、各消費者グループは、互いに干渉しないように、ストリーム構造にメッセージを追加するには、xaddコマンドを介してプロデューサー。

コンシューマ・グループでは、コンシューマはプリエンプションによって消費し、メッセージ last_delivered_idを掴んで先に進み、消費完了後にxackによってメッセージを確認します。

プロデューサー

プロダクション・ニュース

xadd <key> <key-id> [<maxlen> <number>] <k1> <v1> ... メッセージの追加

  • key-idには*を設定し、システム自身が生成することもできますし、自分で指定することもできます。
  • maxlenはストリームの長さを指定し、それを超えると最も古いメッセージが削除されます。
  • k1 v1 キーと値をスペースで区切り、複数のキーと値のペアを指定できます。
.1:]> xadd joblist * k4 v4
"793-0"

xrang <key> <start-id> <end-id> スコープ内メッセージの照会

  • start-id start-id - 最小値を示します。
  • end-id end-id +は最大値を示します。
.1:]> xrange joblist - +
1) 1) ") 1) "k1"
 2) "v1"
2) 1) ") 1) "k2"
 2) "v2"
3) 1) ") 1) "k3"
 2) "v3"
4) 1) ") 1) "k4"
 2) "v4"

xlen 長さの取得

.1:]> xlen joblist
(integer) 4

xdel <key> <message-id> メッセージの削除

.1:]> xdel joblist 961-0
(integer)

コンシューマ

単独消費

xread [<count> <number>]|[<block> <timeout>] STREAMS <key> <last-message-id> メッセージの独立した消費、STREAMSは必須で、コマンドの最後に置く必要があります。

  • カウント番号 メッセージ番号の読み取り
  • ブロック・タイムアウトのブロック待ち時間、待ち時間を超えるとnilが返されます。
  • last-message-id メッセージを読み込む最後のメッセージ ID $ は最後のメッセージ ID を表します。
.1:]> xread STREAMS joblist 0
1) 1) "joblist"
 2) 1) 1) ") 1) "k1"
 2) "v1"
 2) 1) ") 1) "k2"
 2) "v2"
 3) 1) ") 1) "k3"
 2) "v3"

消費グループ消費

xgroup CREATE <key> <group-name> <last-message-id> 消費グループの作成

  • 最終メッセージID 0-0 は最初のメッセージIDを表す, $ は最後のメッセージIDを表す
.1:]> xgroup create joblist g5 0
OK

xreadgroup GROUP <group-name> <consumer-name> [<block> <timeout>]|[<count> <number>] STREAMS <key> <last-message-id>GROUP,STREAMS はキーワードです。

  • カウント番号 メッセージ番号の読み取り
  • ブロック・タイムアウトのブロック待ち時間、待ち時間を超えるとnilが返されます。
  • last-message-id これが特殊文字 > ならメッセージを読み、カーソルを1ビット進めます。
.1:]> xreadgroup GROUP g1 c4 count 1 STREAMS joblist >
1) 1) "joblist"
 2) 1) 1) ") 1) "k3"
 2) "v3"

xack <key> <group-name> <message-id> 同じ消費者グループに属する消費者グループが再び受信することのない確認メッセージ ``

.1:]> xack joblist g1 683-0
(integer) 1

追加コマンド

XPENDING <key> <group-name> <start-id> <end-id> <count> [<consumer-name>] コンシューマ・グループが現在処理中のメッセージを表示します。

メッセージ ID、コンシューマ名、非アクティブのミリ秒数、メッセージが読まれた回数。

.1:]> xpending joblist g1 - + 10
1) 1) ") "c1" 
 3) (integer) ) (integer) 1
2) 1) ") "c1"
 3) (integer) ) (integer) 1
3) 1) ") "c2"
 3) (integer) ) (integer) 1

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N> 割り当てメッセージの消費者

.1:]> xclaim joblist g1 c2 
1) (nil)
.1:]> xpending joblist g1 - + 10
1) 1) ") "c2"
 3) (integer) ) (integer) 3
2) 1) ") "c2"
 3) (integer) ) (integer) 2
3) 1) ") "c2"
 3) (integer) ) (integer) 1

xinfo CONSUMERS <key> <group-name> 消費者グループに属する消費者に関する情報の入手

名前、承認されるメッセージの数、一意なid、この順。
.1:]> xinfo CONSUMERS joblist g1
1) 1) "name"
 2) "c1"
 3) "pending"
 4) (integer) 0
 5) "idle"
 6) (integer) ) 1) "name"
 2) "c2"
 3) "pending"
 4) (integer) 3
 5) "idle"
 6) (integer) ) 1) "name"
 2) "c4"
 3) "pending"
 4) (integer) 0
 5) "idle"
 6) (integer) 

xinfo GROUPS <key> 名前、コンシューマの数、確認メッセージの数、カーソルの現在のメッセージ ID の順にコンシューマ・グループ情報を取得します。

.1:]> xinfo GROUPS joblist
1) 1) "name"
 2) "g1"
 3) "consumers"
 4) (integer) 3
 5) "pending"
 6) (integer) 3
 7) "last-delivered-id"
 8) ") 1) "name"
 2) "g2"
 3) "consumers"
 4) (integer) 2
 5) "pending"
 6) (integer) 2
 7) "last-delivered-id"
 8) "961-0"

xinfo STREAM <key>順番、長さ、エンコーディング設定、グループ数、最後に生成されたメッセージID、最初のメッセージ、最後のメッセージ

.1:]> xinfo STREAM joblist
 1) "length"
 2) (integer) 4
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 6
 9) "last-generated-id"
10) ") "first-entry"
12) 1) ") 1) "k4"
 2) "v4"
13) "last-entry"
14) 1) ") 1) "k8"
 2) "v8"
Read next

アプレット

-ログイン\nユニオニドとオープニド\n\n\n鍵Api\nwx.loginが公式に提供するログイン機能\n ユーザの現在のログインが有効かどうかを検証\n 事前

Apr 4, 2020 · 6 min read