본문으로 건너뛰기

Persistent Task Group

· 약 17분

이번 글에서는 Python 3.11에 새로 도입될 asyncio.TaskGroup API 및 제가 개발하여 제안 중인 aiotools.PersistentTaskGroup API에 대해 소개합니다. 특히 구조적 병행성(structured concurrency)를 구현하기 위해서 중요한 부분이며, 실제로 Backend.AI 개발 과정에서도 구조적 병렬성이 제대로 지켜지지 않아서 발생한 버그 사례가 있습니다.

구조적 병행성

구조적 프로그래밍(structured programming)이라는 말을 들어보셨나요?

구조적 프로그래밍은 컴퓨터 발전 역사의 초기였던 1960년대 후반에서 1970년대 초에 등장한 개념입니다. 프로그래밍 언어 발전에 큰 공헌을 했던 다익스트라의 기고문에서 "GOTO 문이 왜 해로운가"라는 주제로 보다 잘 알려지게 되었습니다.

당시에는 많은 프로그램들이 현대의 관점에서 봤을 때 어셈블리나 그에 가까운 저수준 프로그래밍 언어를 사용하여 개발되던 시절입니다. 지금은 프로그램을 작성할 때 절차형 프로그래밍 언어를 사용하는 경우 명령어가 쓰여진 순서대로 실행되고 이때 조건문(분기), 반복문, 재귀, 서브루틴(함수)를 활용하여 제어 흐름을 표현하는 것이 너무나 당연한 개념이지만 당시에는 그렇지 않았습니다. GOTO를 지나치게 많이 활용할 경우 프로그램을 작성한 사람조차도 시간이 지나면 어느 지점에서 다른 한 지점으로 왜 제어 흐름이 넘어가는지 머릿속으로 추적하는 것이 극도로 어려워집니다. 이때 구조적 프로그래밍을 활용하면, 제어 흐름의 방향이 바뀌거나 점프가 일어나는 경우의 가짓수를 제한할 수 있고 거기에 프로그래밍 언어의 문법 키워드를 부여함으로써 누구나 어떤 의도로 실행 흐름의 변화가 생기는지 알 수 있습니다. 그리고 항상 실행 흐름의 가능한 경로들이 분석 가능하기 때문에 이러한 제어 구조들을 조합한 프로그램도 항상 분석 가능하다는 특징이 있고, 이런 몇 가지 제어 구조들만으로 튜링 완전한(=컴퓨터로 할 수 있는 일을 모두 하는) 프로그램을 작성할 수 있음을 보임으로써 현대 프로그래밍 언어들의 초석을 닦았습니다.

구조적 병행성(structured concurrency)은 이를 비동기 프로그래밍과 코루틴으로 확장한 것입니다. 그 연원은 fork-join model로 거슬러 올라가지만, 본격적으로 세상에 알려진 것은 2018년 Python의 wheel 패키지 형식 도입과 trio라는 asyncio 대체 구현을 개발했던 바 있는 Nathaniel J. Smith의 블로그 글이 계기였습니다. 제목 또한 "Go statement considered harmful"이라는 부제를 달아 다익스트라의 글에 대한 오마주를 남기기도 했죠. 이 글에서 그는 모든 코루틴(혹은 실행 흐름의 분기)들은 명확한 진입·종료 지점이 있어야 하고, 종료 지점 전에 모든 소속 비동기 동작이 완료됨을 보장해야 한다는 점을 역설하였습니다. 특히 비동기 프로그래밍에서는 어떤 비동기 작업을 기다리던 도중 해당 작업을 임의의 시점에 취소(cancel)하는 경우가 발생하는데, 이때 그 작업이 발생시킨 추가적인 비동기 작업들을 모두 함께 취소시킨 후 제어 흐름을 반환하도록 강제할 필요성이 생깁니다. 그렇지 않으면 결과값을 반환받을 주체는 이미 사라졌는데 작업은 계속 진행되면서 리소스를 과다 사용하거나 메모리 누수가 발생하는 등의 일이 발생할 수 있기 때문입니다.

asyncio.TaskGroup

올해 10월 릴리즈 예정인 Python 3.11에 새로 추가될 기능으로 asyncio.TaskGroup 클래스가 있습니다. 표준 라이브러리에 들어간 구현은 njs가 만든 trio의 nursery 개념을 도입한 것으로, 그 구현은 asyncio의 핵심 기여자들이 개발한 EdgeDB의 대체 구현에서 가져와 Guido가 직접 버그 수정 및 개선을 한 버전입니다.

