Ingesta de eventos a través de la API de streaming
Introducción
Utilice esta API para enviar datos de eventos a una canalización para su procesamiento, filtrado, enriquecimiento y enrutamiento.
Este punto final admite ingesta de alto rendimiento en tiempo real y admite eventos JSON estructurados y semiestructurados. Es ideal para aplicaciones en la nube, microservicios, dispositivos IoT o cualquier sistema que emita telemetría.
¿Por qué utilizar esta API?
- Envíe registros y métricas directamente a una canalización
- Normalice y enriquezca los datos antes de enviarlos al flujo descendente
- Reducción de costes mediante el filtrado o el muestreo temprano
- Cree canalizaciones flexibles sin dependencia de un proveedor
Antes de empezar
Asegúrese de tener:
- Un Token API con permisos de ingestión
- Un ID de flujo
- Cargas útiles de eventos con formato JSON
curlo Postman instalado
Visión general del pipeline
- Mermaid (código)
- Mermaid (imagen)
- ASCII
flowchart LR
A["App Cloud / Microservicio"] --> B["API de Ingesta en Streaming"]
B --> C["Capa de Procesamiento<br/> • Análisis<br/> • Enriquecimiento<br/> • Enmascaramiento"]
C --> D["Motor de Enrutamiento<br/> • Filtros<br/> • Muestreo<br/> • Reglas"]
D --> E{{"Destinos:<br/>S3 / SIEM / Elastic / Snowflake"}}
+--------------+ +------------------+ +----------------+ +----------------+
| App Cloud | ---> | API de Ingesta | ---> | Procesamiento | ---> | Enrutamiento |
| (Logs/Eventos)| | (POST /ingest) | | (Parse/Enrich) | | (Motor Reglas) |
+--------------+ +------------------+ +----------------+ +----------------+
|
v
+----------------------+
| Destinos |
| S3 | SIEM | DW | ES |
+----------------------+
Punto final de ingesta
POST /v1/streams/ingest
Cabeceras de solicitud
| Encabezado | Valor | Requerido | Descripción |
|---|---|---|---|
| Authorization | Bearer <token> | Sí | Token API |
| Content-Type | application/json | Sí | Carga JSON |
| X-Stream-Id | <tu-stream-id> | Sí | Pipeline objetivo |
Cuerpo de la solicitud
{
"timestamp": "2025-02-12T14:32:15Z",
"source": "payments-app",
"event": {
"transactionId": "txn_30921",
"status": "approved",
"latency_ms": 182,
"amount": 499,
"currency": "USD"
},
"metadata": {
"env": "prod",
"team": "payments"
}
}
Notas
timestamp: Formato ISO 8601event: Datos del evento principal (pares clave-valor)- Tamaño máximo de la carga útil: 1 MB
Respuesta
200 OK
{
"status": "accepted",
"ingested_bytes": 527,
"stream_id": "a0c3b4de-1923-4ff1-8dba-f92ad912d20f"
}
400 Bad request
Problemas comunes:
- JSON no válido
- Formato de marca de tiempo incorrecto
- Faltan los campos
sourceoevent - La carga útil supera el tamaño máximo
{
"error": "invalid_format",
"detail": "El campo 'timestamp' debe ser una cadena ISO 8601 válida."
}
401 No Autorizado
- Token faltante o caducado.
429 Tasa limitada
- Reduzca la tasa de solicitudes o envíe eventos por lotes.
Ejemplo cURL
curl -X POST "https://api.example.com/v1/streams/ingest" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-H "X-Stream-Id: $STREAM_ID" \
-d @event.json
Casos de uso comunes
- Ingesta de registros de microservicios
- Transmisión de telemetría de dispositivos IoT
- Normalización de eventos de sistemas distribuidos
- Enrutamiento de eventos enriquecidos a SIEM, S3, Elastic o Snowflake
- Canalizaciones de detección de fraudes o anomalías en tiempo real
Solución de problemas
| Síntoma | Causa probable | Solución |
|---|---|---|
| No se muestran eventos en el pipeline | ID de Stream incorrecto | Copie el ID de Stream correcto |
| Carga rechazada | JSON inválido | Validar con jq |
| Faltan campos | Estructura anidada incorrecta | Revisar reglas de análisis |
| Alta latencia | Grandes cargas o lotes | Reduzca el tamaño del lote |
Próximos pasos
- Añadir reglas de análisis, enmascaramiento o enriquecimiento
- Enrutar los datos a múltiples destinos
- Reducción de costes mediante muestreo o filtrado
- Construir métricas y cuadros de mando