본문 바로가기
Modern C++

Concurrency in C++11 (번역)

by 날쑤 2016. 5. 2.

출처

원문링크 - Here (Posted on April 7, 2014 by paolosev)

Intro

  C++ 11은 람다 표현식, 초기화 리스트, 우측값(rvalue) 참조, auto 타입 추론 등과 같은 특징들이 추가되면서 이전 버전들보다 훨씬 나아졌다. C++11 표준 라이브러리는 이제 정규 표현식, 개선된 스마트 포인터, 그리고 멀티 스레드 라이브러리까지 제공한다. 그러나 모던 C++은 병렬/비동기 연산에 대해서는 아직까지 제한적인 기능만을 제공하며, 이는 C#과 같은 언어들과 비교했을 때 특히 그렇다.

The need for asynchrony

  그런데 우리는 왜 비동기성(asynchrony)에 대한 지원을 필요로 하는 것일까? 컴퓨터 아키텍쳐는 도처에 있는 멀티 코어 프로세서들과 클라우드 상에 분산되어 있는 코어들로 인해 갈수록 더 병렬적, 그리고 분산적으로 변해가고 있다. 그리고 소프트웨어 응용(application)들은 점점 더 빈번하게 하나의 머신 혹은 네트워크 상의 많은 코어들에 분포되어 있는 여러 요소들로 구성되고 있다. 최신 프로그래밍 언어들은 이러한 병렬성(parallelism)에 대한 지원을 (언어적으로) 제공해야 한다.

  동시에 반응성(responsiveness)은 소프트웨어에 있어서 더욱 더 필수적인 성질이 되었다. (이는 최근 각광받는 프로그래밍 패러다임인 반응적 프로그래밍의 중요한 원칙 중 하나이다.)

  반응성은 I/O 작업의 완료를 기다리기 위해 블럭되는 것을 원하지 않음을 의미한다. 서버 측면에서는 일꾼(worker) 스레드가 다른 작업을 수행하는 데 사용될 수 있고 작업이 완료되었을 때 통지받을 수 있는 동안에는 블럭되는 것을 원치 않는다. 그리고 클라이언트 측면에서는 어플리케이션을 느릿느릿하고 반응성이 떨어지게 만드는 main/GUI 스레드의 블러킹이 일어나는 것을 절대로 원하지 않는다. 그러므로 비동기적으로 동작하는 코드를 작성할 수 있다는 것은 블러킹이 없고 반응성을 잃지 않으면서 I/O 작업들의 지연(latency)을 처리하기 위해 점차 더 중요해지고 있다. 예를 들면, 일반적으로 WinRT에서 50ms 이상 걸리는 모든 I/O 바운드 API들은 오직 비동기적인 인터페이스만을 제공하며, 심지어 고전적인 블러킹을 동반하는 호출과는 같이 수행조차 될 수 없다.

  이번, 그리고 다음 몇 번의 포스팅들에서는 최근의 C++이 동시성을 제공하기 위해 무엇을 제공하는지, 그리고 저 목적에 가까워지고 있는 새로운 특징들은 무엇인지를 살펴볼 것이다. 우리는 표준에 있는 것뿐만 아니라 Microsoft가 제공하는 윈도우즈에 특화된 PPL 프레임워크에 있는 것들도 살펴볼 것이다.

A simple example

  우리가 어떻게 C++에서 비동기적인 코드를 작성할 수 있는지를 이해하기 위해 매우 단순한 예제 하나를 가지고 살펴보자. 하나의 파일을 읽어서 파일에 있는 내용을 다른 파일로 복사하는 함수를 작성하는 상황을 가정하자. 이를 위해 우리는 아래와 같이 함수들을 작성할 수 있을 것이다.

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 

using namespace std; 
 
