컴퓨터 과학(CS)

API Rate Limiting 원리와 구현 전략: 안정적인 서비스를 위한 필수 기술

devcomet 2025. 5. 26. 08:34
728x90
반응형

API Rate Limiting 원리와 구현 전략: 안정적인 서비스를 위한 필수 기술
API Rate Limiting 원리와 구현 전략: 안정적인 서비스를 위한 필수 기술

 

현대 웹 애플리케이션에서 API는 핵심적인 역할을 담당합니다.
하지만 무제한적인 API 호출은 서버 과부하와 서비스 장애를 초래할 수 있습니다.
이러한 문제를 해결하기 위해 API Rate Limiting이 등장했으며, 오늘날 모든 대규모 서비스에서 필수적으로 사용되고 있습니다.

이 글에서는 API Rate Limiting의 핵심 원리부터 실제 구현 전략까지 상세하게 다루어보겠습니다.


API Rate Limiting이란 무엇인가?

API Rate Limiting은 특정 시간 동안 클라이언트가 API에 요청할 수 있는 횟수를 제한하는 기술입니다.
이는 서버 리소스를 보호하고, 공정한 사용을 보장하며, DDoS 공격과 같은 악의적인 트래픽으로부터 서비스를 보호하는 핵심적인 방어 메커니즘입니다.

 

Rate Limiting의 주요 목적:

  • 서버 과부하 방지 및 안정성 확보
  • 공정한 리소스 분배를 통한 서비스 품질 유지
  • 악의적인 공격 및 스팸 요청 차단
  • 비용 효율적인 인프라 운영
  • API 사용량 기반 과금 모델 지원

Rate Limiting 알고리즘의 종류와 특징

Token Bucket 알고리즘

Token Bucket 알고리즘은 가장 널리 사용되는 rate limiting 방식 중 하나입니다.
버킷에 토큰을 일정한 속도로 채우고, 요청이 들어올 때마다 토큰을 소모하는 방식으로 동작합니다.

import time
import threading

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity  # 버킷 최대 용량
        self.tokens = capacity    # 현재 토큰 수
        self.refill_rate = refill_rate  # 초당 토큰 충전률
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        with self.lock:
            now = time.time()
            # 시간 경과에 따른 토큰 충전
            tokens_to_add = (now - self.last_refill) * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

# 사용 예시
bucket = TokenBucket(capacity=10, refill_rate=1)  # 10개 용량, 초당 1개 충전
if bucket.consume():
    print("요청 처리 가능")
else:
    print("Rate limit 초과")

 

Token Bucket의 장점:

  • 순간적인 트래픽 버스트 허용
  • 평균 처리율과 최대 버스트 처리량 모두 제어 가능
  • 구현이 상대적으로 간단

Leaky Bucket 알고리즘

Leaky Bucket 알고리즘은 요청을 큐에 저장하고 일정한 속도로 처리하는 방식입니다.
물이 새는 양동이처럼 일정한 속도로만 요청을 처리하여 트래픽을 평활화합니다.

import time
import queue
import threading

class LeakyBucket:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity
        self.leak_rate = leak_rate  # 초당 처리 가능한 요청 수
        self.queue = queue.Queue(maxsize=capacity)
        self.last_leak = time.time()
        self.lock = threading.Lock()

    def add_request(self, request):
        with self.lock:
            self._leak()
            try:
                self.queue.put_nowait(request)
                return True
            except queue.Full:
                return False

    def _leak(self):
        now = time.time()
        elapsed = now - self.last_leak
        leaks = int(elapsed * self.leak_rate)

        for _ in range(min(leaks, self.queue.qsize())):
            try:
                self.queue.get_nowait()
            except queue.Empty:
                break

        self.last_leak = now

# 사용 예시
bucket = LeakyBucket(capacity=100, leak_rate=10)  # 100개 용량, 초당 10개 처리
if bucket.add_request("API_REQUEST"):
    print("요청이 큐에 추가됨")
else:
    print("큐가 가득참 - Rate limit 초과")

Fixed Window Counter

Fixed Window Counter는 고정된 시간 창에서 요청 수를 카운트하는 간단한 방식입니다.
구현이 쉽고 메모리 효율적이지만, 윈도우 경계에서 트래픽 스파이크가 발생할 수 있습니다.

import time
from collections import defaultdict

class FixedWindowCounter:
    def __init__(self, window_size, max_requests):
        self.window_size = window_size  # 윈도우 크기 (초)
        self.max_requests = max_requests
        self.counters = defaultdict(int)

    def is_allowed(self, client_id):
        current_window = int(time.time()) // self.window_size
        key = f"{client_id}_{current_window}"

        if self.counters[key] < self.max_requests:
            self.counters[key] += 1
            return True
        return False

    def cleanup_old_windows(self):
        current_window = int(time.time()) // self.window_size
        keys_to_remove = []

        for key in self.counters:
            window = int(key.split('_')[-1])
            if window < current_window - 1:
                keys_to_remove.append(key)

        for key in keys_to_remove:
            del self.counters[key]

