EverydayTech Platform - Developer Reference
Complete Source Code Documentation - All Applications
Loading...
Searching...
No Matches
metricsAggregator.js
Go to the documentation of this file.
1/**
2 * @file Metrics Aggregator - In-Memory Metrics Batching Service
3 * @module services/metricsAggregator
4 * @description
5 * High-performance metrics buffering and batch insertion service. Aggregates real-time
6 * agent metrics in memory and flushes to database periodically to reduce write overhead.
7 * Prevents database saturation from high-frequency metric submissions.
8 *
9 * **Buffering Strategy:**
10 * - Collects metrics in memory buffer (array)
11 * - Flushes based on size threshold OR time interval
12 * - Size trigger: 100 metrics
13 * - Time trigger: 10 seconds
14 *
15 * **Performance Benefits:**
16 * - Reduces database writes by 10-100x
17 * - Prevents connection pool exhaustion
18 * - Batches INSERT operations for efficiency
19 * - Lower database CPU/IO load
20 *
21 * **Metrics Processing:**
22 * 1. Receive metric from agent (via WebSocket or HTTP)
23 * 2. Buffer in memory array
24 * 3. Trigger flush on threshold (size or time)
25 * 4. Batch INSERT into agent_metrics table
26 * 5. Prune metrics older than 6 months
27 * 6. Clear buffer and reset timer
28 *
29 * **Metric Schema:**
30 * ```javascript
31 * {
32 * agent_uuid: "550e8400-e29b-41d4-a716-446655440000",
33 * ts: "2025-03-13T10:30:00.000Z",
34 * data: {
35 * cpuLoad1: 45.2, // CPU 1-minute load average
36 * memPercent: 78.5, // Memory usage percentage
37 * disk: [{used: 500GB}], // Disk usage per drive
38 * uptime: 86400 // System uptime in seconds
39 * }
40 * }
41 * ```
42 *
43 * **Database Schema:**
44 * - Table: `agent_metrics`
45 * - Indexed on: (agent_id, ts)
46 * - Retention: 6 months (auto-pruned)
47 *
48 * **Configuration:**
49 * - `BATCH_INTERVAL`: 10000ms (10 seconds)
50 * - `MAX_BATCH_SIZE`: 100 metrics
51 * - Retention: 6 months
52 * @example
53 * // Add metric to buffer (from WebSocket handler)
54 * const { addMetric } = require('./services/metricsAggregator');
55 * addMetric({
56 * agent_uuid: agent.uuid,
57 * ts: new Date().toISOString(),
58 * data: {
59 * cpuLoad1: 45.2,
60 * memPercent: 78.5,
61 * disk: [{used: 500000000000}],
62 * uptime: 86400
63 * }
64 * });
65 * @example
66 * // Force immediate flush (before shutdown)
67 * const { flushMetrics } = require('./services/metricsAggregator');
68 * flushMetrics();
69 * @see {@link module:services/websocketManager} for real-time metrics streaming
70 * @see {@link module:routes/agent~SubmitMetricsBatch} for HTTP metrics endpoint
71 * @requires ./db - PostgreSQL connection pool
72 */
73
74// metricsAggregator.js
75// Simple in-memory metrics batching for upload
76
77const BATCH_INTERVAL = 10000; // 10 seconds
78const MAX_BATCH_SIZE = 100; // Max metrics per batch
79let metricsBuffer = [];
80let flushTimer = null;
81
82/**
83 *
84 * @param metric
85 */
86function addMetric(metric) {
87 metricsBuffer.push(metric);
88 if (metricsBuffer.length >= MAX_BATCH_SIZE) {
89 flushMetrics();
90 } else if (!flushTimer) {
91 flushTimer = setTimeout(flushMetrics, BATCH_INTERVAL);
92 }
93}
94
95/**
96 *
97 */
98function flushMetrics() {
99 if (!metricsBuffer.length) return;
100 // Replace with DB or API upload logic
101 console.log('==============================');
102 console.log(`[MetricsAggregator] 🚀 Flushing ${metricsBuffer.length} metrics`);
103 console.log(`[MetricsAggregator] First metric sample:`, JSON.stringify(metricsBuffer[0], null, 2));
104 // Insert metrics into agent_metrics table
105 const pool = require('./db');
106 const insertQuery = `
107 INSERT INTO agent_metrics (agent_id, ts, cpu, memory, disk, uptime, raw)
108 SELECT a.agent_id, $2, $3, $4, $5, $6, $7
109 FROM agents a WHERE a.agent_uuid = $1
110 `;
111
112 metricsBuffer.forEach(async (m) => {
113 try {
114 // Extract values from metric payload
115 const agent_uuid = m.agent_uuid;
116 const ts = m.ts || new Date().toISOString();
117 const cpu = m.data?.cpuLoad1 ?? m.data?.cpu ?? null;
118 const memory = m.data?.memPercent ?? m.data?.memory ?? null;
119 const disk = Array.isArray(m.data?.disk) ? m.data.disk.reduce((acc, d) => acc + (d.used || 0), 0) : null;
120 const uptime = m.data?.uptime ?? null;
121 const raw = JSON.stringify(m.data || m);
122 await pool.query(insertQuery, [agent_uuid, ts, cpu, memory, disk, uptime, raw]);
123 } catch (err) {
124 console.error('[MetricsAggregator] Failed to insert metric:', err);
125 }
126 });
127
128 // Prune metrics older than 6 months
129 try {
130 const pruneQuery = `DELETE FROM agent_metrics WHERE ts < NOW() - INTERVAL '6 months'`;
131 pool.query(pruneQuery);
132 } catch (err) {
133 console.error('[MetricsAggregator] Failed to prune old metrics:', err);
134 }
135 metricsBuffer = [];
136 clearTimeout(flushTimer);
137 flushTimer = null;
138 console.log('[MetricsAggregator] ✅ Flush complete');
139 console.log('==============================');
140}
141
142module.exports = { addMetric, flushMetrics };