카카오클라우드 AIaaS 교육/AIaaS를 위한 머신러닝&AI

[스나이퍼팩토리] AIaaS 마스터 클래스 9주차 DAY 42 - python 심화

mangji2 2025. 5. 27. 14:53

[1] python 심화 수업 복습 - 이터레이터와 제너레이터, 동시성과 병렬 처리

4. 이터레이터와 제너레이터

🔹 이터러블(Iterable)과 이터레이터(Iterator)

  • 이터러블: 리스트, 튜플, 문자열 등 반복 가능한 객체로 for 루프에서 사용 가능
  • 이터레이터: __iter__()__next__() 메서드를 가진 객체
  • next() 함수로 다음 값을 순차적으로 가져옴

🔹 제너레이터(Generator)

제너레이터란 한 번에 모든 값을 메모리에 올리지 않고, 필요한 순간에 값을 하나씩 생성하는 객체이다.

 

일반 함수와 제너레이터 함수를 비교해 보면 일반 함수의 경우에는

def get_numbers():
	return [1, 2, 3]

 

이렇게 리스트로 전부 리턴하지만 제너레이터 함수는

def gen_numbers():
    yield 1
    yield 2
    yield 3

 

이렇게 yield 키워드로 하나씩 생성한다.

  • yield 키워드가 있는 함수는 호출 시 즉시 실행되지 않고 제너레이터 객체를 반환
  • 상태를 유지하면서 필요할 때마다 값을 생성 (지연 평가)

🔹 제너레이터 표현식

  • 리스트 컴프리헨션과 유사하지만 괄호를 ()로 사용
g = (x * 2 for x in range(10))

 

return과 비슷하지만 함수를 일시 중단했다가 다음 값을 요청할 때 다시 이어서 실행해 주기 때문에 메모리를 절약할 수 있다.

🔹 장점

  • 메모리 효율성: 한 번에 하나씩 값을 생성하여 메모리 절약
  • 지연 계산: 필요할 때마다 값을 생성하므로 성능 향상
  • 무한 시퀀스 처리 가능

🔹 활용 예시

  • 대용량 로그 파일 한 줄씩 읽기
  • 특정 키워드 포함한 줄만 필터링
  • 실시간 스트림 데이터 처리

5. 동시성과 병렬처리

🔹 개념 차이

개념 설명
동시성 (Concurrency) 단일 코어에서도 여러 작업을 번갈아 수행 (논리적 병렬)
병렬성 (Parallelism) 멀티코어에서 여러 작업을 동시에 수행 (물리적 병렬)

 

🔹 멀티스레딩 (threading)

  • 하나의 프로세스에서 여러 스레드 동시 실행
  • I/O 작업에 적합
  • 주요 메서드: start(), join(), is_alive()
  • GIL(Global Interpreter Lock)로 인해 CPU 작업 병렬화는 한계 존재
import threading
import time

def print_numbers():
    for i in range(5):
        print(f"[스레드1] {i}")
        time.sleep(1)

def print_letters():
    for c in ['A', 'B', 'C', 'D', 'E']:
        print(f"[스레드2] {c}")
        time.sleep(1)

# 스레드 생성
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)

# 스레드 시작
t1.start()
t2.start()

# 메인 스레드가 두 스레드가 끝날 때까지 기다림
t1.join()
t2.join()

🔹 멀티프로세싱 (multiprocessing)

  • 프로세스 단위 병렬 실행 → 각각 독립된 메모리 공간
  • CPU 집약적 작업에 유리
  • Process, Pool, apply(), map() 사용

멀티프로세싱으로 숫자 제곱하는 예제를 보겠다.

import multiprocessing
import time

def square(n):
    print(f"{n}^2 계산 중...")
    time.sleep(1)
    print(f"{n}^2 = {n * n}")

if __name__ == "__main__":
    numbers = [1, 2, 3, 4]
    processes = []

    for num in numbers:
        p = multiprocessing.Process(target=square, args=(num,))
        processes.append(p)
        p.start()  # 프로세스 시작

    for p in processes:
        p.join()  # 프로세스가 끝날 때까지 기다림

    print("모든 계산 완료!")

 