# 사용 예시
limiter = FixedWindowCounter(window_size=60, max_requests=100)  # 1분당 100회
if limiter.is_allowed("user123"):
    print("요청 허용")
else:
    print("Rate limit 초과")

Sliding Window Log

Sliding Window Log는 각 요청의 타임스탬프를 로그로 기록하여 정확한 sliding window를 구현하는 방식입니다.
가장 정확하지만 메모리 사용량이 많다는 단점이 있습니다.

import time
from collections import defaultdict, deque

class SlidingWindowLog:
    def __init__(self, window_size, max_requests):
        self.window_size = window_size
        self.max_requests = max_requests
        self.request_logs = defaultdict(deque)

    def is_allowed(self, client_id):
        now = time.time()
        log = self.request_logs[client_id]

        # 윈도우 밖의 오래된 요청 제거
        while log and log[0] < now - self.window_size:
            log.popleft()

        if len(log) < self.max_requests:
            log.append(now)
            return True
        return False

# 사용 예시
limiter = SlidingWindowLog(window_size=3600, max_requests=1000)  # 1시간당 1000회
if limiter.is_allowed("user456"):
    print("요청 허용")
else:
    print("Rate limit 초과")

Redis를 활용한 분산 Rate Limiting 구현

대규모 서비스에서는 여러 서버 인스턴스 간에 rate limiting 상태를 공유해야 합니다.
Redis를 활용한 분산 rate limiting은 이러한 요구사항을 효과적으로 해결합니다.

import redis
import time
import json

