それが僕には楽しかったんです。

僕と MySQL と時々 MariaDB

AWS SDK for PHP から AWS SQSを使ってみた

はじめに

最近話題のマストドンのStreaming APIを叩いて、特定のトゥートに含まれる文字列を別な何かに提供したい!!
という要望を叶えるために AWS SQS を使うことになった。
ここでは、メモも含めてここにまとめていく。

意外と文献がなかったので、実践では標準キュー、FIFOキューの両方についてコードを書く。

AWS Simple Queue Service とは

aws.amazon.com

SQS とは、これ。

メッセージキューをクラウド化したもの、キューをいくつも作る事ができてそれぞれ独立して管理できる。
一時的にデータを蓄えて置くのにはかなり最適で、非同期サービス対サービスのような今回の状況にかなり適している。
またキュー自体にも2種類あり、標準のキューとFIFOキューがある。
多分キューと聞いて思い浮かべるのは後者だと思う。

スタンダードキュー

スタンダードキューには大きく3つの特徴がある。

メッセージの順序

スタンダードキューにおけるメッセージの順序は特徴的な部分がある。通常キューと言われるとFIFO(First-In First-Out)を考えると思うがこのキューでは順番がなるべく保持されるだけで必ずしもFIFOを実現できるものではないと言うこと。
そのため、各メッセージに順序をつけてDBなどに突っ込むなどしてメッセージを受け取ったときにソートする必要がある

一回以上の配信

ドキュメントにもあるが、SQSは冗長性と可用性を確保するために、メッセージが複数のサーバに保存される。
それが影響して、メッセージの受信と削除のときにまれに同一のメッセージのコピーを処理してしまい想定される挙動と異なる挙動を取る場合がある。
そのため、それらを考慮して処理を設計する必要がある。

ショートポーリングを使用したメッセージ処理

SQSはデフォルトでショートポーリングを使用する。ショートポーリングとはポーリングされたキューが空であろうとなかろうとすぐに応答を返すことを指す。
反対にロングポーリングはキューに達するかロングポーリングがタイムアウトになるまで応答を返すことがない。

ショートポーリングを使用した場合、サーバのサブセットがサンプリングされるためそれらのサーバにあるメッセージだけ返される。
そのため、受信リクエストによっては全てのメッセージを受け取ることができない、というかそもそもサーバがわが持っていいない場合がある。
これは、サーバにデータをコピーしていることとそれらのサーバが分散システムとなっていることが要因となっている。

AWS SDK for PHP による実践

まず共通する環境について。
PHPは以下のバージョンを使う。

$ php -v
PHP 7.2.8-1+ubuntu16.04.1+deb.sury.org+1 (cli) (built: Jul 25 2018 10:51:50) ( NTS )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.2.0, Copyright (c) 1998-2018 Zend Technologies
with Zend OPcache v7.2.8-1+ubuntu16.04.1+deb.sury.org+1, Copyright (c) 1999-2018, by Zend Technologies
with Xdebug v2.6.0, Copyright (c) 2002-2018, by Derick Rethans

composer を使って、以下のパッケージを持ってくる。

$ composer require aws/aws-sdk-php

また以下のディレクトリにcredential、つまり認証情報を以下の形式でいれる必要がある。
このあたりは、次回のIAM関連についての記事と同じタイミングで書く。

$ cat $HOME/.aws/credential
[default]
aws_access_key_id = ACCESS_KEY
aws_secret_access_key = SECRET_KEY

標準キューとFIFOキューの作り方に関してはコンソールから作るだけなので省略。


標準キュー(スタンダードキューの実践)
メッセージの送信
<?php

require_once 'vendor/autoload.php';

use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'url');


try {

    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'us-east-1',
        'version' => 'latest',
    ]);

    $post_params = [
        'DelaySeconds' => 0, //ここに入れた秒数だけキューから値を取り出せない
        'MessageAttributes' => [
            'Title' => [
                'DataType' => 'String', //binaryを指定するとbase64 encodeでバイナリを与える
                'StringValue' => 'Amazon SQS Test Message',
            ]
        ],
        'MessageBody' => 'This is AWS SDK for PHP.', //メッセージ本文
        'QueueUrl' => QUEUE_URL,
    ];

    $result = $client->sendMessage($post_params);

} catch (SqsException $error) {
    $e->getMessage();
}

メッセージの受信と削除
<?php

require_once 'vendor/autoload.php';

use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'url');


try {

    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'us-east-1',
        'version' => 'latest',
    ]);

    $get_params = [
        'AttributeNames' => ['All'],
        'MessageAttributeNames' => ['All'], //All指定部分は取得したい属性を設定する
        'MaxNumberOfMessages' => 5, //一度に受け取るメッセージの最大数
        'QueueUrl' => QUEUE_URL,
        'WaitTimeSeconds' => 20, 
        'VisibilityTimeout' => 60,
    ];

    $result = $client->receiveMessage($get_params);
    var_dump($result);
    $data = $result->get('Messages');

    if ($data)
    {
        foreach($data as $item)
        {
            echo $item['Body']."\n";

            //受け取ったら削除しないとデータは残る
            $client->deleteMessage([
                'QueueUrl' => QUEUE_URL,
                'ReceiptHandle' => $item['ReceiptHandle'],
            ]);
        }
    }

} catch (SqsException $error) {
    $e->getMessage();
}

FIFO キュー

FIFOキューはイベントの順番が重要になる場合や重複不可な時に使うと良いらしい・
というのもメッセージ順序が大きく関係している。

メッセージの順序

FIFOキューでは配信は一回だけの処理とされる点が標準キューと大きな違いがある。
メッセージの送信、受信の順序は厳密に守られコンシューマが削除するまで使用可能状態で残り続ける。
また、キューの状態で重複は導入されていない・

重要な用語

いくつか、SQS FIFOを使ううえで重要な用語がある。(メッセージグループなど)
それは以下のサイトで詳解されていたので省略。
Amazon SQS FIFO (先入れ先出し) キュー - Amazon Simple Queue Service

メッセージの送信
<?php

require_once 'vendor/autoload.php';

use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'url');

try {

    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'us-east-1',
        'version' => 'latest',
    ]);

    $client->sendMessage([
        'QueueUrl' => QUEUE_URL,
        'MessageBody' => 'hello,SQS!!',
        'MessageGroupId' => 'test',
        'MessageDeduplicationId' => time(),
    ]);

} catch (SqsException $error) {
    $e->getMessage();
}
メッセージの受信と削除
<?php

require_once 'vendor/autoload.php';

use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'url');


try {

    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'us-east-1',
        'version' => 'latest',
    ]);

    $result = $client->receiveMessage([
        'MaxNumberOfMessages' => 1,
        'QueueUrl' => QUEUE_URL,
    ]);

    foreach ($result->search('Messages[]') as $message) 
    {
        $queueHandle = $message['ReceiptHandle'];
        echo $message['Body'];
        $client->deleteMessage([
            'QueueUrl' => QUEUE_URL,
            'ReceiptHandle' => $queueHandle,
        ]);
    }

} catch (SqsException $error) {
    $e->getMessage();
}

おわりに

SQSはシンプルなメッセージをアプリケーション間で共有する際に便利で
特にFIFOを様々な配信サーバの一つの機能として利用できる。