Qtの基礎 - MQTT

2024年11月16日 (土) 11:18時点におけるWiki (トーク | 投稿記録)による版 (ページの作成:「== 概要 == <br><br> == MQTT通信 == ==== 技術的な制約 ==== * メモリ関連の制約 *: QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。 *: 特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。 *: <br> * ネットワ…」)
(差分) ← 古い版 | 最新版 (差分) | 新しい版 → (差分)

概要



MQTT通信

技術的な制約

  • メモリ関連の制約
    QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。
    特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。

  • ネットワーク関連の制約
    Qt MQTTはTCP/IP上で動作するため、ネットワークの状態に依存する。
    ファイアウォールや特定のポートがブロックされている環境では動作しない可能性がある。
    SSL/TLS接続を使用する場合、追加の設定と証明書の管理が必要である。


デバッグとトラブルシューティング

開発時は、MQTTクライアントツール (例: MQTT Explorer) を使用することにより、通信の様子を視覚的に確認できる。
また、シリアルモニタを活用して、接続状態やメッセージの送受信を確認する。

使用例

以下の例では、MQTT通信でトピックの送受信を行っている。

全てのMQTT操作 (接続、発行、購読) をシグナル/スロットを使用して非同期で実行している。
また、接続が失敗した場合は、自動的に再接続を行う。

送信 (MQTTパブリッシャー側)
 // Publisher.h
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QTimer>
 #include <QDebug>
 
 class MqttPublisher : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString m_host;
    quint16 m_port;
 
 public:
    explicit MqttPublisher(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttPublisher::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttPublisher::handleError);
    }
 
    ~MqttPublisher()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnect();
       }
    }

    // MQTT接続の開始
    void connectToHost(const QString &host, quint16 port)
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // メッセージの発行
    void publishMessage(const QString &topic, const QByteArray &message)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // QoS 1でメッセージを発行 (メッセージ到達保証あり)
       auto publish = m_client->publish(topic, message, 1);
       if (!publish) {
          qWarning() << "メッセージの発行に失敗しました: " << topic;
          return;
       }
    }
 
    // 接続の切断
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバーが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザー名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー:" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port);
          });
       }
    }
 };


 // Publisher側の使用例
 
 #include "Publisher.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttPublisher publisher;
    publisher.connectToHost("<IPアドレスまたはホスト名  例: localhost>", <MQTT通信するポート番号  : 1883>);
 
    // 定期的にメッセージを送信
    QTimer timer;
    QObject::connect(&timer, &QTimer::timeout, [&publisher]() {
       publisher.publishMessage("qt/topic", "Hello, MQTT!");
    });
 
    timer.start(1000);  // 1秒おきに送信
    
    return a.exec();
 }


受信 (MQTTサブスクライバ側)