이미 asyncio의 고수준 API에는 비동기 작업들을 위한 제어 구조가 몇 가지 제공되고 있었습니다. asyncio.gather()가 대표적입니다. 자바스크립트로 치면 Promise.all()에 해당하는 것으로, 넘겨준 모든 코루틴이나 비동기 작업이 완료될 때까지 기다렸다가 리턴하는 함수입니다.

그런데 이 함수에는 한 가지 문제가 있습니다. 전달한 모든 작업이 항상 성공적으로 끝난다면 좋겠지만, 어느 한 작업이 실패한다면 어떻게 해야 할까요? 구조적 병행성을 보장하려면 다음 중 하나를 해야 합니다:

  1. 일단 모든 작업이 성공이든 실패든 결과를 얻을 때까지 기다리기
  2. 어느 작업이 실패하면 진행 중이던 다른 작업들을 중단하고 바로 다음 코드로 넘어가 계속 실행하기

여기에서 asyncio.gather()는 첫번째 방식을 취하는 것처럼 보이지만, 어느 한 작업이 실패했을 때 다른 작업들은 계속 실행되도록 놔두면서 자신은 그 실패 오류를 즉시 발생시키고 리턴해버립니다. 그러면 나머지 다른 작업들이 반환하는 결과는 받아올 길이 없어지고, 실패하더라도 그 예외를 받아 처리할 방법도 없습니다. (asyncio에서는 이벤트 루프 자체에 fallback exception handler가 달려있어서 결국 여기서 에러 로그를 찍는 것밖에 할 수 있는 게 없습니다.) 대신 인자 옵션으로 return_exceptions=True를 지정하면 실패가 발생하더라도 모든 작업이 끝날 때까지 기다렸다가 목록화된 결과 및 예외 집합을 반환합니다. 하지만 아래와 같이 루프를 돌면서 예외 여부를 일일이 검사하는 코드를 추가로 덧붙여야 하는 불편함이 있습니다.

results = await asyncio.gather(mycoro(...), ..., return_exceptions=True)
for result in results:
if isinstance(result, Exception):
... # handle error
else:
... # handle result

그래서 등장한 것이 asyncio.TaskGroup입니다. TaskGroup은 두번째 접근 방식을 취하는데요, 여기에 추가적인 변화가 더 있습니다. TaskGroup 자체의 실행이 취소된 경우 내부의 비동기 작업들도 모두 함께 취소된다는 점, 그리고 async with 구문을 통해 사용한다는 점이죠. 특히 후자의 경우 Python 언어 문법에 의해 어떠한 상황에서도 항상 종료 핸들러가 실행됨을 보장한다는 점에서 구조적 병행성을 구현하는 데 중요한 역할을 합니다.

async with asyncio.TaskGroup() as tg:
tg.create_task(mycoro(...))
tg.create_task(mycoro(...))
...
tg.create_task(mycoro(...))

# Here, it is guaranteed that all inner tasks were either completed or cancelled.

이때 여러 개의 비동기 작업들 중에서 하나 또는 여럿이 예외를 발생시킨 경우, 이를 묶어서 처리하기 위한 ExceptionGroup 및 여기에서 개별 예외 형식에 대한 필터링을 문법적으로 쉽게 할 수 있는 except* 문법이 Python 3.11에 새로 추가됩니다. 여기에서 다루기에는 너무 길어지므로 ExceptionGroup에 대한 상세한 소개는 PEP-654 문서를 참고하시기 바랍니다. aiotools는 Python 3.6 이후 버전에서도 TaskGroup을 사용할 수 있도록 백포팅한 구현(EdgeDB의 구현공식 구현을 참조하여 작성)을 제공하고 있습니다. Python 3.11 이상에서는 자동으로 표준라이브러리의 구현과 ExceptionGroup을 사용하도록 변경됩니다.

aiotools.PersistentTaskGroup

과연 이것만으로 충분할까요?

Backend.AI에서는 특히 여러 노드에 분산된 다중 컨테이너로 구성된 '클러스터 세션'을 실행하는 경우 여러 개의 비동기 작업을 묶어서 기다리는 패턴이 많이 등장합니다. 스케줄러가 클러스터 세션의 소속 컨테이너들이 어떤 에이전트에 할당될 지 결정하면, 에이전트별로 컨테이너들을 재그룹화하고 에이전트별로 생성할 컨테이너 묶음을 던져줍니다. 이때 매니저는 여러 개의 에이전트에 대한 비동기 작업을 묶어서 기다리고, 각 에이전트는 전달받은 컨테이너 묶음을 생성하기 위해 각 컨테이너를 생성하기 위한 비동기 작업을 마찬가지로 묶어서 기다립니다. 여러 개의 에이전트 중에서 어느 한 에이전트의 어느 한 컨테이너만 생성에 실패한 경우, 전체 작업을 어떻게 취소해야 할까요?

