[Python] ThreadPoolExecutor

끝으로 ㅣ 2025. 2. 2. 10:17

ThreadPoolExecutor

  • ThreadPoolExecutor는 Python의 멀티스레딩을 쉽게 관리할 수 있도록 제공하는 concurrent.futures 모듈의 일부다
  • 멀티 스레딩을 사용하여 I/O 작업(네트워크 요청, 파일 읽기/쓰기 등)에서 성능을 개선할 수 있다
  • 단일 스레드 방식의 작업을 함수로 분리하여 ThreadPoolExecutor에 submit 함수 파라미터로 전달
  • as_completed를 통해 작업이 완료되는 순서대로 결과를 가져온다 (Future 객체의 result 함수)

 

사용 예시

  • 응답 시간이 1~3초 사이로 랜덤 하게 소요되는 API를 호출하는 작업을 예로 들겠다
# 1~3초 sleep을 통해 가상의 API 호출을 mocking한 함수
# 1~100 사이의 랜덤 숫자를 반환
def mock_api_call():
    delay = random.uniform(1, 3)
    sleep(delay)
 
    randomNumber = random.uniform(1, 100)
    return randomNumber

단일 스레드 방식

def main(concurrent_count: int, batch_size: int):
    # 수정할 문서 조회
    cursor = mongodb.collection("concurrent_test").find()
    total_count = mongodb.count_documents("concurrent_test", {})
 
    print(f"작업 대상인 문서 갯수: {total_count}")
 
    for i, document in enumerate(cursor):
        name = document["name"]
 
        number = mock_api_call()
 
        mongodb.upsert_one(
            "concurrent_test",
            {"name": name},
            {"number": number},
        )
  • 10개의 document에 대해 20초 소요

멀티 스레드 방식 (ThreadPoolExecutor 사용)

# Thread에게 할당할 Task 함수 분리
def process_document(document: Dict[str, Any]):
    try:
        name = document["name"]
 
        number = mock_api_call()
 
        mongodb.upsert_one(
            "concurrent_test",
            {"name": name},
            {"number": number},
        )
    except Exception as e:
        name = document.get("name", None)
        mongodb.log_error(log=f"[Error] {e}\n\n{name}", trace="concurrent_test")
        raise Exception(f"Error during processing: {name}")
 
 
def main(concurrent_count: int, batch_size: int):
    # 수정할 문서 조회
    cursor = mongodb.collection("concurrent_test").find()
    total_count = mongodb.count_documents("concurrent_test", {})
 
    print(f"작업 대상인 문서 갯수: {total_count}")
 
    with ThreadPoolExecutor(max_workers=concurrent_count) as executor:
        futures = [executor.submit(process_document, document) for document in cursor]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error during processing: {e}")
  • 10개의 document에 대해 2초 소요