class DistributedRateLimiter:
    def __init__(self, redis_client, algorithm='token_bucket'):
        self.redis = redis_client
        self.algorithm = algorithm

    def token_bucket_check(self, key, capacity, refill_rate, tokens_requested=1):
        """Redis Lua 스크립트를 사용한 Token Bucket 구현"""
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local tokens_requested = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        -- 토큰 충전 계산
        local elapsed = now - last_refill
        local tokens_to_add = elapsed * refill_rate
        tokens = math.min(capacity, tokens + tokens_to_add)

        if tokens >= tokens_requested then
            tokens = tokens - tokens_requested
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, 3600)  -- 1시간 TTL
            return 1
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, 3600)
            return 0
        end
        """

        result = self.redis.eval(
            lua_script, 1, key,
            capacity, refill_rate, tokens_requested, time.time()
        )
        return bool(result)

    def sliding_window_check(self, key, window_size, max_requests):
        """Redis Sorted Set을 사용한 Sliding Window 구현"""
        now = time.time()
        pipeline = self.redis.pipeline()

        # 윈도우 밖의 오래된 요청 제거
        pipeline.zremrangebyscore(key, 0, now - window_size)

        # 현재 요청 수 확인
        pipeline.zcard(key)

        # 현재 요청 추가
        pipeline.zadd(key, {str(now): now})

        # TTL 설정
        pipeline.expire(key, int(window_size) + 1)

        results = pipeline.execute()
        current_requests = results[1]

        return current_requests < max_requests

# Redis 연결 및 사용 예시
redis_client = redis.Redis(host='localhost', port=6379, db=0)
limiter = DistributedRateLimiter(redis_client)

# Token Bucket 방식 사용
user_key = "rate_limit:user123"
if limiter.token_bucket_check(user_key, capacity=100, refill_rate=10):
    print("API 요청 처리 가능")
else:
    print("Rate limit 초과 - 잠시 후 다시 시도")

# Sliding Window 방식 사용
api_key = "rate_limit:api_key_xyz"
if limiter.sliding_window_check(api_key, window_size=3600, max_requests=5000):
    print("API 키 사용 가능")
else:
    print("시간당 요청 한도 초과")

실제 서비스에서의 Rate Limiting 전략

계층별 Rate Limiting 적용

실제 프로덕션 환경에서는 다층 Rate Limiting 전략을 적용하는 것이 효과적입니다.
각 계층마다 다른 제한 정책을 적용하여 세밀한 제어가 가능합니다.

class HierarchicalRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.limiters = {
            'global': {'window': 60, 'limit': 10000},      # 전역 제한
            'user': {'window': 60, 'limit': 100},          # 사용자별 제한
            'api_key': {'window': 3600, 'limit': 5000},    # API 키별 제한
            'ip': {'window': 60, 'limit': 200},            # IP별 제한
            'endpoint': {'window': 60, 'limit': 1000}      # 엔드포인트별 제한
        }

    def check_all_limits(self, request_info):
        """모든 계층의 제한을 확인"""
        checks = [
            ('global', 'global'),
            ('user', request_info.get('user_id')),
            ('api_key', request_info.get('api_key')),
            ('ip', request_info.get('client_ip')),
            ('endpoint', request_info.get('endpoint'))
        ]

        for limit_type, identifier in checks:
            if identifier and not self._check_limit(limit_type, identifier):
                return False, limit_type

        return True, None

    def _check_limit(self, limit_type, identifier):
        config = self.limiters[limit_type]
        key = f"rate_limit:{limit_type}:{identifier}"

        return self._sliding_window_check(
            key, config['window'], config['limit']
        )

    def _sliding_window_check(self, key, window_size, max_requests):
        # 앞서 구현한 sliding window 로직 사용
        now = time.time()
        pipeline = self.redis.pipeline()

        pipeline.zremrangebyscore(key, 0, now - window_size)
        pipeline.zcard(key)
        pipeline.zadd(key, {str(now): now})
        pipeline.expire(key, int(window_size) + 1)

        results = pipeline.execute()
        return results[1] < max_requests

# 사용 예시
limiter = HierarchicalRateLimiter(redis_client)

request_info = {
    'user_id': 'user123',
    'api_key': 'key_abc',
    'client_ip': '192.168.1.100',
    'endpoint': '/api/v1/users'
}

allowed, blocked_by = limiter.check_all_limits(request_info)
if allowed:
    print("모든 제한을 통과 - 요청 처리")
else:
    print(f"{blocked_by} 제한에 의해 차단됨")

HTTP 헤더를 통한 Rate Limiting 정보 제공

클라이언트가 현재 상태를 파악할 수 있도록 표준 HTTP 헤더를 통해 rate limiting 정보를 제공해야 합니다.

from flask import Flask, request, jsonify
import time

app = Flask(__name__)

class RateLimitHeaders:
    @staticmethod
    def add_rate_limit_headers(response, limit, remaining, reset_time):
        """표준 Rate Limiting HTTP 헤더 추가"""
        response.headers['X-RateLimit-Limit'] = str(limit)
        response.headers['X-RateLimit-Remaining'] = str(remaining)
        response.headers['X-RateLimit-Reset'] = str(int(reset_time))

        # Retry-After 헤더 (429 상태 코드와 함께 사용)
        if remaining <= 0:
            retry_after = max(1, int(reset_time - time.time()))
            response.headers['Retry-After'] = str(retry_after)

        return response

@app.before_request
def check_rate_limit():
    client_ip = request.remote_addr
    endpoint = request.endpoint

    # Rate limit 체크 로직
    limit = 100
    window_size = 3600
    current_time = time.time()
    window_start = current_time - (current_time % window_size)
    reset_time = window_start + window_size

    # 실제 제한 확인 (여기서는 예시)
    remaining = 50  # 실제로는 Redis에서 조회

    if remaining <= 0:
        response = jsonify({
            'error': 'Rate limit exceeded',
            'message': 'Too many requests. Please try again later.'
        })
        response.status_code = 429
        return RateLimitHeaders.add_rate_limit_headers(
            response, limit, remaining, reset_time
        )

@app.after_request
def add_rate_limit_info(response):
    # 모든 응답에 rate limit 헤더 추가
    if not response.headers.get('X-RateLimit-Limit'):
        RateLimitHeaders.add_rate_limit_headers(
            response, 100, 50, time.time() + 3600
        )
    return response

@app.route('/api/v1/data')
def get_data():
    return jsonify({'data': 'sample data'})

if __name__ == '__main__':
    app.run(debug=True)

Rate Limiting 모니터링 및 알림 시스템

효과적인 Rate Limiting 모니터링은 서비스 안정성을 위해 필수적입니다.
실시간 모니터링과 알림 시스템을 구축하여 이상 상황을 빠르게 감지할 수 있습니다.

import logging
import json
from datetime import datetime, timedelta
from collections import defaultdict

class RateLimitMonitor:
    def __init__(self, redis_client, alert_threshold=0.8):
        self.redis = redis_client
        self.alert_threshold = alert_threshold
        self.logger = logging.getLogger('rate_limit_monitor')

    def record_rate_limit_event(self, event_type, client_info, limit_info):
        """Rate limiting 이벤트 기록"""
        event_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event_type,  # 'allowed', 'blocked', 'warning'
            'client_info': client_info,
            'limit_info': limit_info
        }

        # Redis에 이벤트 저장 (시계열 데이터)
        key = f"rate_limit_events:{datetime.utcnow().strftime('%Y%m%d%H')}"
        self.redis.lpush(key, json.dumps(event_data))
        self.redis.expire(key, 86400 * 7)  # 7일 보관

        # 임계값 초과 시 알림
        if event_type == 'blocked':
            self._check_alert_conditions(client_info, limit_info)

    def _check_alert_conditions(self, client_info, limit_info):
        """알림 조건 확인"""
        client_id = client_info.get('user_id') or client_info.get('ip')
        current_usage = limit_info.get('current_requests', 0)
        limit = limit_info.get('limit', 0)

        usage_ratio = current_usage / limit if limit > 0 else 0

        if usage_ratio >= self.alert_threshold:
            self._send_alert({
                'type': 'rate_limit_exceeded',
                'client_id': client_id,
                'usage_ratio': usage_ratio,
                'current_requests': current_usage,
                'limit': limit,
                'timestamp': datetime.utcnow().isoformat()
            })

    def _send_alert(self, alert_data):
        """알림 발송 (실제로는 Slack, 이메일 등으로 발송)"""
        self.logger.warning(f"Rate Limit Alert: {json.dumps(alert_data)}")

        # 여기에 실제 알림 로직 구현
        # - Slack 웹훅
        # - 이메일 발송
        # - SMS 알림
        # - 모니터링 시스템 연동

    def get_rate_limit_statistics(self, hours=24):
        """Rate limiting 통계 조회"""
        stats = defaultdict(int)
        end_time = datetime.utcnow()

        for hour_offset in range(hours):
            timestamp = end_time - timedelta(hours=hour_offset)
            key = f"rate_limit_events:{timestamp.strftime('%Y%m%d%H')}"

            events = self.redis.lrange(key, 0, -1)
            for event_json in events:
                event = json.loads(event_json)
                stats[event['event_type']] += 1

        return {
            'total_requests': sum(stats.values()),
            'allowed_requests': stats['allowed'],
            'blocked_requests': stats['blocked'],
            'block_rate': stats['blocked'] / sum(stats.values()) * 100 if sum(stats.values()) > 0 else 0,
            'time_period': f"{hours} hours"
        }

# 사용 예시
monitor = RateLimitMonitor(redis_client)

# 이벤트 기록
monitor.record_rate_limit_event(
    'blocked',
    {'user_id': 'user123', 'ip': '192.168.1.100'},
    {'current_requests': 95, 'limit': 100, 'window': '1 hour'}
)

# 통계 조회
stats = monitor.get_rate_limit_statistics(hours=24)
print(f"지난 24시간 통계: {json.dumps(stats, indent=2)}")

성능 최적화 및 확장성 고려사항

메모리 효율적인 데이터 구조 활용

대규모 트래픽을 처리하기 위해서는 메모리 효율성성능 최적화가 중요합니다.
특히 Redis 메모리 사용량을 최소화하고 빠른 응답 시간을 보장해야 합니다.

import struct
import hashlib

class OptimizedRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

    def _hash_key(self, original_key):
        """키 해싱을 통한 메모리 사용량 최적화"""
        return hashlib.md5(original_key.encode()).hexdigest()[:16]

    def compact_sliding_window(self, key, window_size, max_requests):
        """압축된 sliding window 구현"""
        hashed_key = self._hash_key(key)
        current_time = int(time.time())

        # 시간을 슬롯으로 나누어 메모리 사용량 감소
        slot_size = 60  # 1분 단위 슬롯
        current_slot = current_time // slot_size

        lua_script = """
        local key = KEYS[1]
        local current_slot = tonumber(ARGV[1])
        local window_slots = tonumber(ARGV[2])
        local max_requests = tonumber(ARGV[3])

        -- 윈도우 밖의 오래된 슬롯 제거
        local oldest_slot = current_slot - window_slots
        redis.call('ZREMRANGEBYSCORE', key, 0, oldest_slot)

        -- 현재 윈도우의 총 요청 수 계산
        local total_requests = 0
        local slots = redis.call('ZRANGEBYSCORE', key, oldest_slot + 1, current_slot, 'WITHSCORES')

        for i = 2, #slots, 2 do
            total_requests = total_requests + tonumber(slots[i])
        end

        if total_requests < max_requests then
            -- 현재 슬롯의 카운터 증가
            redis.call('ZINCRBY', key, 1, current_slot)
            redis.call('EXPIRE', key, window_slots * 60 + 60)
            return 1
        else
            return 0
        end
        """

        window_slots = window_size // slot_size
        result = self.redis.eval(
            lua_script, 1, hashed_key,
            current_slot, window_slots, max_requests
        )

        return bool(result)

    def batch_check_limits(self, requests):
        """배치 처리를 통한 성능 최적화"""
        pipeline = self.redis.pipeline()

        for req in requests:
            key = self._hash_key(f"rate_limit:{req['client_id']}")
            # 각 요청에 대한 Redis 명령을 파이프라인에 추가
            pipeline.eval(
                self._get_rate_limit_script(),
                1, key,
                req['window_size'], req['max_requests']
            )

        results = pipeline.execute()
        return [bool(result) for result in results]

# 사용 예시
optimized_limiter = OptimizedRateLimiter(redis_client)

# 단일 요청 처리
if optimized_limiter.compact_sliding_window(
    "user123", window_size=3600, max_requests=1000
):
    print("요청 허용")

# 배치 요청 처리
batch_requests = [
    {'client_id': 'user1', 'window_size': 3600, 'max_requests': 1000},
    {'client_id': 'user2', 'window_size': 3600, 'max_requests': 1000},
    {'client_id': 'user3', 'window_size': 3600, 'max_requests': 1000}
]

results = optimized_limiter.batch_check_limits(batch_requests)
for i, allowed in enumerate(results):
    print(f"Request {i+1}: {'Allowed' if allowed else 'Blocked'}")

마이크로서비스 환경에서의 Rate Limiting

마이크로서비스 아키텍처에서는 서비스 간 통신과 외부 API 호출에 대한 종합적인 rate limiting 전략이 필요합니다.
각 서비스마다 독립적인 제한 정책을 가지면서도 전체적인 조화를 이루어야 합니다.

import asyncio
import aioredis
from typing import Dict, List

class MicroserviceRateLimiter:
    def __init__(self, redis_url: str, service_name: str):
        self.redis_url = redis_url
        self.service_name = service_name
        self.redis = None

        # 서비스별 기본 정책
        self.default_policies = {
            'user_service': {'rpm': 1000, 'burst': 50},
            'payment_service': {'rpm': 500, 'burst': 20},
            'notification_service': {'rpm': 2000, 'burst': 100},
            'analytics_service': {'rpm': 200, 'burst': 10}
        }

    async def initialize(self):
        """Redis 연결 초기화"""
        self.redis = await aioredis.from_url(self.redis_url)

    async def check_service_to_service_limit(self, source_service: str, 
                                           target_service: str, 
                                           client_id: str = None):
        """서비스 간 호출 제한 확인"""
        policy = self.default_policies.get(target_service, 
                                         {'rpm': 100, 'burst': 10})

        # 서비스 간 호출 키 생성
        key_parts = ['service_limit', source_service, target_service]
        if client_id:
            key_parts.append(client_id)

        key = ':'.join(key_parts)

        return await self._token_bucket_check_async(
            key, 
            capacity=policy['burst'],
            refill_rate=policy['rpm'] / 60.0  # 초당 토큰 수
        )

    async def check_circuit_breaker_with_rate_limit(self, service_name: str):
        """Rate Limiting과 Circuit Breaker 통합"""
        rate_limit_key = f"rate_limit:{service_name}"
        circuit_key = f"circuit:{service_name}"

        # Circuit Breaker 상태 확인
        circuit_state = await self.redis.get(circuit_key)
        if circuit_state == 'OPEN':
            return False, 'CIRCUIT_OPEN'

        # Rate Limit 확인
        allowed = await self._token_bucket_check_async(
            rate_limit_key, capacity=50, refill_rate=10
        )

        if not allowed:
            # 실패율 증가 (Circuit Breaker 로직)
            await self._increment_failure_count(circuit_key)
            return False, 'RATE_LIMITED'

        return True, 'ALLOWED'

    async def _token_bucket_check_async(self, key: str, capacity: int, refill_rate: float):
        """비동기 Token Bucket 구현"""
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        local elapsed = now - last_refill
        local tokens_to_add = elapsed * refill_rate
        tokens = math.min(capacity, tokens + tokens_to_add)

        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, 3600)
            return 0
        end
        """

        import time
        result = await self.redis.eval(
            lua_script, 1, key, capacity, refill_rate, time.time()
        )
        return bool(result)

    async def _increment_failure_count(self, circuit_key: str):
        """Circuit Breaker 실패 카운트 증가"""
        failure_count = await self.redis.incr(f"{circuit_key}:failures")
        await self.redis.expire(f"{circuit_key}:failures", 60)

        # 임계값 초과 시 Circuit Open
        if failure_count >= 10:
            await self.redis.set(circuit_key, 'OPEN', ex=30)

