2 * @file WebSocket Manager - Centralized Real-Time Communication Hub
3 * @module services/websocketManager
5 * Enterprise WebSocket infrastructure managing bidirectional real-time communication
6 * between RMM agents, dashboard clients, and remote access sessions. Provides event-driven
7 * architecture for metrics streaming, command execution, file transfers, and remote desktop.
9 * **Architecture Overview:**
11 * ┌─────────────────┐ ┌──────────────────────┐ ┌─────────────────┐
12 * │ RMM Agents │◄────►│ WebSocketManager │◄────►│ Dashboard UI │
13 * │ (/ws) │ │ (Event Hub) │ │ (/dashboard-ws)│
14 * └─────────────────┘ └──────────────────────┘ └─────────────────┘
16 * ┌─────────┴─────────┐
18 * ┌─────▼──────┐ ┌──────▼──────┐
19 * │ MeshCentral│ │ RDS │
20 * │ Proxy │ │ Sessions │
21 * └────────────┘ └─────────────┘
24 * **Connection Types:**
26 * 1. **Agent Connections** (`/ws?agent_uuid=...`)
27 * - Persistent WebSocket from RMM agent to server
28 * - Real-time metrics streaming (CPU, memory, disk, network)
29 * - Command execution and script running
30 * - File transfer coordination
31 * - Persistent tunnel management for RDP/VNC
33 * 2. **Dashboard Connections** (`/dashboard-ws?connectionId=...`)
34 * - Dashboard client subscriptions to agent streams
35 * - Live metrics, logs, process lists
36 * - Script execution monitoring
37 * - Real-time alerts and notifications
39 * 3. **RDS Sessions** (`/api/rds/:sessionId`)
40 * - Remote Desktop Service broker
41 * - Dual WebSocket coordination (viewer ↔ agent)
42 * - Mouse/keyboard input relay
43 * - Screen update streaming
45 * 4. **MeshCentral Proxy** (`/api/meshcentral/terminal`, `/api/meshcentral/files`)
46 * - WebSocket-to-WebSocket proxying for MeshCentral integration
47 * - Terminal sessions and file browser
49 * **Message Protocol:**
51 * All WebSocket messages use JSON format:
54 * "type": "metrics|script_output|command|subscribe|...",
56 * "requestId": "optional-correlation-id"
60 * **Agent Message Types:**
61 * - `connected` - Agent initial handshake
62 * - `metrics` - Performance metrics (CPU, RAM, disk, network)
63 * - `processes` - Running process snapshot
64 * - `services` - Windows services status
65 * - `script_output` - Script execution output chunk
66 * - `script_complete` - Script execution finished
67 * - `tunnel_ready` - Persistent tunnel established (RDP/VNC)
68 * - `response` - Response to request (with requestId correlation)
70 * **Dashboard Message Types:**
71 * - `subscribe` - Subscribe to agent's real-time data
72 * - `unsubscribe` - Unsubscribe from agent
73 * - `execute_script` - Run script on agent
74 * - `request_processes` - Query running processes
75 * - `request_files` - Query filesystem
77 * **State Management:**
78 * - `agentConnections`: Map<agent_uuid, WebSocket> - Active agent connections
79 * - `dashboardConnections`: Map<connection_id, {ws, subscriptions}> - Dashboard subscriptions
80 * - `subscriptions`: Map<agent_uuid, Set<connection_id>> - Agent → Subscribers index
81 * - `activeExecutions`: Map<execution_id, {...}> - Running script tracking
82 * - `rdsSessions`: Map<sessionId, {...}> - Remote desktop sessions
83 * - `persistentTunnels`: Map<agent_uuid, {...}> - RDP/VNC tunnel state
86 * - **Automatic reconnection**: Agents reconnect and resume subscriptions
87 * - **Request/response**: Promise-based request-reply pattern with timeouts
88 * - **Broadcast**: Efficient fan-out to multiple dashboard subscribers
89 * - **Session cleanup**: Automatic cleanup on disconnect
90 * - **Port allocation**: Dynamic port assignment for persistent tunnels (45000+)
91 * - **Metrics persistence**: Network I/O saved to database for history
92 * @class WebSocketManager
93 * @augments EventEmitter
95 * // Initialize WebSocket manager
96 * const WebSocketManager = require('./services/websocketManager');
97 * const wsManager = new WebSocketManager();
98 * wsManager.init(httpServer);
100 * // Send command to agent
101 * wsManager.sendToAgent(agent_uuid, {
102 * type: 'execute_script',
103 * data: { script_id: 123, code: 'Get-Service' }
106 * // Request data from agent with timeout
107 * const response = await wsManager.requestFromAgent(agent_uuid, {
108 * type: 'get_processes'
111 * // Broadcast metrics to all subscribers
112 * wsManager.broadcastToSubscribers(agent_uuid, {
114 * data: { cpu: 45.2, memory: 78.5, disk: 65.0 }
116 * @see {@link module:routes/agent} for agent registration endpoints
117 * @see {@link module:services/metricsAggregator} for metrics buffering
118 * @see {@link module:services/guacamoleService} for RDS session management
119 * @requires ws - WebSocket server implementation
120 * @requires events - EventEmitter for pub/sub
121 * @requires db - PostgreSQL connection pool from services/db
125 * WebSocket Manager - Centralized real-time communication
128 * - Agent connections and message routing
129 * - Dashboard subscriptions to agent data streams
130 * - Real-time metrics, processes, logs, events
131 * - Script execution with output streaming
132 * - RDS (Remote Desktop) sessions
135const { WebSocketServer } = require('ws');
136const url = require('url');
137const EventEmitter = require('events');
138const pool = require('./db');
141 * WebSocket Manager class.
142 * Handles real-time bidirectional communication for agents, dashboards, and RDS sessions.
144class WebSocketManager extends EventEmitter {
146 * Initialize WebSocketManager.
147 * Sets up connection maps, subscriptions, and execution tracking.
152 // Agent connections: agent_uuid → ws
153 this.agentConnections = new Map();
155 // Dashboard connections: connection_id → { ws, subscriptions: Set<agent_uuid> }
156 this.dashboardConnections = new Map();
158 // Active subscriptions: agent_uuid → Set<connection_id>
159 this.subscriptions = new Map();
161 // Script executions: execution_id → { agent_uuid, dashboard_connections: Set<connection_id> }
162 this.activeExecutions = new Map();
164 // RDS sessions: sessionId → { agentId, viewer, agent, control }
165 this.rdsSessions = new Map();
167 // Persistent tunnels: agent_uuid → { protocol, localPort, assignedPort, sessionId, ready }
168 this.persistentTunnels = new Map();
170 // Port assignments: track which ports are in use
171 this.assignedPorts = new Set();
172 this.nextAvailablePort = 45000; // Start assigning ports from 45000
174 // Pending requests: requestId → { resolve, reject, timeout }
175 this.pendingRequests = new Map();
179 * Initialize WebSocket server and attach to HTTP server
180 * @description Sets up WebSocket server to handle multiple connection types:
181 * - `/ws` - Agent connections
182 * - `/dashboard-ws` - Dashboard subscriptions
183 * - `/api/rds/:sessionId` - Remote Desktop sessions
184 * - `/api/meshcentral/terminal` - MeshCentral terminal proxy
185 * - `/api/meshcentral/files` - MeshCentral file browser proxy
186 * @param {object} server - HTTP/HTTPS server instance to attach WebSocket upgrade handler
187 * @returns {WebSocketManager} Returns this instance for method chaining
188 * @fires connection WebSocket connection established event
191 const wss = new WebSocketServer({ noServer: true });
193 // Handle upgrade requests
194 server.on('upgrade', (req, socket, head) => {
195 const pathname = req.url.split('?')[0];
198 pathname === '/ws' ||
199 pathname === '/dashboard-ws' ||
200 pathname.startsWith('/api/rds/') ||
201 pathname === '/api/meshcentral/terminal' ||
202 pathname === '/api/meshcentral/files'
204 wss.handleUpgrade(req, socket, head, (ws) => {
205 wss.emit('connection', ws, req);
212 // Handle connections
213 wss.on('connection', (ws, req) => {
214 const parsed = url.parse(req.url, true);
215 const path = parsed.pathname;
217 if (path === '/ws') {
218 this.handleAgentConnection(ws, parsed.query);
219 } else if (path === '/dashboard-ws') {
220 this.handleDashboardConnection(ws, parsed.query);
221 } else if (path.startsWith('/api/rds/')) {
222 this.handleRDSConnection(ws, req);
223 } else if (path === '/api/meshcentral/terminal') {
224 this.handleMeshCentralTerminal(ws, parsed.query);
225 } else if (path === '/api/meshcentral/files') {
226 this.handleMeshCentralFiles(ws, parsed.query);
232 console.log('✅ WebSocket Manager initialized');
237 * Handle new agent WebSocket connection
238 * @description Registers agent connection, stores in connection map, and sets up message
239 * handlers for metrics, script output, and commands. Updates agent last_seen timestamp.
240 * @param {WebSocket} ws - WebSocket connection from agent
241 * @param {object} query - URL query parameters from connection request
242 * @param {string} query.agent_uuid - Unique agent identifier (required)
244 * @fires agent_disconnected When agent connection closes
246 handleAgentConnection(ws, query) {
247 const agent_uuid = query.agent_uuid;
249 console.log('[WS] Agent connection rejected: missing agent_uuid');
254 this.agentConnections.set(agent_uuid, ws);
255 console.log(`[WS] Agent connected: ${agent_uuid} (${this.agentConnections.size} total)`);
257 // Update database last_seen
258 this.updateLastSeen(agent_uuid);
260 // Handle messages from agent
261 ws.on('message', (raw) => {
263 const msg = JSON.parse(raw.toString());
264 this.handleAgentMessage(agent_uuid, msg, ws);
266 console.error(`[WS] Invalid message from agent ${agent_uuid}:`, err.message);
271 ws.on('close', () => {
272 this.agentConnections.delete(agent_uuid);
273 console.log(`[WS] Agent disconnected: ${agent_uuid}`);
274 this.updateLastSeen(agent_uuid);
275 this.cleanupAgentSessions(agent_uuid);
277 // Notify subscribed dashboards
278 this.broadcastToSubscribers(agent_uuid, {
279 type: 'agent_disconnected',
281 timestamp: Date.now()
285 // Send welcome message
286 ws.send(JSON.stringify({
289 timestamp: Date.now()
294 * Handle new dashboard WebSocket connection
295 * @description Creates dashboard connection with unique ID, manages subscriptions to agents,
296 * and handles subscribe/unsubscribe messages.
297 * @param {WebSocket} ws - WebSocket connection from dashboard client
300 handleDashboardConnection(ws) {
301 const connection_id = `dashboard-${Date.now()}-${Math.random().toString(36).slice(2)}`;
303 this.dashboardConnections.set(connection_id, {
305 subscriptions: new Set(),
306 authenticated: false,
310 console.log(`[WS] Dashboard connected: ${connection_id}`);
312 // Handle messages from dashboard
313 ws.on('message', (raw) => {
315 const msg = JSON.parse(raw.toString());
316 this.handleDashboardMessage(connection_id, msg, ws);
318 console.error(`[WS] Invalid message from dashboard ${connection_id}:`, err.message);
323 ws.on('close', () => {
324 const conn = this.dashboardConnections.get(connection_id);
326 // Unsubscribe from all agents
327 for (const agent_uuid of conn.subscriptions) {
328 const subs = this.subscriptions.get(agent_uuid);
330 subs.delete(connection_id);
331 if (subs.size === 0) {
332 this.subscriptions.delete(agent_uuid);
337 this.dashboardConnections.delete(connection_id);
338 console.log(`[WS] Dashboard disconnected: ${connection_id}`);
342 ws.send(JSON.stringify({
345 timestamp: Date.now()
350 * Process and route messages received from agent
351 * @description Handles all message types from agents including metrics, script output,
352 * tunnel status, and responses to requests. Broadcasts relevant messages to subscribed dashboards.
353 * @param {string} agent_uuid - Unique agent identifier
354 * @param {object} msg - Parsed message object from agent
355 * @param {string} msg.type - Message type (metrics|script_output|tunnel_ready|response|...)
356 * @param {object} msg.data - Message payload data
357 * @param {string} [msg.requestId] - Optional correlation ID for request/response pattern
358 * @param {WebSocket} ws - Agent's WebSocket connection
359 * @returns {Promise<void>}
361 async handleAgentMessage(agent_uuid, msg, ws) {
362 console.log(`[WS] Agent ${agent_uuid} → ${msg.type}`);
366 // Update agent version in database if provided
370 `UPDATE agents SET version = $1, last_seen = NOW() WHERE agent_uuid = $2`,
371 [msg.version, agent_uuid]
373 console.log(`[WS] Updated agent ${agent_uuid} to version ${msg.version}`);
375 console.error('[WS] Failed to update agent version:', err);
379 // Send plugin enabled/disabled status back to agent
381 const pluginsResult = await pool.query(
382 `SELECT name, enabled FROM plugins ORDER BY name ASC`
384 const pluginStatus = {};
385 pluginsResult.rows.forEach(row => {
386 pluginStatus[row.name] = row.enabled;
389 // Send plugin status to agent
390 ws.send(JSON.stringify({
391 type: 'plugin_status_update',
392 plugins: pluginStatus,
393 timestamp: Date.now()
396 console.log(`[WS] Sent plugin status to agent ${agent_uuid}: ${Object.keys(pluginStatus).length} plugins`);
398 console.error('[WS] Failed to fetch plugin status:', err);
401 this.emit('agent_hello', { agent_uuid, ...msg });
404 case 'metrics_update':
405 // Save network info (IP) to database for offline fallback
406 this.saveNetworkInfo(agent_uuid, msg.data);
408 // Broadcast to subscribed dashboards
409 this.broadcastToSubscribers(agent_uuid, {
410 type: 'metrics_update',
413 timestamp: msg.timestamp || Date.now()
417 case 'processes_snapshot':
418 this.broadcastToSubscribers(agent_uuid, {
419 type: 'processes_snapshot',
421 processes: msg.processes,
422 timestamp: msg.timestamp || Date.now()
426 case 'services_snapshot':
427 this.broadcastToSubscribers(agent_uuid, {
428 type: 'services_snapshot',
430 services: msg.services,
431 timestamp: msg.timestamp || Date.now()
435 case 'script_output':
436 // Forward to dashboards watching this execution
437 this.broadcastScriptOutput(msg.execution_id, {
438 type: 'script_output',
439 execution_id: msg.execution_id,
441 timestamp: msg.timestamp || Date.now()
445 case 'script_complete':
446 this.broadcastScriptOutput(msg.execution_id, {
447 type: 'script_complete',
448 execution_id: msg.execution_id,
449 exit_code: msg.exit_code,
450 timestamp: msg.timestamp || Date.now()
452 this.activeExecutions.delete(msg.execution_id);
456 this.broadcastToSubscribers(agent_uuid, {
460 message: msg.message,
461 timestamp: msg.timestamp || Date.now()
466 this.broadcastToSubscribers(agent_uuid, {
470 timestamp: msg.timestamp || Date.now()
475 // Handle response to dashboard request
476 if (msg.request_id) {
477 const pending = this.pendingRequests.get(msg.request_id);
479 clearTimeout(pending.timeout);
480 pending.resolve(msg.data);
481 this.pendingRequests.delete(msg.request_id);
486 case 'file-list-response':
487 // Handle file list response (existing RDS functionality)
489 const pending = this.pendingRequests.get(msg.requestId);
491 clearTimeout(pending.timeout);
492 pending.resolve(msg.files || []);
493 this.pendingRequests.delete(msg.requestId);
498 case 'file_download_chunk':
499 case 'file_download_complete':
500 case 'file_download_error':
501 case 'file_upload_success':
502 case 'file_upload_error': {
503 // Forward to file transfer service
504 const fileTransferService = require('./fileTransferService');
505 if (msg.type === 'file_download_chunk') {
506 fileTransferService.handleDownloadChunk(msg);
507 } else if (msg.type === 'file_download_complete') {
508 fileTransferService.handleDownloadComplete(msg);
513 case 'connection_approval_response': {
514 // Forward to tunnel service via event emitter
515 this.emit('connection_approval_response', {
517 requestId: msg.requestId,
518 approved: msg.approved,
519 timestamp: msg.timestamp
521 console.log(`[WS] Forwarded approval response from agent ${agent_uuid}: ${msg.approved ? 'APPROVED' : 'DENIED'}`);
526 // Handle RDS data from agent (new single-WebSocket architecture)
527 // Forward to viewer WebSocket
528 const sessionId = msg.sessionId;
529 const session = this.rdsSessions.get(sessionId);
530 if (session && session.viewer && session.viewer.readyState === 1) {
531 // Decode base64 and send raw binary to viewer
532 const buffer = Buffer.from(msg.data, 'base64');
533 session.viewer.send(buffer);
535 console.warn(`[RDS] Received data for session ${sessionId} but no viewer connected`);
540 case 'rds_tunnel_closed': {
541 // Handle tunnel closure notification from agent
542 const closedSessionId = msg.sessionId;
543 const closedSession = this.rdsSessions.get(closedSessionId);
545 // Only clean up if NOT a persistent tunnel
546 if (!closedSession.persistent) {
548 if (closedSession.viewer && closedSession.viewer.readyState === 1) {
549 closedSession.viewer.close();
552 if (closedSession.control && closedSession.control.readyState === 1) {
553 closedSession.control.close();
555 this.rdsSessions.delete(closedSessionId);
556 console.log(`[RDS] Session ${closedSessionId} closed by agent`);
558 console.log(`[RDS] Ignored tunnel close for persistent session ${closedSessionId}`);
564 case 'rds_started': {
565 // Handle RDS tunnel started confirmation
566 console.log(`[RDS] Agent confirmed tunnel started: ${msg.sessionId}, proto: ${msg.proto}, port: ${msg.port}`);
567 const startedSession = this.rdsSessions.get(msg.sessionId);
568 if (startedSession) {
569 startedSession.proto = msg.proto;
570 startedSession.port = msg.port;
571 startedSession.active = true;
577 // Handle RDS tunnel failure
578 console.error(`[RDS] Agent reported tunnel failure: ${msg.sessionId}, reason: ${msg.reason}`);
579 const failedSession = this.rdsSessions.get(msg.sessionId);
581 if (failedSession.viewer) failedSession.viewer.close();
582 if (failedSession.control) failedSession.control.close();
583 this.rdsSessions.delete(msg.sessionId);
588 case 'persistent_tunnel_request':
589 // Handle agent requesting persistent tunnel assignment
590 this.handlePersistentTunnelRequest(agent_uuid, msg, ws);
593 case 'persistent_tunnel_ready':
594 // Handle agent confirming persistent tunnel is ready
595 this.handlePersistentTunnelReady(agent_uuid, msg);
599 console.log(`[WS] Unknown message type from agent: ${msg.type}`);
604 * Process and route messages received from dashboard clients
605 * @description Handles dashboard commands including authenticate, subscribe/unsubscribe,
606 * request processes/services, execute scripts, and control services on agents.
607 * @param {string} connection_id - Unique dashboard connection identifier
608 * @param {object} msg - Parsed message object from dashboard
609 * @param {string} msg.type - Message type (authenticate|subscribe|execute_script|...)
610 * @param {object} [msg.data] - Message payload data
611 * @param {WebSocket} ws - Dashboard's WebSocket connection
614 handleDashboardMessage(connection_id, msg, ws) {
615 console.log(`[WS] Dashboard ${connection_id} → ${msg.type}`);
617 const conn = this.dashboardConnections.get(connection_id);
622 // Validate JWT token
624 const jwt = require('jsonwebtoken');
625 const JWT_SECRET = process.env.JWT_SECRET;
628 ws.send(JSON.stringify({
630 error: 'No token provided',
631 timestamp: Date.now()
636 const decoded = jwt.verify(msg.token, JWT_SECRET);
637 conn.authenticated = true;
638 conn.user_id = decoded.user_id || decoded.userId;
639 conn.tenant_id = decoded.tenant_id || decoded.tenantId;
641 ws.send(JSON.stringify({
642 type: 'authenticated',
644 user_id: conn.user_id,
645 tenant_id: conn.tenant_id,
646 timestamp: Date.now()
649 console.log(`[WS] Dashboard ${connection_id} authenticated as user ${conn.user_id}`);
651 console.error(`[WS] Dashboard ${connection_id} authentication failed:`, err.message);
652 ws.send(JSON.stringify({
654 error: 'Invalid token',
655 timestamp: Date.now()
661 // Subscribe to agent updates
662 if (!Array.isArray(msg.agent_uuids)) return;
664 for (const agent_uuid of msg.agent_uuids) {
665 conn.subscriptions.add(agent_uuid);
667 if (!this.subscriptions.has(agent_uuid)) {
668 this.subscriptions.set(agent_uuid, new Set());
670 this.subscriptions.get(agent_uuid).add(connection_id);
672 console.log(`[WS] Dashboard ${connection_id} subscribed to agent ${agent_uuid}`);
675 ws.send(JSON.stringify({
677 agent_uuids: msg.agent_uuids,
678 timestamp: Date.now()
683 if (!Array.isArray(msg.agent_uuids)) return;
685 for (const agent_uuid of msg.agent_uuids) {
686 conn.subscriptions.delete(agent_uuid);
688 const subs = this.subscriptions.get(agent_uuid);
690 subs.delete(connection_id);
691 if (subs.size === 0) {
692 this.subscriptions.delete(agent_uuid);
698 case 'request_processes':
699 this.sendToAgent(msg.agent_uuid, {
700 type: 'get_processes',
701 request_id: msg.request_id
705 case 'request_services':
706 this.sendToAgent(msg.agent_uuid, {
707 type: 'get_services',
708 request_id: msg.request_id
712 case 'execute_script':
713 this.executeScript(msg.agent_uuid, msg.script_id, msg.code, connection_id);
717 this.sendToAgent(msg.agent_uuid, {
718 type: 'kill_process',
720 request_id: msg.request_id
724 case 'control_service':
725 this.sendToAgent(msg.agent_uuid, {
726 type: 'control_service',
727 service_name: msg.service_name,
728 action: msg.action, // start, stop, restart
729 request_id: msg.request_id
734 console.log(`[WS] Unknown message type from dashboard: ${msg.type}`);
739 * Send message to specific agent
740 * @description Sends JSON message to agent if connected. Logs approval requests and
741 * handles connection failures gracefully.
742 * @param {string} agent_uuid - Target agent's unique identifier
743 * @param {object} message - Message object to send
744 * @param {string} message.type - Message type
745 * @param {object} [message.data] - Message payload
746 * @param {string} [message.request_id] - Optional correlation ID for responses
747 * @returns {boolean} True if message sent successfully, false if agent not connected
749 sendToAgent(agent_uuid, message) {
750 const ws = this.agentConnections.get(agent_uuid);
751 if (!ws || ws.readyState !== 1) {
752 console.log(`[WS] Agent ${agent_uuid} not connected`);
757 // Log approval requests specifically
758 if (message.type === 'connection_approval_request') {
759 console.log(`[WS] 📤 Sending approval request to agent ${agent_uuid}:`, JSON.stringify(message));
761 ws.send(JSON.stringify(message));
762 console.log(`[WS] ✅ Message sent to agent ${agent_uuid}, type: ${message.type}`);
765 console.error(`[WS] Failed to send to agent ${agent_uuid}:`, err.message);
771 * Broadcast message to all dashboards subscribed to agent
772 * @description Efficiently fan-out message to multiple dashboard subscribers.
773 * Automatically handles connection cleanup for closed connections.
774 * @param {string} agent_uuid - Agent whose subscribers should receive message
775 * @param {object} message - Message object to broadcast
776 * @param {string} message.type - Message type (metrics|agent_disconnected|...)
777 * @param {object} [message.data] - Message payload
780 broadcastToSubscribers(agent_uuid, message) {
781 const subs = this.subscriptions.get(agent_uuid);
782 if (!subs || subs.size === 0) return;
784 const payload = JSON.stringify(message);
787 for (const connection_id of subs) {
788 const conn = this.dashboardConnections.get(connection_id);
789 if (conn && conn.ws.readyState === 1) {
791 conn.ws.send(payload);
794 console.error(`[WS] Failed to send to dashboard ${connection_id}:`, err.message);
800 console.log(`[WS] Broadcasted ${message.type} to ${sent} dashboards`);
805 * Execute script on agent and stream output to dashboard
806 * @description Creates execution session, sends script to agent, and registers
807 * dashboard for output streaming. Supports PowerShell, Bash, or Python scripts.
808 * @param {string} agent_uuid - Target agent's unique identifier
809 * @param {number} script_id - Script ID from database
810 * @param {string} code - Script code to execute
811 * @param {string} dashboard_connection_id - Dashboard requesting execution
812 * @returns {string} execution_id - Unique execution identifier for tracking
814 executeScript(agent_uuid, script_id, code, dashboard_connection_id) {
815 const execution_id = `exec-${Date.now()}-${Math.random().toString(36).slice(2)}`;
817 this.activeExecutions.set(execution_id, {
819 dashboard_connections: new Set([dashboard_connection_id])
822 this.sendToAgent(agent_uuid, {
823 type: 'execute_script',
833 * Broadcast script output to watching dashboards
834 * @description Sends script output chunk or completion message to all dashboards
835 * watching this execution.
836 * @param {string} execution_id - Script execution identifier
837 * @param {object} message - Output message to broadcast
838 * @param {string} message.type - Message type (script_output|script_complete)
839 * @param {object} message.data - Output data (stdout, stderr, exit code)
842 broadcastScriptOutput(execution_id, message) {
843 const execution = this.activeExecutions.get(execution_id);
844 if (!execution) return;
846 const payload = JSON.stringify(message);
848 for (const connection_id of execution.dashboard_connections) {
849 const conn = this.dashboardConnections.get(connection_id);
850 if (conn && conn.ws.readyState === 1) {
852 conn.ws.send(payload);
854 console.error(`[WS] Failed to send script output to ${connection_id}:`, err.message);
861 * Send request to agent and wait for response
862 * @description Implements request-reply pattern with timeout. Correlates responses
863 * using request_id. Used for synchronous queries like get_processes, get_services.
864 * @param {string} agent_uuid - Target agent's unique identifier
865 * @param {object} message - Request message object
866 * @param {string} message.type - Request type (get_processes|get_services|...)
867 * @param {number} [timeout] - Timeout in milliseconds
868 * @returns {Promise<object>} Resolves with agent's response data, rejects on timeout
869 * @throws {Error} Throws 'Request timeout' if agent doesn't respond within timeout
870 * @throws {Error} Throws 'Agent not connected' if agent is offline
872 requestFromAgent(agent_uuid, message, timeout = 10000) {
873 return new Promise((resolve, reject) => {
874 const request_id = `req-${Date.now()}-${Math.random().toString(36).slice(2)}`;
876 const timer = setTimeout(() => {
877 this.pendingRequests.delete(request_id);
878 reject(new Error('Request timeout'));
881 this.pendingRequests.set(request_id, { resolve, reject, timeout: timer });
883 const success = this.sendToAgent(agent_uuid, {
890 this.pendingRequests.delete(request_id);
891 reject(new Error('Agent not connected'));
897 * Request file list from agent.
898 * Sends file list request to agent and returns promise with results.
899 * @param {string} agentId - Agent UUID
900 * @param {string} path - File system path to list
901 * @returns {Promise<Array>} Resolves with file list array
902 * @throws {Error} Agent not connected or timeout
904 requestAgentFileList(agentId, path) {
905 return new Promise((resolve, reject) => {
906 const ws = this.agentConnections.get(agentId);
907 if (!ws || ws.readyState !== 1) {
908 return reject(new Error('Agent not connected'));
911 const requestId = `filelist-${Date.now()}-${Math.random().toString(36).slice(2)}`;
913 const timer = setTimeout(() => {
914 this.pendingRequests.delete(requestId);
915 reject(new Error('Timeout waiting for agent file list'));
918 this.pendingRequests.set(requestId, { resolve, reject, timeout: timer });
920 // Send file list request
921 ws.send(JSON.stringify({ type: 'file-list', requestId, path }));
926 * Update agent last_seen in database.
927 * @param {string} agent_uuid - Agent UUID to update
928 * @returns {Promise<void>}
930 async updateLastSeen(agent_uuid) {
932 const pool = require('./db');
934 'UPDATE agents SET last_seen = NOW() WHERE agent_uuid = $1',
938 console.error(`[WS] Failed to update last_seen for ${agent_uuid}:`, err.message);
943 * Save network and disk info from WebSocket metrics to database.
944 * This ensures offline fallback data is current.
945 * @param {string} agent_uuid - Agent UUID
946 * @param {object} metrics - Metrics object with network and disk data
947 * @returns {Promise<void>}
949 async saveNetworkInfo(agent_uuid, metrics) {
951 const pool = require('./db');
953 // Extract IP address from metrics
954 if (metrics.network && metrics.network.ipv4) {
956 'UPDATE agents SET ip_address = $1, updated_at = NOW() WHERE agent_uuid = $2',
957 [metrics.network.ipv4, agent_uuid]
961 // Update system_info with current drives data
962 if (metrics.disk && Array.isArray(metrics.disk)) {
963 // Get existing system_info and merge drives data
964 const result = await pool.query(
965 'SELECT system_info FROM agents WHERE agent_uuid = $1',
969 if (result.rows.length > 0) {
970 const systemInfo = result.rows[0].system_info || {};
972 // Convert WebSocket disk format to drives format
973 const drives = metrics.disk.map(d => ({
974 Drive: d.mount || d.filesystem || 'Unknown',
975 Root: d.mount || d.filesystem || 'Unknown',
976 Used: Math.round((d.used / (1024 * 1024 * 1024)) * 100) / 100, // Convert to GB
977 Free: Math.round(((d.total - d.used) / (1024 * 1024 * 1024)) * 100) / 100,
978 Total: Math.round((d.total / (1024 * 1024 * 1024)) * 100) / 100,
979 UsedPercent: Math.round(d.used_percent * 10) / 10
982 systemInfo.drives = drives;
985 'UPDATE agents SET system_info = $1, updated_at = NOW() WHERE agent_uuid = $2',
986 [JSON.stringify(systemInfo), agent_uuid]
991 console.error(`[WS] Failed to save network info for ${agent_uuid}:`, err.message);
996 * Cleanup sessions when agent disconnects.
997 * Removes RDS sessions and active executions for disconnected agent.
998 * @param {string} agent_uuid - Agent UUID to clean up
1001 cleanupAgentSessions(agent_uuid) {
1002 // Clean up RDS sessions
1003 for (const [sessionId, session] of this.rdsSessions.entries()) {
1004 if (session.agentId === agent_uuid) {
1005 if (session.viewer) session.viewer.close();
1006 if (session.control) session.control.close();
1007 this.rdsSessions.delete(sessionId);
1008 console.log(`[RDS] Cleaned session ${sessionId} for agent ${agent_uuid}`);
1012 // Clean up active executions
1013 for (const [execution_id, execution] of this.activeExecutions.entries()) {
1014 if (execution.agent_uuid === agent_uuid) {
1015 this.broadcastScriptOutput(execution_id, {
1016 type: 'script_error',
1018 error: 'Agent disconnected',
1019 timestamp: Date.now()
1021 this.activeExecutions.delete(execution_id);
1027 * Handle RDS connections (existing functionality).
1028 * Routes viewer and control WebSocket connections for remote desktop sessions.
1029 * @param {WebSocket} ws - WebSocket connection
1030 * @param {object} req - HTTP request object with URL
1033 handleRDSConnection(ws, req) {
1034 const parsed = url.parse(req.url, true);
1035 const path = parsed.pathname;
1037 if (path.startsWith('/api/rds/view/')) {
1038 const sessionId = parsed.query.session;
1039 if (!sessionId) return ws.close();
1041 const session = this.rdsSessions.get(sessionId);
1042 if (session) session.viewer = ws;
1044 console.log(`[RDS] Viewer connected: ${sessionId}`);
1046 // Handle messages from viewer (keyboard/mouse) - forward to agent via main WebSocket
1047 ws.on('message', (data) => {
1048 if (session && session.agentId) {
1049 const agentWs = this.agentConnections.get(session.agentId);
1050 if (agentWs && agentWs.readyState === 1) {
1051 // Send viewer data to agent through main WebSocket
1052 agentWs.send(JSON.stringify({
1054 sessionId: sessionId,
1055 data: data.toString('base64') // Encode binary data as base64
1061 ws.on('close', () => {
1063 session.viewer = null;
1064 // Notify agent to close tunnel
1065 const agentWs = this.agentConnections.get(session.agentId);
1066 if (agentWs && agentWs.readyState === 1) {
1067 agentWs.send(JSON.stringify({
1069 sessionId: sessionId
1073 console.log(`[RDS] Viewer disconnected: ${sessionId}`);
1076 } else if (path.startsWith('/api/rds/control/')) {
1077 const sessionId = parsed.query.session;
1078 if (!sessionId) return ws.close();
1080 const session = this.rdsSessions.get(sessionId);
1081 if (!session) return ws.close();
1083 session.control = ws;
1084 console.log(`[RDS] Control connected: ${sessionId}`);
1086 // Forward control messages to agent
1087 ws.on('message', (raw) => {
1089 const msg = JSON.parse(raw.toString());
1090 const agentWs = this.agentConnections.get(session.agentId);
1091 if (agentWs && agentWs.readyState === 1) {
1092 agentWs.send(JSON.stringify({ ...msg, sessionId }));
1095 console.error('[RDS] Error forwarding control:', err);
1099 ws.on('close', () => {
1100 if (session) session.control = null;
1101 console.log(`[RDS] Control disconnected: ${sessionId}`);
1107 * Handle persistent tunnel request from agent.
1108 * Assigns port and creates persistent tunnel session.
1109 * @param {string} agent_uuid - Agent UUID
1110 * @param {object} msg - Message with protocol and port
1111 * @param {WebSocket} ws - Agent WebSocket connection
1112 * @returns {Promise<void>}
1114 async handlePersistentTunnelRequest(agent_uuid, msg, ws) {
1115 const { protocol, port: localPort } = msg;
1116 console.log(`[Persistent Tunnel] Agent ${agent_uuid} requesting ${protocol}:${localPort} tunnel`);
1118 // Clean up any existing tunnel for this agent
1119 const existing = this.persistentTunnels.get(agent_uuid);
1121 this.assignedPorts.delete(existing.assignedPort);
1122 this.rdsSessions.delete(existing.sessionId);
1123 console.log(`[Persistent Tunnel] Cleaned up existing tunnel for agent ${agent_uuid}`);
1126 // Assign a new port
1127 const assignedPort = this.getNextAvailablePort();
1128 const sessionId = require('crypto').randomUUID();
1130 // Store tunnel info
1131 this.persistentTunnels.set(agent_uuid, {
1139 // Register in RDS sessions for data routing
1140 this.rdsSessions.set(sessionId, {
1141 agentId: agent_uuid,
1148 console.log(`[Persistent Tunnel] Assigned ${agent_uuid} → localhost:${assignedPort} (${protocol}:${localPort})`);
1150 // Send assignment to agent
1151 ws.send(JSON.stringify({
1152 type: 'persistent_tunnel_assigned',
1157 timestamp: Date.now()
1162 * Handle persistent tunnel ready confirmation.
1163 * Marks tunnel as ready after agent confirms initialization.
1164 * @param {string} agent_uuid - Agent UUID
1165 * @param {object} msg - Message with sessionId
1168 handlePersistentTunnelReady(agent_uuid, msg) {
1169 const { sessionId } = msg;
1170 const tunnel = this.persistentTunnels.get(agent_uuid);
1172 if (tunnel && tunnel.sessionId === sessionId) {
1173 tunnel.ready = true;
1174 console.log(`[Persistent Tunnel] ✅ Agent ${agent_uuid} tunnel ready: localhost:${tunnel.assignedPort} → ${tunnel.protocol}:${tunnel.localPort}`);
1179 * Get next available port for persistent tunnels.
1180 * Increments port counter and tracks assigned ports.
1181 * @returns {number} Available port number
1183 getNextAvailablePort() {
1184 while (this.assignedPorts.has(this.nextAvailablePort)) {
1185 this.nextAvailablePort++;
1187 const port = this.nextAvailablePort++;
1188 this.assignedPorts.add(port);
1193 * Get persistent tunnel info for agent.
1194 * @param {string} agent_uuid - Agent UUID
1195 * @returns {object|undefined} Tunnel info object or undefined if not found
1197 getPersistentTunnel(agent_uuid) {
1198 return this.persistentTunnels.get(agent_uuid);
1202 * Get agent connection status.
1203 * @param {string} agent_uuid - Agent UUID
1204 * @returns {boolean} True if agent is connected and ready
1206 isAgentConnected(agent_uuid) {
1207 const ws = this.agentConnections.get(agent_uuid);
1208 return ws && ws.readyState === 1;
1213 * Returns current connection and session counts.
1214 * @returns {object} Stats object with connection counts
1218 agents: this.agentConnections.size,
1219 dashboards: this.dashboardConnections.size,
1220 subscriptions: this.subscriptions.size,
1221 executions: this.activeExecutions.size,
1222 rds_sessions: this.rdsSessions.size,
1223 persistent_tunnels: this.persistentTunnels.size
1228 * Handle MeshCentral terminal WebSocket connection.
1229 * Proxies terminal session between dashboard and MeshCentral.
1230 * @param {WebSocket} ws - WebSocket connection from dashboard
1231 * @param {object} query - Query parameters with sessionToken and agentId
1232 * @returns {Promise<void>}
1234 async handleMeshCentralTerminal(ws, query) {
1235 const { sessionToken, agentId } = query;
1237 console.log(`[MeshCentral Terminal] New connection: agentId=${agentId}, sessionToken=${sessionToken?.substring(0, 8)}...`);
1239 if (!sessionToken || !agentId) {
1240 console.error('[MeshCentral Terminal] Missing sessionToken or agentId');
1241 ws.close(1008, 'Missing required parameters');
1246 // Validate session and get device info
1247 const sessionResult = await pool.query(
1248 `SELECT s.*, a.meshcentral_nodeid, a.tenant_id
1249 FROM meshcentral_sessions s
1250 JOIN agents a ON a.agent_id = s.agent_id
1251 WHERE s.session_token = $1 AND s.agent_id = $2 AND s.expires_at > NOW()`,
1252 [sessionToken, agentId]
1255 if (sessionResult.rows.length === 0) {
1256 console.error('[MeshCentral Terminal] Invalid or expired session');
1257 ws.close(1008, 'Invalid or expired session');
1261 const session = sessionResult.rows[0];
1262 const nodeId = session.meshcentral_nodeid;
1263 const tenantId = session.tenant_id;
1265 console.log(`[MeshCentral Terminal] Session validated: tenantId=${tenantId}, nodeId=${nodeId}`);
1267 // Get tenant's MeshCentral API connection
1268 const { getTenantMeshAPI } = require('../routes/meshcentral');
1269 const api = await getTenantMeshAPI(tenantId);
1270 if (!api || !api.ws) {
1271 console.error('[MeshCentral Terminal] Failed to connect to MeshCentral');
1272 ws.close(1011, 'MeshCentral connection failed');
1276 console.log(`[MeshCentral Terminal] Connected to MeshCentral for tenant ${tenantId}`);
1278 // Forward messages from MeshCentral to client
1279 const meshMessageHandler = (data) => {
1281 const message = JSON.parse(data.toString());
1282 if (message.type === 'console') {
1283 console.log(`[MeshCentral Terminal] -> Client: console message`);
1284 ws.send(JSON.stringify(message));
1287 console.error('[MeshCentral Terminal] Error parsing MeshCentral message:', err);
1290 api.ws.on('message', meshMessageHandler);
1292 // Forward messages from client to MeshCentral
1293 ws.on('message', (data) => {
1295 const clientMessage = JSON.parse(data.toString());
1296 console.log(`[MeshCentral Terminal] <- Client:`, clientMessage.type, clientMessage.action);
1298 const meshMessage = {
1302 msg: JSON.stringify(clientMessage)
1305 api.ws.send(JSON.stringify(meshMessage));
1307 console.error('[MeshCentral Terminal] Error forwarding client message:', err);
1311 // Handle disconnection
1312 ws.on('close', () => {
1313 console.log(`[MeshCentral Terminal] Client disconnected: agentId=${agentId}`);
1314 api.ws.off('message', meshMessageHandler);
1317 // Send initial connection success
1318 ws.send(JSON.stringify({
1324 console.error('[MeshCentral Terminal] Connection error:', err);
1325 ws.close(1011, 'Internal server error');
1330 * Handle MeshCentral file browser WebSocket connection.
1331 * Proxies file browser session between dashboard and MeshCentral.
1332 * @param {WebSocket} ws - WebSocket connection from dashboard
1333 * @param {object} query - Query parameters with sessionToken and agentId
1334 * @returns {Promise<void>}
1336 async handleMeshCentralFiles(ws, query) {
1337 const { sessionToken, agentId } = query;
1339 console.log(`[MeshCentral Files] New connection: agentId=${agentId}, sessionToken=${sessionToken?.substring(0, 8)}...`);
1341 if (!sessionToken || !agentId) {
1342 console.error('[MeshCentral Files] Missing sessionToken or agentId');
1343 ws.close(1008, 'Missing required parameters');
1348 // Validate session and get device info
1349 const sessionResult = await pool.query(
1350 `SELECT s.*, a.meshcentral_nodeid, a.tenant_id
1351 FROM meshcentral_sessions s
1352 JOIN agents a ON a.agent_id = s.agent_id
1353 WHERE s.session_token = $1 AND s.agent_id = $2 AND s.expires_at > NOW()`,
1354 [sessionToken, agentId]
1357 if (sessionResult.rows.length === 0) {
1358 console.error('[MeshCentral Files] Invalid or expired session');
1359 ws.close(1008, 'Invalid or expired session');
1363 const session = sessionResult.rows[0];
1364 const nodeId = session.meshcentral_nodeid;
1365 const tenantId = session.tenant_id;
1367 console.log(`[MeshCentral Files] Session validated: tenantId=${tenantId}, nodeId=${nodeId}`);
1369 // Get tenant's MeshCentral API connection
1370 const { getTenantMeshAPI } = require('../routes/meshcentral');
1371 const api = await getTenantMeshAPI(tenantId);
1372 if (!api || !api.ws) {
1373 console.error('[MeshCentral Files] Failed to connect to MeshCentral');
1374 ws.close(1011, 'MeshCentral connection failed');
1378 console.log(`[MeshCentral Files] Connected to MeshCentral for tenant ${tenantId}`);
1380 // Forward messages from MeshCentral to client
1381 const meshMessageHandler = (data) => {
1383 const message = JSON.parse(data.toString());
1385 // Only forward file-related messages
1386 if (message.type === 'notify' ||
1387 message.type === 'download' ||
1388 message.type === 'uploaddone' ||
1389 message.action === 'ls' ||
1390 message.action === 'download' ||
1391 message.action === 'upload') {
1393 console.log(`[MeshCentral Files] -> Client: ${message.action || message.type}`);
1394 ws.send(JSON.stringify(message));
1397 console.error('[MeshCentral Files] Error parsing MeshCentral message:', err);
1400 api.ws.on('message', meshMessageHandler);
1402 // Forward messages from client to MeshCentral
1403 ws.on('message', (data) => {
1405 const clientMessage = JSON.parse(data.toString());
1406 console.log(`[MeshCentral Files] <- Client: ${clientMessage.action}`, {
1407 path: clientMessage.path,
1408 reqid: clientMessage.reqid
1411 const meshMessage = {
1418 api.ws.send(JSON.stringify(meshMessage));
1420 console.error('[MeshCentral Files] Error forwarding client message:', err);
1421 ws.send(JSON.stringify({
1423 message: 'Failed to process request'
1428 // Handle disconnection
1429 ws.on('close', () => {
1430 console.log(`[MeshCentral Files] Client disconnected: agentId=${agentId}`);
1431 api.ws.off('message', meshMessageHandler);
1434 // Send initial connection success
1435 ws.send(JSON.stringify({
1441 console.error('[MeshCentral Files] Connection error:', err);
1442 ws.close(1011, 'Internal server error');
1447// Create singleton instance
1448const wsManager = new WebSocketManager();
1450module.exports = wsManager;