우선 process로 객체를 생성하고 start()로 실행시켜 각각의 프로세스는 별도의 cpu 코어에서 돌아간다. 

또 windows에서는 반드시 if __name__ == "__main__" 조건문 안에서 실행해야 한다. 안 그러면 무한 프로세스 생성 문제 생김

🔹 스레드/프로세스 동기화 도구

  • Lock: 동시에 공유 자원 접근하지 않도록 제어
  • Queue: 안전한 데이터 교환
  • Event/Condition: 스레드 간 통신 및 동기화

🔹 ThreadPoolExecutor / ProcessPoolExecutor

  • submit(), map() 등으로 작업 분산
  • shutdown(wait=True)로 안전한 종료
  • Future 객체로 결과 확인

ProcessPoolExecutor는 프로세스 기반의 병렬 처리를 수행한다. 각 작업이 별도의 파이썬 인터프리터 프로세스에서 실행되므로, gli의 제약을 받지 않아 cpu 바운드 작업에 병렬성을 제공할 수 있다.

🔹 asyncio (비동기 프로그래밍)

asyncio는 파이썬의 비동기 프로그래밍을 위한 프레임워크이다. 여러 작업을 동시에 처리하는 것처럼 보이게 만들지만 실제로는 하나의 스레드에서 비동기로 전환하며 처리한다.

여기서 비동기 프로그래밍이란 기다리는 시간이 길면 그동안 다른 일 먼저 하고 다시 돌아오는 프로그래밍이라고 할 수 있다.

 

  • async def, await 키워드로 코루틴 정의 및 실행
  • create_task(), gather(), run()으로 동시 실행
  • 이벤트 루프 기반: 단일 스레드에서 비동기 처리
  • 네트워크 I/O, API 요청 등 대기 시간이 긴 작업에 적합

🔹 동시성 모델 비교

모델 CPU 집약 I/O 집약 특징
Threading x GIL로 인한 한계 존재
Multiprocessing x 멀티코어 활용 가능
asyncio x 가장 가벼운 방식, 단일 스레드 기반

 


[2] 과제

 

문제

로그 파일을 한 줄씩 읽는 제너레이터 함수와 특정 패턴이 포함된 줄만 필터링하는 제너레이터 함수를

작성하는 과제였다.

#이터레이터와 제너레이터
#과제
#로그 파일을 한 줄씩 읽는 제너레이터 함수 작성
#특정 패턴이 포함된 줄만 필터링하는 제너레이터 작성

 

코드

# 로그 파일을 한 줄씩 읽는 제너레이터
def read_log_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            yield line.strip()

# 특정 패턴이 포함된 줄만 필터링하는 제너레이터
def filter_log(lines, keywords):
    for line in lines:
        if any(keyword in line for keyword in keywords):
            yield line

 

read_log_file은 지정된 경로의 로그 파일을 한 줄씩 읽어서 스트리밍 방식으로 반환하는 제너레이터이다.

파일을 한 번에 메모리에 로드하는 대신, 요청이 있을 때마다 필요한 한 줄만 읽어서 반환하게된다.

filter_log 함수는 다른 이터러블로부터 로그 줄을 받아, 주어진 키워드 중 하나라도 포함된 줄만 필터링하여 반환하는

제너레이터이다.

 

이렇게 파일을 한 번에 메모리에 올리지 않고 한 줄씩 읽으면서 메모리의 효율성을 높일 수 있다.

 

 

문제2

순차 처리,멀티스레딩,비동기 처리의 방식으로 하나의 공통 api 리스트를 처리하는 과제이다.

5개의 api 주소 리스트를 만들고 각각 3가지의 방식으로 가져오는 시간을 비교해보는 것이다.

코드

import time
import requests
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor

API_URLS = [
]

#순차 처리 방식
def fetch_sequential(urls):
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(response.text)
    return results

#멀티스레딩 threadpoolExecutor 사용
def fetch_single(url):
    response = requests.get(url)
    return response.text

def fetch_threadpool(urls):
    results = []
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(fetch_single, urls))
    return results

