mazyu36の日記

某SIer所属のクラウドエンジニアのブログ

Amazon MQで Multi AZなMQTTブローカーを構築し、フェイルオーバーを考慮したMQTT通信を行う(AWS CDK, paho-mqtt)

前回の記事で、AWSにおけるMQTTブローカーのサービスに比較について記載しました。

mazyu36.hatenablog.com

今回はその中でもAmazon MQを使用し、Multi AZであるMQTTブローカーを構築した上でMQTT通信を試してみました。

ポイントとしては以下です。

  • Amazon MQ (MQTTブローカー):AWS CDKで構築。
    • Multi AZ構成(Active-Standby)
    • 権限に応じたユーザー設定(フルアクセス権限を持つユーザー、一部の通信のみ許可するユーザーの作成)
  • Python (クライアント):paho-mqttでMQTTクライアントを実装。
    • PublisherとSubscriberをそれぞれ実装。
    • MQTTブローカーのMulti AZ構成に対応した接続機構を具備(フェイルオーバー時に自動再接続)

目次

実装について

github.com

1. CDKの実装について

Amazon MQに関する実装の部分のみ簡単に解説します。

なおAmazon MQは2023/6時点でL2 Constructが存在しないので、基本はL1 Constructで実装していく形になります。

docs.aws.amazon.com

※L2 Constructの要望に関するIssueはあります。

github.com

Brokerの定義

ActiveMQでActive Standby構成のブローカーを定義していきます。

    const broker = new amazonmq.CfnBroker(scope, 'MQTTBroker', {
      brokerName: 'mqtt-broker',
      engineType: 'ACTIVEMQ',  // ActiveMQを指定
      engineVersion: '5.15.14',
      autoMinorVersionUpgrade: true,
      deploymentMode: 'ACTIVE_STANDBY_MULTI_AZ',  // Active Standbyを定義
      hostInstanceType: 'mq.t3.micro',

      // NW周りの設定を実施
      publiclyAccessible: true,
      securityGroups: [props.securityGroup.securityGroupId],
      subnetIds: props.vpc.publicSubnets.map(subnet => subnet.subnetId),

      // log出力設定を実施
      logs: {
        general: true,
        audit: true
      },

      // ユーザー設定
      users: [
        {
          groups: ['admin'],
          username: 'admin',
          password: 'admintest1234',
          consoleAccess: true,
        },
        {
          groups: ['publisher'],
          username: 'publisher',
          password: 'publisher1234',
          consoleAccess: false,
        },
        {
          groups: ['subscriber'],
          username: 'subscriber',
          password: 'subscriber1234',
          consoleAccess: false,
        }
      ],
    });

ユーザー設定として、admin, publisher, subscriberの3つのユーザーを作成しています。これらのユーザーに対して後ほど権限設定をします。

なおここではpasswordを直接記載していますが、実際には何かしらの方法で隠蔽した方が良いと思います。

Brokerの設定と紐付けを定義

    // Broker用の設定を定義
    const config = new amazonmq.CfnConfiguration(scope, 'MQTTBrokerConfig', {
      engineType: 'ACTIVEMQ',
      engineVersion: '5.15.14',

      // activemq.xmlの内容をbase64で渡す ★
      data: cdk.Fn.base64(`<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
      <broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000">
        <destinationPolicy>
          <policyMap>
            <policyEntries>
              <policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000">
                <pendingMessageLimitStrategy>
                  <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
              </policyEntry>
              <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="600000" />
            </policyEntries>
          </policyMap>
        </destinationPolicy>
        <plugins>
          <authorizationPlugin>
            <map>
              <authorizationMap>
                <authorizationEntries>
                  <authorizationEntry topic=">" write="admin, publisher, activemq-webconsole" read="admin, subscriber, activemq-webconsole" admin="admin,publisher, activemq-webconsole"/>
                </authorizationEntries>
              </authorizationMap>
            </map>
          </authorizationPlugin>
        </plugins>
      </broker>
      `),
      description: 'MQTT Broker Config',
      name: 'mqtt-broker-config',
    });

    // Brokerと設定を紐づける
    new amazonmq.CfnConfigurationAssociation(scope, 'MQTTBrokerConfigAssociation', {
      broker: broker.ref,
      configuration: {
        id: config.ref,
        revision: config.attrRevision
      }
    });

