12,964
回編集
536行目: | 536行目: | ||
<br> | <br> | ||
==== 同期先 (リモートPC) ==== | ==== 同期先 (リモートPC) ==== | ||
以下の例では、librsyncライブラリを使用して、同期元から転送されたファイルおよびディレクトリを非同期で受信している。<br> | |||
<br> | |||
* 非同期処理 | |||
*: QtConcurrentクラスを使用して、コマンド処理やデータ送信を非同期で行う。 | |||
*: QTcpServerクラスおよびQTcpSocketクラスを使用して、非同期のネットワーク通信を実現している。 | |||
* ストリーミング処理 | |||
*: クライアントからのデータを段階的に読み取る。 | |||
*: 大きなファイルやデータの場合でも効率的に処理できる。 | |||
* librsyncライブラリの使用 | |||
*: librsyncライブラリを使用して、効率的なファイル同期を実現している。 | |||
*: ファイルのシグネチャ生成やデルタの適用等の処理を行う。 | |||
<br> | |||
<syntaxhighlight lang="c++"> | |||
// RemoteSyncServer.hファイル | |||
#include <QObject> | |||
#include <QFile> | |||
#include <QDataStream> | |||
#include <QDir> | |||
#include <QTcpServer> | |||
#include <QTcpSocket> | |||
#include <QFuture> | |||
#include <QtConcurrent> | |||
#include <librsync.h> | |||
class RemoteSyncServer : public QObject | |||
{ | |||
Q_OBJECT | |||
private: | |||
QTcpServer *server; | |||
QMap<QTcpSocket*, QByteArray> clientBuffers; | |||
// クライアントからのコマンドを処理 | |||
void processCommand(QTcpSocket *client, const QByteArray &command) | |||
{ | |||
if (command.startsWith("MKDIR:")) { | |||
// ディレクトリ作成コマンドを処理 | |||
QString path = QString::fromUtf8(command.mid(6)); | |||
createDirectory(client, path); | |||
} | |||
else if (command.startsWith("SIGNATURE:")) { | |||
// シグネチャ要求コマンドを処理 | |||
QString path = QString::fromUtf8(command.mid(10)); | |||
sendFileSignature(client, path); | |||
} | |||
else if (command == "DONE") { | |||
// 完了通知を処理する場合 | |||
} | |||
else { | |||
// 不明なコマンドの場合はデルタデータとして処理 | |||
QString path = client->property("currentFile").toString(); | |||
if (!path.isEmpty()) { | |||
applyDelta(client, path, command); | |||
} | |||
else { | |||
sendError(client, "不明なコマンドまたは無効なデルタデータ"); | |||
} | |||
} | |||
} | |||
// ディレクトリを作成 | |||
void createDirectory(QTcpSocket *client, const QString &path) | |||
{ | |||
QDir dir; | |||
if (!dir.mkpath(path)) { | |||
sendError(client, QString("ディレクトリの作成に失敗: %1").arg(path)); | |||
} | |||
} | |||
// ファイルのシグネチャを送信 | |||
void sendFileSignature(QTcpSocket *client, const QString &path) | |||
{ | |||
QFile file(path); | |||
if (!file.open(QIODevice::ReadOnly)) { | |||
sendError(client, QString("ファイルのオープンに失敗: %1").arg(path)); | |||
return; | |||
} | |||
// librsyncを使用してシグネチャを生成 | |||
rs_result result; | |||
rs_signature_t *sig; | |||
FILE *f = fdopen(file.handle(), "rb"); | |||
result = rs_sig_file(f, &sig, RS_DEFAULT_BLOCK_LEN, 0, RS_MD4_SIG_MAGIC); | |||
if (result != RS_DONE) { | |||
sendError(client, QString("シグネチャの生成に失敗: %1").arg(rs_strerror(result))); | |||
return; | |||
} | |||
// シグネチャをバイト配列に変換 | |||
rs_buffers_t buf; | |||
char outbuf[8192]; | |||
QByteArray signature; | |||
buf.next_out = outbuf; | |||
buf.avail_out = sizeof(outbuf); | |||
do { | |||
result = rs_job_iter(sig, &buf); | |||
if (buf.avail_out < sizeof(outbuf)) { | |||
signature.append(outbuf, sizeof(outbuf) - buf.avail_out); | |||
buf.next_out = outbuf; | |||
buf.avail_out = sizeof(outbuf); | |||
} | |||
} while (result == RS_BLOCKED); | |||
rs_job_free(sig); | |||
if (result != RS_DONE) { | |||
sendError(client, QString("シグネチャの変換に失敗: %1").arg(rs_strerror(result))); | |||
return; | |||
} | |||
// シグネチャを非同期で送信 | |||
sendDataAsync(client, signature); | |||
client->setProperty("currentFile", path); | |||
} | |||
// デルタを適用してファイルを更新 | |||
void applyDelta(QTcpSocket *client, const QString &path, const QByteArray &deltaData) | |||
{ | |||
QFile file(path); | |||
if (!file.open(QIODevice::ReadWrite)) { | |||
sendError(client, QString("ファイルのオープンに失敗: %1").arg(path)); | |||
return; | |||
} | |||
// librsyncを使用してデルタを適用 | |||
rs_result result; | |||
FILE *basis_file = fdopen(file.handle(), "r+b"); | |||
rs_signature_t *sig = nullptr; | |||
// シグネチャの再構築 | |||
result = rs_sig_file(basis_file, &sig, RS_DEFAULT_BLOCK_LEN, 0, RS_MD4_SIG_MAGIC); | |||
if (result != RS_DONE) { | |||
sendError(client, QString("シグネチャの再構築に失敗: %1").arg(rs_strerror(result))); | |||
return; | |||
} | |||
// 一時ファイルの作成 | |||
QTemporaryFile newFile; | |||
if (!newFile.open()) { | |||
sendError(client, "一時ファイルの作成に失敗"); | |||
rs_free_sumset(sig); | |||
return; | |||
} | |||
FILE *new_file = fdopen(newFile.handle(), "w+b"); | |||
// デルタの適用 | |||
rs_buffers_t buf; | |||
char inbuf[8192]; | |||
char outbuf[8192]; | |||
buf.next_in = deltaData.constData(); | |||
buf.avail_in = deltaData.size(); | |||
buf.next_out = outbuf; | |||
buf.avail_out = sizeof(outbuf); | |||
rs_job_t *job = rs_patch_begin(rs_file_copy_cb, basis_file); | |||
do { | |||
result = rs_job_iter(job, &buf); | |||
if (buf.avail_out < sizeof(outbuf)) { | |||
size_t written = fwrite(outbuf, 1, sizeof(outbuf) - buf.avail_out, new_file); | |||
if (written != sizeof(outbuf) - buf.avail_out) { | |||
sendError(client, "一時ファイルへの書き込みに失敗"); | |||
rs_job_free(job); | |||
rs_free_sumset(sig); | |||
return; | |||
} | |||
buf.next_out = outbuf; | |||
buf.avail_out = sizeof(outbuf); | |||
} | |||
} while (result == RS_BLOCKED); | |||
rs_job_free(job); | |||
rs_free_sumset(sig); | |||
if (result != RS_DONE) { | |||
sendError(client, QString("デルタの適用に失敗: %1").arg(rs_strerror(result))); | |||
return; | |||
} | |||
// 一時ファイルを元のファイルに上書き | |||
file.close(); | |||
newFile.close(); | |||
if (!QFile::remove(path) || !newFile.rename(path)) { | |||
sendError(client, "ファイルの更新に失敗"); | |||
return; | |||
} | |||
// 成功メッセージを送信 | |||
sendDataAsync(client, "SUCCESS"); | |||
} | |||
// データを非同期で送信する | |||
QFuture<void> sendDataAsync(QTcpSocket *client, const QByteArray &data) | |||
{ | |||
return QtConcurrent::run([client, data]() { | |||
QDataStream out(client); | |||
out << quint32(data.size()); | |||
out.writeRawData(data.constData(), data.size()); | |||
client->waitForBytesWritten(); | |||
}); | |||
} | |||
// エラーメッセージをクライアントに送信する | |||
void sendError(QTcpSocket *client, const QString &error) | |||
{ | |||
QByteArray errorData = QString("ERROR:%1").arg(error).toUtf8(); | |||
sendDataAsync(client, errorData); | |||
emit errorOccurred(error); | |||
} | |||
public: | |||
explicit RemoteSyncServer(QObject *parent) : QObject(parent), server(new QTcpServer(this)) | |||
{ | |||
// サーバーの新しい接続シグナルをhandleNewConnectionスロットに接続 | |||
connect(server, &QTcpServer::newConnection, this, &RemoteSyncServer::handleNewConnection); | |||
} | |||
~RemoteSyncServer() | |||
{ | |||
// サーバが稼働中の場合は停止 | |||
if (server->isListening()) { | |||
server->close(); | |||
} | |||
} | |||
// サーバを指定されたポートで開始 | |||
bool start(quint16 port) | |||
{ | |||
// 指定されたポートでサーバーを開始 | |||
if (!server->listen(QHostAddress::Any, port)) { | |||
emit errorOccurred(QString("サーバの起動に失敗: %1").arg(server->errorString())); | |||
return false; | |||
} | |||
return true; | |||
} | |||
signals: | |||
// エラー発生時に発行されるシグナル | |||
void errorOccurred(const QString &error); | |||
private slots: | |||
// 新しいクライアント接続を処理 | |||
void handleNewConnection() | |||
{ | |||
// 新しいクライアント接続を取得 | |||
QTcpSocket *clientSocket = server->nextPendingConnection(); | |||
// クライアントのreadyRead信号をreadClientDataスロットに接続 | |||
connect(clientSocket, &QTcpSocket::readyRead, this, &RemoteSyncServer::readClientData); | |||
// クライアントの切断を処理 | |||
connect(clientSocket, &QTcpSocket::disconnected, [this, clientSocket]() { | |||
clientBuffers.remove(clientSocket); | |||
clientSocket->deleteLater(); | |||
}); | |||
} | |||
// クライアントからのデータを読み取るスロット | |||
void readClientData() | |||
{ | |||
// シグナルを送信したクライアントソケットを取得 | |||
QTcpSocket *clientSocket = qobject_cast<QTcpSocket*>(sender()); | |||
if (!clientSocket) return; | |||
// クライアントからのデータを読み取り | |||
QByteArray &buffer = clientBuffers[clientSocket]; | |||
buffer.append(clientSocket->readAll()); | |||
// 完全なコマンドを処理 | |||
while (buffer.size() >= 4) { | |||
QDataStream stream(buffer); | |||
quint32 size; | |||
stream >> size; | |||
if (buffer.size() < size + 4) break; | |||
QByteArray command = buffer.mid(4, size); | |||
buffer.remove(0, size + 4); | |||
// コマンドを非同期で処理 | |||
QtConcurrent::run([this, clientSocket, command]() { | |||
processCommand(clientSocket, command); | |||
}); | |||
} | |||
}; | |||
</syntaxhighlight> | |||
<br> | |||
<syntaxhighlight lang="c++"> | |||
// main.cppファイル | |||
#include "RemoteSyncServer.h" | |||
int main(int argc, char *argv[]) | |||
{ | |||
QCoreApplication a(argc, argv); | |||
qint16 port = <ポート番号>; | |||
RemoteSyncServer server; | |||
if (!server.start(port)) { | |||
qDebug() << "サーバの起動に失敗"; | |||
return -1; | |||
} | |||
qDebug() << QString("サーバが起動 (ポート: %1")).arg(port); | |||
return a.exec(); | |||
} | |||
</syntaxhighlight> | |||
<br><br> | <br><br> | ||