📢 Webサイト閉鎖と移転のお知らせ
このWebサイトは2026年9月に閉鎖いたします。
新しい記事は移転先で追加しております。(旧サイトでは記事を追加しておりません)

411行目: 411行目:
   
   
     subscriber.subscribe("qt/topic");
     subscriber.subscribe("qt/topic");
    return a.exec();
}
</syntaxhighlight>
<br>
===== 送受信 (MQTTパブリッシャー / MQTTサブスクライバ) =====
以下の例では、トピックの送受信を行っている。<br>
<br>
* クラス構造
** MqttWorker
**: MQTT通信の実際の処理を担当
** MqttManager
**: アプリケーションとワーカー間のインターフェース
*: <br>
* スレッド管理
*: MQTTの処理を専用スレッドで実行
*: シグナル / スロットによる安全な通信
*: <br>
* エラーハンドリング
*: 全てのエラーはerrorOccurredシグナルで通知
*: 接続問題は自動的に処理
*: 包括的なエラー状態の検出と通知
*: デバッグ情報の出力
*: <br>
* 非同期処理
*: 全ての操作が非ブロッキング
*: メッセージキューイングによる信頼性の確保
<br>
<syntaxhighlight lang="c++">
// Mqttworker.h
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QThread>
#include <QQueue>
#include <QMutex>
/**
  * @brief MQTTの通信処理を担当するワーカークラス
  *
  * このクラスは別スレッドで動作して、MQTT通信に関する以下に示す機能を提供する
  * - MQTTブローカーへの接続 / 切断
  * - メッセージの発行 (パブリッシュ)
  * - トピックの購読(サブスクライブ)
  * - メッセージの受信と通知
  *
  * スレッドセーフな設計となっており、メッセージキューイング機能も備えている
  */
class MqttWorker : public QObject
{
    Q_OBJECT
private:
    QMqttClient *m_client;                                  // MQTTクライアントインスタンス
    QQueue<QPair<QString, QByteArray>> m_messageQueue;      // 未送信メッセージのキュー
    QMutex m_mutex;                                        // スレッド同期用ミューテックス
    bool m_isConnected;                                    // 現在の接続状態
public:
    /**
    * @brief コンストラクタ
    * @param parent 親オブジェクト(デフォルトはnullptr)
    */
    explicit MqttWorker(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this)), m_isConnected(false)
    {
      // MQTTクライアントのシグナルとワーカーのスロットを接続
      connect(m_client, &QMqttClient::messageReceived, this, &MqttWorker::handleMessage);
      connect(m_client, &QMqttClient::stateChanged, this, &MqttWorker::handleStateChange);
      connect(m_client, &QMqttClient::errorChanged, this, &MqttWorker::handleError);
    }
    /**
    * @brief デストラクタ
    * 接続中の場合は切断処理を行います
    */
    ~MqttWorker()
    {
      if (m_client->state() == QMqttClient::Connected) {
          m_client->disconnectFromHost();
      }
    }
public slots:
    /**
    * @brief MQTTブローカーへの接続を開始
    * @param host ブローカーのホスト名またはIPアドレス
    * @param port ブローカーのポート番号
    *
    * このメソッドは非同期で実行され、接続状態の変更はconnectionStateChangedシグナルで通知される
    */
    void connectToHost(const QString &host, quint16 port)
    {
      qDebug() << "ワーカースレッド開始: " << QThread::currentThread();
      m_client->setHostname(host);
      m_client->setPort(port);
      m_client->connectToHost();
    }
    /**
    * @brief 指定されたトピックにメッセージを発行
    * @param topic 発行先のトピック
    * @param payload 送信するメッセージ内容
    *
    * 未接続時はメッセージをキューに保存し、接続時に自動的に送信する
    * QoS 1を使用して、メッセージの到達を保証する
    */
    void publishMessage(const QString &topic, const QByteArray &payload)
    {
      QMutexLocker locker(&m_mutex);
      if (!m_isConnected) {
          // 未接続時はメッセージをキューに保存
          m_messageQueue.enqueue(qMakePair(topic, payload));
          qDebug() << "メッセージをキューに保存: " << topic;
          return;
      }
      auto publish = m_client->publish(topic, payload, 1);
      if (!publish) {
          emit errorOccurred("メッセージの発行に失敗しました: " + topic);
          qWarning() << "メッセージ発行失敗: " << topic;
      }
      else {
          qDebug() << "メッセージを発行:" << topic;
      }
    }
    /**
    * @brief 指定されたトピックを購読
    * @param topic 購読するトピック名
    *
    * 接続済みの場合のみ購読を開始する
    * QoS 1を使用して、メッセージの到達を保証する
    */
    void subscribe(const QString &topic)
    {
      if (!m_isConnected) {
          emit errorOccurred("購読できません - 接続されていません");
          qWarning() << "購読失敗 - 未接続:" << topic;
          return;
      }
      auto subscription = m_client->subscribe(topic, 1);
      if (!subscription) {
          emit errorOccurred("トピックの購読に失敗しました: " + topic);
          qWarning() << "購読失敗:" << topic;
      }
      else {
          qDebug() << "トピックを購読開始:" << topic;
      }
    }