amazonmq.CfnConfigurationでBroker用の設定を作成し、amazonmq.CfnConfigurationAssociationでBrokerと紐づけるという流れになります。

★はActiveMQ用の設定ファイル(activemq.xml)の内容を渡し、Amazon MQの設定として適用することができます。

ただしActiveMQで使える項目の全てが利用できるわけではないので注意が必要です。使用できる項目は以下に記載されています。

docs.aws.amazon.com

Amazon MQで使用できないものの例として、simpleAuthenticationPluginがあります。

ActiveMQでは以下のようにxmlでユーザー情報を定義することが可能です。しかしAmazon MQではこちらは使用できません。

      <simpleAuthenticationPlugin>
        <users>
          <authenticationUser username="user1" password="password1" groups="publishers"/>
          <authenticationUser username="user2" password="password2" groups="subscribers"/>
        </users>
      </simpleAuthenticationPlugin>

そのためAmazon MQ側のプロパティとしてユーザーを作成する必要があります(後はマネコンから作成するか)。

今回の実装例における設定では、ユーザーの権限を設定しています。具体的には以下の箇所です。

              <authorizationMap>
                <authorizationEntries>
                  <authorizationEntry topic=">" write="admin, publisher, activemq-webconsole" read="admin, subscriber, activemq-webconsole" admin="admin,publisher, activemq-webconsole"/>
                </authorizationEntries>
              </authorizationMap>

authorizationEntryによりユーザーのグループに基づく権限を設定しています。

今回の例では以下のように設定しています。

  • adminactivemq-webcomsole(ActiveMQの管理画面からの操作)はなんでもできる
  • publisherはtopicの作成とpublishができる(subscribeは不可)
  • subscriberはtopicのsubscribeができる(publishは不可)

xmlの設定項目の意味は以下です。

  • topic: >となっていますが、これは全てのトピックにこの内容を適用するという意味です。
  • write: MQTTの場合、topicへのpublishを許可するグループを指定します(この権限だけだとtopicの「作成」はできないので注意)。
  • read:MQTTの場合、topicのsubscribeを許可するグループを指定します。
  • admin: MQTTの場合、topicの作成や削除まで許可するグループを指定します。今回はwriteと同じ権限にしています。※MQTTで動的にtopicを作成してpublishすることが普通な気がするので、publisherは基本admin権限が必要な気がするが権限として強すぎる気も、、、解決策わかる方いれば教えてください。

※以下私見

  • このxmlで権限を設定しないといけないのが、AWS IoT Coreと比較してちょっと面倒だなという印象を感じました。

  • またsimpleAuthenticationPluginのようにAmazon MQでは使えない設定項目もあるので、AWS外で使っていたActiveMQの設定をそのまま移行できるとは限らないのがなんとも言えないです。

2. Clientの実装について

pythonのpaho-mqttでMQTTクライアントの実装を行なっています。

以下サイトの実装を大いに参考にさせていただきました。 そのため多くは解説しないですが、今回特に考慮した点のみ改造します

tech.fusic.co.jp

リポジトリpython配下にクライアントの実装を格納しています。

.
├── mqtt_helper.py  // MQTTクライアントの操作
├── publisher.py // publisherを実装
└── subscriber.py // subscriberを実装

フェイルオーバーの考慮

アーキテクチャを再掲しますが、今回MQTTブローカーはActive-Standby構成のため、フェイルオーバーを考慮する必要があります。

Amazon MQでは接続先のエンドポイントがインスタンスごとに払い出されるため、そこの接続制御を行う必要があります。

faileoverから始まるのはフェイルオーバートランスポートであり、ライブラリ等が対応していればこれを元に制御することが可能です。

引用元:https://www.cloudremix.net/awsdocs/20210317_AWS-BlackBelt_AmazonMQ%20210317a.pdf

以下JavaのSpring Bootによる実装例。

dev.classmethod.jp

調査した限りpythonのpaho-mqttではサポートされていないようでした。以下のissueでクラスターに対して接続する機能のリクエストが起票されています。

github.com

フェイルオーバーの実装

