C Sharpの基礎 - マルチスレッド

提供:MochiuWiki : SUSE, EC, PCB
2021年11月29日 (月) 17:01時点におけるWiki (トーク | 投稿記録)による版 (→‎例外処理)
ナビゲーションに移動 検索に移動

概要

.NET Framework 4.5で導入されたC# 5.0(Visual Studio 2012) からは、マルチスレッドプログラミングをコンパイラレベルでサポートしている。
具体的には、asyncとawaitというキーワードが導入された。
これは、タスクベース非同期パターン(Task-based Asynchronous Pattern、TAP)を実装するものである。

このタスクベース非同期パターンでは、処理の開始はMethodNameAsyncという形式をとる。
そしてその戻り値を、Task、Task<T>、voidとし、Taskを待つにはawaitを付加する。
また、awaitを付加した関数には、asyncを付加する。

下記の例は、Task<int>型を返す非同期関数である。

 public async Task<int> MethodAsync()
 {
    var iRet = await Task.Run(() =>
                     {
                        await Task.Delay(5000);
                        return 0;
                     });
    return iRet;
 }



asyncとawaitの動作

タスクとは、一連の処理をひとまとまりにした単位である。
Taskクラスはマルチスレッドで実行されるようになっており、連続で記述する場合は、記述した個数だけマルチスレッドで並列処理される。

Task.Runメソッドは、大量の計算のみを行う処理を並列実行させる時のみに使用する場合、または、既存のメソッドを変更せずに使用する場合に使用する。

 public async Task MethodAsync()
 {
    var task1 = Task.Run(() =>
                {
                   // 何か処理1
                });
 
    var task2 = Task.Run(() =>
                {
                   // 何か処理2
                });
 
    var task12  = Task.WhenAll(task1, task2);
 
    var taskAll = task12.ContinueWith(() =>
                  {
                     // 何か処理3
                  });
 
    return await taskAll.ConfigureAwait(false);
 }


Task.Runメソッドには、引数を渡すオーバーロードが存在しない。
引数を渡す場合、Taskクラスのインスタンスの生成と同時に引数を渡して、TaskクラスのStartメソッドを使用してスレッドを実行する必要がある。

 private async void MethodAsync()
 {
    int m = 999;
 
    Task<int> task = new Task(x =>
    {
       Thread.Sleep(3000);
       return x * 2;
    }, m);
 
    task.Start();
 
    m = 100;
 
    int result = await task;  // result : 1998
    Console.WriteLine($"{result}");
 }



Wait / Result

コンソールソフトウェアにおいて、WaitメソッドおよびResultメソッドを使用して、非同期メソッドを作成することができる。

 HttpClient hc = new HttpClient();
 
 string html = hc.GetStringAsync("http://example.jp/").Result;


また、WaitメソッドおよびResultメソッド以外に、WaitAllメソッドやWaitAnyメソッドが存在する。

 HttpClient hc = new HttpClient();
 
 Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
 Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
 
 // タスクが終わるまでスレッドをブロック
 t1.Wait();
 
 // タスクが終わるまでスレッドをブロックして結果を取得
 string binghtml = t2.Result;
 
 // どれかのタスクが終わるまでスレッドをブロック
 int completedTaskIndex = Task.WaitAny(t1, t2);
 // 0 : タスクt1の方が速い場合
 // 1 : タスクt2の方が速い場合
 
 // どれかのタスクが終わるまでスレッドをブロック(タイムアウトあり)
 int completedTaskIndex2 = Task.WaitAny(new[] { t1, t2 }, 50);
 // 0  : タスクt1の方が速い場合
 // 1  : タスクt2の方が速い場合
 // -1 : タスクt1およびt2とも50[ms]以内に応答がない場合
 
 // 全てタスクが終わるまでスレッドをブロック
 Task.WaitAll(t1, t2);
 
 // 全てのタスクが終わるまでスレッドをブロック(タイムアウトあり)
 bool allTasksCompleted = Task.WaitAll(new[] { t1, t2 }, 50);
 // true  : タスクt1およびt2とも50[ms]以内に応答がある場合
 // false : タスクt1またはt2のいずれか一方、または、タスクt1およびt2とも50[ms]以内に応答がない場合



WhenAny

複数のタスクのうち、いずれか1つのみが完了するまで待機する場合、WhenAnyメソッドを使用する。

 HttpClient hc = new HttpClient();
 
 Task<string> t1 = hc.GetStringAsync("https://www.microsoft.com/");
 Task<string> t2 = hc.GetStringAsync("https://www.bing.com/");
 
 Task<string> completedTask = await Task.WhenAny(t1, t2);


