📢 Webサイト閉鎖と移転のお知らせ
このWebサイトは2026年9月に閉鎖いたします。
新しい記事は移転先で追加しております。(旧サイトでは記事を追加しておりません)
| 411行目: | 411行目: | ||
subscriber.subscribe("qt/topic"); | subscriber.subscribe("qt/topic"); | ||
return a.exec(); | |||
} | |||
</syntaxhighlight> | |||
<br> | |||
===== 送受信 (MQTTパブリッシャー / MQTTサブスクライバ) ===== | |||
以下の例では、トピックの送受信を行っている。<br> | |||
<br> | |||
* クラス構造 | |||
** MqttWorker | |||
**: MQTT通信の実際の処理を担当 | |||
** MqttManager | |||
**: アプリケーションとワーカー間のインターフェース | |||
*: <br> | |||
* スレッド管理 | |||
*: MQTTの処理を専用スレッドで実行 | |||
*: シグナル / スロットによる安全な通信 | |||
*: <br> | |||
* エラーハンドリング | |||
*: 全てのエラーはerrorOccurredシグナルで通知 | |||
*: 接続問題は自動的に処理 | |||
*: 包括的なエラー状態の検出と通知 | |||
*: デバッグ情報の出力 | |||
*: <br> | |||
* 非同期処理 | |||
*: 全ての操作が非ブロッキング | |||
*: メッセージキューイングによる信頼性の確保 | |||
<br> | |||
<syntaxhighlight lang="c++"> | |||
// Mqttworker.h | |||
#include <QObject> | |||
#include <QtMqtt/QMqttClient> | |||
#include <QThread> | |||
#include <QQueue> | |||
#include <QMutex> | |||
/** | |||
* @brief MQTTの通信処理を担当するワーカークラス | |||
* | |||
* このクラスは別スレッドで動作して、MQTT通信に関する以下に示す機能を提供する | |||
* - MQTTブローカーへの接続 / 切断 | |||
* - メッセージの発行 (パブリッシュ) | |||
* - トピックの購読(サブスクライブ) | |||
* - メッセージの受信と通知 | |||
* | |||
* スレッドセーフな設計となっており、メッセージキューイング機能も備えている | |||
*/ | |||
class MqttWorker : public QObject | |||
{ | |||
Q_OBJECT | |||
private: | |||
QMqttClient *m_client; // MQTTクライアントインスタンス | |||
QQueue<QPair<QString, QByteArray>> m_messageQueue; // 未送信メッセージのキュー | |||
QMutex m_mutex; // スレッド同期用ミューテックス | |||
bool m_isConnected; // 現在の接続状態 | |||
public: | |||
/** | |||
* @brief コンストラクタ | |||
* @param parent 親オブジェクト(デフォルトはnullptr) | |||
*/ | |||
explicit MqttWorker(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this)), m_isConnected(false) | |||
{ | |||
// MQTTクライアントのシグナルとワーカーのスロットを接続 | |||
connect(m_client, &QMqttClient::messageReceived, this, &MqttWorker::handleMessage); | |||
connect(m_client, &QMqttClient::stateChanged, this, &MqttWorker::handleStateChange); | |||
connect(m_client, &QMqttClient::errorChanged, this, &MqttWorker::handleError); | |||
} | |||
/** | |||
* @brief デストラクタ | |||
* 接続中の場合は切断処理を行います | |||
*/ | |||
~MqttWorker() | |||
{ | |||
if (m_client->state() == QMqttClient::Connected) { | |||
m_client->disconnectFromHost(); | |||
} | |||
} | |||
public slots: | |||
/** | |||
* @brief MQTTブローカーへの接続を開始 | |||
* @param host ブローカーのホスト名またはIPアドレス | |||
* @param port ブローカーのポート番号 | |||
* | |||
* このメソッドは非同期で実行され、接続状態の変更はconnectionStateChangedシグナルで通知される | |||
*/ | |||
void connectToHost(const QString &host, quint16 port) | |||
{ | |||
qDebug() << "ワーカースレッド開始: " << QThread::currentThread(); | |||
m_client->setHostname(host); | |||
m_client->setPort(port); | |||
m_client->connectToHost(); | |||
} | |||
/** | |||
* @brief 指定されたトピックにメッセージを発行 | |||
* @param topic 発行先のトピック | |||
* @param payload 送信するメッセージ内容 | |||
* | |||
* 未接続時はメッセージをキューに保存し、接続時に自動的に送信する | |||
* QoS 1を使用して、メッセージの到達を保証する | |||
*/ | |||
void publishMessage(const QString &topic, const QByteArray &payload) | |||
{ | |||
QMutexLocker locker(&m_mutex); | |||
if (!m_isConnected) { | |||
// 未接続時はメッセージをキューに保存 | |||
m_messageQueue.enqueue(qMakePair(topic, payload)); | |||
qDebug() << "メッセージをキューに保存: " << topic; | |||
return; | |||
} | |||
auto publish = m_client->publish(topic, payload, 1); | |||
if (!publish) { | |||
emit errorOccurred("メッセージの発行に失敗しました: " + topic); | |||
qWarning() << "メッセージ発行失敗: " << topic; | |||
} | |||
else { | |||
qDebug() << "メッセージを発行:" << topic; | |||
} | |||
} | |||
/** | |||
* @brief 指定されたトピックを購読 | |||
* @param topic 購読するトピック名 | |||
* | |||
* 接続済みの場合のみ購読を開始する | |||
* QoS 1を使用して、メッセージの到達を保証する | |||
*/ | |||
void subscribe(const QString &topic) | |||
{ | |||
if (!m_isConnected) { | |||
emit errorOccurred("購読できません - 接続されていません"); | |||
qWarning() << "購読失敗 - 未接続:" << topic; | |||
return; | |||
} | |||
auto subscription = m_client->subscribe(topic, 1); | |||
if (!subscription) { | |||
emit errorOccurred("トピックの購読に失敗しました: " + topic); | |||
qWarning() << "購読失敗:" << topic; | |||
} | |||
else { | |||
qDebug() << "トピックを購読開始:" << topic; | |||
} | |||
} | |||
signals: | |||
/** | |||
* @brief メッセージ受信時に発行されるシグナル | |||
* @param topic 受信したメッセージのトピック | |||
* @param message 受信したメッセージの内容 | |||
*/ | |||
void messageReceived(const QString &topic, const QByteArray &message); | |||
/** | |||
* @brief 接続状態が変化した時に発行されるシグナル | |||
* @param connected 接続状態 (true: 接続済み, false: 未接続) | |||
*/ | |||
void connectionStateChanged(bool connected); | |||
/** | |||
* @brief エラー発生時に発行されるシグナル | |||
* @param error エラーメッセージ | |||
*/ | |||
void errorOccurred(const QString &error); | |||
private slots: | |||
/** | |||
* @brief メッセージ受信時の処理 | |||
* @param message 受信したメッセージ | |||
* @param topic 受信したトピック | |||
*/ | |||
void handleMessage(const QByteArray &message, const QMqttTopicName &topic) { | |||
emit messageReceived(topic.name(), message); | |||
qDebug() << "メッセージを受信:" << topic.name() << message; | |||
} | |||
/** | |||
* @brief 接続状態変更時の処理 | |||
* | |||
* 接続完了時にキューに保存されたメッセージの送信を試みます | |||
*/ | |||
void handleStateChange() | |||
{ | |||
m_isConnected = (m_client->state() == QMqttClient::Connected); | |||
emit connectionStateChanged(m_isConnected); | |||
if (m_isConnected) { | |||
qDebug() << "MQTT接続完了"; | |||
// 接続時にキューのメッセージを処理 | |||
QMutexLocker locker(&m_mutex); | |||
while (!m_messageQueue.isEmpty()) { | |||
auto message = m_messageQueue.dequeue(); | |||
publishMessage(message.first, message.second); | |||
} | |||
} | |||
else { | |||
qDebug() << "MQTT切断"; | |||
} | |||
} | |||
/** | |||
* @brief エラー発生時の処理 | |||
* @param error 発生したエラーの種類 | |||
* | |||
* エラーの種類に応じて適切なメッセージを生成し通知します | |||
*/ | |||
void handleError(QMqttClient::ClientError error) | |||
{ | |||
QString errorMessage; | |||
switch (error) { | |||
case QMqttClient::NoError: | |||
return; | |||
case QMqttClient::InvalidProtocolVersion: | |||
errorMessage = "無効なプロトコルバージョン"; | |||
break; | |||
case QMqttClient::IdRejected: | |||
errorMessage = "クライアントID拒否"; | |||
break; | |||
case QMqttClient::ServerUnavailable: | |||
errorMessage = "サーバー利用不可"; | |||
break; | |||
case QMqttClient::BadUsernameOrPassword: | |||
errorMessage = "認証エラー"; | |||
break; | |||
case QMqttClient::NotAuthorized: | |||
errorMessage = "認可エラー"; | |||
break; | |||
case QMqttClient::TransportInvalid: | |||
errorMessage = "トランスポートエラー"; | |||
break; | |||
case QMqttClient::ProtocolViolation: | |||
errorMessage = "プロトコル違反"; | |||
break; | |||
case QMqttClient::UnknownError: | |||
errorMessage = "不明なエラー"; | |||
break; | |||
default: | |||
errorMessage = "予期せぬエラー"; | |||
break; | |||
} | |||
emit errorOccurred(errorMessage); | |||
qCritical() << "MQTTエラー:" << errorMessage; | |||
} | |||
}; | |||
</syntaxhighlight> | |||
<br> | |||
<syntaxhighlight lang="c++"> | |||
// Mqttmanager.hファイル | |||
#include <QObject> | |||
#include <QThread> | |||
#include "Mqttworker.h" | |||
/** | |||
* @brief MQTT通信を管理するマネージャークラス | |||
* | |||
* このクラスは以下に示す機能を提供する | |||
* - MQTTワーカーの生成と管理 | |||
* - ワーカースレッドの制御 | |||
* - MQTT通信操作のインターフェース提供 | |||
* | |||
* アプリケーションはこのクラスを通じてMQTT通信を利用する | |||
* 全ての操作は非同期で実行されて、メインスレッドをブロックしない | |||
*/ | |||
class MqttManager : public QObject | |||
{ | |||
Q_OBJECT | |||
private: | |||
QThread m_workerThread; // ワーカー用スレッド | |||
MqttWorker *m_worker; // MQTTワーカーインスタンス | |||
public: | |||
/** | |||
* @brief コンストラクタ | |||
* @param parent 親オブジェクト(デフォルトはnullptr) | |||
* | |||
* ワーカーオブジェクトを生成して、専用スレッドで実行を開始する | |||
* 必要なシグナル / スロット接続も確立する | |||
*/ | |||
explicit MqttManager(QObject *parent = nullptr) : QObject(parent), m_worker(new MqttWorker) | |||
{ | |||
// ワーカーを別スレッドに移動 | |||
m_worker->moveToThread(&m_workerThread); | |||
// マネージャーからワーカーへのシグナル接続 | |||
connect(this, &MqttManager::connectRequested, m_worker, &MqttWorker::connectToHost); | |||
connect(this, &MqttManager::publishRequested, m_worker, &MqttWorker::publishMessage); | |||
connect(this, &MqttManager::subscribeRequested, m_worker, &MqttWorker::subscribe); | |||
// ワーカーからマネージャーへのシグナル接続 | |||
connect(m_worker, &MqttWorker::messageReceived, this, &MqttManager::messageReceived); | |||
connect(m_worker, &MqttWorker::connectionStateChanged, this, &MqttManager::connectionStateChanged); | |||
connect(m_worker, &MqttWorker::errorOccurred, this, &MqttManager::errorOccurred); | |||
// スレッド終了時のクリーンアップ設定 | |||
connect(&m_workerThread, &QThread::finished, m_worker, &MqttWorker::deleteLater); | |||
// ワーカースレッドを開始 | |||
m_workerThread.start(); | |||
qDebug() << "MQTTマネージャー初期化完了"; | |||
} | |||
/** | |||
* @brief デストラクタ | |||
* | |||
* ワーカースレッドを適切に終了して、リソースを解放する | |||
*/ | |||
~MqttManager() | |||
{ | |||
m_workerThread.quit(); | |||
m_workerThread.wait(); | |||
qDebug() << "MQTTマネージャー終了"; | |||
} | |||
/** | |||
* @brief MQTTブローカーへの接続を要求 | |||
* @param host ブローカーのホスト名またはIPアドレス | |||
* @param port ブローカーのポート番号 | |||
*/ | |||
void connect(const QString &host, quint16 port) | |||
{ | |||
emit connectRequested(host, port); | |||
qDebug() << "接続要求:" << host << port; | |||
} | |||
/** | |||
* @brief メッセージの発行を要求 | |||
* @param topic 発行先のトピック | |||
* @param payload 送信するメッセージ内容 | |||
*/ | |||
void publish(const QString &topic, const QByteArray &payload) | |||
{ | |||
emit publishRequested(topic, payload); | |||
qDebug() << "発行要求:" << topic; | |||
} | |||
/** | |||
* @brief トピックの購読を要求 | |||
* @param topic 購読するトピック名 | |||
*/ | |||
void subscribe(const QString &topic) | |||
{ | |||
emit subscribeRequested(topic); | |||
qDebug() << "購読要求:" << topic; | |||
} | |||
signals: | |||
// ワーカーへの要求シグナル | |||
void connectRequested(const QString &host, quint16 port); // 接続要求シグナル | |||
void publishRequested(const QString &topic, const QByteArray &payload); // 発行要求シグナル | |||
void subscribeRequested(const QString &topic); // 購読要求シグナル | |||
// アプリケーションへの通知シグナル | |||
void messageReceived(const QString &topic, const QByteArray &message); // メッセージ受信通知 | |||
void connectionStateChanged(bool connected); // 接続状態変更通知 | |||
void errorOccurred(const QString &error); // エラー発生通知 | |||
}; | |||
</syntaxhighlight> | |||
<br> | |||
<syntaxhighlight lang="c++"> | |||
// 使用例 : main.cppファイル | |||
#include <QCoreApplication> | |||
#include "Mqttmanager.h" | |||
int main(int argc, char *argv[]) | |||
{ | |||
QCoreApplication a(argc, argv); | |||
// MQTTマネージャーのインスタンスを生成 | |||
MqttManager manager; | |||
// 接続状態変更時の処理 | |||
QObject::connect(&manager, &MqttManager::connectionStateChanged, [](bool connected) { | |||
if (connected) { | |||
qDebug() << "MQTT接続完了"; | |||
} | |||
else { | |||
qDebug() << "MQTT切断"; | |||
} | |||
}); | |||
// メッセージ受信時の処理 | |||
QObject::connect(&manager, &MqttManager::messageReceived, [](const QString &topic, const QByteArray &message) { | |||
qDebug() << "メッセージ受信 - トピック: " << topic; | |||
qDebug() << "内容: " << message; | |||
}); | |||
// エラー発生時の処理 | |||
QObject::connect(&manager, &MqttManager::errorOccurred, [](const QString &error) { | |||
qCritical() << "MQTTエラー: " << error; | |||
}); | |||
// MQTTブローカーへの接続 | |||
manager.connect("<IPアドレスまたはホスト名 例: localhost>", <ポート番号 例: 1883>); | |||
// トピックの購読 | |||
manager.subscribe("qt/topic"); | |||
// メッセージの発行 | |||
QTimer::singleShot(1000, [&manager]() { | |||
manager.publish("qt/topic", "Hello、MQTT!"); | |||
}); | |||
return a.exec(); | return a.exec(); | ||