先述のblackbeltのフェイルオーバートランスポートの記載を参考に、以下の方針でpaho-mqttでフェイルオーバーの機構を実装してみました。

  • 接続先のBrokerをリストとして保持する
  • 接続試行時は対象のエンドポイントをランダムに選択する。
    • 成功時はそのまま接続
    • 接続試行失敗時、またはフェイルオーバー発生等による切断発生時はランダムに対象のエンドポイントを再選択し、接続を試みる。

以下該当箇所の実装の抜粋です。

class MQTTHelper:
    def __init__(self, client_name):
        self.username = 'admin'
        self.password = 'admintest1234'
        # ①Brokerのリストを保持
        self.broker_host_list = [
            'b-xxxxx.mq.ap-northeast-1.amazonaws.com',
            'b-xxxxx.mq.ap-northeast-1.amazonaws.com']

        # 中略

    def connect(self):
        """
        フェイルオーバーを考慮した接続のための関数
        """
        # ②attemptは簡易的なエクスポネンシャルバックオフに使用
        attempt = 1

        # 接続試行のループ
        while True:
            # ③接続先をランダムに指定
            broker = random.choice(self.broker_host_list)
            print(f'Connecting to {broker}')
            try:
                # ④接続できたらそのままループをスタートさせ、attemptをリセットしループを抜ける。
                self.client.connect(broker, 8883, 60)
                print(f'Successfully connected to "{broker}"')
                self.client.loop_start()
                attempt = 1

                break
            except Exception as e:
                print(f'Failed to connect to {broker}')
                print(e)
                print('Trying to connect to another broker...')

                # ⑤接続に失敗したら一定時間待機してから再試行する
                wait_time = min(2 ** attempt, 30)
                print(f'Waiting for {wait_time} seconds before retrying...\n')
                time.sleep(wait_time)
                attempt += 1
                continue

まず①で接続先のBroker(Active, Standbyのエンドポイント)を保持しておきます。これをもとに接続先を切り替えていきます。

②のattemptは簡易的にエクスポネンシャルバックオフを行うためのカウンタになります。フェイルオーバー発生時はStandbyのBrokerがActiveに昇格しますが、多少時間がかかるので、そのままだと待機時間無しに大量の再接続のリクエストを投げてしまいます。

そのためattemptで接続失敗時の試行回数を保持し、失敗した回数が多いほど再接続するまでの時間を長く設けるようにしています。

        # 接続試行回数のカウンタを設定
        attempt = 1

        while True:
            try:
                # 接続できたらカウンタをリセットしてループを抜ける
                self.client.connect(broker, 8883, 60)
                attempt = 1

                break
            except Exception as e:
                # 接続に失敗したら2^試行回数分だけ待機する。ただし無駄に長くなりすぎないよう、上限は30秒にする。
                wait_time = min(2 ** attempt, 30)
                
                # 待機した後にカウンタを増やしてループを継続。再接続を試す。
                time.sleep(wait_time)
                attempt += 1
                continue

上記で簡易的ですがエクスポネンシャルバックオフの機構をもつ、フェイルオーバーの仕組みが実装できました。 改善の余地がある点としては以下が挙げられます。

  • 接続失敗時はもう片方の接続先に接続を試行する方が良い。今回ランダムに接続先を指定しているので、Standbyのインスタンスのエンドポイントを指定し続けて無駄に接続の試行を試してしまう可能性があります。2つのエンドポイントそれぞれに接続試行を行い両方ダメだったら、一定時間待機して再試行とした方が良さそうです。
  • 打ち切りの機構の実装。Amazon MQ自体がダウンするなどで復旧の見込みがなくなったケースでも延々と再接続を試してしまいます。そのため何回か接続を試行した際は終了させる機構があった方が良さそうです。

フェイルオーバーの動作確認

簡単に動作を試してみました。

Amazon MQでフェイルオーバーの動作を確認したい場合、マネコンからBrokerの再起動を行えば良いです。これによりBrokerの再起動がかかり、Active-Standbyの入れ替えが発生するのでテストができます。

実際に上記の実装を使い、publisherを起動しっぱなしにしてBrokerの再起動を行ったところ、接続試行を何回か繰り返した後に再接続できていることが確認できました。エクスポネンシャルバックオフの機構も機能していそうです。 (※エンドポイントをマスクしていないですが、削除済みのBrokerの値なのでもう使えない接続先です)

終わりに

次はAWS IoT CoreでMQTTを試してみたいと思います。