blog

Laravel redis キューベースの解析

非同期実行: コード実行の一部は非常に時間がかかります。レスポンス速度を向上させ、接続リソースの過剰な占有を避けるために、コードのこの部分をキューに入れ、非同期実行することができます。 エラーの再試行...

Jul 3, 2020 · 5 min. read
シェア

キューを使う理由

キューを使用する目的は一般的です。
  1. 非同期実行
  2. エラー再試行
説明してください。
非同期実行: 一部のコード実行は非常に時間がかかります。応答速度を向上させ、接続リソースの過剰使用を避けるために、コードのこの部分をキューに入れ、非同期実行することができます。
例えば、新規ユーザーがウェブサイトに登録した後、ウェルカムメールを送信する必要がありますが、これにはネットワークIOが関与しており、時間を制御することができません。
エラー再試行:いくつかのタスクの正常な実行を保証するために、タスクが正常に処理されるか、またはN個以上のエラー後にキャンセルされるまで、実行エラーが一定期間遅延され、その後再試行することができる場合、タスクを実行キューに入れることができます。
例:ユーザーが携帯電話番号をバインドする必要がある、この時間は、SMSのインターフェイスを送信するために、サードパーティに依存している、1つは、時間がかかるのかわからない、1つは、コールの成功を確保するために、エラーの後に再試行する必要がありますされていません。

Laravel キューは

以下では、デフォルトで使用されるキューとその構成について、以下のように分析します。
  • デフォルトのキューエンジン: redis
redis-cliのmonitorコマンドを使って、実行されたコマンド文を確認します。
  • デフォルトのキュー名: default

配布タスク

これは非同期通知を配信する例です。
Laravelで非同期通知が開始されると、Laravelはredisのタスクキューに新しいタスクを追加します。
redis実行文
redis> RPUSH queues:default
{
 "displayName": "App\\Listeners\\RebateEventListener",
 "job": "Illuminate\\Queue\\CallQueuedHandler@call",
 "maxTries": null,
 "timeout": null,
 "timeoutAt": null,
 "data": {
 "commandName": "Illuminate\\Events\\CallQueuedListener",
 "command": "O:36:"Illuminate\\Events\\CallQueuedListener":7:{s:5:"class";s:33:"App\\Listeners\\RebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"App\\Events\\RebateCreatedEvent":4:{s:11:"\u0000*\u0000tbkOrder";O:45:"Illuminate\\Contracts\\Database\\ModelIdentifier":3:{s:5:"class";s:19:"App\\Models\\TbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"\u0000*\u0000notifyAdmins";b:1;s:13:"\u0000*\u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"\u0000*\u0000job";N;}"
 },
 "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
 "attempts": 0
}
上記のredis文は、タスク情報をredisキューのqueues:defaultの末尾にプッシュします。

タスクキューワーカー

php artisan queue:work --once --delay=1 --tries=3
上記実行文のパラメータの意味。
  1. --デフォルトでは常駐プロセスによって常に実行されます。
  2. -tries=3タスクエラーの再試行回数は最大3回、デフォルトは無制限
  3. --delay=1 タスクがエラーになると、毎回1秒遅れて再実行されます。
ワーカーが起動すると、以下のステップを順番に実行します。
ここでは、デフォルトのキューのデフォルトを例として使用し、redis関連の操作のみを説明します。
  1. queues:default:delayed orderedコレクションから処理可能な「遅延タスク」を取得し、queues:default queue Specific実行文の末尾にrpushします。
redis> eval "Luaスクリプト" 2キュー:default:delayed queues:default 現在のタイムスタンプ
Luaスクリプトは次のようになります。
-- Get all of the jobs with an expired "score"...localval = redis.call('zrangebyscore', KEYS[1],'-inf', ARGV[1])-- If we have values in the array, we will remove them from the first queue-- and add them onto thedestination queue in chunks of 100, which moves-- all of the appropriate jobs onto the destination queue very safely.if(next(val) ~=nil)thenredis.call('zremrangebyrank', KEYS[1],0, #val -1)fori =1, #val,100doredis.call('rpush', KEYS[2],unpack(val, i,math.min(i+99, #val)))endendreturnval
具体的な実行ステートメント。
redis> eval "Luaスクリプト" 2キュー:default:reserved queues:default 現在のタイムスタンプ
使用するLuaスクリプトは、ステップ1と同じです。
queue:defaultキューからタスクを取得、試行回数を増やし、queu:default:reserved ordered setに保存、タスクのスコア値は現在時刻+90(タスク実行タイムアウト)。
具体的な実行ステートメント。
redis> eval 「Luaスクリプト” 2 queues:default queues:default:reserved タスクタイムアウトタイムスタンプ
Luaスクリプト
- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}
  1. config('queue.connections.redis.retry_after') は、タスクが実行中にリセットされるのを防ぐために、時間がかかりすぎるようであれば増やすべきです。
  2. ZREM queues:default:reserved "特定のタスク" 上記で取得したタスクの実行に成功したら、そのタスクをqueues:default:reservedキューから削除します。
  3. タスクの実行が失敗した場合、2つのシナリオがあります。
タスクが指定されたリトライ閾値未満で失敗した場合 queues:default:reservedからタスクを削除し、タスクをqueues:default:delayed ordered setに追加します。
redis> EVAL "Luaスクリプト" 2キュー:default:delayed queues:default:reserved "失敗したタスク " タスク実行遅延のタイムスタンプ
Luaスクリプト
-- Remove the job from the current queue... redis.call('zrem', KEYS[2], ARGV[1]) -- Add the job onto the "delayed" queue... redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) return true
タスクの失敗回数が指定されたリトライしきい値を超えた場合、そのタスクをキューから削除します。
redis> ZREM queue:default:reserved
上記のようにLuaスクリプトを使用する目的は、操作のアトミック性を確保することであることに注意してください。 Redisはシングルプロセス、シングルスレッドモデルであり、Luaスクリプトとしてコマンドを実行することで、スクリプトが同時実行の問題なくアトミックに実行されることが保証されます。

高度な多くのPHPerは、常にいくつかの問題やボトルネックに遭遇し、ビジネスコードは、方向感覚よりも多くを記述するために、私はいくつかの情報を照合している改善するためにどこから始めればわからないが、これらに限定されない:分散アーキテクチャ、高いスケーラビリティ、高性能、高同時性、サーバーのパフォーマンスチューニング、TP6、laravel、YII2、Redis、Swoole、Swoft、Kafka、Mysqlの最適化、シェルスクリプト、Docker、マイクロサービス、Nginx、その他多くの高度な高度な乾物は、みんなと自由に共有する必要があります!



Read next

Javaの基本 - 抽象クラスとインターフェースの違い

抽象クラス: abstract キーワードで修飾されたクラスを抽象クラスと呼びます。 インターフェース: インターフェースは interface キーワードで宣言します:

Jul 3, 2020 · 2 min read