Qtの基礎 - MQTT
概要
QtでMQTT通信を実装する場合は、QtMqttモジュールを使用する。
このモジュールは、標準的なMQTTプロトコルの機能をカバーしており、プロジェクトファイルに簡単な設定を追加するだけで利用可能である。
中核となるQMqttClientクラスを使用することにより、MQTTブローカーへの接続、切断、メッセージの送受信等の基本的な操作が可能になる。
接続設定では、ホスト名、ポート番号、必要に応じて認証情報を指定する。
Qt特有のシグナル / スロットメカニズムを使用することで、接続状態の監視やメッセージの非同期処理を効率的に実装できる。
接続完了、切断、エラー発生等のイベントをリアルタイムで検知して、適切な処理を行うことができる。
パブリッシュ / サブスクライブ (メッセージの送受信) の実装も直感的である。
トピック名を指定してメッセージを発行、あるいは、特定のトピックをサブスクライブしてメッセージを受信することができる。
また、ワイルドカードを使用したトピックの購読にも対応している。
セキュリティ面においては、TLS/SSL通信やユーザ認証もサポートされている。
証明書の設定や認証情報の指定により、セキュアな通信を実現できる。
また、Last Will Testamentの設定やQoSレベルの指定といったMQTTプロトコルの高度な機能も利用可能である。
実装時において重要なことは、エラーハンドリングの適切な実装が挙げられる。
ネットワークの切断、再接続、タイムアウト等の状況に適切に対応することにより、安定したアプリケーションを構築することができる。
また、大量のメッセージを送受信する場合は、非同期処理を活用してUIのブロッキングを防ぐことが重要である。
MQTT通信
技術的な制約
- メモリ関連の制約
- QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。
- 特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。
- ネットワーク関連の制約
- Qt MQTTはTCP/IP上で動作するため、ネットワークの状態に依存する。
- ファイアウォールや特定のポートがブロックされている環境では動作しない可能性がある。
- SSL/TLS接続を使用する場合、追加の設定と証明書の管理が必要である。
デバッグとトラブルシューティング
開発時は、MQTTクライアントツール (例: MQTT Explorer) を使用することにより、通信の様子を視覚的に確認できる。
また、シリアルモニタを活用して、接続状態やメッセージの送受信を確認する。
使用例
以下の例では、MQTT通信でトピックの送受信を行っている。
全てのMQTT操作 (接続、発行、購読) をシグナル/スロットを使用して非同期で実行している。
また、接続が失敗した場合は、自動的に再接続を行う。
送信 (MQTTパブリッシャー側)
// Publisher.h
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QTimer>
#include <QDebug>
class MqttPublisher : public QObject
{
Q_OBJECT
private:
QMqttClient *m_client;
QString m_host;
quint16 m_port;
public:
explicit MqttPublisher(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
{
// MQTT接続状態変更時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::stateChanged, this, &MqttPublisher::handleConnectionStateChange);
// エラー発生時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::errorChanged, this, &MqttPublisher::handleError);
}
~MqttPublisher()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnect();
}
}
// MQTT接続の開始
void connectToHost(const QString &host, quint16 port)
{
m_host = host;
m_port = port;
// 接続パラメータを設定
m_client->setHostname(host);
m_client->setPort(port);
// 非同期で接続を開始
m_client->connectToHost();
}
// メッセージの発行
void publishMessage(const QString &topic, const QByteArray &message)
{
if (m_client->state() != QMqttClient::Connected) {
qWarning() << "MQTTクライアントが接続されていません";
return;
}
// QoS 1でメッセージを発行 (メッセージ到達保証あり)
auto publish = m_client->publish(topic, message, 1);
if (!publish) {
qWarning() << "メッセージの発行に失敗しました: " << topic;
return;
}
}
// 接続の切断
void disconnect()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnectFromHost();
}
}
private slots:
// MQTT接続状態が変化した時のハンドラ
void handleConnectionStateChange()
{
switch (m_client->state()) {
case QMqttClient::Connected:
qInfo() << "MQTTブローカーに接続しました";
break;
case QMqttClient::Disconnected:
qInfo() << "MQTTブローカーから切断されました";
break;
case QMqttClient::Connecting:
qInfo() << "MQTTブローカーに接続中...";
break;
default:
qWarning() << "不明な接続状態です: " << m_client->state();
break;
}
}
// エラー発生時のハンドラ
void handleError(QMqttClient::ClientError error)
{
QString errorMsg;
switch (error) {
case QMqttClient::NoError:
return;
case QMqttClient::InvalidProtocolVersion:
errorMsg = "無効なプロトコルバージョンです";
break;
case QMqttClient::IdRejected:
errorMsg = "クライアントIDが拒否されました";
break;
case QMqttClient::ServerUnavailable:
errorMsg = "サーバーが利用できません";
break;
case QMqttClient::BadUsernameOrPassword:
errorMsg = "ユーザー名またはパスワードが無効です";
break;
case QMqttClient::NotAuthorized:
errorMsg = "認証されていません";
break;
case QMqttClient::TransportInvalid:
errorMsg = "トランスポートが無効です";
break;
case QMqttClient::ProtocolViolation:
errorMsg = "プロトコル違反が発生しました";
break;
case QMqttClient::UnknownError:
errorMsg = "不明なエラーが発生しました";
break;
default:
errorMsg = "予期せぬエラーが発生しました";
break;
}
qCritical() << "MQTTエラー:" << errorMsg;
// エラー発生時は再接続を試みる
if (m_client->state() != QMqttClient::Connected) {
QTimer::singleShot(5000, [this]() {
qInfo() << "再接続を試みます...";
this->connectToHost(m_host, m_port);
});
}
}
};
// Publisher側の使用例
#include "Publisher.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
MqttPublisher publisher;
publisher.connectToHost("<IPアドレスまたはホスト名 例: localhost>", <MQTT通信するポート番号 例: 1883>);
// 定期的にメッセージを送信
QTimer timer;
QObject::connect(&timer, &QTimer::timeout, [&publisher]() {
publisher.publishMessage("qt/topic", "Hello, MQTT!");
});
timer.start(1000); // 1秒おきに送信
return a.exec();
}
受信 (MQTTサブスクライバ側)
以下の例では、受信したトピックの購読 (サブスクライブ) している。
// Subscriber.hファイル
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QtMqtt/QMqttSubscription>
#include <QTimer>
#include <QDebug>
class MqttSubscriber : public QObject
{
Q_OBJECT
private:
QMqttClient *m_client;
QString m_host;
quint16 m_port;
QMap<QString, QMqttSubscription*> m_subscriptions;
public:
explicit MqttSubscriber(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
{
// MQTT接続状態変更時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::stateChanged, this, &MqttSubscriber::handleConnectionStateChange);
// エラー発生時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::errorChanged, this, &MqttSubscriber::handleError);
}
~MqttSubscriber()
{
// 全ての購読を解除
for (auto subscription : m_subscriptions) {
subscription->unsubscribe();
delete subscription;
}
m_subscriptions.clear();
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnect();
}
}
// MQTT接続を開始する
void connectToHost(const QString &host, quint16 port)
{
m_host = host;
m_port = port;
// 接続パラメータを設定
m_client->setHostname(host);
m_client->setPort(port);
// 非同期で接続を開始
m_client->connectToHost();
}
// トピックを購読する
void subscribe(const QString &topic)
{
if (m_client->state() != QMqttClient::Connected) {
qWarning() << "MQTTクライアントが接続されていません";
return;
}
// 既に購読済みのトピックは無視
if (m_subscriptions.contains(topic)) {
qInfo() << "トピックは既に購読されています:" << topic;
return;
}
// QoS 1でトピックを購読 (メッセージ到達保証あり)
auto subscription = m_client->subscribe(topic, 1);
if (!subscription) {
qWarning() << "トピックの購読に失敗しました:" << topic;
return;
}
// メッセージ受信時のハンドラを設定
connect(subscription, &QMqttSubscription::messageReceived, this, &MqttSubscriber::handleMessage);
// 購読リストに追加
m_subscriptions.insert(topic, subscription);
qInfo() << "トピックを購読開始しました:" << topic;
}
// 接続を切断する
void disconnect()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnectFromHost();
}
}
signals:
// メッセージ受信時のシグナル
void messageReceived(const QString &topic, const QByteArray &message);
private slots:
// MQTT接続状態が変化した時のハンドラ
void handleConnectionStateChange()
{
switch (m_client->state()) {
case QMqttClient::Connected:
qInfo() << "MQTTブローカーに接続しました";
break;
case QMqttClient::Disconnected:
qInfo() << "MQTTブローカーから切断されました";
break;
case QMqttClient::Connecting:
qInfo() << "MQTTブローカーに接続中...";
break;
default:
qWarning() << "不明な接続状態です: " << m_client->state();
break;
}
}
// エラー発生時のハンドラ
void handleError(QMqttClient::ClientError error)
{
QString errorMsg;
switch (error) {
case QMqttClient::NoError:
return;
case QMqttClient::InvalidProtocolVersion:
errorMsg = "無効なプロトコルバージョンです";
break;
case QMqttClient::IdRejected:
errorMsg = "クライアントIDが拒否されました";
break;
case QMqttClient::ServerUnavailable:
errorMsg = "サーバが利用できません";
break;
case QMqttClient::BadUsernameOrPassword:
errorMsg = "ユーザ名またはパスワードが無効です";
break;
case QMqttClient::NotAuthorized:
errorMsg = "認証されていません";
break;
case QMqttClient::TransportInvalid:
errorMsg = "トランスポートが無効です";
break;
case QMqttClient::ProtocolViolation:
errorMsg = "プロトコル違反が発生しました";
break;
case QMqttClient::UnknownError:
errorMsg = "不明なエラーが発生しました";
break;
default:
errorMsg = "予期せぬエラーが発生しました";
break;
}
qCritical() << "MQTTエラー:" << errorMsg;
// エラー発生時は再接続を試みる
if (m_client->state() != QMqttClient::Connected) {
QTimer::singleShot(5000, [this]() {
qInfo() << "再接続を試みます...";
this->connectToHost(m_host, m_port);
});
}
}
// メッセージ受信時のハンドラ
void handleMessage(const QByteArray &message, const QMqttTopicName &topic)
{
// メッセージを受信したことをログに記録
qInfo() << "メッセージを受信しました - トピック: " << topic.name();
qInfo() << "メッセージ: " << message;
// シグナルを発行して、アプリケーションの他の部分に通知
emit messageReceived(topic.name(), message);
}
};
// Subscriber側の使用例
#include "Subscriber.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
MqttSubscriber subscriber;
subscriber.connectToHost("localhost", 1883);
// メッセージ受信時の処理
QObject::connect(&subscriber, &MqttSubscriber::messageReceived, [](const QString &topic, const QByteArray &message) {
qDebug() << "受信 - トピック: " << topic;
qDebug() << "メッセージ: " << message;
});
subscriber.subscribe("qt/topic");
return a.exec();
}
送受信 (MQTTパブリッシャー / MQTTサブスクライバ)
以下の例では、トピックの送受信を行っている。
- クラス構造
- MqttWorker
- MQTT通信の実際の処理を担当
- MqttManager
- アプリケーションとワーカー間のインターフェース
- MqttWorker
- スレッド管理
- MQTTの処理を専用スレッドで実行
- シグナル / スロットによる安全な通信
- エラーハンドリング
- 全てのエラーはerrorOccurredシグナルで通知
- 接続問題は自動的に処理
- 包括的なエラー状態の検出と通知
- デバッグ情報の出力
- 非同期処理
- 全ての操作が非ブロッキング
- メッセージキューイングによる信頼性の確保
// Mqttworker.h
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QThread>
#include <QQueue>
#include <QMutex>
/**
* @brief MQTTの通信処理を担当するワーカークラス
*
* このクラスは別スレッドで動作して、MQTT通信に関する以下に示す機能を提供する
* - MQTTブローカーへの接続 / 切断
* - メッセージの発行 (パブリッシュ)
* - トピックの購読(サブスクライブ)
* - メッセージの受信と通知
*
* スレッドセーフな設計となっており、メッセージキューイング機能も備えている
*/
class MqttWorker : public QObject
{
Q_OBJECT
private:
QMqttClient *m_client; // MQTTクライアントインスタンス
QQueue<QPair<QString, QByteArray>> m_messageQueue; // 未送信メッセージのキュー
QMutex m_mutex; // スレッド同期用ミューテックス
bool m_isConnected; // 現在の接続状態
public:
/**
* @brief コンストラクタ
* @param parent 親オブジェクト(デフォルトはnullptr)
*/
explicit MqttWorker(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this)), m_isConnected(false)
{
// MQTTクライアントのシグナルとワーカーのスロットを接続
connect(m_client, &QMqttClient::messageReceived, this, &MqttWorker::handleMessage);
connect(m_client, &QMqttClient::stateChanged, this, &MqttWorker::handleStateChange);
connect(m_client, &QMqttClient::errorChanged, this, &MqttWorker::handleError);
}
/**
* @brief デストラクタ
* 接続中の場合は切断処理を行います
*/
~MqttWorker()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnectFromHost();
}
}
public slots:
/**
* @brief MQTTブローカーへの接続を開始
* @param host ブローカーのホスト名またはIPアドレス
* @param port ブローカーのポート番号
*
* このメソッドは非同期で実行され、接続状態の変更はconnectionStateChangedシグナルで通知される
*/
void connectToHost(const QString &host, quint16 port)
{
qDebug() << "ワーカースレッド開始: " << QThread::currentThread();
m_client->setHostname(host);
m_client->setPort(port);
m_client->connectToHost();
}
/**
* @brief 指定されたトピックにメッセージを発行
* @param topic 発行先のトピック
* @param payload 送信するメッセージ内容
*
* 未接続時はメッセージをキューに保存し、接続時に自動的に送信する
* QoS 1を使用して、メッセージの到達を保証する
*/
void publishMessage(const QString &topic, const QByteArray &payload)
{
QMutexLocker locker(&m_mutex);
if (!m_isConnected) {
// 未接続時はメッセージをキューに保存
m_messageQueue.enqueue(qMakePair(topic, payload));
qDebug() << "メッセージをキューに保存: " << topic;
return;
}
auto publish = m_client->publish(topic, payload, 1);
if (!publish) {
emit errorOccurred("メッセージの発行に失敗しました: " + topic);
qWarning() << "メッセージ発行失敗: " << topic;
}
else {
qDebug() << "メッセージを発行:" << topic;
}
}
/**
* @brief 指定されたトピックを購読
* @param topic 購読するトピック名
*
* 接続済みの場合のみ購読を開始する
* QoS 1を使用して、メッセージの到達を保証する
*/
void subscribe(const QString &topic)
{
if (!m_isConnected) {
emit errorOccurred("購読できません - 接続されていません");
qWarning() << "購読失敗 - 未接続:" << topic;
return;
}
auto subscription = m_client->subscribe(topic, 1);
if (!subscription) {
emit errorOccurred("トピックの購読に失敗しました: " + topic);
qWarning() << "購読失敗:" << topic;
}
else {
qDebug() << "トピックを購読開始:" << topic;
}
}
signals:
/**
* @brief メッセージ受信時に発行されるシグナル
* @param topic 受信したメッセージのトピック
* @param message 受信したメッセージの内容
*/
void messageReceived(const QString &topic, const QByteArray &message);
/**
* @brief 接続状態が変化した時に発行されるシグナル
* @param connected 接続状態 (true: 接続済み, false: 未接続)
*/
void connectionStateChanged(bool connected);
/**
* @brief エラー発生時に発行されるシグナル
* @param error エラーメッセージ
*/
void errorOccurred(const QString &error);
private slots:
/**
* @brief メッセージ受信時の処理
* @param message 受信したメッセージ
* @param topic 受信したトピック
*/
void handleMessage(const QByteArray &message, const QMqttTopicName &topic) {
emit messageReceived(topic.name(), message);
qDebug() << "メッセージを受信:" << topic.name() << message;
}
/**
* @brief 接続状態変更時の処理
*
* 接続完了時にキューに保存されたメッセージの送信を試みます
*/
void handleStateChange()
{
m_isConnected = (m_client->state() == QMqttClient::Connected);
emit connectionStateChanged(m_isConnected);
if (m_isConnected) {
qDebug() << "MQTT接続完了";
// 接続時にキューのメッセージを処理
QMutexLocker locker(&m_mutex);
while (!m_messageQueue.isEmpty()) {
auto message = m_messageQueue.dequeue();
publishMessage(message.first, message.second);
}
}
else {
qDebug() << "MQTT切断";
}
}
/**
* @brief エラー発生時の処理
* @param error 発生したエラーの種類
*
* エラーの種類に応じて適切なメッセージを生成し通知します
*/
void handleError(QMqttClient::ClientError error)
{
QString errorMessage;
switch (error) {
case QMqttClient::NoError:
return;
case QMqttClient::InvalidProtocolVersion:
errorMessage = "無効なプロトコルバージョン";
break;
case QMqttClient::IdRejected:
errorMessage = "クライアントID拒否";
break;
case QMqttClient::ServerUnavailable:
errorMessage = "サーバー利用不可";
break;
case QMqttClient::BadUsernameOrPassword:
errorMessage = "認証エラー";
break;
case QMqttClient::NotAuthorized:
errorMessage = "認可エラー";
break;
case QMqttClient::TransportInvalid:
errorMessage = "トランスポートエラー";
break;
case QMqttClient::ProtocolViolation:
errorMessage = "プロトコル違反";
break;
case QMqttClient::UnknownError:
errorMessage = "不明なエラー";
break;
default:
errorMessage = "予期せぬエラー";
break;
}
emit errorOccurred(errorMessage);
qCritical() << "MQTTエラー:" << errorMessage;
}
};
// Mqttmanager.hファイル
#include <QObject>
#include <QThread>
#include "Mqttworker.h"
/**
* @brief MQTT通信を管理するマネージャークラス
*
* このクラスは以下に示す機能を提供する
* - MQTTワーカーの生成と管理
* - ワーカースレッドの制御
* - MQTT通信操作のインターフェース提供
*
* アプリケーションはこのクラスを通じてMQTT通信を利用する
* 全ての操作は非同期で実行されて、メインスレッドをブロックしない
*/
class MqttManager : public QObject
{
Q_OBJECT
private:
QThread m_workerThread; // ワーカー用スレッド
MqttWorker *m_worker; // MQTTワーカーインスタンス
public:
/**
* @brief コンストラクタ
* @param parent 親オブジェクト(デフォルトはnullptr)
*
* ワーカーオブジェクトを生成して、専用スレッドで実行を開始する
* 必要なシグナル / スロット接続も確立する
*/
explicit MqttManager(QObject *parent = nullptr) : QObject(parent), m_worker(new MqttWorker)
{
// ワーカーを別スレッドに移動
m_worker->moveToThread(&m_workerThread);
// マネージャーからワーカーへのシグナル接続
connect(this, &MqttManager::connectRequested, m_worker, &MqttWorker::connectToHost);
connect(this, &MqttManager::publishRequested, m_worker, &MqttWorker::publishMessage);
connect(this, &MqttManager::subscribeRequested, m_worker, &MqttWorker::subscribe);
// ワーカーからマネージャーへのシグナル接続
connect(m_worker, &MqttWorker::messageReceived, this, &MqttManager::messageReceived);
connect(m_worker, &MqttWorker::connectionStateChanged, this, &MqttManager::connectionStateChanged);
connect(m_worker, &MqttWorker::errorOccurred, this, &MqttManager::errorOccurred);
// スレッド終了時のクリーンアップ設定
connect(&m_workerThread, &QThread::finished, m_worker, &MqttWorker::deleteLater);
// ワーカースレッドを開始
m_workerThread.start();
qDebug() << "MQTTマネージャー初期化完了";
}
/**
* @brief デストラクタ
*
* ワーカースレッドを適切に終了して、リソースを解放する
*/
~MqttManager()
{
m_workerThread.quit();
m_workerThread.wait();
qDebug() << "MQTTマネージャー終了";
}
/**
* @brief MQTTブローカーへの接続を要求
* @param host ブローカーのホスト名またはIPアドレス
* @param port ブローカーのポート番号
*/
void connect(const QString &host, quint16 port)
{
emit connectRequested(host, port);
qDebug() << "接続要求:" << host << port;
}
/**
* @brief メッセージの発行を要求
* @param topic 発行先のトピック
* @param payload 送信するメッセージ内容
*/
void publish(const QString &topic, const QByteArray &payload)
{
emit publishRequested(topic, payload);
qDebug() << "発行要求:" << topic;
}
/**
* @brief トピックの購読を要求
* @param topic 購読するトピック名
*/
void subscribe(const QString &topic)
{
emit subscribeRequested(topic);
qDebug() << "購読要求:" << topic;
}
signals:
// ワーカーへの要求シグナル
void connectRequested(const QString &host, quint16 port); // 接続要求シグナル
void publishRequested(const QString &topic, const QByteArray &payload); // 発行要求シグナル
void subscribeRequested(const QString &topic); // 購読要求シグナル
// アプリケーションへの通知シグナル
void messageReceived(const QString &topic, const QByteArray &message); // メッセージ受信通知
void connectionStateChanged(bool connected); // 接続状態変更通知
void errorOccurred(const QString &error); // エラー発生通知
};
// 使用例 : main.cppファイル
#include <QCoreApplication>
#include "Mqttmanager.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// MQTTマネージャーのインスタンスを生成
MqttManager manager;
// 接続状態変更時の処理
QObject::connect(&manager, &MqttManager::connectionStateChanged, [](bool connected) {
if (connected) {
qDebug() << "MQTT接続完了";
}
else {
qDebug() << "MQTT切断";
}
});
// メッセージ受信時の処理
QObject::connect(&manager, &MqttManager::messageReceived, [](const QString &topic, const QByteArray &message) {
qDebug() << "メッセージ受信 - トピック: " << topic;
qDebug() << "内容: " << message;
});
// エラー発生時の処理
QObject::connect(&manager, &MqttManager::errorOccurred, [](const QString &error) {
qCritical() << "MQTTエラー: " << error;
});
// MQTTブローカーへの接続
manager.connect("<IPアドレスまたはホスト名 例: localhost>", <ポート番号 例: 1883>);
// トピックの購読
manager.subscribe("qt/topic");
// メッセージの発行
QTimer::singleShot(1000, [&manager]() {
manager.publish("qt/topic", "Hello、MQTT!");
});
return a.exec();
}