#asyncio + aiohttp 사용 (비동기 방식)
async def fetch_async(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_asyncio(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        return await asyncio.gather(*tasks)

#성능 비교 실행 코드
def measure_time(func, *args):
    start = time.time()
    result = func(*args)
    end = time.time()
    print(f"{func.__name__} took {end - start:.2f} seconds")
    return result

def measure_time_async(async_func, *args):
    start = time.time()
    result = asyncio.run(async_func(*args))
    end = time.time()
    print(f"{async_func.__name__} took {end - start:.2f} seconds")
    return result

if __name__ == "__main__":
    print("\n 순차 처리:")
    measure_time(fetch_sequential, API_URLS)

    print("\n " \
    "ThreadPoolExecutor 사용:")
    measure_time(fetch_threadpool, API_URLS)

    print("\n asyncio + aiohttp 사용:")
    measure_time_async(fetch_asyncio, API_URLS)

 

우선 fetch_sequential 함수 즉 순차 처리 함수는 한 줄씩 차례대로 요청 -> 응답 -> 다음 요청 이런 순으로 응답을 저장한다. 모든 요청이 직렬로 일어나고 코드는 매우 단순하지만 매우 느리다.

#순차 처리 방식
def fetch_sequential(urls):
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(response.text)
    return results

 

 

다음으로 멀티스레딩 처리 방식으로 구현한 함수를 보면  

def fetch_single(url):
    response = requests.get(url)
    return response.text

def fetch_threadpool(urls):
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(fetch_single, urls))
    return results

 

ThreadPoolExecutor를 사용해서 스레드를 여러 개 만들어 동시에 여러 요청을 처리하게 한다.

map 함수가 fetch_single(url)을 병렬로 실행하게 해준다. 이런 방식은 다중 작업을 병렬 혹은 비동기적으로 처리해야 하는 하는 I/O 작업에 적합하고 빠르다.

 

 

마지막으로로 비동기 처리로 구현한 함수를 보면 

async def fetch_async(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_asyncio(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        return await asyncio.gather(*tasks)

 

fetch_async 함수는 비동기 함수로 await과 함께 쓰일 수 있으며 다른 비동기 작업이 끝날 때까지 기다리고 그동안 다른 작업으로 넘어갈 수 있다. 

 

fetch_asyncio 함수는 여러 요청을 한꺼번에 보내는 역할을 한다. 모든 요청에 사용할 aiohttp 세션을 하나 생성한다.

그리고 tasks에 리스트 컴프리헨션으로 각 url에 대한 비동기 작업들을 생성했다. 이때 작업은 실제 실행이 아니라 예약만 한 상태이다.

그리고 asyncio.gather() 을 통해 모든 비동기 작업을 병렬로 실행하고 결과는 [응답1,응답2 ...] 형태로 리턴된다.

 

 

전체 흐름을 정리하자면 

 

  • fetch_asyncio()는 URL 리스트를 받고
  • 세션을 만들고,
  • URL마다 fetch_async()를 예약하고
  • gather()로 한꺼번에 실행하고
  • 응답이 모두 올 때까지 기다림

비동기 함수가 빠른 이유는 

순차적으로 get()하고 text() 받을 때마다 기다리지 않고,

await을 통해 그동안 다른 요청들을 처리하기 때문에 병렬처럼 빠른 처리가 가능한 것이다.

 

 


 

제너레이터를 배우면서 한 번에 모든 값을 메모리에 올리지 않고 필요한 순간에 값을 처리하는 방법으로 대용량 데이터를 효율적으로 처리할 수 있다는 점을 배웠고, 멀티스레딩,멀티프로세싱,비동기 각각의 동시성/병렬 처리 방식의 차이를 실습과제를 통해 확실히 이해할 수 있었다. 특히 유튜브처럼 다중 작업이 많거나 I/O 중심의 작업은 멀티스레딩과 asyncio가 적합하고, CPU 중심 작업은 멀티 프로세싱이 효과적이라는 점을 코드를 직접 구현해 보면서 명확히 알게 되었다.

 


본 후기는 [카카오엔터프라이즈x스나이퍼팩토리] 카카오클라우드로 배우는 AIaaS 마스터 클래스 (B-log) 리뷰로 작성 되었습니다.