概要
C#のマルチスレッドプログラミングは、並行処理を実現する強力な機能である。
.NET Framework 4.5以降、C# 5.0以降はコンパイラレベルでマルチスレッドをサポートしており、特にasync
/ await
キーワードの導入により、非同期プログラミングが大幅に簡素化された。
タスクベース非同期パターン (Task-based Asynchronous Pattern: TAP) は、この新しいアプローチの中心となる概念である。
TAPでは、非同期メソッドの名前にAsync
というサフィックスを付けて、戻り値の型としてTask
、Task<T>
、void
を使用する。
async
キーワードは、メソッドが非同期であることを示す。
このキーワードを使用することにより、メソッド内でawait
キーワードを使用できるようになる。
await
は、非同期操作の完了を待機するために使用され、その間にスレッドをブロックすることなく、制御を呼び出し元に返す。
以下の例では、MethodAsync1とMethodAsync2という2つの非同期メソッドが定義されている。
これらのメソッドは、それぞれTask<int>型とTask<(bool, string)>型 (タプル型) を返す。
これらのメソッド内では、Task.Runを使用して新しいタスクを開始し、Task.Delayで非同期の待機を模倣している。
実際のアプリケーションでは、この部分が時間のかかる操作 (ファイルI/Oやネットワーク通信等) に置き換わることが多い。
// Task<int>型を返す非同期関数
public async Task<int> MethodAsync1()
{
var iRet = await Task.Run(() =>
{
await Task.Delay(5000);
return 0;
});
return iRet;
}
// Task<(bool, string)>型 (タプル型) を返す非同期関数
public async Task<(bool, string)> MethodAsync2()
{
var tRet = await Task.Run(() =>
{
await Task.Delay(5000);
return (true, @"some strings");;
});
return tRet;
}
C#のマルチスレッドプログラミングのメリットは多岐にわたる。
UIの応答性の向上、複数のCPUコアを効率的に利用、I/O操作を最適化すること等が可能である。
しかし、マルチスレッドプログラミングには注意点もある。
例えば、デッドロックやレースコンディション等の同期の問題に注意する必要がある。
また、過度にスレッドを使用する場合、却ってパフォーマンスが低下する可能性もある。
C#は、これらの課題に対処するための様々なツールを提供している。
例えば、lock
キーワード、Monitor
クラス、Interlocked
クラス等が存在する。
また、より高度な並行処理のためには、Task Parallel Library (TPL)やParallel LINQ (PLINQ)等のライブラリも用意されている。
マルチスレッドプログラミングは強力であるが、適切に使用することが重要である。
シンプルな操作には同期プログラミング、複雑な非同期操作にはasync
/ await
パターンを使用する等、状況に応じて適切なアプローチを選択することが、
効率的で保守しやすいコードを記述するための鍵となる。
asyncとawaitの動作
タスクとは、一連の処理をひとまとまりにした単位である。
Task
クラスはマルチスレッドで実行されるようになっており、連続で記述する場合は、記述した個数だけマルチスレッドで並列処理される。
Task.Run
メソッドは、大量の計算のみを行う処理を並列実行させる時のみに使用する場合、または、既存のメソッドを変更せずに使用する場合に使用する。
public async Task MethodAsync()
{
var task1 = Task.Run(() =>
{
// 何か処理1
});
var task2 = Task.Run(() =>
{
// 何か処理2
});
var task12 = Task.WhenAll(task1, task2);
var taskAll = task12.ContinueWith(() =>
{
// 何か処理3
});
return await taskAll.ConfigureAwait(false);
}
Task.Run
メソッドには、引数を渡すオーバーロードが存在しない。
引数を渡す場合、Task
クラスのインスタンスの生成と同時に引数を渡して、Task
クラスのStart
メソッドを使用してスレッドを実行する必要がある。
private async void MethodAsync()
{
int m = 999;
Task<int> task = new Task(x =>
{
Thread.Sleep(3000);
return x * 2;
}, m);
task.Start();
m = 100;
int result = await task; // result : 1998
Console.WriteLine($"{result}");
}
Wait / Result
コンソールソフトウェアにおいて、Wait
メソッドおよびResult
メソッドを使用して、非同期メソッドを作成することができる。
HttpClient hc = new HttpClient();
string html = hc.GetStringAsync("http://example.jp/").Result;
また、Wait
メソッドおよびResult
メソッド以外に、WaitAll
メソッドやWaitAny
メソッドが存在する。
HttpClient hc = new HttpClient();
Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
// タスクが終わるまでスレッドをブロック
t1.Wait();
// タスクが終わるまでスレッドをブロックして結果を取得
string binghtml = t2.Result;
// どれかのタスクが終わるまでスレッドをブロック
int completedTaskIndex = Task.WaitAny(t1, t2);
// 0 : タスクt1の方が速い場合
// 1 : タスクt2の方が速い場合
// どれかのタスクが終わるまでスレッドをブロック(タイムアウトあり)
int completedTaskIndex2 = Task.WaitAny(new[] { t1, t2 }, 50);
// 0 : タスクt1の方が速い場合
// 1 : タスクt2の方が速い場合
// -1 : タスクt1およびt2とも50[ms]以内に応答がない場合
// 全てタスクが終わるまでスレッドをブロック
Task.WaitAll(t1, t2);
// 全てのタスクが終わるまでスレッドをブロック(タイムアウトあり)
bool allTasksCompleted = Task.WaitAll(new[] { t1, t2 }, 50);
// true : タスクt1およびt2とも50[ms]以内に応答がある場合
// false : タスクt1またはt2のいずれか一方、または、タスクt1およびt2とも50[ms]以内に応答がない場合
WhenAny
複数のタスクのうち、いずれか1つのみが完了するまで待機する場合、WhenAny
メソッドを使用する。
HttpClient hc = new HttpClient();
Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
Task<string> completedTask = await Task.WhenAny(t1, t2);
これを応用して、処理継続中のタスクをキャンセルすることもできる。
HttpClient hc = new HttpClient();
CancellationTokenSource cts = new CancellationTokenSource();
// GetStringAsyncメソッドにはCancellationToken構造体を受けるオーバーロードが無いため、GetAsyncメソッドを使用している
Task<HttpResponseMessage> t1 = hc.GetAsync("https://www.microsoft.com/", cts.Token);
Task<HttpResponseMessage> t2 = hc.GetAsync("https://www.bing.com/", cts.Token);
// 完了したタスクを取得
Task<HttpResponseMessage> completedTask = await Task.WhenAny(t1, t2);
// 他の処理中のタスクは全てキャンセルする
cts.Cancel();
// 完了したタスクの結果を取得
HttpResponseMessage msg = await completedTask;
string html = await msg.Content.ReadAsStringAsync();
WhenAllメソッド / ContinueWithメソッド
Task
クラスはContinueWith
メソッドを使用して、その後の処理を連続して実行できる。
例えば、2つ以上のタスクを実行して全てのタスクが完了した後に続けてタスクを実行する場合、ContinueWith
メソッドまたはawait
を使用してタスクの完了を待つ。
// パターン1
var task1 = Task.Run(() =>
{
// 何か処理
});
var task2 = Task.Run(() =>
{
// 何か処理
});
var task3 = Task.Run(() =>
{
// 何か処理
});
var taskAll = Task.WhenAll(task1, task2).ContinueWith(task3); // awaitは不要
// パターン2
var task1 = Task.Run(() =>
{
// 何か処理
});
await task1.ConfigureAwait(false);
var task2 = Task.Run(() =>
{
// 何か処理
});
await task2.ConfigureAwait(false);
var task3 = Task.Run(() =>
{
// 何か処理
});
await task3.ConfigureAwait(false);
状況にもよるが、処理が膨大になると可読性が落ちるため、先にタスクをまとめて記述して、タスク間の関係性が見通せるパターン1の方が良い。
IsCompletedプロパティ
タスクが完了しているかどうかを確認する場合、IsCompleted
プロパティを使用する。
IsCompletedSuccessfully
プロパティもあるが、これはタスクが成功および完了した場合のみtrue
になる。
また、IsFaulted
プロパティやIsCanceled
プロパティも存在する。
HttpClient hc = new HttpClient();
Task<string> task = hc.GetStringAsync("https://www.microsoft.com/");
// タスクが成功および完了したかどうかを確認
if (!task.IsCompleted)
{
// この時点でタスクt1が成功および完了している場合、このメッセージは表示されない
Console.WriteLine("ちょっとまってね");
}
var html = await task;
Console.WriteLine(html);
ConfigureAwait(false)
GUIアプリケーションにおいて、Wait
メソッドおよびResult
メソッドを使用する場合、デッドロックが発生する。
これは、Task
クラスのWait
メソッドやResult
メソッドは、元のスレッドへ戻るからである。
以下の例では、まず、methodAsync().Wait()を実行した時点で親スレッドがスリープする。
次に、methodAsyncメソッドの完了後、親スレッドに戻る時、既に親スレッドがスリープしているため、処理を続けることができない。
そのため、親スレッドは子スレッドを待ち続け、子スレッドは親スレッドに戻ろうとして、デッドロックが発生する。
// デッドロックが起きるサンプルコード(GUIアプリケーションのみ)
// コンソールアプリケーションでは正常に動作する
public void callMethod()
{
methodAsync().Wait();
}
public async Task methodAsync()
{
return await Task.Delay(1000);
}
対策として、ConfigureAwait(false)
を使用するとデッドロックが回避できる。
ConfigureAwait(false)
は、メソッドの終了後に親スレッド(呼び出し元)に戻らなくてもよいことを指定する。
ブロックされたスレッドに戻らなくなるため、methodAsyncメソッドが終了した時にcallMethodメソッドに戻ることができる。
// デッドロックが起きないサンプルコード(GUIアプリケーションのみ)
public void callMethod()
{
methodAsync().Wait();
}
public async Task methodAsync()
{
return await Task.Delay(1000).ConfigureAwait(false);
}
Parallelクラス
CPU処理を並列実行する場合、Parallel
クラスを使用する。
以下の例では、methodAsync1メソッド、methodAsync2メソッド、methodAsync3メソッドが並列実行される。
ただし、CPUのコア数やスレッドプールの空き状況によっては、並列実行されない場合もあることに注意する。
なお、Parallel
クラスには、Parallel.Invoke
メソッドの他にParallel.For
メソッドやParallel.ForEach
メソッドも存在する。
また、Parallel.Invoke
メソッドの第1引数のParallelOptions
クラスは省略可能である。
// Invokeメソッドの引数はAction型
Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
methodAsync1, // 戻り値が無いAction型の場合
() => { methodAsync2(100); }, // 戻り値があるFunc型の場合
() =>
{ // 直接処理を記述してもよい
int n = 400;
methodAsync3(n);
}
);
PLINQ(ParallelEnumerable)
LINQ
(PLINQ
)を使用して、並列実行することもできる。
PLINQ
では、AsParallel
拡張メソッド以降のコードが並列化される。
以下の例では、処理A(ファイルの読み込み)はシングルスレッドで実行、処理B(ハッシュ値計算)はマルチスレッドで実行している。
なお、AsParallel
拡張メソッドの実行後のWithDegreeOfParallelism
拡張メソッドは省略可能である。
// ファイルのハッシュ値を計算するプログラム
var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
var filehash = files
.Select(f => new { File = f, Data = File.ReadAllBytes(f) }) // 処理A(ファイルの読み込み)
.AsParallel()
.WithDegreeOfParallelism(4) // 最大同時並列数:4
.Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)}) // 処理B(ハッシュ値計算)
.ToArray();
排他処理
排他処理を行う場合、lock
キーワードが基本的かつ一般的な方法である。
しかし、状況や要件によっては他の手法も考慮する。
lock
キーワード以外にも以下に示すような選択肢がある。
- Monitorクラス
- lockキーワードは内部的に
Monitor
クラスを使用している。 Monitor
クラスを直接使用する場合、より細かい制御が可能になる。
- lockキーワードは内部的に
- Mutexクラス
- プロセス間の同期に使用する。
- 複数のアプリケーションインスタンス間での排他制御が必要な場合に適している。
- Semaphore / SemaphoreSlim
- リソースへのアクセスを制限する必要がある場合に使用する。
- 例えば、同時に最大N個のスレッドがリソースにアクセスできるようにする場合等である。
- ReaderWriterLockSlim
- 複数の読み取りスレッドを同時に許可しながら、書き込みを排他的に行うことができる。
- 読み取り操作が書き込み操作よりも頻繁に行われる場合に適している。
- また、ファイル操作に要する時間が長くなり、同時読み取りの利点が大きくなる場合にも適している。
- その他、アプリケーションの規模が大きくなり、より細かい制御が必要になった場合等にもよい。
- ただし、小規模なファイル操作の場合は、ReaderWriterLockSlimのオーバーヘッドがlockキーワードよりも大きくなる可能性があることに注意する。
- Interlockedクラス
- 単一の変数に対する原子的操作 (インクリメント、デクリメント、交換等) を提供する。
- SpinLock構造体
- 非常に短い時間だけロックが必要な場合に使用する。
- CPUリソースを多く消費するが、コンテキストスイッチのオーバーヘッドを避けることができる。
- 選択の基準
- 同期の範囲
- 単一のアプリケーション内か、プロセス間か
- 操作の粒度
- 細かい操作か、大きなブロックか
- パフォーマンス要件
- 高頻度の短い操作か、低頻度の長い操作か
- 読み書きのバランス
- 読み取りと書き込みの頻度のバランス
- 同期の範囲
lockキーワード
lockキーワードのメリットと特徴を以下に示す。
- 使いやすさ
- シンプルで直感的な構文を持つ。
- 安全性
- デッドロックのリスクを軽減する。(ただし、完全に防ぐわけではない)
- パフォーマンス
- 比較的軽量な操作である。
以下の例では、排他処理 (lock
キーワード) を使用してテキストファイルを2つのスレッドから読み書きしている。
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
class Program
{
private static readonly object fileLock = new object();
private const string fileName = "sample.txt";
static async Task Main(string[] args)
{
try
{
Task readTask = ReadFileAsync();
Task writeTask = WriteFileAsync();
await Task.WhenAll(readTask, writeTask);
}
catch (Exception ex)
{
Console.WriteLine($"メインプロセスでエラーが発生: {ex.Message}");
}
finally
{
Console.WriteLine("プログラムの終了");
}
}
static async Task ReadFileAsync()
{
for (var i = 0; i < 10; i++)
{
try
{
string content = await ReadFromFileAsync();
Console.WriteLine($"読み込んだ内容: {content}");
}
catch (Exception ex)
{
Console.WriteLine($"読み込み中にエラーが発生: {ex.Message}");
}
await Task.Delay(1000);
}
}
static async Task WriteFileAsync()
{
int counter = 0;
for (var i = 0; i < 10; i++)
{
try
{
string content = $"カウンター: {counter++}";
await WriteToFileAsync(content);
Console.WriteLine($"書き込んだ内容: {content}");
}
catch (Exception ex)
{
Console.WriteLine($"書き込み中にエラーが発生: {ex.Message}");
}
await Task.Delay(1500);
}
}
static async Task<string> ReadFromFileAsync()
{
if (!File.Exists(fileName))
{
throw new FileNotFoundException($"ファイルが見つかりません: {fileName}");
}
try
{
lock (fileLock)
{
using (StreamReader reader = new StreamReader(fileName))
{
return reader.ReadToEnd();
}
}
}
catch (IOException ex)
{
throw new IOException($"ファイルの読み込み中にエラーが発生: {ex.Message}", ex);
}
}
static async Task WriteToFileAsync(string content)
{
try
{
lock (fileLock)
{
using (StreamWriter writer = new StreamWriter(fileName, false))
{
writer.Write(content);
}
}
}
catch (IOException ex)
{
throw new IOException($"ファイルの書き込み中にエラーが発生: {ex.Message}", ex);
}
}
}
ReaderWriterLockSlimキーワード
以下の例では、複数のスレッドが頻繁に読み取りを行い、時々書き込みを行うシナリオを想定している。
具体的には、共有されるデータ構造 (この場合は設定ファイル) に対して、多数の読み取り操作と少数の更新操作が行われる状況である。
ReaderWriterLockSlimキーワードのメリットを以下に示す。
- 複数の読み取り操作が同時に行われることを許可する。
- 書き込み操作が行われる場合には、他の全ての操作 (読み取りと書き込み) をブロックする。
ReaderWriterLockSlimキーワードが適している理由を以下に示す。
- 書き込み時の整合性保護
- 稀に発生する書き込み操作時には、全ての操作をブロックしてデータトレースを防ぐ。
- パフォーマンス
- lockキーワードを使用した場合、全ての読み取り操作が直列化されてしまうが、
- ReaderWriterLockSlimキーワードを使用することにより、読み取り操作のスループットが大幅に向上する。
読み取りが多く書き込みが少ない場合は、ReaderWriterLockSlim
キーワードの使用が適切である。
ただし、読み取りと書き込みの頻度が同程度の場合、あるいは、ロックの保持時間が非常に短い場合は、lockキーワードの方がオーバーヘッドが少なく、パフォーマンスが良い可能性がある。
実務では、ベンチマークを取ることを推奨する。
using System;
using System.IO;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
// ReaderWriterLockSlim を使用して、設定の読み取りと書き込みを同期するクラス
class ConfigManager
{
private static readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private static Dictionary<string, string> config = new Dictionary<string, string>(); // 設定を保持する (ファイルにも保存)
private const string configFile = "config.txt";
// 読み取りロックを使用
public static string GetConfig(string key)
{
rwLock.EnterReadLock();
try
{
return config.TryGetValue(key, out var value) ? value : null;
}
finally
{
rwLock.ExitReadLock();
}
}
// 書き込みロックを使用
public static void SetConfig(string key, string value)
{
rwLock.EnterWriteLock();
try
{
config[key] = value;
SaveConfig();
}
finally
{
rwLock.ExitWriteLock();
}
}
private static void SaveConfig()
{
using (StreamWriter writer = new StreamWriter(configFile, false))
{
foreach (var kvp in config)
{
writer.WriteLine($"{kvp.Key}={kvp.Value}");
}
}
}
public static void LoadConfig()
{
rwLock.EnterWriteLock();
try
{
config.Clear();
if (File.Exists(configFile))
{
foreach (var line in File.ReadAllLines(configFile))
{
var parts = line.Split('=');
if (parts.Length == 2)
{
config[parts[0]] = parts[1];
}
}
}
}
finally
{
rwLock.ExitWriteLock();
}
}
}
class Program
{
static async Task Main()
{
ConfigManager.LoadConfig();
var readTasks = new List<Task>();
var writeTasks = new List<Task>();
// 多数の読み取りタスクを作成
for (int i = 0; i < 100; i++)
{
readTasks.Add(Task.Run(() => ReadConfig()));
}
// 少数の書き込みタスクを作成
for (int i = 0; i < 5; i++)
{
writeTasks.Add(Task.Run(() => WriteConfig(i)));
}
await Task.WhenAll(readTasks.Concat(writeTasks));
Console.WriteLine("全てのタスクが完了");
}
static void ReadConfig()
{
for (int i = 0; i < 1000; i++)
{
var value = ConfigManager.GetConfig("TestKey");
Console.WriteLine($"Read: TestKey = {value}");
Thread.Sleep(10); // 読み取り操作の間隔
}
}
static void WriteConfig(int writerIndex)
{
for (int i = 0; i < 10; i++)
{
ConfigManager.SetConfig("TestKey", $"Value{writerIndex}-{i}");
Console.WriteLine($"Write: TestKey = Value{writerIndex}-{i}");
Thread.Sleep(500); // 書き込み操作の間隔
}
}
}
例外処理
Task
クラスやParallel
クラスで発生する例外は、例外AggregateException
としてキャッチすることができる。
try
{
Parallel.Invoke(
() => throw new ArgumentException(),
() => throw new InvalidOperationException(),
() => throw new FormatException()
);
}
catch (AggregateException exception)
{
// 原因となるExceptionを確認する場合、FlattenメソッドのInnerExceptionsプロパティを使用する
var exceptions = exception.Flatten().InnerExceptions;
foreach (var ex in exceptions)
{
Debug.WriteLine(ex.GetType());
}
}
catch (Exception ex)
{ // 未処理の例外をキャッチ
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHをキャッチ
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
ただし、Task
クラスをawait
する場合、個別のException
クラスを使用してキャッチすることもできる。
try
{
var addresses = await Dns.GetHostAddressesAsync("example.jp");
}
catch(SocketException exception)
{
Debug.WriteLine(exception.ToString());
}
catch (Exception ex)
{ // 未処理の例外をキャッチ
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHをキャッチ
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
Task
クラスで発生したキャッチされていない例外をまとめて処理する場合、TaskScheduler.UnobservedTaskException
メソッドを使用する。
ただし、これは、例外が発生したタイミングでキャッチできないこと、および、例外をキャッチできたとしても対処可能なことが限られることに注意する。
private async void buttonl_Click(object sender, RoutedEventArgs e)
{
await Task.Run(() =>
{
await Task.Delay(1000);
});
// 非同期処理の未処理例外をキャッチする
TaskScheduler.UnobservedTaskException += (sender, e) =>
{
AggregateException ae = e.Exception;
Debug.WriteLine(ae.ToString());
e.SetObserved(); // .NET Framework 4.0の場合、これを記述しないとソフトウェアが強制終了する
};
}
上記の例外処理において、以下のようなメソッドを記述すると便利である。
ValueTask
構造体ではなくTask
クラスでもよいが、ValueTask
構造体にすることにより、アロケーションコストを削減できる可能性がある。
public void Button_Click(object sender, EventArgs e)
{
SafeAsyncBlock(async () =>
{
// 非同期で取得
var body = await httpClient.GetStringAsync("https://example.com/foobar");
var foobar = JsonConvert.DeserializeObject<FooBar>(body);
// 表示
textBox.Text = $"${foobar.Price}";
});
}
// 非同期処理で発生する例外を安全に処理する
public static async void SafeAsyncBlock(Func<ValueTask> action)
{
try
{ // 非同期処理を実行
await action();
}
catch (Exception ex)
{ // 未処理の例外を補足する
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHも補足
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
}
※注意
普段は考慮する必要はないと思われるが、SEHを非同期処理の例外として伝搬して継続させることはできない。
したがって、catch
句で補足できるSEHは、最初のタスクコンテキストスイッチが発生する以前のSEHのみとなる。
1度タスクコンテキストスイッチが発生した後の非同期継続処理でSEHが発生した場合は、この方法では対処できない。
おそらく、ThreadPool
クラスから割り当てられたワーカースレッドの根元に伝搬する、または、SynchronizationContext
がホストするスレッドの根元に伝搬する。
AppDomain
でフックする以外に良い方法は存在しないかもしれないことに注意する。