Skip to main content

Rate Limits

Rate limits protect the API from abuse and ensure fair access for all users. Learn how limits work and how to handle them gracefully.

Rate Limit Types

Assisters API enforces two types of rate limits:
TypeDescription
RPMRequests Per Minute - total API calls
TPMTokens Per Minute - total tokens processed

Limits by Tier

TierRPMTPMMonthly Tokens
Free10100,000100,000
Developer1001,000,0005,000,000
Startup5005,000,00025,000,000
EnterpriseCustomCustomUnlimited

Upgrade Your Plan

Need higher limits? Upgrade to a higher tier

Rate Limit Headers

Every response includes rate limit information:
X-RateLimit-Limit-RPM: 100
X-RateLimit-Remaining-RPM: 95
X-RateLimit-Reset-RPM: 1706745660

X-RateLimit-Limit-TPM: 1000000
X-RateLimit-Remaining-TPM: 995000
X-RateLimit-Reset-TPM: 1706745660
HeaderDescription
X-RateLimit-Limit-*Your current limit
X-RateLimit-Remaining-*Remaining quota
X-RateLimit-Reset-*Unix timestamp when quota resets

Rate Limit Errors

When you exceed limits, you’ll receive a 429 Too Many Requests response:
{
  "error": {
    "message": "Rate limit exceeded. Please retry after 5 seconds.",
    "type": "rate_limit_error",
    "code": "rate_limit_exceeded"
  }
}
The response includes a Retry-After header:
Retry-After: 5

Handling Rate Limits

Basic Retry Logic

import time
from openai import OpenAI, RateLimitError

client = OpenAI(
    api_key="ask_your_key",
    base_url="https://api.assisters.dev/v1"
)

def make_request_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.chat.completions.create(
                model="llama-3.1-8b",
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            wait_time = int(e.response.headers.get("Retry-After", 5))
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)

Exponential Backoff

import time
import random
from openai import RateLimitError

def exponential_backoff(func, max_retries=5, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except RateLimitError:
            if attempt == max_retries - 1:
                raise

            # Exponential backoff with jitter
            delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
            time.sleep(delay)

JavaScript Implementation

async function withRetry(fn, maxRetries = 3) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (error.status !== 429 || attempt === maxRetries - 1) {
        throw error;
      }

      const retryAfter = parseInt(error.headers?.['retry-after'] || '5');
      console.log(`Rate limited. Waiting ${retryAfter}s...`);
      await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
    }
  }
}

// Usage
const response = await withRetry(() =>
  client.chat.completions.create({
    model: 'llama-3.1-8b',
    messages: [{ role: 'user', content: 'Hello!' }]
  })
);

Best Practices

Implement Retries

Always implement retry logic with exponential backoff

Monitor Headers

Check rate limit headers to proactively slow down

Queue Requests

Use a request queue to control throughput

Cache Responses

Cache responses when possible to reduce API calls

Request Queuing

For high-volume applications, implement a request queue:
import asyncio
from collections import deque

class RateLimitedClient:
    def __init__(self, rpm_limit=100):
        self.rpm_limit = rpm_limit
        self.request_times = deque()
        self.lock = asyncio.Lock()

    async def wait_for_capacity(self):
        async with self.lock:
            now = time.time()

            # Remove requests older than 1 minute
            while self.request_times and now - self.request_times[0] > 60:
                self.request_times.popleft()

            # If at capacity, wait
            if len(self.request_times) >= self.rpm_limit:
                wait_time = 60 - (now - self.request_times[0])
                await asyncio.sleep(wait_time)

            self.request_times.append(time.time())

    async def chat(self, messages):
        await self.wait_for_capacity()
        return await self.client.chat.completions.create(
            model="llama-3.1-8b",
            messages=messages
        )

Token Management

TPM limits are based on total tokens (input + output). Manage them by:

1. Estimate Tokens Before Sending

import tiktoken

def estimate_tokens(messages, model="llama-3.1-8b"):
    # Use cl100k_base encoding (similar to Llama tokenizer)
    encoding = tiktoken.get_encoding("cl100k_base")

    total = 0
    for message in messages:
        total += len(encoding.encode(message["content"]))
        total += 4  # Message overhead

    return total

# Check before sending
estimated = estimate_tokens(messages)
if estimated > remaining_tpm:
    # Wait or truncate
    pass

2. Set Max Tokens

response = client.chat.completions.create(
    model="llama-3.1-8b",
    messages=messages,
    max_tokens=500  # Limit output tokens
)

3. Trim Context

def trim_messages(messages, max_tokens=4000):
    encoding = tiktoken.get_encoding("cl100k_base")

    # Always keep system message
    result = [messages[0]] if messages[0]["role"] == "system" else []
    current_tokens = sum(len(encoding.encode(m["content"])) for m in result)

    # Add messages from most recent
    for message in reversed(messages[1:]):
        msg_tokens = len(encoding.encode(message["content"])) + 4
        if current_tokens + msg_tokens > max_tokens:
            break
        result.insert(1 if result else 0, message)
        current_tokens += msg_tokens

    return result

Monitoring Usage

Track your usage proactively:
class UsageTracker:
    def __init__(self):
        self.minute_requests = 0
        self.minute_tokens = 0
        self.minute_start = time.time()

    def record(self, tokens_used):
        # Reset if minute has passed
        if time.time() - self.minute_start > 60:
            self.minute_requests = 0
            self.minute_tokens = 0
            self.minute_start = time.time()

        self.minute_requests += 1
        self.minute_tokens += tokens_used

    def get_usage(self):
        return {
            "requests_this_minute": self.minute_requests,
            "tokens_this_minute": self.minute_tokens
        }

Burst Handling

For batch processing, spread requests over time:
import asyncio

async def process_batch(items, rpm_limit=100):
    # Calculate delay between requests
    delay = 60 / rpm_limit  # seconds per request

    results = []
    for item in items:
        result = await process_item(item)
        results.append(result)
        await asyncio.sleep(delay)

    return results

Need Higher Limits?

Contact Sales

Enterprise plans offer custom rate limits tailored to your needs