위와 같은 시나리오에서 Backend.AI는 첫번째 패턴을 더 많이 사용합니다. 그 이유는 하나의 비동기 작업 안에 외부 시스템 호출에 의한 부수 효과가 많이 발생하는 경우, 임의 시점에 작업을 취소하였을 때 이를 딱 진행된 지점까지의 부수 효과만 골라서 rollback하는 것이 매우 어렵기 때문입니다. 특히 현재는 파일시스템을 다루는 부분이 asyncio의 thread pool executor를 이용하도록 되어있기 때문에 중간에 작업을 중단하는 것이 더 어려운 점도 있습니다. 모든 부수 효과의 실행 여부를 별도로 기억했다가 취소 시점에 실행된 것들만 모아서 다시 역순으로 취소한다거나 이런 메커니즘을 구현해야 하죠.

이와 같은 상황에서도 구조적 병행성을 보장하기 위해 개발한 것이 PersistentTaskGroup입니다. TaskGroup과 같은 점은 구조적으로 종료 시점에 내부의 모든 비동기 작업이 끝나있다는 것을 보장합니다. 차이점으로는 어느 한 작업의 실패가 다른 작업들의 취소를 불러오는 대신, 미리 지정한 예외 처리 핸들러를 불러준다는 점입니다. 그리고 async with 기반의 사용법뿐만 아니라, 오랜 시간 사용되는 오브젝트의 수명주기 핸들러에 붙일 수 있도록 별도의 shutdown() 메소드를 지원합니다.

async-with 블록 사용 방식
import aiotools

async def main():
async with aiotools.PersistentTaskGroup() as tg:
tg.create_task(...)
tg.create_task(...)
tg.create_task(...)
...

# Here, it is guaranteed that all tasks has finished.
객체 수명주기 동기화 사용 방식
import aiotools

class LongLivedObject:

def __init__(self, ...):
self._tg = aiotools.PersistentTaskGroup()
...

async def aclose(self):
await self._tg.shutdown()
# Here, it is guaranteed that all tasks has finished.

async def some_work(self, ...):
self._tg.create_task(...)
self._tg.create_task(...)
...

이때 create_task() 메소드의 반환값은 작업의 비동기 결과를 알 수 있는 asyncio.Future 객체입니다. Task 객체를 직접 반환하지 않는 이유는 Task의 수명주기 관리를 PersistentTaskGroup이 전담하기 위함이며, Future 객체는 그 결과를 알 수 있는 일종의 proxy 역할입니다. 반환된 Future 객체를 이용해서 원래 task를 취소하는 것은 지원되지 않으며, 대신 await를 걸어두면 개별 task가 반환한 결과값 또는 개별 task가 발생시킨 예외를 잡을 수 있습니다.

Persistent Task Group의 도입을 통해 다음과 같은 버그를 잡는 데 도움이 되었습니다.

  • 에이전트 쪽에서 컨테이너 생성이 실패한 경우 매니저에서 오류로 인해 세션 생성이 취소되었음을 데이터베이스에 기록하는데, 이 기록이 확률적으로 제대로 기록되지 않는 경우
  • 스케줄러 큐에 세션 생성 요청을 넣는 과정에서 사전 입력 검증 실패가 발생했을 때 일부 경우에서 이것이 세션 생성 요청 API의 오류 반환값으로 제대로 연계되지 않는 경우

사실 이번 Python 3.11에 Persistent Task Group API를 추가하기 위해 이슈를 발행해보았으나, 우선은 aiotools 구현의 안정화를 좀더 지켜본 후 3.12 정도에 반영을 고려해보자는 것으로 결론이 난 상태입니다.

Backend.AI의 코드 규모가 상당히 크기 때문에 아직은 asyncio.gather(..., return_exceptions=True)를 사용하는 곳이 많이 있는데, 점진적으로 PersistentTaskGroup으로 대체해나갈 예정입니다. 앞으로 더욱 안정성이 향상된 Backend.AI 플랫폼을 제공할 수 있는 토대가 될 것으로 생각합니다. 또한 이 글이 Python asyncio 기반의 다른 애플리케이션 개발자분들에게 도움이 되었기를 바랍니다.