# FastAPI와 함께 사용하는 예시
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.base import BaseHTTPMiddleware

app = FastAPI()

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, limiter: MicroserviceRateLimiter):
        super().__init__(app)
        self.limiter = limiter

    async def dispatch(self, request, call_next):
        # 클라이언트 식별
        client_id = request.headers.get('X-Client-ID', request.client.host)
        endpoint = str(request.url.path)

        # Rate limit 확인
        allowed, reason = await self.limiter.check_circuit_breaker_with_rate_limit(
            f"{client_id}:{endpoint}"
        )

        if not allowed:
            if reason == 'CIRCUIT_OPEN':
                raise HTTPException(status_code=503, detail="Service temporarily unavailable")
            else:
                raise HTTPException(status_code=429, detail="Rate limit exceeded")

        response = await call_next(request)
        return response

# 미들웨어 등록
limiter = MicroserviceRateLimiter("redis://localhost:6379", "api_service")

@app.on_event("startup")
async def startup_event():
    await limiter.initialize()

app.add_middleware(RateLimitMiddleware, limiter=limiter)

@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
    return {"user_id": user_id, "name": "Sample User"}

API Gateway와 Rate Limiting 통합

API Gateway 레벨에서의 rate limiting은 모든 마이크로서비스에 대한 통합적인 보호막을 제공합니다.
Kong, Nginx, AWS API Gateway 등과 같은 도구들과의 연동 방법을 살펴보겠습니다.

