DevOps & Scaling

Trabajadores de resolución de CAPTCHA de escala automática

Los grupos de trabajadores estáticos desperdician dinero durante los momentos de tranquilidad y crean cuellos de botella durante los picos. El escalado automático hace coincidir el número de trabajadores con la demanda real, optimizando tanto el costo como el rendimiento.


Señales de escala

Señal Ampliar cuando Reducir cuando
Profundidad de la cola > 20 tareas pendientes < 5 tareas pendientes
Utilización de trabajadores > 80% ocupado < 20% ocupado
Resolver latencia P95 > 60 segundos P95 < 20 segundos
Tasa de error > 5% (necesita trabajadores nuevos) Estable < 1%
Equilibrio N/A Balance <$1 (dejar de escalar)

Escalador automático basado en subprocesos

Escale los subprocesos de trabajo dentro de un solo proceso:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Escalador automático basado en procesos

Escale los procesos de trabajo para el aislamiento de la CPU:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Escalamiento consciente del equilibrio

Deje de escalar cuando los fondos se agoten:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Integrar en el ciclo de escalado:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Estrategias de escalamiento comparadas

Estrategia Mejor para Latencia Complejidad
Grupo de subprocesos I/O-bound (llamadas API) Bajo Bajo
Grupo de procesos Preprocesamiento vinculado a la CPU Medio Medio
Kubernetes HPA Implementaciones nativas de la nube superior Alto
KEDA Escalado basado en eventos Medio Medio

Solución de problemas

Problema causa Solución
Los trabajadores siguen aumentando La cola nunca se agota Comprobar si los trabajadores realmente están procesando
Reducción demasiado agresiva Umbral bajo Aumentar el retraso de reducción a 30 s+
Procesos zombies Procesos no limpiados Utilice _cleanup_dead() con regularidad
El balance se agota rápidamente Demasiados trabajadores Agregar verificación de balance a la lógica de escalado

Preguntas frecuentes

¿Cuál es la proporción correcta de trabajadores por cola?

Apunte a 1 trabajador por cada 5 a 10 tareas en cola. Cada trabajador procesa entre 3 y 6 CAPTCHA por minuto, según el tipo.

¿Debo utilizar hilos o procesos?

Subprocesos para llamadas API puras (CaptchaAI es I/O-bound). Procesos cuando también realiza preprocesamiento de imágenes o cálculos intensos además de la resolución.

¿Qué tan rápido debo ampliar?

Aumente rápidamente (verifique cada 10 a 15 segundos), reduzca lentamente (espere entre 30 y 60 segundos de carga baja). Esto evita las peleas entre estados.


Guías relacionadas

  • Colas de trabajos con Kubernetes
  • Monitoreo con Prometheus/Grafana

Escala de forma inteligente: obtén tu API key de CaptchaAI hoy.

Los comentarios están deshabilitados para este artículo.

Publicaciones relacionadas

Explainers Impacto de la resolución DNS en el rendimiento de la API CAPTCHA
Cómo afecta la resolución DNS al rendimiento de la API CAPTCHA: cuándo es un cuello de botella, cómo optimizarla con keep-alive, caché y pre-resolución.

Cómo afecta la resolución DNS al rendimiento de la API CAPTCHA: cuándo es un cuello de botella, cómo optimizar...

Apr 24, 2026
DevOps & Scaling Funciones de Azure + CaptchaAI: Integración en la nube
Cómo resolver CAPTCHA con Captcha AI desde Azure Functions: trigger HTTP, integración con Key Vault, procesamiento por cola y despliegue en Azure.

Cómo resolver CAPTCHA con Captcha AI desde Azure Functions: trigger HTTP, integración con Key Vault, procesami...

May 02, 2026
Explainers Gestión de user-agent en QA propia con CaptchaAI
Cómo gestionar el user-agent en pruebas QA propias para reproducir escenarios reales sin recurrir a anti-detección.

Cómo gestionar el user-agent en pruebas QA propias para reproducir escenarios reales sin recurrir a anti-detec...

May 01, 2026