signals:
    /**
    * @brief メッセージ受信時に発行されるシグナル
    * @param topic 受信したメッセージのトピック
    * @param message 受信したメッセージの内容
    */
    void messageReceived(const QString &topic, const QByteArray &message);
    /**
    * @brief 接続状態が変化した時に発行されるシグナル
    * @param connected 接続状態 (true: 接続済み, false: 未接続)
    */
    void connectionStateChanged(bool connected);
    /**
    * @brief エラー発生時に発行されるシグナル
    * @param error エラーメッセージ
    */
    void errorOccurred(const QString &error);
private slots:
    /**
    * @brief メッセージ受信時の処理
    * @param message 受信したメッセージ
    * @param topic 受信したトピック
    */
    void handleMessage(const QByteArray &message, const QMqttTopicName &topic) {
        emit messageReceived(topic.name(), message);
        qDebug() << "メッセージを受信:" << topic.name() << message;
    }
    /**
    * @brief 接続状態変更時の処理
    *
    * 接続完了時にキューに保存されたメッセージの送信を試みます
    */
    void handleStateChange()
    {
      m_isConnected = (m_client->state() == QMqttClient::Connected);
      emit connectionStateChanged(m_isConnected);
      if (m_isConnected) {
          qDebug() << "MQTT接続完了";
          // 接続時にキューのメッセージを処理
          QMutexLocker locker(&m_mutex);
          while (!m_messageQueue.isEmpty()) {
            auto message = m_messageQueue.dequeue();
            publishMessage(message.first, message.second);
          }
      }
      else {
          qDebug() << "MQTT切断";
      }
    }
    /**
    * @brief エラー発生時の処理
    * @param error 発生したエラーの種類
    *
    * エラーの種類に応じて適切なメッセージを生成し通知します
    */
    void handleError(QMqttClient::ClientError error)
    {
      QString errorMessage;
      switch (error) {
          case QMqttClient::NoError:
            return;
          case QMqttClient::InvalidProtocolVersion:
            errorMessage = "無効なプロトコルバージョン";
            break;
          case QMqttClient::IdRejected:
            errorMessage = "クライアントID拒否";
            break;
          case QMqttClient::ServerUnavailable:
            errorMessage = "サーバー利用不可";
            break;
          case QMqttClient::BadUsernameOrPassword:
            errorMessage = "認証エラー";
            break;
          case QMqttClient::NotAuthorized:
            errorMessage = "認可エラー";
            break;
          case QMqttClient::TransportInvalid:
            errorMessage = "トランスポートエラー";
            break;
          case QMqttClient::ProtocolViolation:
            errorMessage = "プロトコル違反";
            break;
          case QMqttClient::UnknownError:
            errorMessage = "不明なエラー";
            break;
          default:
            errorMessage = "予期せぬエラー";
            break;
      }
      emit errorOccurred(errorMessage);
      qCritical() << "MQTTエラー:" << errorMessage;
    }
};
</syntaxhighlight>
<br>
<syntaxhighlight lang="c++">
// Mqttmanager.hファイル
#include <QObject>
#include <QThread>
#include "Mqttworker.h"
/**
  * @brief MQTT通信を管理するマネージャークラス
  *
  * このクラスは以下に示す機能を提供する
  * - MQTTワーカーの生成と管理
  * - ワーカースレッドの制御
  * - MQTT通信操作のインターフェース提供
  *
  * アプリケーションはこのクラスを通じてMQTT通信を利用する
  * 全ての操作は非同期で実行されて、メインスレッドをブロックしない
  */