これを応用して、処理継続中のタスクをキャンセルすることもできる。

 HttpClient hc = new HttpClient();
 CancellationTokenSource cts = new CancellationTokenSource();
 
 // GetStringAsyncメソッドにはCancellationToken構造体を受けるオーバーロードが無いため、GetAsyncメソッドを使用している
 Task<HttpResponseMessage> t1 = hc.GetAsync("https://www.microsoft.com/", cts.Token);
 Task<HttpResponseMessage> t2 = hc.GetAsync("https://www.bing.com/", cts.Token);
 
 // 完了したタスクを取得
 Task<HttpResponseMessage> completedTask = await Task.WhenAny(t1, t2);
 
 // 他の処理中のタスクは全てキャンセルする
 cts.Cancel();
 
 // 完了したタスクの結果を取得
 HttpResponseMessage msg = await completedTask;
 string html = await msg.Content.ReadAsStringAsync();



WhenAllメソッド / ContinueWithメソッド

TaskクラスはContinueWithメソッドを使用して、その後の処理を連続して実行できる。

例えば、2つ以上のタスクを実行して全てのタスクが完了した後に続けてタスクを実行する場合、ContinueWithメソッドまたはawaitを使用してタスクの完了を待つ。

 // パターン1
 var task1 = Task.Run(() =>
             {
                // 何か処理
             });
 
 var task2 = Task.Run(() =>
             {
                // 何か処理
             });
 
 var task3 = Task.Run(() =>
             {
                // 何か処理
             });
 
 var taskAll = Task.WhenAll(task1, task2).ContinueWith(task3);  // awaitは不要


 // パターン2
 var task1 = Task.Run(() =>
             {
                // 何か処理
             });
 
 await task1.ConfigureAwait(false);
 
 var task2 = Task.Run(() =>
             {
                // 何か処理
             });
 
 await task2.ConfigureAwait(false);
 
 var task3 = Task.Run(() =>
             {
                // 何か処理
             });
 
 await task3.ConfigureAwait(false);


状況にもよるが、処理が膨大になると可読性が落ちるため、先にタスクをまとめて記述して、タスク間の関係性が見通せるパターン1の方が良い。


IsCompletedプロパティ

タスクが完了しているかどうかを確認する場合、IsCompletedプロパティを使用する。

IsCompletedSuccessfullyプロパティもあるが、これはタスクが成功および完了した場合のみtrueになる。
また、IsFaultedプロパティやIsCanceledプロパティも存在する。

 HttpClient hc = new HttpClient();
 Task<string> task = hc.GetStringAsync("https://www.microsoft.com/");
 
 // タスクが成功および完了したかどうかを確認
 if (!task.IsCompleted)
 {
    // この時点でタスクt1が成功および完了している場合、このメッセージは表示されない
    Console.WriteLine("ちょっとまってね");
 }
 
 var html = await task;
 Console.WriteLine(html);



ConfigureAwait(false)

GUIアプリケーションにおいて、WaitメソッドおよびResultメソッドを使用する場合、デッドロックが発生する。
これは、TaskクラスのWaitメソッドやResultメソッドは、元のスレッドへ戻るからである。

以下の例では、まず、methodAsync().Wait()を実行した時点で親スレッドがスリープする。
次に、methodAsyncメソッドの完了後、親スレッドに戻る時、既に親スレッドがスリープしているため、処理を続けることができない。
そのため、親スレッドは子スレッドを待ち続け、子スレッドは親スレッドに戻ろうとして、デッドロックが発生する。

 // デッドロックが起きるサンプルコード(GUIアプリケーションのみ)
 // コンソールアプリケーションでは正常に動作する
 public void callMethod()
 {
    methodAsync().Wait();
 }
 
 public async Task methodAsync()
 {
    return await Task.Delay(1000);
 }


対策として、ConfigureAwait(false)を使用するとデッドロックが回避できる。

ConfigureAwait(false)は、メソッドの終了後に親スレッド(呼び出し元)に戻らなくてもよいことを指定する。
ブロックされたスレッドに戻らなくなるため、methodAsyncメソッドが終了した時にcallMethodメソッドに戻ることができる。

 // デッドロックが起きないサンプルコード(GUIアプリケーションのみ)
 public void callMethod()
 {
    methodAsync().Wait();
 }
 
 public async Task methodAsync()
 {
    return await Task.Delay(1000).ConfigureAwait(false);
 }



Parallelクラス

CPU処理を並列実行する場合、Parallelクラスを使用する。

以下の例では、methodAsync1メソッド、methodAsync2メソッド、methodAsync3メソッドが並列実行される。
ただし、CPUのコア数やスレッドプールの空き状況によっては、並列実行されない場合もあることに注意する。

なお、Parallelクラスには、Parallel.Invokeメソッドの他にParallel.ForメソッドやParallel.ForEachメソッドも存在する。
また、Parallel.Invokeメソッドの第1引数のParallelOptionsクラスは省略可能である。

 // Invokeメソッドの引数はAction型
 Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 4 }, // 最大同時並列数:4
                 methodAsync1,                  // 戻り値が無いAction型の場合
                 () => { methodAsync2(100); },  // 戻り値があるFunc型の場合
                 () => 
                 {  // 直接処理を記述してもよい
                    int n = 400;
                    methodAsync3(n);
                 }
                );



