Qtの基礎 - MQTT

提供:MochiuWiki : SUSE, EC, PCB
2024年11月16日 (土) 11:41時点におけるWiki (トーク | 投稿記録)による版 (→‎概要)
ナビゲーションに移動 検索に移動

概要

QtでMQTT通信を実装する場合は、QtMqttモジュールを使用する。
このモジュールは、標準的なMQTTプロトコルの機能をカバーしており、プロジェクトファイルに簡単な設定を追加するだけで利用可能である。

中核となるQMqttClientクラスを使用することにより、MQTTブローカーへの接続、切断、メッセージの送受信等の基本的な操作が可能になる。
接続設定では、ホスト名、ポート番号、必要に応じて認証情報を指定する。

Qt特有のシグナル / スロットメカニズムを使用することで、接続状態の監視やメッセージの非同期処理を効率的に実装できる。
接続完了、切断、エラー発生等のイベントをリアルタイムで検知して、適切な処理を行うことができる。

パブリッシュ / サブスクライブ (メッセージの送受信) の実装も直感的である。
トピック名を指定してメッセージを発行、あるいは、特定のトピックをサブスクライブしてメッセージを受信することができる。
また、ワイルドカードを使用したトピックの購読にも対応している。

セキュリティ面においては、TLS/SSL通信やユーザ認証もサポートされている。
証明書の設定や認証情報の指定により、セキュアな通信を実現できる。
また、Last Will Testamentの設定やQoSレベルの指定といったMQTTプロトコルの高度な機能も利用可能である。

実装時において重要なことは、エラーハンドリングの適切な実装が挙げられる。
ネットワークの切断、再接続、タイムアウト等の状況に適切に対応することにより、安定したアプリケーションを構築することができる。

また、大量のメッセージを送受信する場合は、非同期処理を活用してUIのブロッキングを防ぐことが重要である。


MQTT通信

技術的な制約

  • メモリ関連の制約
    QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。
    特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。

  • ネットワーク関連の制約
    Qt MQTTはTCP/IP上で動作するため、ネットワークの状態に依存する。
    ファイアウォールや特定のポートがブロックされている環境では動作しない可能性がある。
    SSL/TLS接続を使用する場合、追加の設定と証明書の管理が必要である。


デバッグとトラブルシューティング

開発時は、MQTTクライアントツール (例: MQTT Explorer) を使用することにより、通信の様子を視覚的に確認できる。
また、シリアルモニタを活用して、接続状態やメッセージの送受信を確認する。

使用例

以下の例では、MQTT通信でトピックの送受信を行っている。

全てのMQTT操作 (接続、発行、購読) をシグナル/スロットを使用して非同期で実行している。
また、接続が失敗した場合は、自動的に再接続を行う。

送信 (MQTTパブリッシャー側)
 // Publisher.h
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QTimer>
 #include <QDebug>
 
 class MqttPublisher : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString m_host;
    quint16 m_port;
 
 public:
    explicit MqttPublisher(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttPublisher::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttPublisher::handleError);
    }
 
    ~MqttPublisher()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnect();
       }
    }

    // MQTT接続の開始
    void connectToHost(const QString &host, quint16 port)
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // メッセージの発行
    void publishMessage(const QString &topic, const QByteArray &message)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // QoS 1でメッセージを発行 (メッセージ到達保証あり)
       auto publish = m_client->publish(topic, message, 1);
       if (!publish) {
          qWarning() << "メッセージの発行に失敗しました: " << topic;
          return;
       }
    }
 
    // 接続の切断
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバーが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザー名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー:" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port);
          });
       }
    }
 };


 // Publisher側の使用例
 
 #include "Publisher.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttPublisher publisher;
    publisher.connectToHost("<IPアドレスまたはホスト名  例: localhost>", <MQTT通信するポート番号  : 1883>);
 
    // 定期的にメッセージを送信
    QTimer timer;
    QObject::connect(&timer, &QTimer::timeout, [&publisher]() {
       publisher.publishMessage("qt/topic", "Hello, MQTT!");
    });
 
    timer.start(1000);  // 1秒おきに送信
    
    return a.exec();
 }


受信 (MQTTサブスクライバ側)

以下の例では、受信したトピックの購読 (サブスクライブ) している。

 // Subscriber.hファイル
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QtMqtt/QMqttSubscription>
 #include <QTimer>
 #include <QDebug>
 
 class MqttSubscriber : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString     m_host;
    quint16     m_port;
    QMap<QString, QMqttSubscription*> m_subscriptions;
 
 public:
    explicit MqttSubscriber(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttSubscriber::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttSubscriber::handleError);
    }
 
    ~MqttSubscriber()
    {
       // 全ての購読を解除
       for (auto subscription : m_subscriptions) {
          subscription->unsubscribe();
          delete subscription;
       }
 
       m_subscriptions.clear();
 
      if (m_client->state() == QMqttClient::Connected) {
         m_client->disconnect();
      }
   }
 
    // MQTT接続を開始する
    void connectToHost(const QString &host, quint16 port)
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // トピックを購読する
    void subscribe(const QString &topic)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // 既に購読済みのトピックは無視
       if (m_subscriptions.contains(topic)) {
          qInfo() << "トピックは既に購読されています:" << topic;
          return;
       }
 
       // QoS 1でトピックを購読 (メッセージ到達保証あり)
       auto subscription = m_client->subscribe(topic, 1);
       if (!subscription) {
          qWarning() << "トピックの購読に失敗しました:" << topic;
          return;
       }
 
       // メッセージ受信時のハンドラを設定
       connect(subscription, &QMqttSubscription::messageReceived, this, &MqttSubscriber::handleMessage);
 
       // 購読リストに追加
       m_subscriptions.insert(topic, subscription);
       qInfo() << "トピックを購読開始しました:" << topic;
    }
 
    // 接続を切断する
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 signals:
    // メッセージ受信時のシグナル
    void messageReceived(const QString &topic, const QByteArray &message);
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザ名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー:" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port);
          });
       }
    }
 
    // メッセージ受信時のハンドラ
    void handleMessage(const QByteArray &message, const QMqttTopicName &topic)
    {
       // メッセージを受信したことをログに記録
       qInfo() << "メッセージを受信しました - トピック: " << topic.name();
       qInfo() << "メッセージ: " << message;
 
       // シグナルを発行して、アプリケーションの他の部分に通知
       emit messageReceived(topic.name(), message);
    }
 };


 // Subscriber側の使用例
 
 #include "Subscriber.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttSubscriber subscriber;
    subscriber.connectToHost("localhost", 1883);
 
    // メッセージ受信時の処理
    QObject::connect(&subscriber, &MqttSubscriber::messageReceived, [](const QString &topic, const QByteArray &message) {
       qDebug() << "受信 - トピック: " << topic;
       qDebug() << "メッセージ: "  << message;
    });
 
    subscriber.subscribe("qt/topic");
 
    return a.exec();
 }