2 * Background Metrics Sync Worker
4 * Runs every 30 minutes to sync DigitalOcean metrics to database for all tenants
5 * Prevents rate limiting and provides instant page loads directly from cached data
8const cron = require('node-cron');
9const { Pool } = require('pg');
10const DigitalOceanService = require('../services/digitalOceanService');
12const SYNC_INTERVAL = process.env.SYNC_INTERVAL || '*/30 * * * *'; // Every 30 minutes
15const pool = new Pool({
16 host: process.env.DB_HOST,
17 port: process.env.DB_PORT,
18 user: process.env.DB_USER,
19 password: process.env.DB_PASSWORD,
20 database:process.env.DB_NAME || 'defaultdb',
21 ssl: { rejectUnauthorized: false }
24console.log('[MetricsSync] Worker starting...');
25console.log(`[MetricsSync] Database: ${process.env.DB_HOST}:${process.env.DB_PORT}`);
26console.log(`[MetricsSync] Sync Interval: ${SYNC_INTERVAL} (every 30 minutes)`);
29 * Sync metrics for a single tenant from DigitalOcean.
30 * Fetches database and droplet resources, retrieves metrics, and stores in database.
31 * @function syncTenantMetrics
32 * @param {number} tenantId - Tenant ID to sync metrics for
33 * @returns {Promise<void>}
35async function syncTenantMetrics(tenantId) {
37 const doService = new DigitalOceanService(tenantId);
38 const resources = await doService.getAllResources();
46 // =====================
47 // Sync Databases + Metrics
48 // =====================
49 const databases = resources.databases || [];
50 const activeDatabaseIds = databases.map(db => db.id);
52 for (const db of databases) {
55 metrics = await doService.getDatabaseMetrics(db.id);
58 console.error(`[MetricsSync] Failed to fetch metrics for database ${db.id}:`, err.message);
62 // Prepare metadata - use null if stringify fails
65 metadata = JSON.stringify(db);
67 console.error(`[MetricsSync] Failed to stringify metadata for database ${db.id}:`, jsonErr.message);
68 metadata = JSON.stringify({ id: db.id, name: db.name, engine: db.engine });
72 `INSERT INTO hosting_databases (
73 tenant_id, do_database_id, database_name, engine, version, status,
74 region, size, num_nodes, connection_host, connection_port,
75 connection_database, connection_user, tags,
76 cpu_count, memory_mb, disk_gb, metrics_last_updated,
77 metadata, created_at, updated_at
79 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
80 ON CONFLICT (do_database_id)
82 database_name = EXCLUDED.database_name,
83 status = EXCLUDED.status,
84 version = EXCLUDED.version,
86 num_nodes = EXCLUDED.num_nodes,
87 connection_host = EXCLUDED.connection_host,
88 connection_port = EXCLUDED.connection_port,
89 cpu_count = EXCLUDED.cpu_count,
90 memory_mb = EXCLUDED.memory_mb,
91 disk_gb = EXCLUDED.disk_gb,
92 metrics_last_updated = EXCLUDED.metrics_last_updated,
93 metadata = EXCLUDED.metadata,
94 updated_at = CURRENT_TIMESTAMP`,
96 tenantId, db.id, db.name, db.engine, db.version, db.status,
97 db.region, db.size, db.num_nodes || 1,
98 db.connection?.host || null, db.connection?.port || null,
99 db.connection?.database || null, db.connection?.user || null,
101 metrics?.cpu_count || null, metrics?.memory_mb || null, metrics?.disk_gb || null,
102 metrics ? new Date() : null,
109 `INSERT INTO hosting_metrics_history (
110 resource_type, resource_id, tenant_id, cpu_value, memory_value, disk_value
111 ) VALUES ($1, $2, $3, $4, $5, $6)`,
112 ['database', db.id, tenantId, metrics.cpu_count, metrics.memory_mb, metrics.disk_gb]
116 syncStats.databases++;
118 console.error(`[MetricsSync] Failed to sync database ${db.id} (${db.name}):`, err.message);
122 // Mark databases as deleted if they no longer exist
123 if (activeDatabaseIds.length > 0) {
125 `UPDATE hosting_databases
126 SET status = 'deleted', updated_at = CURRENT_TIMESTAMP
128 AND do_database_id NOT IN (${activeDatabaseIds.map((_, i) => `$${i + 2}`).join(', ')})
129 AND status != 'deleted'`,
130 [tenantId, ...activeDatabaseIds]
134 // =====================
135 // Sync Droplets + Metrics
136 // =====================
137 const droplets = resources.droplets || [];
138 const activeDropletIds = droplets.map(d => d.id.toString());
140 for (const droplet of droplets) {
141 let cpuMetrics = null;
145 cpuMetrics = await doService.getDropletMetrics(droplet.id, 'cpu');
148 if (cpuMetrics?.data?.result?.[0]?.values) {
149 const values = cpuMetrics.data.result[0].values;
150 const sum = values.reduce((acc, [, value]) => acc + parseFloat(value), 0);
151 avgCpu = values.length > 0 ? (sum / values.length).toFixed(2) : null;
154 console.error(`[MetricsSync] Failed to fetch metrics for droplet ${droplet.id}:`, err.message);
158 // Prepare metadata - use null if stringify fails
161 metadata = JSON.stringify(droplet);
163 console.error(`[MetricsSync] Failed to stringify metadata for droplet ${droplet.id}:`, jsonErr.message);
164 metadata = JSON.stringify({ id: droplet.id, name: droplet.name, status: droplet.status });
168 `INSERT INTO hosting_droplets (
169 tenant_id, do_droplet_id, droplet_name, status, region, size,
170 ip_address, ipv6_address, vcpus, memory, disk, image, tags,
171 cpu_usage_percent, metrics_last_updated,
172 metadata, created_at, updated_at
174 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
175 ON CONFLICT (do_droplet_id)
177 droplet_name = EXCLUDED.droplet_name,
178 status = EXCLUDED.status,
179 size = EXCLUDED.size,
180 ip_address = EXCLUDED.ip_address,
181 vcpus = EXCLUDED.vcpus,
182 memory = EXCLUDED.memory,
183 disk = EXCLUDED.disk,
184 cpu_usage_percent = EXCLUDED.cpu_usage_percent,
185 metrics_last_updated = EXCLUDED.metrics_last_updated,
186 metadata = EXCLUDED.metadata,
187 updated_at = CURRENT_TIMESTAMP`,
189 tenantId, droplet.id.toString(), droplet.name, droplet.status,
190 droplet.region.slug, droplet.size_slug,
191 droplet.networks?.v4?.[0]?.ip_address || null,
192 droplet.networks?.v6?.[0]?.ip_address || null,
193 droplet.vcpus, droplet.memory, droplet.disk,
194 droplet.image?.slug || droplet.image?.name || null,
195 droplet.tags || [], avgCpu, avgCpu ? new Date() : null,
202 `INSERT INTO hosting_metrics_history (
203 resource_type, resource_id, tenant_id, cpu_value
204 ) VALUES ($1, $2, $3, $4)`,
205 ['droplet', droplet.id.toString(), tenantId, avgCpu]
209 syncStats.droplets++;
211 console.error(`[MetricsSync] Failed to sync droplet ${droplet.id} (${droplet.name}):`, err.message);
215 // Mark droplets as deleted if they no longer exist
216 if (activeDropletIds.length > 0) {
218 `UPDATE hosting_droplets
219 SET status = 'deleted', updated_at = CURRENT_TIMESTAMP
221 AND do_droplet_id NOT IN (${activeDropletIds.map((_, i) => `$${i + 2}`).join(', ')})
222 AND status != 'deleted'`,
223 [tenantId, ...activeDropletIds]
229 console.error(`[MetricsSync] Error syncing tenant ${tenantId}:`, error.message);
237async function syncAllTenants() {
238 const startTime = Date.now();
239 console.log(`[MetricsSync] Starting metrics sync at ${new Date().toISOString()}`);
243 const tenantsResult = await pool.query(
244 `SELECT tenant_id FROM tenants`
246 const tenants = tenantsResult.rows;
248 console.log(`[MetricsSync] Syncing ${tenants.length} tenant(s)`);
250 let totalStats = { databases: 0, droplets: 0, metrics: 0 };
252 for (const tenant of tenants) {
254 const stats = await syncTenantMetrics(tenant.tenant_id);
255 totalStats.databases += stats.databases;
256 totalStats.droplets += stats.droplets;
257 totalStats.metrics += stats.metrics;
258 console.log(`[MetricsSync] Tenant ${tenant.tenant_id}: ${JSON.stringify(stats)}`);
260 console.error(`[MetricsSync] Failed to sync tenant ${tenant.tenant_id}:`, err.message);
264 const duration = ((Date.now() - startTime) / 1000).toFixed(2);
265 console.log(`[MetricsSync] ✓ Sync completed in ${duration}s - Total:`, totalStats);
268 const duration = ((Date.now() - startTime) / 1000).toFixed(2);
269 console.error(`[MetricsSync] ✗ Sync failed after ${duration}s:`, error.message);
273// Run sync every 30 minutes
274cron.schedule(SYNC_INTERVAL, syncAllTenants);
276// Run sync immediately on startup
278 console.log('[MetricsSync] Running initial sync...');
279 await syncAllTenants();
280 console.log('[MetricsSync] Worker ready. Next sync in 30 minutes.');
284process.on('SIGINT', () => {
285 console.log('[MetricsSync] Shutting down...');
290process.on('SIGTERM', () => {
291 console.log('[MetricsSync] Shutting down...');