본문 바로가기
C#

[C#] Task

by DANEW 2023. 11. 6.

Task 클래스

 

Task 클래스 (System.Threading.Tasks)

비동기 작업을 나타냅니다.

learn.microsoft.com

- 스레드의 단점
 * 스레드의 반환값을 얻기 힘듬. 어떤 형태로든 공유 필드를 만들어야 함
 * 스레드의 예외를 잡아서 전파하기 힘듬
 * 스레드의 연산이 끝난 후 다른 뭔가를 수행하게 만들 수 없음(Join은 현재 스레드가 차단됨)
 * 세밀한 동시성을 구현하는데 방해가 많음 - 조합 능력이 부족함
 * 수많은 스레드를 사용할 경우 유지하는데만 많은 메모리가 소모됨


- Task
 * 스레드보다 더 높은 수준의 추상
 * 하나의 동시적 연산(Concurrent Operation)을 대표
 * 조합 할 수 있음(연속(Continuation) 기능을 이용해 여러 Task를 사슬처럼 이을 수 있음)
 * 시동 잠복지연(Startup Latency)를 완화하기 위해 스레드 풀을 활용 할 수 있음
 * TaskCompletionSource와 함께 콜백 방식으로 활용 가능
 * .NET 4.0에서 병렬 프로그래밍 라이브러리의 일부로 도입 됨
 * 계속 개선되어서 이제는 일반적인 동시성 시나리오에서도 잘 작동함
 * C#의 비동기 함수들의 바탕이 되는 형식이 됨

 

Task를 이용한 동시 연산 시작

1. .NET 4.5
 - Task.Run
 - Action 형식의 대리자를 지정


2. .NET 4.0
 - Task.Factory.StartNew
 - Task.Run은 하나의 Task 객체를 돌려주고, 해당 객체를 통해 작업의 진행 상황을 조회 할 수 있음
 - 이미 시작된 작업 객체를 돌려주는데, 시작되지 않은 작업 객채는 생성자를 통해 만들 수 있음
 - 객채의 Status 속성으로 실행 상태를 확인 할 수 있음

namespace System.Threading.Tasks
{
    public enum TaskStatus
    {
        Created = 0,
        WaitingForActivation = 1,
        WaitingToRun = 2,
        Running = 3,
        WaitingForChildrenToComplete = 4,
        RanToCompletion = 5,
        Canceled = 6,
        Faulted = 7
    }
}

Wait 메서드

- 어떤 작업 객체에 Wait를 호출하면 그 작업이 완료될 때까지 현재 스레드의 작업이 차단됨
- Thread의 Join과 비슷
- 만료시간과 취소 토큰을 받는 버전이 있음. 이를 통해 작업이 완료되기 전에 대기를 끝낼 수 있음

Task task = Task.Run(() =>
{
    Thread.Sleep(2000);
    Console.WriteLine("Foo");
});
Console.WriteLine(task.IsCompleted);
task.Wait();

오래 실행되는 작업

- CLR은 작업 객체들을 풀 스레드에서 실행함
- 풀 스레드는 짧게 실행되는 계산량 한정 연산에 이상적
- 스레드를 차단하는 연산에서는 다음과 같이 풀 스레드를 사용하지 않도록 해야 함

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
 
namespace Practice
{
    class Program
    {
        // 튜플 리스트
        public class PairList<TKey, TValue> : List<Tuple<TKey, TValue>>
        {
            public void Add(TKey key, TValue value)
            {
                Add(new Tuple<TKey, TValue>(key, value));
            }
        }
 
        public static void Main(string[] arg)
        {
            const int testCase = 50;
            const int delay = 1000;
            const string message = "!";
 
            Stopwatch sw = new Stopwatch();
            var taskList = new List<Task>();
            var threadList = new List<Thread>();
            var eventList = new List<ManualResetEvent>();
            var swResultList = new PairList<string, long>();
            string tempString = null;
 
 
            //////////////////////////////////////
            // 1. Task.Factory.StartNew (With LongRunning Option)
 
            tempString = "1. Task.Factory.StartNew (With LongRunning Option)";
            Console.WriteLine(tempString);
            sw.Start();
 
            for (int i = 0; i < testCase; i++)
            {
                taskList.Add(Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(delay);
                    Console.Write(message);
                }, TaskCreationOptions.LongRunning));
            }
            Task.WaitAll(taskList.ToArray());
            sw.Stop();
 
            swResultList.Add(tempString, sw.ElapsedMilliseconds);
            Console.WriteLine('\n');
 
 
            //////////////////////////////////////
            // 2. Task.Factory.StartNew (Without LongRunning Option)
 
            tempString = "2. Task.Factory.StartNew (Without LongRunning Option)";
            Console.WriteLine(tempString);
            taskList.Clear();
            sw.Restart();
 
            for (int i = 0; i < testCase; i++)
            {
                taskList.Add(Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(delay);
                    Console.Write(message);
                }));
            }
            Task.WaitAll(taskList.ToArray());
            sw.Stop();
 
            swResultList.Add(tempString, sw.ElapsedMilliseconds);
            Console.WriteLine('\n');
 
 
            //////////////////////////////////////
            // 3. Task.Run
 
            tempString = "3. Task.Run";
            Console.WriteLine(tempString);
            taskList.Clear();
            sw.Restart();
 
            for (int i = 0; i < testCase; i++)
            {
                taskList.Add(Task.Run(() =>
                {
                    Thread.Sleep(delay);
                    Console.Write(message);
                }));
            }
            Task.WaitAll(taskList.ToArray());
            sw.Stop();
 
            swResultList.Add(tempString, sw.ElapsedMilliseconds);
            Console.WriteLine('\n');
 
 
            //////////////////////////////////////
            // 4. Thread
 
            tempString = "4. Thread";
            Console.WriteLine(tempString);
            sw.Restart();
 
            for (int i = 0; i < testCase; i++)
            {
                threadList.Add(new Thread(() =>
                {
                    Thread.Sleep(delay);
                    Console.Write(message);
                }));
                threadList[i].Start();
            }
 
            foreach (var item in threadList)
            {
                item.Join();
            }
            sw.Stop();
 
            swResultList.Add(tempString, sw.ElapsedMilliseconds);
            Console.WriteLine('\n');
 
 
            //////////////////////////////////////
            // 5. ThreadPool.QueueUserWorkItem
 
            tempString = "5. ThreadPool.QueueUserWorkItem";
            Console.WriteLine(tempString);
            sw.Restart();
 
            for (int i = 0; i < testCase; i++)
            {
                var resetEvent = new ManualResetEvent(false);
                ThreadPool.QueueUserWorkItem((nonUsed) =>
                {
                    Thread.Sleep(delay);
                    Console.Write(message);
                    resetEvent.Set();
                });
                eventList.Add(resetEvent);
            }
 
            WaitHandle.WaitAll(eventList.ToArray());
            sw.Stop();
 
            swResultList.Add(tempString, sw.ElapsedMilliseconds);
            Console.WriteLine('\n');
 
 
            //////////////////////////////////////
            // end. Result
 
            swResultList.Sort((x, y) => x.Item2.CompareTo(y.Item2));
 
            Console.WriteLine("\n=== Result ===");
 
            int index = 0;
            foreach (var item in swResultList)
            {
                index++;
                Console.WriteLine($"#{index} - {item.Item1}: {item.Item2}ms");
            }
        }
    }
}