以下の例では、受信したトピックの購読 (サブスクライブ) している。

 // Subscriber.hファイル
 
 #include <QObject>
 #include <QtMqtt/QMqttClient>
 #include <QtMqtt/QMqttSubscription>
 #include <QTimer>
 #include <QDebug>
 
 class MqttSubscriber : public QObject
 {
    Q_OBJECT
 
 private:
    QMqttClient *m_client;
    QString     m_host;
    quint16     m_port;
    QMap<QString, QMqttSubscription*> m_subscriptions;
 
 public:
    explicit MqttSubscriber(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
    {
       // MQTT接続状態変更時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::stateChanged, this, &MqttSubscriber::handleConnectionStateChange);
 
       // エラー発生時のシグナルをハンドラに接続
       connect(m_client, &QMqttClient::errorChanged, this, &MqttSubscriber::handleError);
    }
 
    ~MqttSubscriber()
    {
       // 全ての購読を解除
       for (auto subscription : m_subscriptions) {
          subscription->unsubscribe();
          delete subscription;
       }
 
       m_subscriptions.clear();
 
      if (m_client->state() == QMqttClient::Connected) {
         m_client->disconnect();
      }
   }
 
    // MQTT接続を開始する
    void connectToHost(const QString &host, quint16 port)
    {
       m_host = host;
       m_port = port;
 
       // 接続パラメータを設定
       m_client->setHostname(host);
       m_client->setPort(port);
 
       // 非同期で接続を開始
       m_client->connectToHost();
    }
 
    // トピックを購読する
    void subscribe(const QString &topic)
    {
       if (m_client->state() != QMqttClient::Connected) {
          qWarning() << "MQTTクライアントが接続されていません";
          return;
       }
 
       // 既に購読済みのトピックは無視
       if (m_subscriptions.contains(topic)) {
          qInfo() << "トピックは既に購読されています:" << topic;
          return;
       }
 
       // QoS 1でトピックを購読 (メッセージ到達保証あり)
       auto subscription = m_client->subscribe(topic, 1);
       if (!subscription) {
          qWarning() << "トピックの購読に失敗しました:" << topic;
          return;
       }
 
       // メッセージ受信時のハンドラを設定
       connect(subscription, &QMqttSubscription::messageReceived, this, &MqttSubscriber::handleMessage);
 
       // 購読リストに追加
       m_subscriptions.insert(topic, subscription);
       qInfo() << "トピックを購読開始しました:" << topic;
    }
 
    // 接続を切断する
    void disconnect()
    {
       if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
       }
    }
 
 signals:
    // メッセージ受信時のシグナル
    void messageReceived(const QString &topic, const QByteArray &message);
 
 private slots:
    // MQTT接続状態が変化した時のハンドラ
    void handleConnectionStateChange()
    {
       switch (m_client->state()) {
          case QMqttClient::Connected:
             qInfo() << "MQTTブローカーに接続しました";
             break;
          case QMqttClient::Disconnected:
             qInfo() << "MQTTブローカーから切断されました";
             break;
          case QMqttClient::Connecting:
             qInfo() << "MQTTブローカーに接続中...";
             break;
          default:
             qWarning() << "不明な接続状態です: " << m_client->state();
             break;
       }
    }
 
    // エラー発生時のハンドラ
    void handleError(QMqttClient::ClientError error)
    {
       QString errorMsg;
       switch (error) {
          case QMqttClient::NoError:
             return;
          case QMqttClient::InvalidProtocolVersion:
             errorMsg = "無効なプロトコルバージョンです";
             break;
          case QMqttClient::IdRejected:
             errorMsg = "クライアントIDが拒否されました";
             break;
          case QMqttClient::ServerUnavailable:
             errorMsg = "サーバが利用できません";
             break;
          case QMqttClient::BadUsernameOrPassword:
             errorMsg = "ユーザ名またはパスワードが無効です";
             break;
          case QMqttClient::NotAuthorized:
             errorMsg = "認証されていません";
             break;
          case QMqttClient::TransportInvalid:
             errorMsg = "トランスポートが無効です";
             break;
          case QMqttClient::ProtocolViolation:
             errorMsg = "プロトコル違反が発生しました";
             break;
          case QMqttClient::UnknownError:
             errorMsg = "不明なエラーが発生しました";
             break;
          default:
             errorMsg = "予期せぬエラーが発生しました";
             break;
       }
 
       qCritical() << "MQTTエラー:" << errorMsg;
 
       // エラー発生時は再接続を試みる
       if (m_client->state() != QMqttClient::Connected) {
          QTimer::singleShot(5000, [this]() {
             qInfo() << "再接続を試みます...";
             this->connectToHost(m_host, m_port);
          });
       }
    }
 
    // メッセージ受信時のハンドラ
    void handleMessage(const QByteArray &message, const QMqttTopicName &topic)
    {
       // メッセージを受信したことをログに記録
       qInfo() << "メッセージを受信しました - トピック: " << topic.name();
       qInfo() << "メッセージ: " << message;
 
       // シグナルを発行して、アプリケーションの他の部分に通知
       emit messageReceived(topic.name(), message);
    }
 };


 // Subscriber側の使用例
 
 #include "Subscriber.h"
 
 int main(int argc, char *argv[])
 {
    QCoreApplication a(argc, argv);
 
    MqttSubscriber subscriber;
    subscriber.connectToHost("localhost", 1883);
 
    // メッセージ受信時の処理
    QObject::connect(&subscriber, &MqttSubscriber::messageReceived, [](const QString &topic, const QByteArray &message) {
       qDebug() << "受信 - トピック: " << topic;
       qDebug() << "メッセージ: "  << message;
    });
 
    subscriber.subscribe("qt/topic");
 
    return a.exec();
 }