PLINQ(ParallelEnumerable)

LINQ(PLINQ)を使用して、並列実行することもできる。
PLINQでは、AsParallel拡張メソッド以降のコードが並列化される。

以下の例では、処理A(ファイルの読み込み)はシングルスレッドで実行、処理B(ハッシュ値計算)はマルチスレッドで実行している。
なお、AsParallel拡張メソッドの実行後のWithDegreeOfParallelism拡張メソッドは省略可能である。

 // ファイルのハッシュ値を計算するプログラム
 var files = Directory.GetFiles(Environment.SystemDirectory, "*.exe");
 var filehash = files
                .Select(f => new { File = f, Data = File.ReadAllBytes(f) })                     // 処理A(ファイルの読み込み)
                .AsParallel()
                .WithDegreeOfParallelism(4)                                                     // 最大同時並列数:4
                .Select(f => new { File = f.File, Hash = SHA256.Create().ComputeHash(f.Data)})  // 処理B(ハッシュ値計算)
                .ToArray();



例外処理

TaskクラスやParallelクラスで発生する例外は、例外AggregateExceptionとしてキャッチすることができる。

 try
 {
    Parallel.Invoke(
                    () => throw new ArgumentException(),
                    () => throw new InvalidOperationException(),
                    () => throw new FormatException()
                   );
 }
 catch (AggregateException exception)
 {
    // 原因となるExceptionを確認する場合、FlattenメソッドのInnerExceptionsプロパティを使用する
    var exceptions = exception.Flatten().InnerExceptions;
    foreach (var ex in exceptions)
    {
       Debug.WriteLine(ex.GetType());
    }
 }
 catch (Exception ex)
 {  // 未処理の例外をキャッチ
    Trace.WriteLine($"Log: {ex}");
 }
 catch
 {  // SEHをキャッチ
    Trace.WriteLine("Log: Caught uninterpreted exception.");
 }


ただし、Taskクラスをawaitする場合、個別のExceptionクラスを使用してキャッチすることもできる。

 try
 {
    var addresses = await Dns.GetHostAddressesAsync("example.jp");
 }
 catch(SocketException exception)
 {
    Debug.WriteLine(exception.ToString());
 }
 catch (Exception ex)
 {  // 未処理の例外をキャッチ
    Trace.WriteLine($"Log: {ex}");
 }
 catch
 {  // SEHをキャッチ
    Trace.WriteLine("Log: Caught uninterpreted exception.");
 }


Taskクラスで発生したキャッチされていない例外をまとめて処理する場合、TaskScheduler.UnobservedTaskExceptionメソッドを使用する。
ただし、これは、例外が発生したタイミングでキャッチできないこと、および、例外をキャッチできたとしても対処可能なことが限られることに注意する。

 private async void buttonl_Click(object sender, RoutedEventArgs e)
 {
    await Task.Run(() =>
    {
       await Task.Delay(1000);
    });
 
    // 非同期処理の未処理例外をキャッチする
    TaskScheduler.UnobservedTaskException += (sender, e) =>
    {
       AggregateException ae = e.Exception;
       Debug.WriteLine(ae.ToString());
       e.SetObserved();  // .NET Framework 4.0の場合、これを記述しないとソフトウェアが強制終了する
    };
 }


上記の例外処理において、以下のようなメソッドを記述すると便利である。
ValueTask構造体ではなくTaskクラスでもよいが、ValueTask構造体にすることにより、アロケーションコストを削減できる可能性がある。

 public void Button_Click(object sender, EventArgs e)
 {
    SafeAsyncBlock(async () =>
                   {
                      // 非同期で取得
                      var body = await httpClient.GetStringAsync("https://example.com/foobar");
                      var foobar = JsonConvert.DeserializeObject<FooBar>(body);
 
                      // 表示
                      textBox.Text = $"${foobar.Price}";
                   });
 }
 
 // 非同期処理で発生する例外を安全に処理する
 public static async void SafeAsyncBlock(Func<ValueTask> action)
 {
    try
    {  // 非同期処理を実行
       await action();
    }
    catch (Exception ex)
    {  // 未処理の例外を補足する
        Trace.WriteLine($"Log: {ex}");
    }
    catch
    {  // SEHも補足
       Trace.WriteLine("Log: Caught uninterpreted exception.");
    }
 }


※注意
普段は考慮する必要はないと思われるが、SEHを非同期処理の例外として伝搬して継続させることはできない。
したがって、catch句で補足できるSEHは、最初のタスクコンテキストスイッチが発生する以前のSEHのみとなる。

1度タスクコンテキストスイッチが発生した後の非同期継続処理でSEHが発生した場合は、この方法では対処できない。
おそらく、ThreadPoolクラスから割り当てられたワーカースレッドの根元に伝搬する、または、SynchronizationContextがホストするスレッドの根元に伝搬する。

AppDomainでフックする以外に良い方法は存在しないかもしれないことに注意する。