# Kong Gateway 플러그인 설정 예시 (YAML)
kong_rate_limiting_config = """
plugins:
- name: rate-limiting
  config:
    minute: 100
    hour: 1000
    day: 5000
    policy: redis
    redis_host: redis-cluster.internal
    redis_port: 6379
    redis_database: 1
    hide_client_headers: false

- name: rate-limiting-advanced
  config:
    limit:
      - 1000/min
      - 10000/hour
    window_size:
      - 60
      - 3600
    identifier: consumer
    sync_rate: 10
    strategy: redis
    redis:
      host: redis-cluster.internal
      port: 6379
      database: 1
"""

# Nginx Rate Limiting 설정
nginx_config = """
http {
    # Rate limiting zones 정의
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req_zone $http_x_api_key zone=api_key:10m rate=100r/s;
    limit_req_zone $request_uri zone=endpoint:10m rate=50r/s;

    # Lua 스크립트를 위한 Redis 연결
    lua_shared_dict rate_limit_cache 10m;

    init_by_lua_block {
        local redis = require "resty.redis"

        function check_redis_rate_limit(key, limit, window)
            local red = redis:new()
            red:set_timeout(1000)

            local ok, err = red:connect("127.0.0.1", 6379)
            if not ok then
                return false
            end

            local current_time = ngx.time()
            local window_start = current_time - (current_time % window)

            local count = red:incr(key .. ":" .. window_start)
            red:expire(key .. ":" .. window_start, window)

            return count <= limit
        end
    }

    server {
        listen 80;

        location /api/ {
            # 다층 Rate Limiting 적용
            limit_req zone=api burst=20 nodelay;
            limit_req zone=api_key burst=50 nodelay;
            limit_req zone=endpoint burst=10 nodelay;

            # Lua 스크립트로 Redis 기반 고급 제한
            access_by_lua_block {
                local client_ip = ngx.var.binary_remote_addr
                local api_key = ngx.var.http_x_api_key

                if api_key then
                    local allowed = check_redis_rate_limit(
                        "rate_limit:api_key:" .. api_key, 
                        1000, 3600
                    )
                    if not allowed then
                        ngx.status = 429
                        ngx.say("API key rate limit exceeded")
                        ngx.exit(429)
                    end
                end
            }

            proxy_pass http://backend_servers;

            # Rate limit 헤더 추가
            add_header X-RateLimit-Limit $limit_req_rate always;
            add_header X-RateLimit-Remaining $limit_req_remaining always;
        }
    }
}
"""

