C Sharpの基礎 - マルチスレッド
概要
.NET Framework 4.5で導入されたC# 5.0(Visual Studio 2012) からは、マルチスレッドプログラミングをコンパイラレベルでサポートしている。
具体的には、asyncとawaitというキーワードが導入された。
これは、タスクベース非同期パターン(Task-based Asynchronous Pattern、TAP)を実装するものである。
このタスクベース非同期パターンでは、処理の開始はMethodNameAsyncという形式をとる。
そしてその戻り値を、Task、Task<T>、voidとし、Taskを待つにはawaitを付加する。
また、awaitを付加した関数には、asyncを付加する。
下記の例は、Task<int>型、Task<(bool, string)>型 (タプル型) を返す非同期関数である。
// Task<int>型を返す非同期関数
public async Task<int> MethodAsync1()
{
var iRet = await Task.Run(() =>
{
await Task.Delay(5000);
return 0;
});
return iRet;
}
// Task<(bool, string)>型 (タプル型) を返す非同期関数
public async Task<(bool, string)> MethodAsync2()
{
var tRet = await Task.Run(() =>
{
await Task.Delay(5000);
return (true, @"some strings");;
});
return tRet;
}
asyncとawaitの動作
タスクとは、一連の処理をひとまとまりにした単位である。
Task
クラスはマルチスレッドで実行されるようになっており、連続で記述する場合は、記述した個数だけマルチスレッドで並列処理される。
Task.Run
メソッドは、大量の計算のみを行う処理を並列実行させる時のみに使用する場合、または、既存のメソッドを変更せずに使用する場合に使用する。
public async Task MethodAsync()
{
var task1 = Task.Run(() =>
{
// 何か処理1
});
var task2 = Task.Run(() =>
{
// 何か処理2
});
var task12 = Task.WhenAll(task1, task2);
var taskAll = task12.ContinueWith(() =>
{
// 何か処理3
});
return await taskAll.ConfigureAwait(false);
}
Task.Run
メソッドには、引数を渡すオーバーロードが存在しない。
引数を渡す場合、Task
クラスのインスタンスの生成と同時に引数を渡して、Task
クラスのStart
メソッドを使用してスレッドを実行する必要がある。
private async void MethodAsync()
{
int m = 999;
Task<int> task = new Task(x =>
{
Thread.Sleep(3000);
return x * 2;
}, m);
task.Start();
m = 100;
int result = await task; // result : 1998
Console.WriteLine($"{result}");
}
Wait / Result
コンソールソフトウェアにおいて、Wait
メソッドおよびResult
メソッドを使用して、非同期メソッドを作成することができる。
HttpClient hc = new HttpClient();
string html = hc.GetStringAsync("http://example.jp/").Result;
また、Wait
メソッドおよびResult
メソッド以外に、WaitAll
メソッドやWaitAny
メソッドが存在する。
HttpClient hc = new HttpClient();
Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
// タスクが終わるまでスレッドをブロック
t1.Wait();
// タスクが終わるまでスレッドをブロックして結果を取得
string binghtml = t2.Result;
// どれかのタスクが終わるまでスレッドをブロック
int completedTaskIndex = Task.WaitAny(t1, t2);
// 0 : タスクt1の方が速い場合
// 1 : タスクt2の方が速い場合
// どれかのタスクが終わるまでスレッドをブロック(タイムアウトあり)
int completedTaskIndex2 = Task.WaitAny(new[] { t1, t2 }, 50);
// 0 : タスクt1の方が速い場合
// 1 : タスクt2の方が速い場合
// -1 : タスクt1およびt2とも50[ms]以内に応答がない場合
// 全てタスクが終わるまでスレッドをブロック
Task.WaitAll(t1, t2);
// 全てのタスクが終わるまでスレッドをブロック(タイムアウトあり)
bool allTasksCompleted = Task.WaitAll(new[] { t1, t2 }, 50);
// true : タスクt1およびt2とも50[ms]以内に応答がある場合
// false : タスクt1またはt2のいずれか一方、または、タスクt1およびt2とも50[ms]以内に応答がない場合
WhenAny
複数のタスクのうち、いずれか1つのみが完了するまで待機する場合、WhenAny
メソッドを使用する。
HttpClient hc = new HttpClient();
Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
Task<string> completedTask = await Task.WhenAny(t1, t2);
これを応用して、処理継続中のタスクをキャンセルすることもできる。
HttpClient hc = new HttpClient();
CancellationTokenSource cts = new CancellationTokenSource();
// GetStringAsyncメソッドにはCancellationToken構造体を受けるオーバーロードが無いため、GetAsyncメソッドを使用している
Task<HttpResponseMessage> t1 = hc.GetAsync("https://www.microsoft.com/", cts.Token);
Task<HttpResponseMessage> t2 = hc.GetAsync("https://www.bing.com/", cts.Token);
// 完了したタスクを取得
Task<HttpResponseMessage> completedTask = await Task.WhenAny(t1, t2);
// 他の処理中のタスクは全てキャンセルする
cts.Cancel();
// 完了したタスクの結果を取得
HttpResponseMessage msg = await completedTask;
string html = await msg.Content.ReadAsStringAsync();
WhenAllメソッド / ContinueWithメソッド
Task
クラスはContinueWith
メソッドを使用して、その後の処理を連続して実行できる。
例えば、2つ以上のタスクを実行して全てのタスクが完了した後に続けてタスクを実行する場合、ContinueWith
メソッドまたはawait
を使用してタスクの完了を待つ。
// パターン1
var task1 = Task.Run(() =>
{
// 何か処理
});
var task2 = Task.Run(() =>
{
// 何か処理
});
var task3 = Task.Run(() =>
{
// 何か処理
});
var taskAll = Task.WhenAll(task1, task2).ContinueWith(task3); // awaitは不要
// パターン2
var task1 = Task.Run(() =>
{
// 何か処理
});
await task1.ConfigureAwait(false);
var task2 = Task.Run(() =>
{
// 何か処理
});
await task2.ConfigureAwait(false);
var task3 = Task.Run(() =>
{
// 何か処理
});
await task3.ConfigureAwait(false);
状況にもよるが、処理が膨大になると可読性が落ちるため、先にタスクをまとめて記述して、タスク間の関係性が見通せるパターン1の方が良い。
IsCompletedプロパティ
タスクが完了しているかどうかを確認する場合、IsCompleted
プロパティを使用する。
IsCompletedSuccessfully
プロパティもあるが、これはタスクが成功および完了した場合のみtrue
になる。
また、IsFaulted
プロパティやIsCanceled
プロパティも存在する。
HttpClient hc = new HttpClient();
Task<string> task = hc.GetStringAsync("https://www.microsoft.com/");
// タスクが成功および完了したかどうかを確認
if (!task.IsCompleted)
{
// この時点でタスクt1が成功および完了している場合、このメッセージは表示されない
Console.WriteLine("ちょっとまってね");
}
var html = await task;
Console.WriteLine(html);
ConfigureAwait(false)
GUIアプリケーションにおいて、Wait
メソッドおよびResult
メソッドを使用する場合、デッドロックが発生する。
これは、Task
クラスのWait
メソッドやResult
メソッドは、元のスレッドへ戻るからである。
以下の例では、まず、methodAsync().Wait()を実行した時点で親スレッドがスリープする。
次に、methodAsyncメソッドの完了後、親スレッドに戻る時、既に親スレッドがスリープしているため、処理を続けることができない。
そのため、親スレッドは子スレッドを待ち続け、子スレッドは親スレッドに戻ろうとして、デッドロックが発生する。
// デッドロックが起きるサンプルコード(GUIアプリケーションのみ)
// コンソールアプリケーションでは正常に動作する
public void callMethod()
{
methodAsync().Wait();
}
public async Task methodAsync()
{
return await Task.Delay(1000);
}
対策として、ConfigureAwait(false)
を使用するとデッドロックが回避できる。
ConfigureAwait(false)
は、メソッドの終了後に親スレッド(呼び出し元)に戻らなくてもよいことを指定する。
ブロックされたスレッドに戻らなくなるため、methodAsyncメソッドが終了した時にcallMethodメソッドに戻ることができる。
// デッドロックが起きないサンプルコード(GUIアプリケーションのみ)
public void callMethod()
{
methodAsync().Wait();
}
public async Task methodAsync()
{
return await Task.Delay(1000).ConfigureAwait(false);
}
Parallelクラス
CPU処理を並列実行する場合、Parallel
クラスを使用する。
以下の例では、methodAsync1メソッド、methodAsync2メソッド、methodAsync3メソッドが並列実行される。
ただし、CPUのコア数やスレッドプールの空き状況によっては、並列実行されない場合もあることに注意する。
なお、Parallel
クラスには、Parallel.Invoke
メソッドの他にParallel.For
メソッドやParallel.ForEach
メソッドも存在する。
また、Parallel.Invoke
メソッドの第1引数のParallelOptions
クラスは省略可能である。
// Invokeメソッドの引数はAction型
Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
methodAsync1, // 戻り値が無いAction型の場合
() => { methodAsync2(100); }, // 戻り値があるFunc型の場合
() =>
{ // 直接処理を記述してもよい
int n = 400;
methodAsync3(n);
}
);
PLINQ(ParallelEnumerable)
LINQ
(PLINQ
)を使用して、並列実行することもできる。
PLINQ
では、AsParallel
拡張メソッド以降のコードが並列化される。
以下の例では、処理A(ファイルの読み込み)はシングルスレッドで実行、処理B(ハッシュ値計算)はマルチスレッドで実行している。
なお、AsParallel
拡張メソッドの実行後のWithDegreeOfParallelism
拡張メソッドは省略可能である。
// ファイルのハッシュ値を計算するプログラム
var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
var filehash = files
.Select(f => new { File = f, Data = File.ReadAllBytes(f) }) // 処理A(ファイルの読み込み)
.AsParallel()
.WithDegreeOfParallelism(4) // 最大同時並列数:4
.Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)}) // 処理B(ハッシュ値計算)
.ToArray();
例外処理
Task
クラスやParallel
クラスで発生する例外は、例外AggregateException
としてキャッチすることができる。
try
{
Parallel.Invoke(
() => throw new ArgumentException(),
() => throw new InvalidOperationException(),
() => throw new FormatException()
);
}
catch (AggregateException exception)
{
// 原因となるExceptionを確認する場合、FlattenメソッドのInnerExceptionsプロパティを使用する
var exceptions = exception.Flatten().InnerExceptions;
foreach (var ex in exceptions)
{
Debug.WriteLine(ex.GetType());
}
}
catch (Exception ex)
{ // 未処理の例外をキャッチ
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHをキャッチ
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
ただし、Task
クラスをawait
する場合、個別のException
クラスを使用してキャッチすることもできる。
try
{
var addresses = await Dns.GetHostAddressesAsync("example.jp");
}
catch(SocketException exception)
{
Debug.WriteLine(exception.ToString());
}
catch (Exception ex)
{ // 未処理の例外をキャッチ
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHをキャッチ
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
Task
クラスで発生したキャッチされていない例外をまとめて処理する場合、TaskScheduler.UnobservedTaskException
メソッドを使用する。
ただし、これは、例外が発生したタイミングでキャッチできないこと、および、例外をキャッチできたとしても対処可能なことが限られることに注意する。
private async void buttonl_Click(object sender, RoutedEventArgs e)
{
await Task.Run(() =>
{
await Task.Delay(1000);
});
// 非同期処理の未処理例外をキャッチする
TaskScheduler.UnobservedTaskException += (sender, e) =>
{
AggregateException ae = e.Exception;
Debug.WriteLine(ae.ToString());
e.SetObserved(); // .NET Framework 4.0の場合、これを記述しないとソフトウェアが強制終了する
};
}
上記の例外処理において、以下のようなメソッドを記述すると便利である。
ValueTask
構造体ではなくTask
クラスでもよいが、ValueTask
構造体にすることにより、アロケーションコストを削減できる可能性がある。
public void Button_Click(object sender, EventArgs e)
{
SafeAsyncBlock(async () =>
{
// 非同期で取得
var body = await httpClient.GetStringAsync("https://example.com/foobar");
var foobar = JsonConvert.DeserializeObject<FooBar>(body);
// 表示
textBox.Text = $"${foobar.Price}";
});
}
// 非同期処理で発生する例外を安全に処理する
public static async void SafeAsyncBlock(Func<ValueTask> action)
{
try
{ // 非同期処理を実行
await action();
}
catch (Exception ex)
{ // 未処理の例外を補足する
Trace.WriteLine($"Log: {ex}");
}
catch
{ // SEHも補足
Trace.WriteLine("Log: Caught uninterpreted exception.");
}
}
※注意
普段は考慮する必要はないと思われるが、SEHを非同期処理の例外として伝搬して継続させることはできない。
したがって、catch
句で補足できるSEHは、最初のタスクコンテキストスイッチが発生する以前のSEHのみとなる。
1度タスクコンテキストスイッチが発生した後の非同期継続処理でSEHが発生した場合は、この方法では対処できない。
おそらく、ThreadPool
クラスから割り当てられたワーカースレッドの根元に伝搬する、または、SynchronizationContext
がホストするスレッドの根元に伝搬する。
AppDomain
でフックする以外に良い方法は存在しないかもしれないことに注意する。