概要
MQTT通信
技術的な制約
- メモリ関連の制約
- QtのMQTTクライアントは動的メモリ管理を使用するため、大量のメッセージを処理する場合はメモリ使用量に注意が必要となる。
- 特に、大きなペイロードを持つメッセージを頻繁に送受信する場合、メモリリークを防ぐための適切な解放処理が重要である。
- ネットワーク関連の制約
- Qt MQTTはTCP/IP上で動作するため、ネットワークの状態に依存する。
- ファイアウォールや特定のポートがブロックされている環境では動作しない可能性がある。
- SSL/TLS接続を使用する場合、追加の設定と証明書の管理が必要である。
デバッグとトラブルシューティング
開発時は、MQTTクライアントツール (例: MQTT Explorer) を使用することにより、通信の様子を視覚的に確認できる。
また、シリアルモニタを活用して、接続状態やメッセージの送受信を確認する。
使用例
以下の例では、MQTT通信でトピックの送受信を行っている。
全てのMQTT操作 (接続、発行、購読) をシグナル/スロットを使用して非同期で実行している。
また、接続が失敗した場合は、自動的に再接続を行う。
送信 (MQTTパブリッシャー側)
// Publisher.h
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QTimer>
#include <QDebug>
class MqttPublisher : public QObject
{
Q_OBJECT
private:
QMqttClient *m_client;
QString m_host;
quint16 m_port;
public:
explicit MqttPublisher(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
{
// MQTT接続状態変更時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::stateChanged, this, &MqttPublisher::handleConnectionStateChange);
// エラー発生時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::errorChanged, this, &MqttPublisher::handleError);
}
~MqttPublisher()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnect();
}
}
// MQTT接続の開始
void connectToHost(const QString &host, quint16 port)
{
m_host = host;
m_port = port;
// 接続パラメータを設定
m_client->setHostname(host);
m_client->setPort(port);
// 非同期で接続を開始
m_client->connectToHost();
}
// メッセージの発行
void publishMessage(const QString &topic, const QByteArray &message)
{
if (m_client->state() != QMqttClient::Connected) {
qWarning() << "MQTTクライアントが接続されていません";
return;
}
// QoS 1でメッセージを発行 (メッセージ到達保証あり)
auto publish = m_client->publish(topic, message, 1);
if (!publish) {
qWarning() << "メッセージの発行に失敗しました: " << topic;
return;
}
}
// 接続の切断
void disconnect()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnectFromHost();
}
}
private slots:
// MQTT接続状態が変化した時のハンドラ
void handleConnectionStateChange()
{
switch (m_client->state()) {
case QMqttClient::Connected:
qInfo() << "MQTTブローカーに接続しました";
break;
case QMqttClient::Disconnected:
qInfo() << "MQTTブローカーから切断されました";
break;
case QMqttClient::Connecting:
qInfo() << "MQTTブローカーに接続中...";
break;
default:
qWarning() << "不明な接続状態です: " << m_client->state();
break;
}
}
// エラー発生時のハンドラ
void handleError(QMqttClient::ClientError error)
{
QString errorMsg;
switch (error) {
case QMqttClient::NoError:
return;
case QMqttClient::InvalidProtocolVersion:
errorMsg = "無効なプロトコルバージョンです";
break;
case QMqttClient::IdRejected:
errorMsg = "クライアントIDが拒否されました";
break;
case QMqttClient::ServerUnavailable:
errorMsg = "サーバーが利用できません";
break;
case QMqttClient::BadUsernameOrPassword:
errorMsg = "ユーザー名またはパスワードが無効です";
break;
case QMqttClient::NotAuthorized:
errorMsg = "認証されていません";
break;
case QMqttClient::TransportInvalid:
errorMsg = "トランスポートが無効です";
break;
case QMqttClient::ProtocolViolation:
errorMsg = "プロトコル違反が発生しました";
break;
case QMqttClient::UnknownError:
errorMsg = "不明なエラーが発生しました";
break;
default:
errorMsg = "予期せぬエラーが発生しました";
break;
}
qCritical() << "MQTTエラー:" << errorMsg;
// エラー発生時は再接続を試みる
if (m_client->state() != QMqttClient::Connected) {
QTimer::singleShot(5000, [this]() {
qInfo() << "再接続を試みます...";
this->connectToHost(m_host, m_port);
});
}
}
};
// Publisher側の使用例
#include "Publisher.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
MqttPublisher publisher;
publisher.connectToHost("<IPアドレスまたはホスト名 例: localhost>", <MQTT通信するポート番号 例: 1883>);
// 定期的にメッセージを送信
QTimer timer;
QObject::connect(&timer, &QTimer::timeout, [&publisher]() {
publisher.publishMessage("qt/topic", "Hello, MQTT!");
});
timer.start(1000); // 1秒おきに送信
return a.exec();
}
受信 (MQTTサブスクライバ側)
以下の例では、受信したトピックの購読 (サブスクライブ) している。
// Subscriber.hファイル
#include <QObject>
#include <QtMqtt/QMqttClient>
#include <QtMqtt/QMqttSubscription>
#include <QTimer>
#include <QDebug>
class MqttSubscriber : public QObject
{
Q_OBJECT
private:
QMqttClient *m_client;
QString m_host;
quint16 m_port;
QMap<QString, QMqttSubscription*> m_subscriptions;
public:
explicit MqttSubscriber(QObject *parent = nullptr) : QObject(parent), m_client(new QMqttClient(this))
{
// MQTT接続状態変更時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::stateChanged, this, &MqttSubscriber::handleConnectionStateChange);
// エラー発生時のシグナルをハンドラに接続
connect(m_client, &QMqttClient::errorChanged, this, &MqttSubscriber::handleError);
}
~MqttSubscriber()
{
// 全ての購読を解除
for (auto subscription : m_subscriptions) {
subscription->unsubscribe();
delete subscription;
}
m_subscriptions.clear();
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnect();
}
}
// MQTT接続を開始する
void connectToHost(const QString &host, quint16 port)
{
m_host = host;
m_port = port;
// 接続パラメータを設定
m_client->setHostname(host);
m_client->setPort(port);
// 非同期で接続を開始
m_client->connectToHost();
}
// トピックを購読する
void subscribe(const QString &topic)
{
if (m_client->state() != QMqttClient::Connected) {
qWarning() << "MQTTクライアントが接続されていません";
return;
}
// 既に購読済みのトピックは無視
if (m_subscriptions.contains(topic)) {
qInfo() << "トピックは既に購読されています:" << topic;
return;
}
// QoS 1でトピックを購読 (メッセージ到達保証あり)
auto subscription = m_client->subscribe(topic, 1);
if (!subscription) {
qWarning() << "トピックの購読に失敗しました:" << topic;
return;
}
// メッセージ受信時のハンドラを設定
connect(subscription, &QMqttSubscription::messageReceived, this, &MqttSubscriber::handleMessage);
// 購読リストに追加
m_subscriptions.insert(topic, subscription);
qInfo() << "トピックを購読開始しました:" << topic;
}
// 接続を切断する
void disconnect()
{
if (m_client->state() == QMqttClient::Connected) {
m_client->disconnectFromHost();
}
}
signals:
// メッセージ受信時のシグナル
void messageReceived(const QString &topic, const QByteArray &message);
private slots:
// MQTT接続状態が変化した時のハンドラ
void handleConnectionStateChange()
{
switch (m_client->state()) {
case QMqttClient::Connected:
qInfo() << "MQTTブローカーに接続しました";
break;
case QMqttClient::Disconnected:
qInfo() << "MQTTブローカーから切断されました";
break;
case QMqttClient::Connecting:
qInfo() << "MQTTブローカーに接続中...";
break;
default:
qWarning() << "不明な接続状態です: " << m_client->state();
break;
}
}
// エラー発生時のハンドラ
void handleError(QMqttClient::ClientError error)
{
QString errorMsg;
switch (error) {
case QMqttClient::NoError:
return;
case QMqttClient::InvalidProtocolVersion:
errorMsg = "無効なプロトコルバージョンです";
break;
case QMqttClient::IdRejected:
errorMsg = "クライアントIDが拒否されました";
break;
case QMqttClient::ServerUnavailable:
errorMsg = "サーバが利用できません";
break;
case QMqttClient::BadUsernameOrPassword:
errorMsg = "ユーザ名またはパスワードが無効です";
break;
case QMqttClient::NotAuthorized:
errorMsg = "認証されていません";
break;
case QMqttClient::TransportInvalid:
errorMsg = "トランスポートが無効です";
break;
case QMqttClient::ProtocolViolation:
errorMsg = "プロトコル違反が発生しました";
break;
case QMqttClient::UnknownError:
errorMsg = "不明なエラーが発生しました";
break;
default:
errorMsg = "予期せぬエラーが発生しました";
break;
}
qCritical() << "MQTTエラー:" << errorMsg;
// エラー発生時は再接続を試みる
if (m_client->state() != QMqttClient::Connected) {
QTimer::singleShot(5000, [this]() {
qInfo() << "再接続を試みます...";
this->connectToHost(m_host, m_port);
});
}
}
// メッセージ受信時のハンドラ
void handleMessage(const QByteArray &message, const QMqttTopicName &topic)
{
// メッセージを受信したことをログに記録
qInfo() << "メッセージを受信しました - トピック: " << topic.name();
qInfo() << "メッセージ: " << message;
// シグナルを発行して、アプリケーションの他の部分に通知
emit messageReceived(topic.name(), message);
}
};
// Subscriber側の使用例
#include "Subscriber.h"
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
MqttSubscriber subscriber;
subscriber.connectToHost("localhost", 1883);
// メッセージ受信時の処理
QObject::connect(&subscriber, &MqttSubscriber::messageReceived, [](const QString &topic, const QByteArray &message) {
qDebug() << "受信 - トピック: " << topic;
qDebug() << "メッセージ: " << message;
});
subscriber.subscribe("qt/topic");
return a.exec();
}