Los grupos de trabajadores estáticos desperdician dinero durante los momentos de tranquilidad y crean cuellos de botella durante los picos. El escalado automático hace coincidir el número de trabajadores con la demanda real, optimizando tanto el costo como el rendimiento.
Señales de escala
| Señal | Ampliar cuando | Reducir cuando |
|---|---|---|
| Profundidad de la cola | > 20 tareas pendientes | < 5 tareas pendientes |
| Utilización de trabajadores | > 80% ocupado | < 20% ocupado |
| Resolver latencia | P95 > 60 segundos | P95 < 20 segundos |
| Tasa de error | > 5% (necesita trabajadores nuevos) | Estable < 1% |
| Equilibrio | N/A | Balance <$1 (dejar de escalar) |
Escalador automático basado en subprocesos
Escale los subprocesos de trabajo dentro de un solo proceso:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Escalador automático basado en procesos
Escale los procesos de trabajo para el aislamiento de la CPU:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Escalamiento consciente del equilibrio
Deje de escalar cuando los fondos se agoten:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
Integrar en el ciclo de escalado:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
Estrategias de escalamiento comparadas
| Estrategia | Mejor para | Latencia | Complejidad |
|---|---|---|---|
| Grupo de subprocesos | I/O-bound (llamadas API) | Bajo | Bajo |
| Grupo de procesos | Preprocesamiento vinculado a la CPU | Medio | Medio |
| Kubernetes HPA | Implementaciones nativas de la nube | superior | Alto |
| KEDA | Escalado basado en eventos | Medio | Medio |
Solución de problemas
| Problema | causa | Solución |
|---|---|---|
| Los trabajadores siguen aumentando | La cola nunca se agota | Comprobar si los trabajadores realmente están procesando |
| Reducción demasiado agresiva | Umbral bajo | Aumentar el retraso de reducción a 30 s+ |
| Procesos zombies | Procesos no limpiados | Utilice _cleanup_dead() con regularidad |
| El balance se agota rápidamente | Demasiados trabajadores | Agregar verificación de balance a la lógica de escalado |
Preguntas frecuentes
¿Cuál es la proporción correcta de trabajadores por cola?
Apunte a 1 trabajador por cada 5 a 10 tareas en cola. Cada trabajador procesa entre 3 y 6 CAPTCHA por minuto, según el tipo.
¿Debo utilizar hilos o procesos?
Subprocesos para llamadas API puras (CaptchaAI es I/O-bound). Procesos cuando también realiza preprocesamiento de imágenes o cálculos intensos además de la resolución.
¿Qué tan rápido debo ampliar?
Aumente rápidamente (verifique cada 10 a 15 segundos), reduzca lentamente (espere entre 30 y 60 segundos de carga baja). Esto evita las peleas entre estados.
Guías relacionadas
- Colas de trabajos con Kubernetes
- Monitoreo con Prometheus/Grafana
Escala de forma inteligente: obtén tu API key de CaptchaAI hoy.