Task 클래스
- 스레드의 단점
* 스레드의 반환값을 얻기 힘듬. 어떤 형태로든 공유 필드를 만들어야 함
* 스레드의 예외를 잡아서 전파하기 힘듬
* 스레드의 연산이 끝난 후 다른 뭔가를 수행하게 만들 수 없음(Join은 현재 스레드가 차단됨)
* 세밀한 동시성을 구현하는데 방해가 많음 - 조합 능력이 부족함
* 수많은 스레드를 사용할 경우 유지하는데만 많은 메모리가 소모됨
- Task
* 스레드보다 더 높은 수준의 추상
* 하나의 동시적 연산(Concurrent Operation)을 대표
* 조합 할 수 있음(연속(Continuation) 기능을 이용해 여러 Task를 사슬처럼 이을 수 있음)
* 시동 잠복지연(Startup Latency)를 완화하기 위해 스레드 풀을 활용 할 수 있음
* TaskCompletionSource와 함께 콜백 방식으로 활용 가능
* .NET 4.0에서 병렬 프로그래밍 라이브러리의 일부로 도입 됨
* 계속 개선되어서 이제는 일반적인 동시성 시나리오에서도 잘 작동함
* C#의 비동기 함수들의 바탕이 되는 형식이 됨
Task를 이용한 동시 연산 시작
1. .NET 4.5
- Task.Run
- Action 형식의 대리자를 지정
2. .NET 4.0
- Task.Factory.StartNew
- Task.Run은 하나의 Task 객체를 돌려주고, 해당 객체를 통해 작업의 진행 상황을 조회 할 수 있음
- 이미 시작된 작업 객체를 돌려주는데, 시작되지 않은 작업 객채는 생성자를 통해 만들 수 있음
- 객채의 Status 속성으로 실행 상태를 확인 할 수 있음
namespace System.Threading.Tasks
{
public enum TaskStatus
{
Created = 0,
WaitingForActivation = 1,
WaitingToRun = 2,
Running = 3,
WaitingForChildrenToComplete = 4,
RanToCompletion = 5,
Canceled = 6,
Faulted = 7
}
}
Wait 메서드
- 어떤 작업 객체에 Wait를 호출하면 그 작업이 완료될 때까지 현재 스레드의 작업이 차단됨
- Thread의 Join과 비슷
- 만료시간과 취소 토큰을 받는 버전이 있음. 이를 통해 작업이 완료되기 전에 대기를 끝낼 수 있음
Task task = Task.Run(() =>
{
Thread.Sleep(2000);
Console.WriteLine("Foo");
});
Console.WriteLine(task.IsCompleted);
task.Wait();
오래 실행되는 작업
- CLR은 작업 객체들을 풀 스레드에서 실행함
- 풀 스레드는 짧게 실행되는 계산량 한정 연산에 이상적
- 스레드를 차단하는 연산에서는 다음과 같이 풀 스레드를 사용하지 않도록 해야 함
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Practice
{
class Program
{
// 튜플 리스트
public class PairList<TKey, TValue> : List<Tuple<TKey, TValue>>
{
public void Add(TKey key, TValue value)
{
Add(new Tuple<TKey, TValue>(key, value));
}
}
public static void Main(string[] arg)
{
const int testCase = 50;
const int delay = 1000;
const string message = "!";
Stopwatch sw = new Stopwatch();
var taskList = new List<Task>();
var threadList = new List<Thread>();
var eventList = new List<ManualResetEvent>();
var swResultList = new PairList<string, long>();
string tempString = null;
//////////////////////////////////////
// 1. Task.Factory.StartNew (With LongRunning Option)
tempString = "1. Task.Factory.StartNew (With LongRunning Option)";
Console.WriteLine(tempString);
sw.Start();
for (int i = 0; i < testCase; i++)
{
taskList.Add(Task.Factory.StartNew(() =>
{
Thread.Sleep(delay);
Console.Write(message);
}, TaskCreationOptions.LongRunning));
}
Task.WaitAll(taskList.ToArray());
sw.Stop();
swResultList.Add(tempString, sw.ElapsedMilliseconds);
Console.WriteLine('\n');
//////////////////////////////////////
// 2. Task.Factory.StartNew (Without LongRunning Option)
tempString = "2. Task.Factory.StartNew (Without LongRunning Option)";
Console.WriteLine(tempString);
taskList.Clear();
sw.Restart();
for (int i = 0; i < testCase; i++)
{
taskList.Add(Task.Factory.StartNew(() =>
{
Thread.Sleep(delay);
Console.Write(message);
}));
}
Task.WaitAll(taskList.ToArray());
sw.Stop();
swResultList.Add(tempString, sw.ElapsedMilliseconds);
Console.WriteLine('\n');
//////////////////////////////////////
// 3. Task.Run
tempString = "3. Task.Run";
Console.WriteLine(tempString);
taskList.Clear();
sw.Restart();
for (int i = 0; i < testCase; i++)
{
taskList.Add(Task.Run(() =>
{
Thread.Sleep(delay);
Console.Write(message);
}));
}
Task.WaitAll(taskList.ToArray());
sw.Stop();
swResultList.Add(tempString, sw.ElapsedMilliseconds);
Console.WriteLine('\n');
//////////////////////////////////////
// 4. Thread
tempString = "4. Thread";
Console.WriteLine(tempString);
sw.Restart();
for (int i = 0; i < testCase; i++)
{
threadList.Add(new Thread(() =>
{
Thread.Sleep(delay);
Console.Write(message);
}));
threadList[i].Start();
}
foreach (var item in threadList)
{
item.Join();
}
sw.Stop();
swResultList.Add(tempString, sw.ElapsedMilliseconds);
Console.WriteLine('\n');
//////////////////////////////////////
// 5. ThreadPool.QueueUserWorkItem
tempString = "5. ThreadPool.QueueUserWorkItem";
Console.WriteLine(tempString);
sw.Restart();
for (int i = 0; i < testCase; i++)
{
var resetEvent = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem((nonUsed) =>
{
Thread.Sleep(delay);
Console.Write(message);
resetEvent.Set();
});
eventList.Add(resetEvent);
}
WaitHandle.WaitAll(eventList.ToArray());
sw.Stop();
swResultList.Add(tempString, sw.ElapsedMilliseconds);
Console.WriteLine('\n');
//////////////////////////////////////
// end. Result
swResultList.Sort((x, y) => x.Item2.CompareTo(y.Item2));
Console.WriteLine("\n=== Result ===");
int index = 0;
foreach (var item in swResultList)
{
index++;
Console.WriteLine($"#{index} - {item.Item1}: {item.Item2}ms");
}
}
}
}
Task<TResult> 클래스 - 작업 결과 반환
- Task의 파생 클래스
- Action 대신 Func<TResult> 형식의 대리자 객체로 Task.Run을 호출
- 나중에 Result 속성으로 얻을 수 있음
* 값을 얻기 전에 속성에 접근하면 Task가 끝나서 Result가 산출 될 때 까지 호출 스레드의 차단
예외
- Thread와 달리 예외를 잘 전파함
- Wait이나 Result 속성에 접근한 코드 쪽으로 예외를 다시 던져줌
- AggreateException 형태로 감싸서 전달(병렬 프로그래밍을 위해서)
- IsCanceled: OperationCanceledException - 토큰으로 취소된 경우의 예외 발생시 true
- IsFaulted: 취소 예외를 제외한 모든 예외 발생시 true
- Exception: 오류의 종류를 받아올 수 있음
자율 작업
- 자율 작업: 띄운 다음 신경 쓰지 않는 작업 (Wait, Result을 참조하지 않거나 연속 작업이 없는 경우)
- 미관찰 예외: 자율 작업의 미처리 예외
- CLR 4.0
* 미관찰 예외 발생시 프로그램 종료
* GC가 미관찰 예외를 발생한 Task를 수거 시, 종료자에서 다시 예외를 던져줌
* 예외 발생 시점이 애매하고, 비동기성의 특정 패턴을 복잡하게 만듬
- CLR 4.5
* 미관찰 예외가 발생해도 프로그램에는 문제가 없음
- 미관찰 예외의 전역 수준 캐치
* TaskScheduler.UnobservedTaskException 구독
- 미관찰 예외의 미묘한 사항
* Task 객체의 작업이 끝나길 만료 시간을 지정해서 기다릴 때, 만료 시간이 지난 후에 예외가 발생하면 미관찰 예외
* Task 객체에 작업에 장애가 발생한 후에 Exception 속성을 조회하면 관찰 예외로 변함
연속 (Continuation)
- 작업 객체에게 지금 하는 일을 마치면 계속하여 다른 일을 실행하라고 지시
- 콜백의 형태로 구현
- GetAwaiter: 대기자(Awaiter) 객체가 반환
* 대기자 객체에 대한 OnCompleted
: 선행 작업(Antecedent task)의 완료(장애)시 호출될 대리자를 지정
* 대기자 객체에 대한 GetResult
: 결과 반환, 예외의 경우 AggregateException으로 재포장되지 않고 던져짐
: 비제네릭 Task의 경우 반환 형식은 void, 예외를 다시 던지기만 할 뿐
- ConfigureAwait
* 동기화 문맥을 연속용 콜백에 전달하는 기능 - UI 스레드 문맥 전달의 설정
* 기능을 사용하지 않는다면 false로 끄는 것이 좋음
- ContinueWith
* 연속용 콜백을 부착하는 다른 방법
* 결과로 Task를 돌려주기 때문에 연속 사용에 유용
* 장애가 발생할 여지가 있다면 AggregateException을 직접 처리해줘야 함
* UI 응용 프로그램에서는 연속 작업을 마샬링하는 코드도 작성해야 함
* TaskContinuationOptions.ExecuteSynchronously
: 비 UI 문맥에서, 연속 작업이 같은 스래드에서 실행되게 설정
: 설정하지 않으면 연속용 콜백이 풀 스레드에서 동작하게 됨
TaskCompletionSource
- 어떤 연산을 지금 바로가 아니라 잠시 후에 시작하는 작업 객체를 생성 할 수 있음
- 연산이 진행되는 동안 스레드를 차단하지 않고도 작업 객체의 모든 장점을 누릴 수 있음
1. TaskCompletionSource 인스턴스 생성
2. TaskCompletionSource 인스턴스의 Task 속성으로 얻은 Task를 다뤄서 작업 실시
3. TaskCompletionSource 인스턴스의 메소드들을 이용하여 작업의 진행을 제어
* 메서드 중 하나를 호출하면 Task 객체에 신호가 전달 됨
: 이후 작업 완료, 장애, 취소 상태 등으로 변경
* 각 TCS 인스턴스에 한해 한번만 호출 가능(이후 예외나 false를 던짐)
- TaskCompletionSource + Timer = Delay
static Task Delay(int milliseconds)
{
// Task 형식으로 반환을 해줘야 하는데,
// TaskCompletionSource의 비제네릭 버전이 없으므로
// 임의의 형식(여기서는 object) 형식으로 만들고
// Task<object>을 가져와 Task 형식으로 돌려줌(파생형이라 가능)
var tcs = new TaskCompletionSource<object>();
var timer = new System.Timers.Timer(milliseconds) { AutoReset = false };
timer.Elapsed += delegate
{
timer.Dispose();
tcs.SetResult(null);
};
timer.Start();
return tcs.Task;
}
Task.Delay 메서드
- 위의 TaskCompletionSource + Timer = Delay 예제와 동일한 일을 수행
- Thread.Sleep의 비동기 버전
Task.WhenAny 메서드
- 주어진 작업 객체 중 하나라도 완료되면 완료되는 작업 객체를 돌려 줌
public static void Main(string[] arg)
{
Task k = Go();
k.Wait();
Console.WriteLine(Test(5000).Result);
}
// WhenAny의 기본적 사용
static async Task Go()
{
Task<int> winningTask = await Task.WhenAny(Delay1(), Delay2(), Delay3());
Console.WriteLine("Done");
// 앞에서 await가 있기에 winningTask는 이미 구해져 있으나,
// 그래도 여기서 await을 다시 해주는게 좋음
// (미처리 예외들을 캐치 할 수 있기 때문에)
Console.WriteLine(await winningTask);
}
// 시간 만료 기능
static async Task<int> Test(int timeout)
{
Task<int> myTask = Delay1();
// WhenAny에서 서로 다른 작업 객체를 지정했기 때문에, 반환형은 Task가 됨
Task winner = await Task.WhenAny(myTask, Task.Delay(timeout));
if (winner != myTask)
{
throw new TimeoutException();
}
return await myTask;
}
static async Task<int> Delay1() { await Task.Delay(1000); return 1; }
static async Task<int> Delay2() { await Task.Delay(2000); return 2; }
static async Task<int> Delay3() { await Task.Delay(3000); return 3; }
WhenAll 메서드
- 주어진 작업 객체들이 모두 완료되면 완료되는 작업 객체를 돌려 줌
// Case 1
await Task.WhenAll(Delay1(), Delay2(), Delay3());
// Case 2
Task task1 = Delay1();
Task task2 = Delay2();
Task task3 = Delay3();
await task1;
await task2;
await task3;
- Case 1
* 모든 작업이 완료되기 전에는 먼저 완료되지 않음 (일부 작업에 장애가 있더라도)
* 여러 작업 객체에서 장애가 발생했다면 새로 돌려준 Task 객체의 AggregateException 속성으로 합쳐짐
- Case 2
* await을 3번이나 호출해야 함
* task1에서 장애 발생시 task2, task3의 대기가 수행되지 않고, 그 둘에서 발생한 예외는 미관찰 예외가 됨
// Case 1
// 처음 한 번의 예외만 캐치하고, 나머지는 미관찰
try
{
int[] ints = await Task.WhenAll(Delay1(), Delay2(), Delay3());
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
// Cas 2
// 모든 예외를 캐치
Task all = Task.WhenAll(Delay1(), Delay2(), Delay3());
try
{
await all;
}
catch
{
Console.WriteLine(all.Exception);
}
커스텀 작업 조합기
- 임의의 작업에 시간 만료 대기 기능 추가
// 임의의 작업에 시간 만료 대기 기능을 추가
async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny(task, Task.Delay(timeout)));
if (winner != task)
{
throw new TimeoutException();
}
return await task;
}
- 임의의 작업에 CancellationToken을 통해 작업을 폐기할 수 있는 기능 추가
// 임의의 작업에 CancellationToken을 통해 작업을 폐기할 수 있는 기능 추가
static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancelToken)
{
var tcs = new TaskCompletionSource<TResult>();
var reg = cancelToken.Register(() => tcs.TrySetCanceled());
task.ContinueWith(ant =>
{
reg.Dispose();
if (ant.IsCanceled)
{
tcs.TrySetCanceled();
}
else if (ant.IsFaulted)
{
tcs.TrySetException(ant.Exception.InnerException);
}
else
{
tcs.TrySetResult(ant.Result);
}
});
return tcs.Task;
}
// WhenAll과 비슷하지만, 하나라도 장애가 있으면 조합이 즉시 실패
async Task<TResult[]> WhenAllorError<TResult>(params Task<TResult>[] tasks)
{
var killJoy = new TaskCompletionSource<TResult[]>();
foreach (var task in tasks)
{
task.ContinueWith(ant =>
{
if (ant.IsCanceled)
{
killJoy.TrySetCanceled();
}
else if (ant.IsFaulted)
{
killJoy.TrySetException(ant.Exception.InnerException);
}
});
}
return await await Task.WhenAny(killJoy.Task, Task.WhenAll(tasks));
}
'C#' 카테고리의 다른 글
[C#] PipeStream (0) | 2023.12.01 |
---|---|
[C#] MemoryStream (0) | 2023.11.24 |
[C#] FileStream (0) | 2023.11.20 |
[C#] Stream (0) | 2023.11.17 |
[C#] Thread (0) | 2023.11.01 |
[C#] Stopwatch (0) | 2023.10.30 |
[C#] PerformanceCounter, PerformanceCounterCategory (0) | 2023.10.28 |
[C#] EventLog (Windows 이벤트 로그) (0) | 2023.10.26 |