Task<TResult> 클래스 - 작업 결과 반환

- Task의 파생 클래스
- Action 대신 Func<TResult> 형식의 대리자 객체로 Task.Run을 호출
- 나중에 Result 속성으로 얻을 수 있음
 * 값을 얻기 전에 속성에 접근하면 Task가 끝나서 Result가 산출 될 때 까지 호출 스레드의 차단

 

예외

- Thread와 달리 예외를 잘 전파함
- Wait이나 Result 속성에 접근한 코드 쪽으로 예외를 다시 던져줌
- AggreateException 형태로 감싸서 전달(병렬 프로그래밍을 위해서)
- IsCanceled: OperationCanceledException - 토큰으로 취소된 경우의 예외 발생시 true
- IsFaulted: 취소 예외를 제외한 모든 예외 발생시 true
- Exception: 오류의 종류를 받아올 수 있음

 

자율 작업

- 자율 작업: 띄운 다음 신경 쓰지 않는 작업 (Wait, Result을 참조하지 않거나 연속 작업이 없는 경우)
- 미관찰 예외: 자율 작업의 미처리 예외
- CLR 4.0
 * 미관찰 예외 발생시 프로그램 종료
 * GC가 미관찰 예외를 발생한 Task를 수거 시, 종료자에서 다시 예외를 던져줌
 * 예외 발생 시점이 애매하고, 비동기성의 특정 패턴을 복잡하게 만듬