vector<char> readFile (const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 
 
size_t writeFile (const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
}

 

  위의 두 함수를 이용해서 손쉽게 한 파일의 내용을 다른 파일로 복사하고 쓰여진 문자들의 수를 반환하는 단순한 함수를 아래와 같이 작성할 수 있다.

size_t sync_copyFile (const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

 

  명백히 우리는 readFile과 writeFile 함수가 순차적으로 수행되기를 원한다. 그러나 저 함수들이 완료되는 것을 기다리는 동안에 블럭되어야 할 것인가? 사실 이는 명백히 억지스러운 예제이며, 파일의 크기가 별로 크지 않은 경우에는 아마도 별로 상관이 없겠지만 파일이 매우 큰 경우에는 버퍼링을 사용해서 버퍼의 내용을 청크들로 복사하는 쪽이 거대한 vector 전체를 반환하는 것보다 더 나을 것이다. 그러나 readFile과 writeFile 함수는 I/O 바운드이며 여기서는 단지 더 복잡한 I/O 작업들에 대한 모델을 나타낼 뿐이다. (실제 어플리케이션에서 네트워크로부터 데이터를 읽어들이고, 읽어들인 데이터를 어디론가 전송하고, 그리고 그 결과를 반환하거나 어딘가에 기록해야 하는 것은 흔한 일이다)

  이제, copyFile 작업이 비동기적으로 수행되기를 희망하는 것에 대해 이야기해보자. 이를 표준 C++에서는 어떻게 가능하게 할까?

Task-based parallelism: futures and promises

  C++11 표준 라이브러리는 동시성을 지원하기 위해 몇 가지 방법들을 제공한다. 첫번째는 std::thread인데 이는 std::mutex, std::lock_guards, std::condition_variables 등의 동기화 객체들과 함께 C++에서 고전적인 멀티스레드 기반으로 동시적으로 수행되는 코드를 작성할 수 있는 편리한 방법을 제공한다.

  우리는 copyFile을 파일 복사를 수행할 새로운 스레드를 생성하고, condition_variable을 사용해서 스레드의 작업이 완료되었을 때 알림을 받도록 수정할 수도 있을 것이다. 그러나 스레드와 락 수준에서 작업하는 것은 꽤 까다로운 일이 될 수 있다. 닷넷의 TPL과 같은 최신 프레임워크들은 좀 더 고수준의 추상화를 태스크 기반(task-based)의 동시성의 형태로 제공한다. 여기서 태스크는 다른 작업들과 함께 병렬적으로 수행할 수 있는 하나의 비동기적인 작업을 나타내며, 시스템은 이러한 병렬성이 어떻게 구현되는지에 대한 구체적인 부분은 은닉한다.

  C++11 라이브러리(새로운 <future> 헤더에 있는)는 또한 다소 제한적인 태스크 기반의 병렬성에 대한 지원을 future와 promise의 형태로 제공한다. std::promise<T>std::future<T> 클래스는 대략적으로 닷넷의 Task<T>, 혹은 Java 8의 Future<T>의 C++ 버전이다. 저 클래스들은 함수를 호출하는 행위와 호출 결과를 대기하는 행위를 분리시키기 위해 쌍으로 동작한다.

  호출자 측에서는 비동기적 함수를 호출하면 타입 T의 결과 값을 받지 않는다. 실제로 받환되는 것은 호출 결과에 대한 placeholder인 std::future<T>이며, 결과는 이후의 어떤 시점이 되면 (여기로) 전달될 것이다.

  future 객체를 획득하고 나면 해당 task가 다른 스레드에서 수행되는 동안 우리는 다른 작업으로 옮겨갈 수 있다.

  std::promise<T> 객체는 비동기적 호출의 피호출자 측에서의 호출 결과를 나타내며, 호출자 쪽으로 호출 결과를 비동기적으로 전달하기 위한 통로이다. 태스크가 완료되면 태스크에서는 promise::set_value 함수를 호출해서 호출 결과를 promise 객체에 집어넣는다.

  호출자 쪽에서 호출 결과에 접근해야 할 때가 되면 호출자는 결과 값을 얻기 위해 블러킹 콜인 future::get()을 호출할 것이다.만약에 태스크가 이미 완료가 되었다면 결과값은 즉시 획득할 수 있겠지만, 그렇지 않다면 호출자 스레드는 결과값을 획득할 수 있을 때까지(즉, 피호출자 스레드에서 promise::set_value를 호출해서 결과값을 지정할 때까지) 정지된다.

  아래의 코드는 우리의 예제인 copyFile 함수에 future와 promise를 적용시킨 버전이다.

#include <future> 
 
size_t future_copyFile (const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 
 
    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 
 
    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    
    return result; 
}

 

  readFile과 writeFile의 수행을 별도의 태스크들로 옮긴 것과 함께 이들을 수행할 스레드들을 구성하고 시작해야 한다는 점을 주목하라. 또한, 우리는 promise와 future 객체를 태스크 함수에서 사용할 수 있도록 이들의 참조를 캡쳐했다. 첫번째 스레드는 읽기를 구현하며, 읽기 작업이 완료되었을 때 수행 결과를 거대한 vector의 형태로 promise 객체에 전달한다. 두번째 스레드는 대응하는 future 객체 위에서 대기하며, 읽기 작업이 완료되면 (future 객체를 통해) 파일의 내용을 저장하고 있는 vector를 얻은 다음 이를 쓰기 함수로 전달한다. 마지막으로 쓰기 작업이 완료되면 파일에 쓰여진 문자들의 수가 두번째 future 객체에 들어간다.

  메인 함수에서는 이러한 병렬성을 이용할 수 있을 것이며, future::get() 함수가 호출되기 전에 다른 작업이 수행될 수 있을 것이다. 그러나 get() 함수를 호출한 시점에 읽기/쓰기 태스크가 아직 완료되지 않았다면 메인 스레드는 여전히 블럭될 것이다.

Packaged tasks

  packaged_tasks를 이용하면 위의 코드를 좀 더 단순화시킬 수 있다. std::packaged_task<T> 클래스는 태스크와 태스크의 promise 객체를 함께 포함하는 컨테이너이다. packaged_task의 템플릿 타입 인자는 태스크 함수의 타입(예를 들면, 예제의 읽기 함수에 대해서는 vector<char>(const string&))이다. 이 클래스는 operator()를 정의하는 호출가능한(callable) 타입이며, 자동으로 std::promise<T>를 생성하고 관리한다.

size_t packagedtask_copyFile (const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = std::vector<char>(const string&); 
    std::packaged_task<Task_Type_Read> pt1(readFile); 
    std::future<std::vector<char>> fut1{ pt1.get_future() }; 
    std::thread th1{ std::move(pt1), inFile }; 
 
    using Task_Type_Write = size_t(const string&); 
    std::packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    std::future<size_t> fut2{ pt2.get_future() }; 
    std::thread th2{ std::move(pt2), outFile }; 
 
    size_t result = fut2.get(); 
    th1.join(); 
    th2.join();
 
    return result; 
}

 

  packaged_task 객체는 복사를 허용하지 않기 때문에 packaged_task 객체를 스레드로 전달하기 위해서는 std::move 함수를 사용해야한다는 것을 기억해두자.

std::async

  packaged_task를 이용함으로써 함수의 로직 자체는 크게 바꾸지 않으면서 코드는 좀 더 가독성있게 되었다. 하지만 여전히 수동으로 태스크들을 수행할 스레드를 생성해야 하며, 각 태스크를 어떤 스레드에서 수행할지를 결정해야 한다.

  만약 std::async() 함수(표준 라이브러리에서 제공)를 사용하면 위의 함수를 좀 더 단순하게 만들 수 있다. 이 함수는 람다나 함수자(functor)를 인자로 받으며, 호출 결과값을 담을 future 객체를 반환한다. 아래의 코드는 std::async() 함수를 사용하도록 수정된 버전의 copyFile 함수이다.

size_t async_copyFile (const string& inFile, const string& outFile) 
{ 
    auto fut1 = std::async(readFile, inFile); 
    auto fut2 = std::async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, outFile); 
 
    return fut2.get(); 
} 

 

  어떤 점에서 std::async() 함수는 TPL 태스크 스케쥴러와 같다. 이는 새로운 스레드가 생성되어야 하거나 기존의(혹은 현재의) 스레드가 재사용될 수 있을 때 어느 스레드에서 태스크를 수행할지를 결정한다.

  (태스크의) 수행시작 정책을 지정하는 것 또한 가능한데, 태스크가 (아마도 다른 스레드에서) 비동기적으로 수행될 것을 요구하는 "async"와 get() 함수가 호출되는 시점에 태스크를 수행되게 하는 "deferred", 두 개의 옵션을 사용할 수 있다.

  근사한 점은 std::async는 모든 구현이나 플랫폼에 특화된 세부 사항들을 숨긴다는 것이다. VS2013에 딸려 있는 <future> 헤더 파일을 살펴보면 std::async의 윈도우에서의 구현을 확인할 수 있는데, 이는 내부적으로 Parallel Patterns Library (PPL)을 사용한다. (PPL은 native C++에서 닷넷의 TPL과 동등한 라이브러리이다.)

PPL

  지금까지 살펴 본 것은 C++11에서 표준화된 것들이다. future와 promise의 설계가 닷넷과 C#이 제공하는 것과 비교해서 아직 꽤 제한적이라고 말해도 타당할 것이다.

  주된 제한점은 C++11의 future는 구성가능(composable)하지 않다는 점이다. 만약 연산을 수행하기 위해 여러 개의 태스크를 동시에 시작했다면, 이들 중 어느 하나가 완료되는 것을 기다리면서 모든 future 객체들이 블럭될 수는 없으며 한 번에 하나의 future 객체만이 블럭될 수 있다. 또한 여러 개의 태스크들을 각 태스크가 이전 태스크의 결과를 입력으로 받는 방식의 시퀀스로 결합시킬 수 있는 간단한 방법이 존재하지 않는다. 구성가능한 태스크들은 전체 아키텍쳐를 논블러킹/이벤트 핸들링 구조로 만드는 것을 허용한다. 우리는 또한 C++에서 태스크 컨티뉴에이션(continuation)이나 async/await 패턴과 같은 것을 쓸 수 있기를 희망한다.

  PPL(Concurrency Runtime이라고도 알려진)로 인해 Microsoft는 표준의 제약사항을 극복하고 태스크 라이브러리의 더 정교한 구현을 실험할 가능성을 가지게 되었다.

  PPL에서 Concurrency::task<T> 클래스(<ppltasks.h> 헤더 파일에 정의된)는 태스크를 나타낸다. 태스크는 future와 동일하며, 이 또한 결과값을 획득하기 위한 블러킹 함수인 get()을 제공한다. 템플릿 타입 파라메터인 T는 리턴 타입이며, 태스크는 수행할 함수(람다/함수 포인터/함수 객체)를 전달해서 초기화시킨다.

  일단은 이식성(portability)에 대한 모든 걱정들은 잠시 내려두고, 이번에는 task를 사용해서 copyFile 함수를 다시 구현해보자.

size_t ppl_copyFile (const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> task1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> task2 = Concurrency::create_task([&task1, outFile]() { 
        return writeFile(task1.get(), outFile); 
    }); 

    return tsk2.get(); 
} 

 

  여기서는 두 개의 태스크 객체들을 생성했고, 읽기와 쓰기 작업을 위한 두 개의 람다 표현식으로 이들을 각각 초기화시켰다.

  이제는 더 이상 스레드에 대해서 걱정할 필요가 없다. 어느 스레드에서 태스크를 수행할 것인지와 스레드 풀을 관리하는 것은 PPL 스케쥴러에게 달려있다. 그러나 우리의 두 태스크들의 상호작용을 아직 수동으로 조율한다는 점을 주목하라. task2는 첫번째 태스크의 참조를 유지해야하고, task1의 수행 결과를 사용하기에 앞서서 task1이 종료되기를 명시적으로 대기한다. 이런 단순한 예제에서는 이러한 점이 수용가능하지만, 더 많은 태스크들과 더 복잡한 코드를 다뤄야 하는 상황에서는 꽤 번거로울 수 있을 것이다.

Task continuations

  future와는 달리 PPL 태스크는 컨티뉴에이션을 통한 합성(composition)을 지원한다. task::next 멤버함수는 태스크에 컨티뉴에이션 태스크를 추가하는 것을 허용한다. 컨티뉴에이션은 선행 태스크의 수행이 완료되면 호출될 것이며, 선행 태스크로부터 반환된 값을 (인자로) 받게 될 것이다.

  자, copyFile 함수를 재작성하되, 이번에는 컨티뉴에이션을 사용해보자.

size_t ppl_then_copyFile (const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
  
    return result.get(); 
}

 

  이제 코드가 훨씬 깔끔해졌다. 우리는 복사 함수의 로직을 두 개의 컴포넌트(태스크)로 분할했는데, 이들은 임의의 스레드에서 수행될 수 있으며 태스크 스케쥴러에 의해 수행될 것이다. 또한 함수 로직을 태스크들의 의존성 그래프(dependency graph)로 선언했다.

  위의 copyFile 함수의 구현은 여전히 최종적인 결과값을 얻기 위해 결국에는 블럭된다. 하지만 실제 프로그램에서는 위의 함수 내에 삽입했던 태스크를 단지 반환하게만 하고, 이 태스크의 결과값을 비동기적으로 다루기 위한 컨티뉴에이션을 수반하도록 만들 수 있다. 우리는 아래처럼 코드를 짜고자 했을 것이다.

// 아래의 함수는 이전 버전의 함수 내에서 만들었던 태스크를 반환하기만 한다.
Concurrency::task<size_t> ppl_create_copyFile_task (const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 

// DO SOMETHING...

auto tCopy =  ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 

// DO SOMETHING...

tCopy.wait();

 

  마지막으로 PPL 태스크는 task::wait_alltask::wait_any 멤버함수와 함께 태스크들을 구성하기 위한 다른 방법들도 제공하는데, 이들은 병렬적으로 수행되는 태스크들을 처리하는데 유용하다.

  주어진 태스크들의 집합에 대해 when_all은 집합 내의 모든 태스크들이 완료되었을 때 완료되도록 하는 또다른 태스크를 생성한다. (즉, join을 구현) 대신 when_any는 집합 내의 어느 하나의 태스크가 완료되었을 때 완료되는 태스크를 생성한다. 이는 동시에 수행할 수 있는 태스크의 개수가 제한되어 있고, 어느 하나가 종료되었을 때 새로운 태스크를 시작해야 하는 상황에서 유용하게 쓰일 수 있다.

  그러나 이것은 단지 PPL이라는 빙산의 일각일 뿐이다. PPL은 풍부한 기능들을 제공하는데, 기능 하나하나는 TPL에서 사용할 수 있는 것과 거의 동일하다. 또한, 일꾼 스레드의 풀을 관리하고 태스크들을 스레드로 할당하는 중요하는 임무를 수행하는 스케쥴러 클래스들도 제공한다. 더 자세한 것은 여기에서 확인할 수 있다.

Towards C++17

  아마도 우리는 곧 PPL에 의해 소개된 몇몇 개선들을 C++ 표준에서도 볼 수 있을 것이다. 이미 Niklas Gustaffson 등에 의해 쓰여진 문서(N3857)가 존재하는데, 여기서는 라이브러리에 대한 몇가지 변화를 제안한다. 특히, 문서의 안은 future::then, future::when_any, 그리고 future::when_all와 함께 future의 결합성(composability)을 PPL에서 보았던 것과 같은 시맨틱으로 가능하게 만드는 것이다. 이전의 예제는 새로운 future를 쓰면 다음과 같이 이식이 용이한 방법으로 다시 쓰여질 수 있을 것이다.

std::future<size_t> future_then_copyFile (const string& inFile, const string& outFile)
{
    return std::async([inFile]() {
        return readFile(inFile);
    }).then([outFile](const vector<char>& buffer) {
        return writeFile(buffer, outFile);
    });
}

 

  블러킹 함수인 get()을 호출하기 전에 future 객체의 완료 여부를 검사하기 위한 future::is_ready와 중첩 future(future를 반환하는 future) 객체를 다루기 위한 future::unwrap도 또한 존재할 것이다. 물론 이 제안서의 더 자세한 부분은 위에 있는 문서에서 확인할 수 있다.

  이것이 완전한 해결책이 될 수 있을까? 꼭 그렇지만은 않다. 닷넷에서의 경험은 우리에게 태스크 기반의 비동기적 코드는 여전히 작성, 디버깅, 그리고 유지보수가 어렵다는 것을 가르쳐준다. 때때로는 매우 어렵기도 하다. 이것이 Async/Await pattern을 통해 비동기성을 좀 더 쉽게 다루기 위해 C# 언어 자체에(5.0 버전에) 새로운 키워드가 추가된 이유이다. C++의 관리되지 않는(unmanaged) 세상에서도 이와 유사한 조짐이 있는가?