# AWS API Gateway와 Lambda를 활용한 Rate Limiting
import json
import boto3
from decimal import Decimal

def lambda_rate_limiter(event, context):
    """AWS Lambda 기반 Rate Limiting 함수"""

    # DynamoDB 테이블 연결
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('RateLimitCounters')

    # 클라이언트 정보 추출
    client_ip = event['requestContext']['identity']['sourceIp']
    api_key = event['headers'].get('x-api-key', 'anonymous')
    current_time = int(time.time())
    window_size = 3600  # 1시간

    # 윈도우 시작 시간 계산
    window_start = current_time - (current_time % window_size)

    try:
        # 현재 카운트 조회 및 증가
        response = table.update_item(
            Key={
                'client_id': f"{api_key}:{client_ip}",
                'window_start': window_start
            },
            UpdateExpression='ADD request_count :increment',
            ExpressionAttributeValues={':increment': 1},
            ReturnValues='ALL_NEW'
        )

        current_count = int(response['Attributes']['request_count'])
        rate_limit = 1000  # 시간당 1000회 제한

        if current_count > rate_limit:
            return {
                'statusCode': 429,
                'headers': {
                    'X-RateLimit-Limit': str(rate_limit),
                    'X-RateLimit-Remaining': '0',
                    'X-RateLimit-Reset': str(window_start + window_size),
                    'Retry-After': str(window_start + window_size - current_time)
                },
                'body': json.dumps({
                    'error': 'Rate limit exceeded',
                    'message': 'Too many requests'
                })
            }

        # 요청 허용
        remaining = max(0, rate_limit - current_count)

        return {
            'statusCode': 200,
            'headers': {
                'X-RateLimit-Limit': str(rate_limit),
                'X-RateLimit-Remaining': str(remaining),
                'X-RateLimit-Reset': str(window_start + window_size)
            },
            'body': json.dumps({'status': 'allowed'})
        }

    except Exception as e:
        # 에러 발생 시 기본적으로 허용
        return {
            'statusCode': 200,
            'body': json.dumps({'status': 'allowed', 'error': str(e)})
        }

고급 Rate Limiting 패턴

Adaptive Rate Limiting

적응형 Rate Limiting은 시스템 부하와 사용자 행동 패턴에 따라 동적으로 제한을 조정하는 고급 기법입니다.

import numpy as np
from collections import deque
import time

