背景
ストリームは、Redis 5.0は、新しいデータ型を導入し、軽量メッセージングミドルウェアのプロジェクトとしてRedisをよりよく利用することができます、カフカの設計から借りて、コンシューマグループや他の概念の導入は、メッセージのRedis伝送がより信頼性が高いように、あなたは、永続的なメッセージ、メッセージの再送信、タイムアウトなどをサポートすることができます。
システム原理
ストリームの設計では、消費者が独立して、または消費する消費者グループの形で消費することを選択できますが、ストリームは、複数の消費者グループをハングアップすることができ、各消費者グループは、互いに干渉しないように、ストリーム構造にメッセージを追加するには、xaddコマンドを介してプロデューサー。
コンシューマ・グループでは、コンシューマはプリエンプションによって消費し、メッセージ last_delivered_idを掴んで先に進み、消費完了後にxackによってメッセージを確認します。
プロデューサー
プロダクション・ニュース
xadd <key> <key-id> [<maxlen> <number>] <k1> <v1> ...
メッセージの追加
- key-idには*を設定し、システム自身が生成することもできますし、自分で指定することもできます。
- maxlenはストリームの長さを指定し、それを超えると最も古いメッセージが削除されます。
- k1 v1 キーと値をスペースで区切り、複数のキーと値のペアを指定できます。
.1:]> xadd joblist * k4 v4
"793-0"
xrang <key> <start-id> <end-id>
スコープ内メッセージの照会
- start-id start-id - 最小値を示します。
- end-id end-id +は最大値を示します。
.1:]> xrange joblist - +
1) 1) ") 1) "k1"
2) "v1"
2) 1) ") 1) "k2"
2) "v2"
3) 1) ") 1) "k3"
2) "v3"
4) 1) ") 1) "k4"
2) "v4"
xlen
.1:]> xlen joblist
(integer) 4
xdel <key> <message-id>
メッセージの削除
.1:]> xdel joblist 961-0
(integer)
コンシューマ
単独消費
xread [<count> <number>]|[<block> <timeout>] STREAMS <key> <last-message-id>
メッセージの独立した消費、STREAMSは必須で、コマンドの最後に置く必要があります。
- カウント番号 メッセージ番号の読み取り
- ブロック・タイムアウトのブロック待ち時間、待ち時間を超えるとnilが返されます。
- last-message-id メッセージを読み込む最後のメッセージ ID $ は最後のメッセージ ID を表します。
.1:]> xread STREAMS joblist 0
1) 1) "joblist"
2) 1) 1) ") 1) "k1"
2) "v1"
2) 1) ") 1) "k2"
2) "v2"
3) 1) ") 1) "k3"
2) "v3"
消費グループ消費
xgroup CREATE <key> <group-name> <last-message-id>
消費グループの作成
- 最終メッセージID
0-0 は最初のメッセージIDを表す, $ は最後のメッセージIDを表す
.1:]> xgroup create joblist g5 0
OK
xreadgroup GROUP <group-name> <consumer-name> [<block> <timeout>]|[<count> <number>] STREAMS <key> <last-message-id>
GROUP,STREAMS はキーワードです。
- カウント番号 メッセージ番号の読み取り
- ブロック・タイムアウトのブロック待ち時間、待ち時間を超えるとnilが返されます。
- last-message-id これが特殊文字 > ならメッセージを読み、カーソルを1ビット進めます。
.1:]> xreadgroup GROUP g1 c4 count 1 STREAMS joblist >
1) 1) "joblist"
2) 1) 1) ") 1) "k3"
2) "v3"
xack <key> <group-name> <message-id>
同じ消費者グループに属する消費者グループが再び受信することのない確認メッセージ ``
.1:]> xack joblist g1 683-0
(integer) 1
追加コマンド
XPENDING <key> <group-name> <start-id> <end-id> <count> [<consumer-name>]
コンシューマ・グループが現在処理中のメッセージを表示します。
メッセージ ID、コンシューマ名、非アクティブのミリ秒数、メッセージが読まれた回数。
.1:]> xpending joblist g1 - + 10
1) 1) ") "c1"
3) (integer) ) (integer) 1
2) 1) ") "c1"
3) (integer) ) (integer) 1
3) 1) ") "c2"
3) (integer) ) (integer) 1
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
割り当てメッセージの消費者
.1:]> xclaim joblist g1 c2
1) (nil)
.1:]> xpending joblist g1 - + 10
1) 1) ") "c2"
3) (integer) ) (integer) 3
2) 1) ") "c2"
3) (integer) ) (integer) 2
3) 1) ") "c2"
3) (integer) ) (integer) 1
xinfo CONSUMERS <key> <group-name>
消費者グループに属する消費者に関する情報の入手
名前、承認されるメッセージの数、一意なid、この順。
.1:]> xinfo CONSUMERS joblist g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) ) 1) "name"
2) "c2"
3) "pending"
4) (integer) 3
5) "idle"
6) (integer) ) 1) "name"
2) "c4"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer)
xinfo GROUPS <key>
名前、コンシューマの数、確認メッセージの数、カーソルの現在のメッセージ ID の順にコンシューマ・グループ情報を取得します。
.1:]> xinfo GROUPS joblist
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) ") 1) "name"
2) "g2"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "961-0"
xinfo STREAM <key>
順番、長さ、エンコーディング設定、グループ数、最後に生成されたメッセージID、最初のメッセージ、最後のメッセージ
.1:]> xinfo STREAM joblist
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 6
9) "last-generated-id"
10) ") "first-entry"
12) 1) ") 1) "k4"
2) "v4"
13) "last-entry"
14) 1) ") 1) "k8"
2) "v8"