class MqttManager : public QObject
{
    Q_OBJECT
private:
    QThread m_workerThread;  // ワーカー用スレッド
    MqttWorker *m_worker;    // MQTTワーカーインスタンス
public:
    /**
    * @brief コンストラクタ
    * @param parent 親オブジェクト(デフォルトはnullptr)
    *
    * ワーカーオブジェクトを生成して、専用スレッドで実行を開始する
    * 必要なシグナル / スロット接続も確立する
    */
    explicit MqttManager(QObject *parent = nullptr) : QObject(parent), m_worker(new MqttWorker)
    {
      // ワーカーを別スレッドに移動
      m_worker->moveToThread(&m_workerThread);
      // マネージャーからワーカーへのシグナル接続
      connect(this, &MqttManager::connectRequested, m_worker, &MqttWorker::connectToHost);
      connect(this, &MqttManager::publishRequested, m_worker, &MqttWorker::publishMessage);
      connect(this, &MqttManager::subscribeRequested, m_worker, &MqttWorker::subscribe);
      // ワーカーからマネージャーへのシグナル接続
      connect(m_worker, &MqttWorker::messageReceived, this, &MqttManager::messageReceived);
      connect(m_worker, &MqttWorker::connectionStateChanged, this, &MqttManager::connectionStateChanged);
      connect(m_worker, &MqttWorker::errorOccurred, this, &MqttManager::errorOccurred);
      // スレッド終了時のクリーンアップ設定
      connect(&m_workerThread, &QThread::finished, m_worker, &MqttWorker::deleteLater);
      // ワーカースレッドを開始
      m_workerThread.start();
      qDebug() << "MQTTマネージャー初期化完了";
    }
    /**
    * @brief デストラクタ
    *
    * ワーカースレッドを適切に終了して、リソースを解放する
    */
    ~MqttManager()
    {
      m_workerThread.quit();
      m_workerThread.wait();
      qDebug() << "MQTTマネージャー終了";
    }
    /**
    * @brief MQTTブローカーへの接続を要求
    * @param host ブローカーのホスト名またはIPアドレス
    * @param port ブローカーのポート番号
    */
    void connect(const QString &host, quint16 port)
    {
        emit connectRequested(host, port);
        qDebug() << "接続要求:" << host << port;
    }
    /**
    * @brief メッセージの発行を要求
    * @param topic 発行先のトピック
    * @param payload 送信するメッセージ内容
    */
    void publish(const QString &topic, const QByteArray &payload)
    {
      emit publishRequested(topic, payload);
      qDebug() << "発行要求:" << topic;
    }
    /**
    * @brief トピックの購読を要求
    * @param topic 購読するトピック名
    */
    void subscribe(const QString &topic)
    {
        emit subscribeRequested(topic);
        qDebug() << "購読要求:" << topic;
    }
signals:
    // ワーカーへの要求シグナル
    void connectRequested(const QString &host, quint16 port);              // 接続要求シグナル
    void publishRequested(const QString &topic, const QByteArray &payload); // 発行要求シグナル
    void subscribeRequested(const QString &topic);                          // 購読要求シグナル
    // アプリケーションへの通知シグナル
    void messageReceived(const QString &topic, const QByteArray &message);  // メッセージ受信通知
    void connectionStateChanged(bool connected);                            // 接続状態変更通知
    void errorOccurred(const QString &error);                              // エラー発生通知
};
</syntaxhighlight>
<br>
<syntaxhighlight lang="c++">
// 使用例 : main.cppファイル
#include <QCoreApplication>
#include "Mqttmanager.h"
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    // MQTTマネージャーのインスタンスを生成
    MqttManager manager;
    // 接続状態変更時の処理
    QObject::connect(&manager, &MqttManager::connectionStateChanged, [](bool connected) {
      if (connected) {
          qDebug() << "MQTT接続完了";
      }
      else {
          qDebug() << "MQTT切断";
      }
    });
    // メッセージ受信時の処理
    QObject::connect(&manager, &MqttManager::messageReceived, [](const QString &topic, const QByteArray &message) {
      qDebug() << "メッセージ受信 - トピック: " << topic;
      qDebug() << "内容: " << message;
    });
    // エラー発生時の処理
    QObject::connect(&manager, &MqttManager::errorOccurred, [](const QString &error) {
      qCritical() << "MQTTエラー: " << error;
    });
    // MQTTブローカーへの接続
    manager.connect("<IPアドレスまたはホスト名  例: localhost>", <ポート番号  例: 1883>);
    // トピックの購読
    manager.subscribe("qt/topic");
    // メッセージの発行
    QTimer::singleShot(1000, [&manager]() {
      manager.publish("qt/topic", "Hello、MQTT!");
    });
   
   
     return a.exec();
     return a.exec();