redis = new Redis();
$this->redis->connect($config['host'], $config['port']);
if (!empty($config['password'])) {
$this->redis->auth($config['password']);
}
$this->redis->select($config['select']);
// 初始化消息队列
$this->queue = Queue::instance(config('queue'));
}
public function push()
{
// 生成一个新的订单
$orderId = uniqid();
$userId = rand(1, 100);
$amount = rand(10, 100);
// 将订单信息推入消息队列
$data = [
'order_id' => $orderId,
'user_id' => $userId,
'amount' => $amount,
];
$this->redis->lPush($this->queueName, json_encode($data));
echo "成功把订单ID为 {$orderId} 的订单推入队列中\n";
}
public function run()
{
// 从队列中获取订单信息并进行处理
$jsonData = $this->redis->rPop($this->queueName);
if (empty($jsonData)) {
echo "队列中没有需要处理的订单\n";
return;
}
$data = json_decode($jsonData, true);
if (!is_array($data) || !isset($data['order_id']) || !isset($data['user_id']) || !isset($data['amount'])) {
echo "无效的订单信息:$jsonData\n";
return;
}
// 尝试处理订单并支付
try {
$result = $this->processOrder($data);
if ($result) {
echo "订单 ID 为 {$data['order_id']} 的订单处理成功\n";
} else {
throw new Exception("订单 ID 为 {$data['order_id']} 的订单支付失败");
}
} catch (Exception $e) {
Log::error($e->getMessage());
$data['attempts'] = intval($data['attempts'] ?? 0) + 1;
if ($data['attempts'] < $this->maxAttempts) {
// 重试
$retryAt = time() + $this->retryAfter;
$this->queue->later($retryAt, $data); ////延时执行
echo "订单 ID 为 {$data['order_id']} 的订单第 {$data['attempts']} 次处理失败,{$this->retryAfter} 秒后进行重试\n";
} else {
// 订单处理失败
echo "订单 ID 为 {$data['order_id']} 的订单处理失败,已达到最大尝试次数\n";
}
}
}
private function processOrder(array $order)
{
// 处理订单并尝试支付
echo "开始处理订单 ID 为 {$order['order_id']} 的订单\n";
// 模拟处理订单和支付的过程,随机返回 true 或 false
$success = boolval(rand(0, 1));
if (!$success) {
throw new Exception("支付失败");
}
return true;
}
}
本文共 个字数,平均阅读时长 ≈ 分钟,您已阅读:0时0分0秒。
649494848