EverydayTech Platform - Developer Reference
Complete Source Code Documentation - All Applications
Loading...
Searching...
No Matches
websocketManager.js
Go to the documentation of this file.
1/**
2 * @file WebSocket Manager - Centralized Real-Time Communication Hub
3 * @module services/websocketManager
4 * @description
5 * Enterprise WebSocket infrastructure managing bidirectional real-time communication
6 * between RMM agents, dashboard clients, and remote access sessions. Provides event-driven
7 * architecture for metrics streaming, command execution, file transfers, and remote desktop.
8 *
9 * **Architecture Overview:**
10 * ```
11 * ┌─────────────────┐ ┌──────────────────────┐ ┌─────────────────┐
12 * │ RMM Agents │◄────►│ WebSocketManager │◄────►│ Dashboard UI │
13 * │ (/ws) │ │ (Event Hub) │ │ (/dashboard-ws)│
14 * └─────────────────┘ └──────────────────────┘ └─────────────────┘
15 * │
16 * ┌─────────┴─────────┐
17 * │ │
18 * ┌─────▼──────┐ ┌──────▼──────┐
19 * │ MeshCentral│ │ RDS │
20 * │ Proxy │ │ Sessions │
21 * └────────────┘ └─────────────┘
22 * ```
23 *
24 * **Connection Types:**
25 *
26 * 1. **Agent Connections** (`/ws?agent_uuid=...`)
27 * - Persistent WebSocket from RMM agent to server
28 * - Real-time metrics streaming (CPU, memory, disk, network)
29 * - Command execution and script running
30 * - File transfer coordination
31 * - Persistent tunnel management for RDP/VNC
32 *
33 * 2. **Dashboard Connections** (`/dashboard-ws?connectionId=...`)
34 * - Dashboard client subscriptions to agent streams
35 * - Live metrics, logs, process lists
36 * - Script execution monitoring
37 * - Real-time alerts and notifications
38 *
39 * 3. **RDS Sessions** (`/api/rds/:sessionId`)
40 * - Remote Desktop Service broker
41 * - Dual WebSocket coordination (viewer ↔ agent)
42 * - Mouse/keyboard input relay
43 * - Screen update streaming
44 *
45 * 4. **MeshCentral Proxy** (`/api/meshcentral/terminal`, `/api/meshcentral/files`)
46 * - WebSocket-to-WebSocket proxying for MeshCentral integration
47 * - Terminal sessions and file browser
48 *
49 * **Message Protocol:**
50 *
51 * All WebSocket messages use JSON format:
52 * ```json
53 * {
54 * "type": "metrics|script_output|command|subscribe|...",
55 * "data": { ... },
56 * "requestId": "optional-correlation-id"
57 * }
58 * ```
59 *
60 * **Agent Message Types:**
61 * - `connected` - Agent initial handshake
62 * - `metrics` - Performance metrics (CPU, RAM, disk, network)
63 * - `processes` - Running process snapshot
64 * - `services` - Windows services status
65 * - `script_output` - Script execution output chunk
66 * - `script_complete` - Script execution finished
67 * - `tunnel_ready` - Persistent tunnel established (RDP/VNC)
68 * - `response` - Response to request (with requestId correlation)
69 *
70 * **Dashboard Message Types:**
71 * - `subscribe` - Subscribe to agent's real-time data
72 * - `unsubscribe` - Unsubscribe from agent
73 * - `execute_script` - Run script on agent
74 * - `request_processes` - Query running processes
75 * - `request_files` - Query filesystem
76 *
77 * **State Management:**
78 * - `agentConnections`: Map<agent_uuid, WebSocket> - Active agent connections
79 * - `dashboardConnections`: Map<connection_id, {ws, subscriptions}> - Dashboard subscriptions
80 * - `subscriptions`: Map<agent_uuid, Set<connection_id>> - Agent → Subscribers index
81 * - `activeExecutions`: Map<execution_id, {...}> - Running script tracking
82 * - `rdsSessions`: Map<sessionId, {...}> - Remote desktop sessions
83 * - `persistentTunnels`: Map<agent_uuid, {...}> - RDP/VNC tunnel state
84 *
85 * **Key Features:**
86 * - **Automatic reconnection**: Agents reconnect and resume subscriptions
87 * - **Request/response**: Promise-based request-reply pattern with timeouts
88 * - **Broadcast**: Efficient fan-out to multiple dashboard subscribers
89 * - **Session cleanup**: Automatic cleanup on disconnect
90 * - **Port allocation**: Dynamic port assignment for persistent tunnels (45000+)
91 * - **Metrics persistence**: Network I/O saved to database for history
92 * @class WebSocketManager
93 * @augments EventEmitter
94 * @example
95 * // Initialize WebSocket manager
96 * const WebSocketManager = require('./services/websocketManager');
97 * const wsManager = new WebSocketManager();
98 * wsManager.init(httpServer);
99 * @example
100 * // Send command to agent
101 * wsManager.sendToAgent(agent_uuid, {
102 * type: 'execute_script',
103 * data: { script_id: 123, code: 'Get-Service' }
104 * });
105 * @example
106 * // Request data from agent with timeout
107 * const response = await wsManager.requestFromAgent(agent_uuid, {
108 * type: 'get_processes'
109 * }, 5000);
110 * @example
111 * // Broadcast metrics to all subscribers
112 * wsManager.broadcastToSubscribers(agent_uuid, {
113 * type: 'metrics',
114 * data: { cpu: 45.2, memory: 78.5, disk: 65.0 }
115 * });
116 * @see {@link module:routes/agent} for agent registration endpoints
117 * @see {@link module:services/metricsAggregator} for metrics buffering
118 * @see {@link module:services/guacamoleService} for RDS session management
119 * @requires ws - WebSocket server implementation
120 * @requires events - EventEmitter for pub/sub
121 * @requires db - PostgreSQL connection pool from services/db
122 */
123
124/**
125 * WebSocket Manager - Centralized real-time communication
126 *
127 * Handles:
128 * - Agent connections and message routing
129 * - Dashboard subscriptions to agent data streams
130 * - Real-time metrics, processes, logs, events
131 * - Script execution with output streaming
132 * - RDS (Remote Desktop) sessions
133 */
134
135const { WebSocketServer } = require('ws');
136const url = require('url');
137const EventEmitter = require('events');
138const pool = require('./db');
139
140/**
141 * WebSocket Manager class.
142 * Handles real-time bidirectional communication for agents, dashboards, and RDS sessions.
143 */
144class WebSocketManager extends EventEmitter {
145 /**
146 * Initialize WebSocketManager.
147 * Sets up connection maps, subscriptions, and execution tracking.
148 */
149 constructor() {
150 super();
151
152 // Agent connections: agent_uuid → ws
153 this.agentConnections = new Map();
154
155 // Dashboard connections: connection_id → { ws, subscriptions: Set<agent_uuid> }
156 this.dashboardConnections = new Map();
157
158 // Active subscriptions: agent_uuid → Set<connection_id>
159 this.subscriptions = new Map();
160
161 // Script executions: execution_id → { agent_uuid, dashboard_connections: Set<connection_id> }
162 this.activeExecutions = new Map();
163
164 // RDS sessions: sessionId → { agentId, viewer, agent, control }
165 this.rdsSessions = new Map();
166
167 // Persistent tunnels: agent_uuid → { protocol, localPort, assignedPort, sessionId, ready }
168 this.persistentTunnels = new Map();
169
170 // Port assignments: track which ports are in use
171 this.assignedPorts = new Set();
172 this.nextAvailablePort = 45000; // Start assigning ports from 45000
173
174 // Pending requests: requestId → { resolve, reject, timeout }
175 this.pendingRequests = new Map();
176 }
177
178 /**
179 * Initialize WebSocket server and attach to HTTP server
180 * @description Sets up WebSocket server to handle multiple connection types:
181 * - `/ws` - Agent connections
182 * - `/dashboard-ws` - Dashboard subscriptions
183 * - `/api/rds/:sessionId` - Remote Desktop sessions
184 * - `/api/meshcentral/terminal` - MeshCentral terminal proxy
185 * - `/api/meshcentral/files` - MeshCentral file browser proxy
186 * @param {object} server - HTTP/HTTPS server instance to attach WebSocket upgrade handler
187 * @returns {WebSocketManager} Returns this instance for method chaining
188 * @fires connection WebSocket connection established event
189 */
190 init(server) {
191 const wss = new WebSocketServer({ noServer: true });
192
193 // Handle upgrade requests
194 server.on('upgrade', (req, socket, head) => {
195 const pathname = req.url.split('?')[0];
196
197 if (
198 pathname === '/ws' ||
199 pathname === '/dashboard-ws' ||
200 pathname.startsWith('/api/rds/') ||
201 pathname === '/api/meshcentral/terminal' ||
202 pathname === '/api/meshcentral/files'
203 ) {
204 wss.handleUpgrade(req, socket, head, (ws) => {
205 wss.emit('connection', ws, req);
206 });
207 } else {
208 socket.destroy();
209 }
210 });
211
212 // Handle connections
213 wss.on('connection', (ws, req) => {
214 const parsed = url.parse(req.url, true);
215 const path = parsed.pathname;
216
217 if (path === '/ws') {
218 this.handleAgentConnection(ws, parsed.query);
219 } else if (path === '/dashboard-ws') {
220 this.handleDashboardConnection(ws, parsed.query);
221 } else if (path.startsWith('/api/rds/')) {
222 this.handleRDSConnection(ws, req);
223 } else if (path === '/api/meshcentral/terminal') {
224 this.handleMeshCentralTerminal(ws, parsed.query);
225 } else if (path === '/api/meshcentral/files') {
226 this.handleMeshCentralFiles(ws, parsed.query);
227 } else {
228 ws.close();
229 }
230 });
231
232 console.log('✅ WebSocket Manager initialized');
233 return this;
234 }
235
236 /**
237 * Handle new agent WebSocket connection
238 * @description Registers agent connection, stores in connection map, and sets up message
239 * handlers for metrics, script output, and commands. Updates agent last_seen timestamp.
240 * @param {WebSocket} ws - WebSocket connection from agent
241 * @param {object} query - URL query parameters from connection request
242 * @param {string} query.agent_uuid - Unique agent identifier (required)
243 * @returns {void}
244 * @fires agent_disconnected When agent connection closes
245 */
246 handleAgentConnection(ws, query) {
247 const agent_uuid = query.agent_uuid;
248 if (!agent_uuid) {
249 console.log('[WS] Agent connection rejected: missing agent_uuid');
250 return ws.close();
251 }
252
253 // Store connection
254 this.agentConnections.set(agent_uuid, ws);
255 console.log(`[WS] Agent connected: ${agent_uuid} (${this.agentConnections.size} total)`);
256
257 // Update database last_seen
258 this.updateLastSeen(agent_uuid);
259
260 // Handle messages from agent
261 ws.on('message', (raw) => {
262 try {
263 const msg = JSON.parse(raw.toString());
264 this.handleAgentMessage(agent_uuid, msg, ws);
265 } catch (err) {
266 console.error(`[WS] Invalid message from agent ${agent_uuid}:`, err.message);
267 }
268 });
269
270 // Handle disconnect
271 ws.on('close', () => {
272 this.agentConnections.delete(agent_uuid);
273 console.log(`[WS] Agent disconnected: ${agent_uuid}`);
274 this.updateLastSeen(agent_uuid);
275 this.cleanupAgentSessions(agent_uuid);
276
277 // Notify subscribed dashboards
278 this.broadcastToSubscribers(agent_uuid, {
279 type: 'agent_disconnected',
280 agent_uuid,
281 timestamp: Date.now()
282 });
283 });
284
285 // Send welcome message
286 ws.send(JSON.stringify({
287 type: 'welcome',
288 agent_uuid,
289 timestamp: Date.now()
290 }));
291 }
292
293 /**
294 * Handle new dashboard WebSocket connection
295 * @description Creates dashboard connection with unique ID, manages subscriptions to agents,
296 * and handles subscribe/unsubscribe messages.
297 * @param {WebSocket} ws - WebSocket connection from dashboard client
298 * @returns {void}
299 */
300 handleDashboardConnection(ws) {
301 const connection_id = `dashboard-${Date.now()}-${Math.random().toString(36).slice(2)}`;
302
303 this.dashboardConnections.set(connection_id, {
304 ws,
305 subscriptions: new Set(),
306 authenticated: false,
307 user_id: null
308 });
309
310 console.log(`[WS] Dashboard connected: ${connection_id}`);
311
312 // Handle messages from dashboard
313 ws.on('message', (raw) => {
314 try {
315 const msg = JSON.parse(raw.toString());
316 this.handleDashboardMessage(connection_id, msg, ws);
317 } catch (err) {
318 console.error(`[WS] Invalid message from dashboard ${connection_id}:`, err.message);
319 }
320 });
321
322 // Handle disconnect
323 ws.on('close', () => {
324 const conn = this.dashboardConnections.get(connection_id);
325 if (conn) {
326 // Unsubscribe from all agents
327 for (const agent_uuid of conn.subscriptions) {
328 const subs = this.subscriptions.get(agent_uuid);
329 if (subs) {
330 subs.delete(connection_id);
331 if (subs.size === 0) {
332 this.subscriptions.delete(agent_uuid);
333 }
334 }
335 }
336 }
337 this.dashboardConnections.delete(connection_id);
338 console.log(`[WS] Dashboard disconnected: ${connection_id}`);
339 });
340
341 // Send welcome
342 ws.send(JSON.stringify({
343 type: 'welcome',
344 connection_id,
345 timestamp: Date.now()
346 }));
347 }
348
349 /**
350 * Process and route messages received from agent
351 * @description Handles all message types from agents including metrics, script output,
352 * tunnel status, and responses to requests. Broadcasts relevant messages to subscribed dashboards.
353 * @param {string} agent_uuid - Unique agent identifier
354 * @param {object} msg - Parsed message object from agent
355 * @param {string} msg.type - Message type (metrics|script_output|tunnel_ready|response|...)
356 * @param {object} msg.data - Message payload data
357 * @param {string} [msg.requestId] - Optional correlation ID for request/response pattern
358 * @param {WebSocket} ws - Agent's WebSocket connection
359 * @returns {Promise<void>}
360 */
361 async handleAgentMessage(agent_uuid, msg, ws) {
362 console.log(`[WS] Agent ${agent_uuid} → ${msg.type}`);
363
364 switch (msg.type) {
365 case 'agent_hello':
366 // Update agent version in database if provided
367 if (msg.version) {
368 try {
369 await pool.query(
370 `UPDATE agents SET version = $1, last_seen = NOW() WHERE agent_uuid = $2`,
371 [msg.version, agent_uuid]
372 );
373 console.log(`[WS] Updated agent ${agent_uuid} to version ${msg.version}`);
374 } catch (err) {
375 console.error('[WS] Failed to update agent version:', err);
376 }
377 }
378
379 // Send plugin enabled/disabled status back to agent
380 try {
381 const pluginsResult = await pool.query(
382 `SELECT name, enabled FROM plugins ORDER BY name ASC`
383 );
384 const pluginStatus = {};
385 pluginsResult.rows.forEach(row => {
386 pluginStatus[row.name] = row.enabled;
387 });
388
389 // Send plugin status to agent
390 ws.send(JSON.stringify({
391 type: 'plugin_status_update',
392 plugins: pluginStatus,
393 timestamp: Date.now()
394 }));
395
396 console.log(`[WS] Sent plugin status to agent ${agent_uuid}: ${Object.keys(pluginStatus).length} plugins`);
397 } catch (err) {
398 console.error('[WS] Failed to fetch plugin status:', err);
399 }
400
401 this.emit('agent_hello', { agent_uuid, ...msg });
402 break;
403
404 case 'metrics_update':
405 // Save network info (IP) to database for offline fallback
406 this.saveNetworkInfo(agent_uuid, msg.data);
407
408 // Broadcast to subscribed dashboards
409 this.broadcastToSubscribers(agent_uuid, {
410 type: 'metrics_update',
411 agent_uuid,
412 data: msg.data,
413 timestamp: msg.timestamp || Date.now()
414 });
415 break;
416
417 case 'processes_snapshot':
418 this.broadcastToSubscribers(agent_uuid, {
419 type: 'processes_snapshot',
420 agent_uuid,
421 processes: msg.processes,
422 timestamp: msg.timestamp || Date.now()
423 });
424 break;
425
426 case 'services_snapshot':
427 this.broadcastToSubscribers(agent_uuid, {
428 type: 'services_snapshot',
429 agent_uuid,
430 services: msg.services,
431 timestamp: msg.timestamp || Date.now()
432 });
433 break;
434
435 case 'script_output':
436 // Forward to dashboards watching this execution
437 this.broadcastScriptOutput(msg.execution_id, {
438 type: 'script_output',
439 execution_id: msg.execution_id,
440 line: msg.line,
441 timestamp: msg.timestamp || Date.now()
442 });
443 break;
444
445 case 'script_complete':
446 this.broadcastScriptOutput(msg.execution_id, {
447 type: 'script_complete',
448 execution_id: msg.execution_id,
449 exit_code: msg.exit_code,
450 timestamp: msg.timestamp || Date.now()
451 });
452 this.activeExecutions.delete(msg.execution_id);
453 break;
454
455 case 'agent_log':
456 this.broadcastToSubscribers(agent_uuid, {
457 type: 'agent_log',
458 agent_uuid,
459 level: msg.level,
460 message: msg.message,
461 timestamp: msg.timestamp || Date.now()
462 });
463 break;
464
465 case 'event_log':
466 this.broadcastToSubscribers(agent_uuid, {
467 type: 'event_log',
468 agent_uuid,
469 event: msg.event,
470 timestamp: msg.timestamp || Date.now()
471 });
472 break;
473
474 case 'response':
475 // Handle response to dashboard request
476 if (msg.request_id) {
477 const pending = this.pendingRequests.get(msg.request_id);
478 if (pending) {
479 clearTimeout(pending.timeout);
480 pending.resolve(msg.data);
481 this.pendingRequests.delete(msg.request_id);
482 }
483 }
484 break;
485
486 case 'file-list-response':
487 // Handle file list response (existing RDS functionality)
488 if (msg.requestId) {
489 const pending = this.pendingRequests.get(msg.requestId);
490 if (pending) {
491 clearTimeout(pending.timeout);
492 pending.resolve(msg.files || []);
493 this.pendingRequests.delete(msg.requestId);
494 }
495 }
496 break;
497
498 case 'file_download_chunk':
499 case 'file_download_complete':
500 case 'file_download_error':
501 case 'file_upload_success':
502 case 'file_upload_error': {
503 // Forward to file transfer service
504 const fileTransferService = require('./fileTransferService');
505 if (msg.type === 'file_download_chunk') {
506 fileTransferService.handleDownloadChunk(msg);
507 } else if (msg.type === 'file_download_complete') {
508 fileTransferService.handleDownloadComplete(msg);
509 }
510 break;
511 }
512
513 case 'connection_approval_response': {
514 // Forward to tunnel service via event emitter
515 this.emit('connection_approval_response', {
516 agent_uuid,
517 requestId: msg.requestId,
518 approved: msg.approved,
519 timestamp: msg.timestamp
520 });
521 console.log(`[WS] Forwarded approval response from agent ${agent_uuid}: ${msg.approved ? 'APPROVED' : 'DENIED'}`);
522 break;
523 }
524
525 case 'rds_data': {
526 // Handle RDS data from agent (new single-WebSocket architecture)
527 // Forward to viewer WebSocket
528 const sessionId = msg.sessionId;
529 const session = this.rdsSessions.get(sessionId);
530 if (session && session.viewer && session.viewer.readyState === 1) {
531 // Decode base64 and send raw binary to viewer
532 const buffer = Buffer.from(msg.data, 'base64');
533 session.viewer.send(buffer);
534 } else {
535 console.warn(`[RDS] Received data for session ${sessionId} but no viewer connected`);
536 }
537 break;
538 }
539
540 case 'rds_tunnel_closed': {
541 // Handle tunnel closure notification from agent
542 const closedSessionId = msg.sessionId;
543 const closedSession = this.rdsSessions.get(closedSessionId);
544 if (closedSession) {
545 // Only clean up if NOT a persistent tunnel
546 if (!closedSession.persistent) {
547 // Notify viewer
548 if (closedSession.viewer && closedSession.viewer.readyState === 1) {
549 closedSession.viewer.close();
550 }
551 // Notify control
552 if (closedSession.control && closedSession.control.readyState === 1) {
553 closedSession.control.close();
554 }
555 this.rdsSessions.delete(closedSessionId);
556 console.log(`[RDS] Session ${closedSessionId} closed by agent`);
557 } else {
558 console.log(`[RDS] Ignored tunnel close for persistent session ${closedSessionId}`);
559 }
560 }
561 break;
562 }
563
564 case 'rds_started': {
565 // Handle RDS tunnel started confirmation
566 console.log(`[RDS] Agent confirmed tunnel started: ${msg.sessionId}, proto: ${msg.proto}, port: ${msg.port}`);
567 const startedSession = this.rdsSessions.get(msg.sessionId);
568 if (startedSession) {
569 startedSession.proto = msg.proto;
570 startedSession.port = msg.port;
571 startedSession.active = true;
572 }
573 break;
574 }
575
576 case 'rds_failed': {
577 // Handle RDS tunnel failure
578 console.error(`[RDS] Agent reported tunnel failure: ${msg.sessionId}, reason: ${msg.reason}`);
579 const failedSession = this.rdsSessions.get(msg.sessionId);
580 if (failedSession) {
581 if (failedSession.viewer) failedSession.viewer.close();
582 if (failedSession.control) failedSession.control.close();
583 this.rdsSessions.delete(msg.sessionId);
584 }
585 break;
586 }
587
588 case 'persistent_tunnel_request':
589 // Handle agent requesting persistent tunnel assignment
590 this.handlePersistentTunnelRequest(agent_uuid, msg, ws);
591 break;
592
593 case 'persistent_tunnel_ready':
594 // Handle agent confirming persistent tunnel is ready
595 this.handlePersistentTunnelReady(agent_uuid, msg);
596 break;
597
598 default:
599 console.log(`[WS] Unknown message type from agent: ${msg.type}`);
600 }
601 }
602
603 /**
604 * Process and route messages received from dashboard clients
605 * @description Handles dashboard commands including authenticate, subscribe/unsubscribe,
606 * request processes/services, execute scripts, and control services on agents.
607 * @param {string} connection_id - Unique dashboard connection identifier
608 * @param {object} msg - Parsed message object from dashboard
609 * @param {string} msg.type - Message type (authenticate|subscribe|execute_script|...)
610 * @param {object} [msg.data] - Message payload data
611 * @param {WebSocket} ws - Dashboard's WebSocket connection
612 * @returns {void}
613 */
614 handleDashboardMessage(connection_id, msg, ws) {
615 console.log(`[WS] Dashboard ${connection_id} → ${msg.type}`);
616
617 const conn = this.dashboardConnections.get(connection_id);
618 if (!conn) return;
619
620 switch (msg.type) {
621 case 'authenticate':
622 // Validate JWT token
623 try {
624 const jwt = require('jsonwebtoken');
625 const JWT_SECRET = process.env.JWT_SECRET;
626
627 if (!msg.token) {
628 ws.send(JSON.stringify({
629 type: 'auth_error',
630 error: 'No token provided',
631 timestamp: Date.now()
632 }));
633 return;
634 }
635
636 const decoded = jwt.verify(msg.token, JWT_SECRET);
637 conn.authenticated = true;
638 conn.user_id = decoded.user_id || decoded.userId;
639 conn.tenant_id = decoded.tenant_id || decoded.tenantId;
640
641 ws.send(JSON.stringify({
642 type: 'authenticated',
643 connection_id,
644 user_id: conn.user_id,
645 tenant_id: conn.tenant_id,
646 timestamp: Date.now()
647 }));
648
649 console.log(`[WS] Dashboard ${connection_id} authenticated as user ${conn.user_id}`);
650 } catch (err) {
651 console.error(`[WS] Dashboard ${connection_id} authentication failed:`, err.message);
652 ws.send(JSON.stringify({
653 type: 'auth_error',
654 error: 'Invalid token',
655 timestamp: Date.now()
656 }));
657 }
658 break;
659
660 case 'subscribe':
661 // Subscribe to agent updates
662 if (!Array.isArray(msg.agent_uuids)) return;
663
664 for (const agent_uuid of msg.agent_uuids) {
665 conn.subscriptions.add(agent_uuid);
666
667 if (!this.subscriptions.has(agent_uuid)) {
668 this.subscriptions.set(agent_uuid, new Set());
669 }
670 this.subscriptions.get(agent_uuid).add(connection_id);
671
672 console.log(`[WS] Dashboard ${connection_id} subscribed to agent ${agent_uuid}`);
673 }
674
675 ws.send(JSON.stringify({
676 type: 'subscribed',
677 agent_uuids: msg.agent_uuids,
678 timestamp: Date.now()
679 }));
680 break;
681
682 case 'unsubscribe':
683 if (!Array.isArray(msg.agent_uuids)) return;
684
685 for (const agent_uuid of msg.agent_uuids) {
686 conn.subscriptions.delete(agent_uuid);
687
688 const subs = this.subscriptions.get(agent_uuid);
689 if (subs) {
690 subs.delete(connection_id);
691 if (subs.size === 0) {
692 this.subscriptions.delete(agent_uuid);
693 }
694 }
695 }
696 break;
697
698 case 'request_processes':
699 this.sendToAgent(msg.agent_uuid, {
700 type: 'get_processes',
701 request_id: msg.request_id
702 });
703 break;
704
705 case 'request_services':
706 this.sendToAgent(msg.agent_uuid, {
707 type: 'get_services',
708 request_id: msg.request_id
709 });
710 break;
711
712 case 'execute_script':
713 this.executeScript(msg.agent_uuid, msg.script_id, msg.code, connection_id);
714 break;
715
716 case 'kill_process':
717 this.sendToAgent(msg.agent_uuid, {
718 type: 'kill_process',
719 pid: msg.pid,
720 request_id: msg.request_id
721 });
722 break;
723
724 case 'control_service':
725 this.sendToAgent(msg.agent_uuid, {
726 type: 'control_service',
727 service_name: msg.service_name,
728 action: msg.action, // start, stop, restart
729 request_id: msg.request_id
730 });
731 break;
732
733 default:
734 console.log(`[WS] Unknown message type from dashboard: ${msg.type}`);
735 }
736 }
737
738 /**
739 * Send message to specific agent
740 * @description Sends JSON message to agent if connected. Logs approval requests and
741 * handles connection failures gracefully.
742 * @param {string} agent_uuid - Target agent's unique identifier
743 * @param {object} message - Message object to send
744 * @param {string} message.type - Message type
745 * @param {object} [message.data] - Message payload
746 * @param {string} [message.request_id] - Optional correlation ID for responses
747 * @returns {boolean} True if message sent successfully, false if agent not connected
748 */
749 sendToAgent(agent_uuid, message) {
750 const ws = this.agentConnections.get(agent_uuid);
751 if (!ws || ws.readyState !== 1) {
752 console.log(`[WS] Agent ${agent_uuid} not connected`);
753 return false;
754 }
755
756 try {
757 // Log approval requests specifically
758 if (message.type === 'connection_approval_request') {
759 console.log(`[WS] 📤 Sending approval request to agent ${agent_uuid}:`, JSON.stringify(message));
760 }
761 ws.send(JSON.stringify(message));
762 console.log(`[WS] ✅ Message sent to agent ${agent_uuid}, type: ${message.type}`);
763 return true;
764 } catch (err) {
765 console.error(`[WS] Failed to send to agent ${agent_uuid}:`, err.message);
766 return false;
767 }
768 }
769
770 /**
771 * Broadcast message to all dashboards subscribed to agent
772 * @description Efficiently fan-out message to multiple dashboard subscribers.
773 * Automatically handles connection cleanup for closed connections.
774 * @param {string} agent_uuid - Agent whose subscribers should receive message
775 * @param {object} message - Message object to broadcast
776 * @param {string} message.type - Message type (metrics|agent_disconnected|...)
777 * @param {object} [message.data] - Message payload
778 * @returns {void}
779 */
780 broadcastToSubscribers(agent_uuid, message) {
781 const subs = this.subscriptions.get(agent_uuid);
782 if (!subs || subs.size === 0) return;
783
784 const payload = JSON.stringify(message);
785 let sent = 0;
786
787 for (const connection_id of subs) {
788 const conn = this.dashboardConnections.get(connection_id);
789 if (conn && conn.ws.readyState === 1) {
790 try {
791 conn.ws.send(payload);
792 sent++;
793 } catch (err) {
794 console.error(`[WS] Failed to send to dashboard ${connection_id}:`, err.message);
795 }
796 }
797 }
798
799 if (sent > 0) {
800 console.log(`[WS] Broadcasted ${message.type} to ${sent} dashboards`);
801 }
802 }
803
804 /**
805 * Execute script on agent and stream output to dashboard
806 * @description Creates execution session, sends script to agent, and registers
807 * dashboard for output streaming. Supports PowerShell, Bash, or Python scripts.
808 * @param {string} agent_uuid - Target agent's unique identifier
809 * @param {number} script_id - Script ID from database
810 * @param {string} code - Script code to execute
811 * @param {string} dashboard_connection_id - Dashboard requesting execution
812 * @returns {string} execution_id - Unique execution identifier for tracking
813 */
814 executeScript(agent_uuid, script_id, code, dashboard_connection_id) {
815 const execution_id = `exec-${Date.now()}-${Math.random().toString(36).slice(2)}`;
816
817 this.activeExecutions.set(execution_id, {
818 agent_uuid,
819 dashboard_connections: new Set([dashboard_connection_id])
820 });
821
822 this.sendToAgent(agent_uuid, {
823 type: 'execute_script',
824 execution_id,
825 script_id,
826 code
827 });
828
829 return execution_id;
830 }
831
832 /**
833 * Broadcast script output to watching dashboards
834 * @description Sends script output chunk or completion message to all dashboards
835 * watching this execution.
836 * @param {string} execution_id - Script execution identifier
837 * @param {object} message - Output message to broadcast
838 * @param {string} message.type - Message type (script_output|script_complete)
839 * @param {object} message.data - Output data (stdout, stderr, exit code)
840 * @returns {void}
841 */
842 broadcastScriptOutput(execution_id, message) {
843 const execution = this.activeExecutions.get(execution_id);
844 if (!execution) return;
845
846 const payload = JSON.stringify(message);
847
848 for (const connection_id of execution.dashboard_connections) {
849 const conn = this.dashboardConnections.get(connection_id);
850 if (conn && conn.ws.readyState === 1) {
851 try {
852 conn.ws.send(payload);
853 } catch (err) {
854 console.error(`[WS] Failed to send script output to ${connection_id}:`, err.message);
855 }
856 }
857 }
858 }
859
860 /**
861 * Send request to agent and wait for response
862 * @description Implements request-reply pattern with timeout. Correlates responses
863 * using request_id. Used for synchronous queries like get_processes, get_services.
864 * @param {string} agent_uuid - Target agent's unique identifier
865 * @param {object} message - Request message object
866 * @param {string} message.type - Request type (get_processes|get_services|...)
867 * @param {number} [timeout] - Timeout in milliseconds
868 * @returns {Promise<object>} Resolves with agent's response data, rejects on timeout
869 * @throws {Error} Throws 'Request timeout' if agent doesn't respond within timeout
870 * @throws {Error} Throws 'Agent not connected' if agent is offline
871 */
872 requestFromAgent(agent_uuid, message, timeout = 10000) {
873 return new Promise((resolve, reject) => {
874 const request_id = `req-${Date.now()}-${Math.random().toString(36).slice(2)}`;
875
876 const timer = setTimeout(() => {
877 this.pendingRequests.delete(request_id);
878 reject(new Error('Request timeout'));
879 }, timeout);
880
881 this.pendingRequests.set(request_id, { resolve, reject, timeout: timer });
882
883 const success = this.sendToAgent(agent_uuid, {
884 ...message,
885 request_id
886 });
887
888 if (!success) {
889 clearTimeout(timer);
890 this.pendingRequests.delete(request_id);
891 reject(new Error('Agent not connected'));
892 }
893 });
894 }
895
896 /**
897 * Request file list from agent.
898 * Sends file list request to agent and returns promise with results.
899 * @param {string} agentId - Agent UUID
900 * @param {string} path - File system path to list
901 * @returns {Promise<Array>} Resolves with file list array
902 * @throws {Error} Agent not connected or timeout
903 */
904 requestAgentFileList(agentId, path) {
905 return new Promise((resolve, reject) => {
906 const ws = this.agentConnections.get(agentId);
907 if (!ws || ws.readyState !== 1) {
908 return reject(new Error('Agent not connected'));
909 }
910
911 const requestId = `filelist-${Date.now()}-${Math.random().toString(36).slice(2)}`;
912
913 const timer = setTimeout(() => {
914 this.pendingRequests.delete(requestId);
915 reject(new Error('Timeout waiting for agent file list'));
916 }, 10000);
917
918 this.pendingRequests.set(requestId, { resolve, reject, timeout: timer });
919
920 // Send file list request
921 ws.send(JSON.stringify({ type: 'file-list', requestId, path }));
922 });
923 }
924
925 /**
926 * Update agent last_seen in database.
927 * @param {string} agent_uuid - Agent UUID to update
928 * @returns {Promise<void>}
929 */
930 async updateLastSeen(agent_uuid) {
931 try {
932 const pool = require('./db');
933 await pool.query(
934 'UPDATE agents SET last_seen = NOW() WHERE agent_uuid = $1',
935 [agent_uuid]
936 );
937 } catch (err) {
938 console.error(`[WS] Failed to update last_seen for ${agent_uuid}:`, err.message);
939 }
940 }
941
942 /**
943 * Save network and disk info from WebSocket metrics to database.
944 * This ensures offline fallback data is current.
945 * @param {string} agent_uuid - Agent UUID
946 * @param {object} metrics - Metrics object with network and disk data
947 * @returns {Promise<void>}
948 */
949 async saveNetworkInfo(agent_uuid, metrics) {
950 try {
951 const pool = require('./db');
952
953 // Extract IP address from metrics
954 if (metrics.network && metrics.network.ipv4) {
955 await pool.query(
956 'UPDATE agents SET ip_address = $1, updated_at = NOW() WHERE agent_uuid = $2',
957 [metrics.network.ipv4, agent_uuid]
958 );
959 }
960
961 // Update system_info with current drives data
962 if (metrics.disk && Array.isArray(metrics.disk)) {
963 // Get existing system_info and merge drives data
964 const result = await pool.query(
965 'SELECT system_info FROM agents WHERE agent_uuid = $1',
966 [agent_uuid]
967 );
968
969 if (result.rows.length > 0) {
970 const systemInfo = result.rows[0].system_info || {};
971
972 // Convert WebSocket disk format to drives format
973 const drives = metrics.disk.map(d => ({
974 Drive: d.mount || d.filesystem || 'Unknown',
975 Root: d.mount || d.filesystem || 'Unknown',
976 Used: Math.round((d.used / (1024 * 1024 * 1024)) * 100) / 100, // Convert to GB
977 Free: Math.round(((d.total - d.used) / (1024 * 1024 * 1024)) * 100) / 100,
978 Total: Math.round((d.total / (1024 * 1024 * 1024)) * 100) / 100,
979 UsedPercent: Math.round(d.used_percent * 10) / 10
980 }));
981
982 systemInfo.drives = drives;
983
984 await pool.query(
985 'UPDATE agents SET system_info = $1, updated_at = NOW() WHERE agent_uuid = $2',
986 [JSON.stringify(systemInfo), agent_uuid]
987 );
988 }
989 }
990 } catch (err) {
991 console.error(`[WS] Failed to save network info for ${agent_uuid}:`, err.message);
992 }
993 }
994
995 /**
996 * Cleanup sessions when agent disconnects.
997 * Removes RDS sessions and active executions for disconnected agent.
998 * @param {string} agent_uuid - Agent UUID to clean up
999 * @returns {void}
1000 */
1001 cleanupAgentSessions(agent_uuid) {
1002 // Clean up RDS sessions
1003 for (const [sessionId, session] of this.rdsSessions.entries()) {
1004 if (session.agentId === agent_uuid) {
1005 if (session.viewer) session.viewer.close();
1006 if (session.control) session.control.close();
1007 this.rdsSessions.delete(sessionId);
1008 console.log(`[RDS] Cleaned session ${sessionId} for agent ${agent_uuid}`);
1009 }
1010 }
1011
1012 // Clean up active executions
1013 for (const [execution_id, execution] of this.activeExecutions.entries()) {
1014 if (execution.agent_uuid === agent_uuid) {
1015 this.broadcastScriptOutput(execution_id, {
1016 type: 'script_error',
1017 execution_id,
1018 error: 'Agent disconnected',
1019 timestamp: Date.now()
1020 });
1021 this.activeExecutions.delete(execution_id);
1022 }
1023 }
1024 }
1025
1026 /**
1027 * Handle RDS connections (existing functionality).
1028 * Routes viewer and control WebSocket connections for remote desktop sessions.
1029 * @param {WebSocket} ws - WebSocket connection
1030 * @param {object} req - HTTP request object with URL
1031 * @returns {void}
1032 */
1033 handleRDSConnection(ws, req) {
1034 const parsed = url.parse(req.url, true);
1035 const path = parsed.pathname;
1036
1037 if (path.startsWith('/api/rds/view/')) {
1038 const sessionId = parsed.query.session;
1039 if (!sessionId) return ws.close();
1040
1041 const session = this.rdsSessions.get(sessionId);
1042 if (session) session.viewer = ws;
1043
1044 console.log(`[RDS] Viewer connected: ${sessionId}`);
1045
1046 // Handle messages from viewer (keyboard/mouse) - forward to agent via main WebSocket
1047 ws.on('message', (data) => {
1048 if (session && session.agentId) {
1049 const agentWs = this.agentConnections.get(session.agentId);
1050 if (agentWs && agentWs.readyState === 1) {
1051 // Send viewer data to agent through main WebSocket
1052 agentWs.send(JSON.stringify({
1053 type: 'rds_data',
1054 sessionId: sessionId,
1055 data: data.toString('base64') // Encode binary data as base64
1056 }));
1057 }
1058 }
1059 });
1060
1061 ws.on('close', () => {
1062 if (session) {
1063 session.viewer = null;
1064 // Notify agent to close tunnel
1065 const agentWs = this.agentConnections.get(session.agentId);
1066 if (agentWs && agentWs.readyState === 1) {
1067 agentWs.send(JSON.stringify({
1068 type: 'rds_stop',
1069 sessionId: sessionId
1070 }));
1071 }
1072 }
1073 console.log(`[RDS] Viewer disconnected: ${sessionId}`);
1074 });
1075
1076 } else if (path.startsWith('/api/rds/control/')) {
1077 const sessionId = parsed.query.session;
1078 if (!sessionId) return ws.close();
1079
1080 const session = this.rdsSessions.get(sessionId);
1081 if (!session) return ws.close();
1082
1083 session.control = ws;
1084 console.log(`[RDS] Control connected: ${sessionId}`);
1085
1086 // Forward control messages to agent
1087 ws.on('message', (raw) => {
1088 try {
1089 const msg = JSON.parse(raw.toString());
1090 const agentWs = this.agentConnections.get(session.agentId);
1091 if (agentWs && agentWs.readyState === 1) {
1092 agentWs.send(JSON.stringify({ ...msg, sessionId }));
1093 }
1094 } catch (err) {
1095 console.error('[RDS] Error forwarding control:', err);
1096 }
1097 });
1098
1099 ws.on('close', () => {
1100 if (session) session.control = null;
1101 console.log(`[RDS] Control disconnected: ${sessionId}`);
1102 });
1103 }
1104 }
1105
1106 /**
1107 * Handle persistent tunnel request from agent.
1108 * Assigns port and creates persistent tunnel session.
1109 * @param {string} agent_uuid - Agent UUID
1110 * @param {object} msg - Message with protocol and port
1111 * @param {WebSocket} ws - Agent WebSocket connection
1112 * @returns {Promise<void>}
1113 */
1114 async handlePersistentTunnelRequest(agent_uuid, msg, ws) {
1115 const { protocol, port: localPort } = msg;
1116 console.log(`[Persistent Tunnel] Agent ${agent_uuid} requesting ${protocol}:${localPort} tunnel`);
1117
1118 // Clean up any existing tunnel for this agent
1119 const existing = this.persistentTunnels.get(agent_uuid);
1120 if (existing) {
1121 this.assignedPorts.delete(existing.assignedPort);
1122 this.rdsSessions.delete(existing.sessionId);
1123 console.log(`[Persistent Tunnel] Cleaned up existing tunnel for agent ${agent_uuid}`);
1124 }
1125
1126 // Assign a new port
1127 const assignedPort = this.getNextAvailablePort();
1128 const sessionId = require('crypto').randomUUID();
1129
1130 // Store tunnel info
1131 this.persistentTunnels.set(agent_uuid, {
1132 protocol,
1133 localPort,
1134 assignedPort,
1135 sessionId,
1136 ready: false
1137 });
1138
1139 // Register in RDS sessions for data routing
1140 this.rdsSessions.set(sessionId, {
1141 agentId: agent_uuid,
1142 viewer: null,
1143 agent: null,
1144 control: null,
1145 persistent: true
1146 });
1147
1148 console.log(`[Persistent Tunnel] Assigned ${agent_uuid} → localhost:${assignedPort} (${protocol}:${localPort})`);
1149
1150 // Send assignment to agent
1151 ws.send(JSON.stringify({
1152 type: 'persistent_tunnel_assigned',
1153 sessionId,
1154 assignedPort,
1155 protocol,
1156 localPort,
1157 timestamp: Date.now()
1158 }));
1159 }
1160
1161 /**
1162 * Handle persistent tunnel ready confirmation.
1163 * Marks tunnel as ready after agent confirms initialization.
1164 * @param {string} agent_uuid - Agent UUID
1165 * @param {object} msg - Message with sessionId
1166 * @returns {void}
1167 */
1168 handlePersistentTunnelReady(agent_uuid, msg) {
1169 const { sessionId } = msg;
1170 const tunnel = this.persistentTunnels.get(agent_uuid);
1171
1172 if (tunnel && tunnel.sessionId === sessionId) {
1173 tunnel.ready = true;
1174 console.log(`[Persistent Tunnel] ✅ Agent ${agent_uuid} tunnel ready: localhost:${tunnel.assignedPort} → ${tunnel.protocol}:${tunnel.localPort}`);
1175 }
1176 }
1177
1178 /**
1179 * Get next available port for persistent tunnels.
1180 * Increments port counter and tracks assigned ports.
1181 * @returns {number} Available port number
1182 */
1183 getNextAvailablePort() {
1184 while (this.assignedPorts.has(this.nextAvailablePort)) {
1185 this.nextAvailablePort++;
1186 }
1187 const port = this.nextAvailablePort++;
1188 this.assignedPorts.add(port);
1189 return port;
1190 }
1191
1192 /**
1193 * Get persistent tunnel info for agent.
1194 * @param {string} agent_uuid - Agent UUID
1195 * @returns {object|undefined} Tunnel info object or undefined if not found
1196 */
1197 getPersistentTunnel(agent_uuid) {
1198 return this.persistentTunnels.get(agent_uuid);
1199 }
1200
1201 /**
1202 * Get agent connection status.
1203 * @param {string} agent_uuid - Agent UUID
1204 * @returns {boolean} True if agent is connected and ready
1205 */
1206 isAgentConnected(agent_uuid) {
1207 const ws = this.agentConnections.get(agent_uuid);
1208 return ws && ws.readyState === 1;
1209 }
1210
1211 /**
1212 * Get stats.
1213 * Returns current connection and session counts.
1214 * @returns {object} Stats object with connection counts
1215 */
1216 getStats() {
1217 return {
1218 agents: this.agentConnections.size,
1219 dashboards: this.dashboardConnections.size,
1220 subscriptions: this.subscriptions.size,
1221 executions: this.activeExecutions.size,
1222 rds_sessions: this.rdsSessions.size,
1223 persistent_tunnels: this.persistentTunnels.size
1224 };
1225 }
1226
1227 /**
1228 * Handle MeshCentral terminal WebSocket connection.
1229 * Proxies terminal session between dashboard and MeshCentral.
1230 * @param {WebSocket} ws - WebSocket connection from dashboard
1231 * @param {object} query - Query parameters with sessionToken and agentId
1232 * @returns {Promise<void>}
1233 */
1234 async handleMeshCentralTerminal(ws, query) {
1235 const { sessionToken, agentId } = query;
1236
1237 console.log(`[MeshCentral Terminal] New connection: agentId=${agentId}, sessionToken=${sessionToken?.substring(0, 8)}...`);
1238
1239 if (!sessionToken || !agentId) {
1240 console.error('[MeshCentral Terminal] Missing sessionToken or agentId');
1241 ws.close(1008, 'Missing required parameters');
1242 return;
1243 }
1244
1245 try {
1246 // Validate session and get device info
1247 const sessionResult = await pool.query(
1248 `SELECT s.*, a.meshcentral_nodeid, a.tenant_id
1249 FROM meshcentral_sessions s
1250 JOIN agents a ON a.agent_id = s.agent_id
1251 WHERE s.session_token = $1 AND s.agent_id = $2 AND s.expires_at > NOW()`,
1252 [sessionToken, agentId]
1253 );
1254
1255 if (sessionResult.rows.length === 0) {
1256 console.error('[MeshCentral Terminal] Invalid or expired session');
1257 ws.close(1008, 'Invalid or expired session');
1258 return;
1259 }
1260
1261 const session = sessionResult.rows[0];
1262 const nodeId = session.meshcentral_nodeid;
1263 const tenantId = session.tenant_id;
1264
1265 console.log(`[MeshCentral Terminal] Session validated: tenantId=${tenantId}, nodeId=${nodeId}`);
1266
1267 // Get tenant's MeshCentral API connection
1268 const { getTenantMeshAPI } = require('../routes/meshcentral');
1269 const api = await getTenantMeshAPI(tenantId);
1270 if (!api || !api.ws) {
1271 console.error('[MeshCentral Terminal] Failed to connect to MeshCentral');
1272 ws.close(1011, 'MeshCentral connection failed');
1273 return;
1274 }
1275
1276 console.log(`[MeshCentral Terminal] Connected to MeshCentral for tenant ${tenantId}`);
1277
1278 // Forward messages from MeshCentral to client
1279 const meshMessageHandler = (data) => {
1280 try {
1281 const message = JSON.parse(data.toString());
1282 if (message.type === 'console') {
1283 console.log(`[MeshCentral Terminal] -> Client: console message`);
1284 ws.send(JSON.stringify(message));
1285 }
1286 } catch (err) {
1287 console.error('[MeshCentral Terminal] Error parsing MeshCentral message:', err);
1288 }
1289 };
1290 api.ws.on('message', meshMessageHandler);
1291
1292 // Forward messages from client to MeshCentral
1293 ws.on('message', (data) => {
1294 try {
1295 const clientMessage = JSON.parse(data.toString());
1296 console.log(`[MeshCentral Terminal] <- Client:`, clientMessage.type, clientMessage.action);
1297
1298 const meshMessage = {
1299 action: 'msg',
1300 type: 'console',
1301 nodeid: nodeId,
1302 msg: JSON.stringify(clientMessage)
1303 };
1304
1305 api.ws.send(JSON.stringify(meshMessage));
1306 } catch (err) {
1307 console.error('[MeshCentral Terminal] Error forwarding client message:', err);
1308 }
1309 });
1310
1311 // Handle disconnection
1312 ws.on('close', () => {
1313 console.log(`[MeshCentral Terminal] Client disconnected: agentId=${agentId}`);
1314 api.ws.off('message', meshMessageHandler);
1315 });
1316
1317 // Send initial connection success
1318 ws.send(JSON.stringify({
1319 type: 'connected',
1320 nodeId: nodeId
1321 }));
1322
1323 } catch (err) {
1324 console.error('[MeshCentral Terminal] Connection error:', err);
1325 ws.close(1011, 'Internal server error');
1326 }
1327 }
1328
1329 /**
1330 * Handle MeshCentral file browser WebSocket connection.
1331 * Proxies file browser session between dashboard and MeshCentral.
1332 * @param {WebSocket} ws - WebSocket connection from dashboard
1333 * @param {object} query - Query parameters with sessionToken and agentId
1334 * @returns {Promise<void>}
1335 */
1336 async handleMeshCentralFiles(ws, query) {
1337 const { sessionToken, agentId } = query;
1338
1339 console.log(`[MeshCentral Files] New connection: agentId=${agentId}, sessionToken=${sessionToken?.substring(0, 8)}...`);
1340
1341 if (!sessionToken || !agentId) {
1342 console.error('[MeshCentral Files] Missing sessionToken or agentId');
1343 ws.close(1008, 'Missing required parameters');
1344 return;
1345 }
1346
1347 try {
1348 // Validate session and get device info
1349 const sessionResult = await pool.query(
1350 `SELECT s.*, a.meshcentral_nodeid, a.tenant_id
1351 FROM meshcentral_sessions s
1352 JOIN agents a ON a.agent_id = s.agent_id
1353 WHERE s.session_token = $1 AND s.agent_id = $2 AND s.expires_at > NOW()`,
1354 [sessionToken, agentId]
1355 );
1356
1357 if (sessionResult.rows.length === 0) {
1358 console.error('[MeshCentral Files] Invalid or expired session');
1359 ws.close(1008, 'Invalid or expired session');
1360 return;
1361 }
1362
1363 const session = sessionResult.rows[0];
1364 const nodeId = session.meshcentral_nodeid;
1365 const tenantId = session.tenant_id;
1366
1367 console.log(`[MeshCentral Files] Session validated: tenantId=${tenantId}, nodeId=${nodeId}`);
1368
1369 // Get tenant's MeshCentral API connection
1370 const { getTenantMeshAPI } = require('../routes/meshcentral');
1371 const api = await getTenantMeshAPI(tenantId);
1372 if (!api || !api.ws) {
1373 console.error('[MeshCentral Files] Failed to connect to MeshCentral');
1374 ws.close(1011, 'MeshCentral connection failed');
1375 return;
1376 }
1377
1378 console.log(`[MeshCentral Files] Connected to MeshCentral for tenant ${tenantId}`);
1379
1380 // Forward messages from MeshCentral to client
1381 const meshMessageHandler = (data) => {
1382 try {
1383 const message = JSON.parse(data.toString());
1384
1385 // Only forward file-related messages
1386 if (message.type === 'notify' ||
1387 message.type === 'download' ||
1388 message.type === 'uploaddone' ||
1389 message.action === 'ls' ||
1390 message.action === 'download' ||
1391 message.action === 'upload') {
1392
1393 console.log(`[MeshCentral Files] -> Client: ${message.action || message.type}`);
1394 ws.send(JSON.stringify(message));
1395 }
1396 } catch (err) {
1397 console.error('[MeshCentral Files] Error parsing MeshCentral message:', err);
1398 }
1399 };
1400 api.ws.on('message', meshMessageHandler);
1401
1402 // Forward messages from client to MeshCentral
1403 ws.on('message', (data) => {
1404 try {
1405 const clientMessage = JSON.parse(data.toString());
1406 console.log(`[MeshCentral Files] <- Client: ${clientMessage.action}`, {
1407 path: clientMessage.path,
1408 reqid: clientMessage.reqid
1409 });
1410
1411 const meshMessage = {
1412 action: 'msg',
1413 type: 'files',
1414 nodeid: nodeId,
1415 ...clientMessage
1416 };
1417
1418 api.ws.send(JSON.stringify(meshMessage));
1419 } catch (err) {
1420 console.error('[MeshCentral Files] Error forwarding client message:', err);
1421 ws.send(JSON.stringify({
1422 type: 'error',
1423 message: 'Failed to process request'
1424 }));
1425 }
1426 });
1427
1428 // Handle disconnection
1429 ws.on('close', () => {
1430 console.log(`[MeshCentral Files] Client disconnected: agentId=${agentId}`);
1431 api.ws.off('message', meshMessageHandler);
1432 });
1433
1434 // Send initial connection success
1435 ws.send(JSON.stringify({
1436 type: 'connected',
1437 nodeId: nodeId
1438 }));
1439
1440 } catch (err) {
1441 console.error('[MeshCentral Files] Connection error:', err);
1442 ws.close(1011, 'Internal server error');
1443 }
1444 }
1445}
1446
1447// Create singleton instance
1448const wsManager = new WebSocketManager();
1449
1450module.exports = wsManager;