- CLR 4.5
 * 미관찰 예외가 발생해도 프로그램에는 문제가 없음
- 미관찰 예외의 전역 수준 캐치
 * TaskScheduler.UnobservedTaskException 구독
- 미관찰 예외의 미묘한 사항
 * Task 객체의 작업이 끝나길 만료 시간을 지정해서 기다릴 때, 만료 시간이 지난 후에 예외가 발생하면 미관찰 예외
 * Task 객체에 작업에 장애가 발생한 후에 Exception 속성을 조회하면 관찰 예외로 변함

 

연속 (Continuation)

- 작업 객체에게 지금 하는 일을 마치면 계속하여 다른 일을 실행하라고 지시
- 콜백의 형태로 구현
- GetAwaiter: 대기자(Awaiter) 객체가 반환
 * 대기자 객체에 대한 OnCompleted
  : 선행 작업(Antecedent task)의 완료(장애)시 호출될 대리자를 지정
 * 대기자 객체에 대한 GetResult
  : 결과 반환, 예외의 경우 AggregateException으로 재포장되지 않고 던져짐
  : 비제네릭 Task의 경우 반환 형식은 void, 예외를 다시 던지기만 할 뿐
- ConfigureAwait
 * 동기화 문맥을 연속용 콜백에 전달하는 기능 - UI 스레드 문맥 전달의 설정
 * 기능을 사용하지 않는다면 false로 끄는 것이 좋음
- ContinueWith
 * 연속용 콜백을 부착하는 다른 방법
 * 결과로 Task를 돌려주기 때문에 연속 사용에 유용
 * 장애가 발생할 여지가 있다면 AggregateException을 직접 처리해줘야 함
 * UI 응용 프로그램에서는 연속 작업을 마샬링하는 코드도 작성해야 함
 * TaskContinuationOptions.ExecuteSynchronously
  : 비 UI 문맥에서, 연속 작업이 같은 스래드에서 실행되게 설정
  : 설정하지 않으면 연속용 콜백이 풀 스레드에서 동작하게 됨

 

TaskCompletionSource

- 어떤 연산을 지금 바로가 아니라 잠시 후에 시작하는 작업 객체를 생성 할 수 있음
- 연산이 진행되는 동안 스레드를 차단하지 않고도 작업 객체의 모든 장점을 누릴 수 있음
1. TaskCompletionSource 인스턴스 생성
2. TaskCompletionSource 인스턴스의 Task 속성으로 얻은 Task를 다뤄서 작업 실시
3. TaskCompletionSource 인스턴스의 메소드들을 이용하여 작업의 진행을 제어
 * 메서드 중 하나를 호출하면 Task 객체에 신호가 전달 됨
  : 이후 작업 완료, 장애, 취소 상태 등으로 변경
 * 각 TCS 인스턴스에 한해 한번만 호출 가능(이후 예외나 false를 던짐)

- TaskCompletionSource + Timer = Delay

static Task Delay(int milliseconds)
{
    // Task 형식으로 반환을 해줘야 하는데,
 
    // TaskCompletionSource의 비제네릭 버전이 없으므로
    // 임의의 형식(여기서는 object) 형식으로 만들고
    // Task<object>을 가져와 Task 형식으로 돌려줌(파생형이라 가능)
    var tcs = new TaskCompletionSource<object>();
 
    var timer = new System.Timers.Timer(milliseconds) { AutoReset = false };
    timer.Elapsed += delegate
    {
        timer.Dispose();
        tcs.SetResult(null);
    };
    timer.Start();
 
    return tcs.Task;
}

 

Task.Delay 메서드

- 위의 TaskCompletionSource + Timer = Delay 예제와 동일한 일을 수행
- Thread.Sleep의 비동기 버전

 

Task.WhenAny 메서드

- 주어진 작업 객체 중 하나라도 완료되면 완료되는 작업 객체를 돌려 줌

public static void Main(string[] arg)
{
    Task k = Go();
    k.Wait();
 
    Console.WriteLine(Test(5000).Result);
}
 
// WhenAny의 기본적 사용
static async Task Go()
{
    Task<int> winningTask = await Task.WhenAny(Delay1(), Delay2(), Delay3());
    Console.WriteLine("Done");
 
    // 앞에서 await가 있기에 winningTask는 이미 구해져 있으나,
    // 그래도 여기서 await을 다시 해주는게 좋음
    // (미처리 예외들을 캐치 할 수 있기 때문에)
    Console.WriteLine(await winningTask);
}
 
