EverydayTech Platform - Developer Reference
Complete Source Code Documentation - All Applications
Loading...
Searching...
No Matches
metricsSync.js
Go to the documentation of this file.
1/**
2 * Background Metrics Sync Worker
3 *
4 * Runs every 30 minutes to sync DigitalOcean metrics to database for all tenants
5 * Prevents rate limiting and provides instant page loads directly from cached data
6 */
7
8const cron = require('node-cron');
9const { Pool } = require('pg');
10const DigitalOceanService = require('../services/digitalOceanService');
11
12const SYNC_INTERVAL = process.env.SYNC_INTERVAL || '*/30 * * * *'; // Every 30 minutes
13
14// Database connection
15const pool = new Pool({
16 host: process.env.DB_HOST,
17 port: process.env.DB_PORT,
18 user: process.env.DB_USER,
19 password: process.env.DB_PASSWORD,
20 database:process.env.DB_NAME || 'defaultdb',
21 ssl: { rejectUnauthorized: false }
22});
23
24console.log('[MetricsSync] Worker starting...');
25console.log(`[MetricsSync] Database: ${process.env.DB_HOST}:${process.env.DB_PORT}`);
26console.log(`[MetricsSync] Sync Interval: ${SYNC_INTERVAL} (every 30 minutes)`);
27
28/**
29 * Sync metrics for a single tenant from DigitalOcean.
30 * Fetches database and droplet resources, retrieves metrics, and stores in database.
31 * @function syncTenantMetrics
32 * @param {number} tenantId - Tenant ID to sync metrics for
33 * @returns {Promise<void>}
34 */
35async function syncTenantMetrics(tenantId) {
36 try {
37 const doService = new DigitalOceanService(tenantId);
38 const resources = await doService.getAllResources();
39
40 let syncStats = {
41 databases: 0,
42 droplets: 0,
43 metrics: 0
44 };
45
46 // =====================
47 // Sync Databases + Metrics
48 // =====================
49 const databases = resources.databases || [];
50 const activeDatabaseIds = databases.map(db => db.id);
51
52 for (const db of databases) {
53 let metrics = null;
54 try {
55 metrics = await doService.getDatabaseMetrics(db.id);
56 syncStats.metrics++;
57 } catch (err) {
58 console.error(`[MetricsSync] Failed to fetch metrics for database ${db.id}:`, err.message);
59 }
60
61 try {
62 // Prepare metadata - use null if stringify fails
63 let metadata = null;
64 try {
65 metadata = JSON.stringify(db);
66 } catch (jsonErr) {
67 console.error(`[MetricsSync] Failed to stringify metadata for database ${db.id}:`, jsonErr.message);
68 metadata = JSON.stringify({ id: db.id, name: db.name, engine: db.engine });
69 }
70
71 await pool.query(
72 `INSERT INTO hosting_databases (
73 tenant_id, do_database_id, database_name, engine, version, status,
74 region, size, num_nodes, connection_host, connection_port,
75 connection_database, connection_user, tags,
76 cpu_count, memory_mb, disk_gb, metrics_last_updated,
77 metadata, created_at, updated_at
78 )
79 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
80 ON CONFLICT (do_database_id)
81 DO UPDATE SET
82 database_name = EXCLUDED.database_name,
83 status = EXCLUDED.status,
84 version = EXCLUDED.version,
85 size = EXCLUDED.size,
86 num_nodes = EXCLUDED.num_nodes,
87 connection_host = EXCLUDED.connection_host,
88 connection_port = EXCLUDED.connection_port,
89 cpu_count = EXCLUDED.cpu_count,
90 memory_mb = EXCLUDED.memory_mb,
91 disk_gb = EXCLUDED.disk_gb,
92 metrics_last_updated = EXCLUDED.metrics_last_updated,
93 metadata = EXCLUDED.metadata,
94 updated_at = CURRENT_TIMESTAMP`,
95 [
96 tenantId, db.id, db.name, db.engine, db.version, db.status,
97 db.region, db.size, db.num_nodes || 1,
98 db.connection?.host || null, db.connection?.port || null,
99 db.connection?.database || null, db.connection?.user || null,
100 db.tags || [],
101 metrics?.cpu_count || null, metrics?.memory_mb || null, metrics?.disk_gb || null,
102 metrics ? new Date() : null,
103 metadata
104 ]
105 );
106
107 if (metrics) {
108 await pool.query(
109 `INSERT INTO hosting_metrics_history (
110 resource_type, resource_id, tenant_id, cpu_value, memory_value, disk_value
111 ) VALUES ($1, $2, $3, $4, $5, $6)`,
112 ['database', db.id, tenantId, metrics.cpu_count, metrics.memory_mb, metrics.disk_gb]
113 );
114 }
115
116 syncStats.databases++;
117 } catch (err) {
118 console.error(`[MetricsSync] Failed to sync database ${db.id} (${db.name}):`, err.message);
119 }
120 }
121
122 // Mark databases as deleted if they no longer exist
123 if (activeDatabaseIds.length > 0) {
124 await pool.query(
125 `UPDATE hosting_databases
126 SET status = 'deleted', updated_at = CURRENT_TIMESTAMP
127 WHERE tenant_id = $1
128 AND do_database_id NOT IN (${activeDatabaseIds.map((_, i) => `$${i + 2}`).join(', ')})
129 AND status != 'deleted'`,
130 [tenantId, ...activeDatabaseIds]
131 );
132 }
133
134 // =====================
135 // Sync Droplets + Metrics
136 // =====================
137 const droplets = resources.droplets || [];
138 const activeDropletIds = droplets.map(d => d.id.toString());
139
140 for (const droplet of droplets) {
141 let cpuMetrics = null;
142 let avgCpu = null;
143
144 try {
145 cpuMetrics = await doService.getDropletMetrics(droplet.id, 'cpu');
146 syncStats.metrics++;
147
148 if (cpuMetrics?.data?.result?.[0]?.values) {
149 const values = cpuMetrics.data.result[0].values;
150 const sum = values.reduce((acc, [, value]) => acc + parseFloat(value), 0);
151 avgCpu = values.length > 0 ? (sum / values.length).toFixed(2) : null;
152 }
153 } catch (err) {
154 console.error(`[MetricsSync] Failed to fetch metrics for droplet ${droplet.id}:`, err.message);
155 }
156
157 try {
158 // Prepare metadata - use null if stringify fails
159 let metadata = null;
160 try {
161 metadata = JSON.stringify(droplet);
162 } catch (jsonErr) {
163 console.error(`[MetricsSync] Failed to stringify metadata for droplet ${droplet.id}:`, jsonErr.message);
164 metadata = JSON.stringify({ id: droplet.id, name: droplet.name, status: droplet.status });
165 }
166
167 await pool.query(
168 `INSERT INTO hosting_droplets (
169 tenant_id, do_droplet_id, droplet_name, status, region, size,
170 ip_address, ipv6_address, vcpus, memory, disk, image, tags,
171 cpu_usage_percent, metrics_last_updated,
172 metadata, created_at, updated_at
173 )
174 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
175 ON CONFLICT (do_droplet_id)
176 DO UPDATE SET
177 droplet_name = EXCLUDED.droplet_name,
178 status = EXCLUDED.status,
179 size = EXCLUDED.size,
180 ip_address = EXCLUDED.ip_address,
181 vcpus = EXCLUDED.vcpus,
182 memory = EXCLUDED.memory,
183 disk = EXCLUDED.disk,
184 cpu_usage_percent = EXCLUDED.cpu_usage_percent,
185 metrics_last_updated = EXCLUDED.metrics_last_updated,
186 metadata = EXCLUDED.metadata,
187 updated_at = CURRENT_TIMESTAMP`,
188 [
189 tenantId, droplet.id.toString(), droplet.name, droplet.status,
190 droplet.region.slug, droplet.size_slug,
191 droplet.networks?.v4?.[0]?.ip_address || null,
192 droplet.networks?.v6?.[0]?.ip_address || null,
193 droplet.vcpus, droplet.memory, droplet.disk,
194 droplet.image?.slug || droplet.image?.name || null,
195 droplet.tags || [], avgCpu, avgCpu ? new Date() : null,
196 metadata
197 ]
198 );
199
200 if (avgCpu) {
201 await pool.query(
202 `INSERT INTO hosting_metrics_history (
203 resource_type, resource_id, tenant_id, cpu_value
204 ) VALUES ($1, $2, $3, $4)`,
205 ['droplet', droplet.id.toString(), tenantId, avgCpu]
206 );
207 }
208
209 syncStats.droplets++;
210 } catch (err) {
211 console.error(`[MetricsSync] Failed to sync droplet ${droplet.id} (${droplet.name}):`, err.message);
212 }
213 }
214
215 // Mark droplets as deleted if they no longer exist
216 if (activeDropletIds.length > 0) {
217 await pool.query(
218 `UPDATE hosting_droplets
219 SET status = 'deleted', updated_at = CURRENT_TIMESTAMP
220 WHERE tenant_id = $1
221 AND do_droplet_id NOT IN (${activeDropletIds.map((_, i) => `$${i + 2}`).join(', ')})
222 AND status != 'deleted'`,
223 [tenantId, ...activeDropletIds]
224 );
225 }
226
227 return syncStats;
228 } catch (error) {
229 console.error(`[MetricsSync] Error syncing tenant ${tenantId}:`, error.message);
230 throw error;
231 }
232}
233
234/**
235 * Sync all tenants
236 */
237async function syncAllTenants() {
238 const startTime = Date.now();
239 console.log(`[MetricsSync] Starting metrics sync at ${new Date().toISOString()}`);
240
241 try {
242 // Get all tenants
243 const tenantsResult = await pool.query(
244 `SELECT tenant_id FROM tenants`
245 );
246 const tenants = tenantsResult.rows;
247
248 console.log(`[MetricsSync] Syncing ${tenants.length} tenant(s)`);
249
250 let totalStats = { databases: 0, droplets: 0, metrics: 0 };
251
252 for (const tenant of tenants) {
253 try {
254 const stats = await syncTenantMetrics(tenant.tenant_id);
255 totalStats.databases += stats.databases;
256 totalStats.droplets += stats.droplets;
257 totalStats.metrics += stats.metrics;
258 console.log(`[MetricsSync] Tenant ${tenant.tenant_id}: ${JSON.stringify(stats)}`);
259 } catch (err) {
260 console.error(`[MetricsSync] Failed to sync tenant ${tenant.tenant_id}:`, err.message);
261 }
262 }
263
264 const duration = ((Date.now() - startTime) / 1000).toFixed(2);
265 console.log(`[MetricsSync] ✓ Sync completed in ${duration}s - Total:`, totalStats);
266
267 } catch (error) {
268 const duration = ((Date.now() - startTime) / 1000).toFixed(2);
269 console.error(`[MetricsSync] ✗ Sync failed after ${duration}s:`, error.message);
270 }
271}
272
273// Run sync every 30 minutes
274cron.schedule(SYNC_INTERVAL, syncAllTenants);
275
276// Run sync immediately on startup
277(async () => {
278 console.log('[MetricsSync] Running initial sync...');
279 await syncAllTenants();
280 console.log('[MetricsSync] Worker ready. Next sync in 30 minutes.');
281})();
282
283// Graceful shutdown
284process.on('SIGINT', () => {
285 console.log('[MetricsSync] Shutting down...');
286 pool.end();
287 process.exit(0);
288});
289
290process.on('SIGTERM', () => {
291 console.log('[MetricsSync] Shutting down...');
292 pool.end();
293 process.exit(0);
294});