class AdaptiveRateLimiter:
    def __init__(self, redis_client, base_limit=1000):
        self.redis = redis_client
        self.base_limit = base_limit
        self.load_history = deque(maxlen=100)  # 최근 100개 데이터 포인트
        self.response_time_history = deque(maxlen=100)

    def calculate_dynamic_limit(self, current_load, avg_response_time):
        """시스템 상태에 따른 동적 제한 계산"""

        # 시스템 부하 기반 조정 (0.0 ~ 1.0)
        load_factor = max(0.1, 1.0 - current_load)

        # 응답 시간 기반 조정
        target_response_time = 100  # 100ms 목표
        response_factor = min(2.0, target_response_time / max(avg_response_time, 1))

        # 최종 제한 계산
        dynamic_limit = int(self.base_limit * load_factor * response_factor)

        # 안전 범위 내에서 조정
        return max(10, min(dynamic_limit, self.base_limit * 2))

    def update_system_metrics(self, cpu_usage, response_time):
        """시스템 메트릭 업데이트"""
        self.load_history.append(cpu_usage)
        self.response_time_history.append(response_time)

    def get_adaptive_limit(self, client_id):
        """클라이언트별 적응형 제한 조회"""

        # 현재 시스템 상태 계산
        current_load = np.mean(self.load_history) if self.load_history else 0.5
        avg_response_time = np.mean(self.response_time_history) if self.response_time_history else 100

        # 클라이언트별 행동 패턴 분석
        client_pattern = self._analyze_client_pattern(client_id)

        # 기본 동적 제한 계산
        base_dynamic_limit = self.calculate_dynamic_limit(current_load, avg_response_time)

        # 클라이언트 패턴에 따른 추가 조정
        if client_pattern == 'abusive':
            final_limit = int(base_dynamic_limit * 0.5)  # 50% 감소
        elif client_pattern == 'premium':
            final_limit = int(base_dynamic_limit * 1.5)  # 50% 증가
        else:
            final_limit = base_dynamic_limit

        # Redis에 현재 제한 저장
        self.redis.setex(f"adaptive_limit:{client_id}", 60, final_limit)

        return final_limit

    def _analyze_client_pattern(self, client_id):
        """클라이언트 행동 패턴 분석"""

        # 최근 1시간 요청 패턴 조회
        pattern_key = f"client_pattern:{client_id}"
        recent_requests = self.redis.lrange(pattern_key, 0, -1)

        if len(recent_requests) < 10:
            return 'normal'

        # 요청 간격 분석
        intervals = []
        for i in range(1, len(recent_requests)):
            prev_time = float(recent_requests[i-1])
            curr_time = float(recent_requests[i])
            intervals.append(curr_time - prev_time)

        avg_interval = np.mean(intervals)

        # 패턴 분류
        if avg_interval < 0.1:  # 100ms 미만 간격
            return 'abusive'
        elif avg_interval > 10:  # 10초 이상 간격
            return 'premium'
        else:
            return 'normal'

# 사용 예시
adaptive_limiter = AdaptiveRateLimiter(redis_client)

# 시스템 메트릭 업데이트 (모니터링 시스템에서 호출)
adaptive_limiter.update_system_metrics(cpu_usage=0.7, response_time=150)

# 클라이언트별 동적 제한 적용
client_limit = adaptive_limiter.get_adaptive_limit("user123")
print(f"User123의 현재 제한: {client_limit} requests/hour")

Rate Limiting with Machine Learning

머신러닝 기반 Rate Limiting은 사용자 행동 패턴을 학습하여 더 정교한 제한을 제공합니다.

import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib
import pandas as pd

class MLBasedRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.scaler = StandardScaler()
        self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42)
        self.model_trained = False

    def extract_features(self, client_id, window_minutes=60):
        """클라이언트 요청 패턴에서 특성 추출"""

        # Redis에서 최근 요청 기록 조회
        request_key = f"request_log:{client_id}"
        requests = self.redis.lrange(request_key, 0, -1)

        if len(requests) < 5:
            return None

        # 타임스탬프 변환
        timestamps = [float(req) for req in requests]
        timestamps.sort()

        # 특성 계산
        intervals = np.diff(timestamps)

        features = {
            'request_count': len(timestamps),
            'avg_interval': np.mean(intervals) if len(intervals) > 0 else 0,
            'std_interval': np.std(intervals) if len(intervals) > 0 else 0,
            'min_interval': np.min(intervals) if len(intervals) > 0 else 0,
            'max_interval': np.max(intervals) if len(intervals) > 0 else 0,
            'request_rate': len(timestamps) / (window_minutes * 60),
            'burst_score': self._calculate_burst_score(intervals),
            'regularity_score': self._calculate_regularity_score(intervals)
        }

        return features

    def _calculate_burst_score(self, intervals):
        """버스트 패턴 점수 계산"""
        if len(intervals) == 0:
            return 0

        # 매우 짧은 간격의 비율
        short_intervals = sum(1 for interval in intervals if interval < 1.0)
        return short_intervals / len(intervals)

    def _calculate_regularity_score(self, intervals):
        """규칙성 점수 계산"""
        if len(intervals) == 0:
            return 0

        # 간격의 변동성 (낮을수록 규칙적)
        cv = np.std(intervals) / np.mean(intervals) if np.mean(intervals) > 0 else float('inf')
        return 1 / (1 + cv)  # 0~1 범위로 정규화

    def train_model(self, training_data):
        """이상 탐지 모델 훈련"""

        # 특성 데이터 준비
        features_list = []
        for client_data in training_data:
            features = self.extract_features(client_data['client_id'])
            if features:
                features_list.append(list(features.values()))

        if len(features_list) < 10:
            print("훈련 데이터 부족")
            return False

        # 데이터 전처리
        X = np.array(features_list)
        X_scaled = self.scaler.fit_transform(X)

        # 이상 탐지 모델 훈련
        self.anomaly_detector.fit(X_scaled)
        self.model_trained = True

        # 모델 저장
        joblib.dump(self.scaler, 'rate_limit_scaler.pkl')
        joblib.dump(self.anomaly_detector, 'rate_limit_anomaly_detector.pkl')

        return True

    def predict_anomaly(self, client_id):
        """클라이언트 요청 패턴의 이상 여부 예측"""

        if not self.model_trained:
            return 0  # 모델이 훈련되지 않은 경우 정상으로 간주

        features = self.extract_features(client_id)
        if not features:
            return 0

        # 특성 벡터 생성 및 스케일링
        feature_vector = np.array(list(features.values())).reshape(1, -1)
        feature_scaled = self.scaler.transform(feature_vector)

        # 이상 점수 계산 (-1: 이상, 1: 정상)
        anomaly_score = self.anomaly_detector.decision_function(feature_scaled)[0]
        is_anomaly = self.anomaly_detector.predict(feature_scaled)[0]

        return {
            'is_anomaly': is_anomaly == -1,
            'anomaly_score': anomaly_score,
            'features': features
        }

    def get_ml_based_limit(self, client_id, base_limit=1000):
        """ML 기반 동적 제한 계산"""

        prediction = self.predict_anomaly(client_id)

        if prediction['is_anomaly']:
            # 이상 패턴 감지 시 제한 강화
            severity = abs(prediction['anomaly_score'])
            reduction_factor = min(0.9, severity / 2)  # 최대 90% 감소
            adjusted_limit = int(base_limit * (1 - reduction_factor))

            # 로그 기록
            self._log_anomaly_detection(client_id, prediction, adjusted_limit)

        else:
            # 정상 패턴의 경우 기본 제한 적용
            adjusted_limit = base_limit

        return max(10, adjusted_limit)  # 최소 10회는 보장

    def _log_anomaly_detection(self, client_id, prediction, new_limit):
        """이상 탐지 로그 기록"""
        log_data = {
            'timestamp': time.time(),
            'client_id': client_id,
            'anomaly_score': prediction['anomaly_score'],
            'features': prediction['features'],
            'new_limit': new_limit
        }

        log_key = f"anomaly_log:{int(time.time() // 3600)}"  # 시간별 로그
        self.redis.lpush(log_key, json.dumps(log_data))
        self.redis.expire(log_key, 86400 * 7)  # 7일 보관

# 사용 예시
ml_limiter = MLBasedRateLimiter(redis_client)

# 모델 훈련 (초기 설정 시)
training_clients = [{'client_id': f'user{i}'} for i in range(100)]
ml_limiter.train_model(training_clients)

# 실시간 이상 탐지 및 제한 적용
client_limit = ml_limiter.get_ml_based_limit("suspicious_user", base_limit=1000)
print(f"ML 기반 동적 제한: {client_limit} requests/hour")

결론 및 Best Practices

API Rate Limiting은 현대 웹 서비스의 안정성과 신뢰성을 보장하는 핵심 기술입니다.
효과적인 rate limiting 전략을 구현하기 위해서는 다음과 같은 모범 사례를 따라야 합니다.

핵심 Best Practices

1. 적절한 알고리즘 선택

  • Token Bucket: 버스트 트래픽 허용이 필요한 경우
  • Leaky Bucket: 일정한 처리율 유지가 중요한 경우
  • Sliding Window: 정확한 제한이 필요한 경우
  • Fixed Window: 단순하고 메모리 효율적인 구현이 필요한 경우

2. 계층적 제한 전략

  • 글로벌, 사용자, API 키, IP, 엔드포인트별 다층 제한
  • 각 계층의 특성에 맞는 개별 정책 적용
  • 가장 제한적인 정책 우선 적용

3. 모니터링 및 알림

  • 실시간 사용량 모니터링
  • 임계값 초과 시 자동 알림
  • 상세한 로그 기록 및 분석

4. 클라이언트 친화적 구현

  • 명확한 HTTP 헤더 제공
  • 적절한 에러 메시지
  • Retry-After 헤더를 통한 재시도 가이드

5. 성능 최적화

  • Redis Lua 스크립트 활용
  • 배치 처리 및 파이프라인 사용
  • 메모리 효율적인 데이터 구조

Rate Limiting은 단순한 기술 구현을 넘어서 비즈니스 요구사항과 사용자 경험을 모두 고려해야 하는 종합적인 접근이 필요합니다.
적절한 전략과 구현을 통해 안정적이고 확장 가능한 API 서비스를 구축할 수 있습니다.

728x90
반응형