// 시간 만료 기능
static async Task<int> Test(int timeout)
{
    Task<int> myTask = Delay1();
 
    // WhenAny에서 서로 다른 작업 객체를 지정했기 때문에, 반환형은 Task가 됨
    Task winner = await Task.WhenAny(myTask, Task.Delay(timeout));
 
    if (winner != myTask)
    {
        throw new TimeoutException();
    }
 
    return await myTask;
}
 
static async Task<int> Delay1() { await Task.Delay(1000); return 1; }
static async Task<int> Delay2() { await Task.Delay(2000); return 2; }
static async Task<int> Delay3() { await Task.Delay(3000); return 3; }

WhenAll 메서드

- 주어진 작업 객체들이 모두 완료되면 완료되는 작업 객체를 돌려 줌

// Case 1
await Task.WhenAll(Delay1(), Delay2(), Delay3());
 
 
// Case 2
Task task1 = Delay1();
Task task2 = Delay2();
Task task3 = Delay3();
 
await task1;
await task2;
await task3;

 

- Case 1
 * 모든 작업이 완료되기 전에는 먼저 완료되지 않음 (일부 작업에 장애가 있더라도)
 * 여러 작업 객체에서 장애가 발생했다면 새로 돌려준 Task 객체의 AggregateException 속성으로 합쳐짐

- Case 2
 * await을 3번이나 호출해야 함
 * task1에서 장애 발생시 task2, task3의 대기가 수행되지 않고, 그 둘에서 발생한 예외는 미관찰 예외가 됨

// Case 1
// 처음 한 번의 예외만 캐치하고, 나머지는 미관찰
try
{
    int[] ints = await Task.WhenAll(Delay1(), Delay2(), Delay3());
}
catch (Exception ex)
{
    Console.WriteLine(ex);
}
 
// Cas 2
// 모든 예외를 캐치
Task all = Task.WhenAll(Delay1(), Delay2(), Delay3());
try
{
    await all;
}
catch
{
    Console.WriteLine(all.Exception);
}

커스텀 작업 조합기

- 임의의 작업에 시간 만료 대기 기능 추가

// 임의의 작업에 시간 만료 대기 기능을 추가
async static Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
    Task winner = await (Task.WhenAny(task, Task.Delay(timeout)));
 
    if (winner != task)
    {
        throw new TimeoutException();
    }
 
    return await task;
}

- 임의의 작업에 CancellationToken을 통해 작업을 폐기할 수 있는 기능 추가

// 임의의 작업에 CancellationToken을 통해 작업을 폐기할 수 있는 기능 추가
static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancelToken)
{
    var tcs = new TaskCompletionSource<TResult>();
    var reg = cancelToken.Register(() => tcs.TrySetCanceled());
 
    task.ContinueWith(ant =>
    {
        reg.Dispose();
 
        if (ant.IsCanceled)
        {
            tcs.TrySetCanceled();
        }
        else if (ant.IsFaulted)
        {
            tcs.TrySetException(ant.Exception.InnerException);
        }
        else
        {
            tcs.TrySetResult(ant.Result);
        }
    });
 
    return tcs.Task;
}
// WhenAll과 비슷하지만, 하나라도 장애가 있으면 조합이 즉시 실패
async Task<TResult[]> WhenAllorError<TResult>(params Task<TResult>[] tasks)
{
    var killJoy = new TaskCompletionSource<TResult[]>();
 
    foreach (var task in tasks)
    {
        task.ContinueWith(ant =>
        {
            if (ant.IsCanceled)
            {
                killJoy.TrySetCanceled();
            }
            else if (ant.IsFaulted)
            {
                killJoy.TrySetException(ant.Exception.InnerException);
            }
        });
    }
 
    return await await Task.WhenAny(killJoy.Task, Task.WhenAll(tasks));
}
반응형

'C#' 카테고리의 다른 글

[C#] PipeStream  (0) 2023.12.01
[C#] MemoryStream  (0) 2023.11.24
[C#] FileStream  (0) 2023.11.20
[C#] Stream  (0) 2023.11.17
[C#] Thread  (0) 2023.11.01
[C#] Stopwatch  (0) 2023.10.30
[C#] PerformanceCounter, PerformanceCounterCategory  (0) 2023.10.28
[C#] EventLog (Windows 이벤트 로그)  (0) 2023.10.26