EverydayTech Platform - Developer Reference
Complete Source Code Documentation - All Applications
Loading...
Searching...
No Matches
fileTransferService.js
Go to the documentation of this file.
1/**
2 * File Transfer Service via WebSocket
3 *
4 * Handles file uploads and downloads through agent WebSocket connections
5 * This allows file transfer to/from agents behind NAT/firewalls
6 *
7 * Flow:
8 * Upload: Dashboard → Backend → WebSocket → Agent → Filesystem
9 * Download: Filesystem → Agent → WebSocket → Backend → Dashboard
10 */
11
12const crypto = require('crypto');
13const fs = require('fs').promises;
14const path = require('path');
15const EventEmitter = require('events');
16const wsManager = require('./websocketManager');
17
18/**
19 *
20 */
21class FileTransferService extends EventEmitter {
22 /**
23 *
24 */
25 constructor() {
26 super();
27 // Map: transferId → { agent_uuid, type, filePath, progress, status, ...}
28 this.activeTransfers = new Map();
29
30 // Temporary upload directory
31 this.uploadDir = path.join(__dirname, '../uploads');
32 }
33
34 /**
35 *
36 */
37 async initialize() {
38 // Ensure upload directory exists
39 try {
40 await fs.mkdir(this.uploadDir, { recursive: true });
41 } catch (err) {
42 console.error('[FileTransfer] Failed to create upload directory:', err);
43 }
44 }
45
46 /**
47 * Start file upload to agent
48 * @param {string} agent_uuid - Agent UUID
49 * @param {string} localPath - Local file path on backend
50 * @param {string} remotePath - Destination path on agent
51 * @param {string} user_email - User initiating transfer
52 * @returns {Promise<string>} transferId
53 */
54 async startUpload(agent_uuid, localPath, remotePath, user_email = 'system') {
55 const agentWs = wsManager.agentConnections.get(agent_uuid);
56 if (!agentWs || agentWs.readyState !== 1) {
57 throw new Error('Agent not connected');
58 }
59
60 const transferId = crypto.randomUUID();
61 const stats = await fs.stat(localPath);
62 const fileSize = stats.size;
63
64 // Store transfer info
65 this.activeTransfers.set(transferId, {
66 transferId,
67 agent_uuid,
68 type: 'upload',
69 localPath,
70 remotePath,
71 fileSize,
72 transferred: 0,
73 status: 'starting',
74 user_email,
75 startedAt: Date.now()
76 });
77
78 // Send start command to agent
79 agentWs.send(JSON.stringify({
80 type: 'file_upload_start',
81 transferId,
82 remotePath,
83 fileSize
84 }));
85
86 console.log(`[FileTransfer] Started upload ${transferId} to agent ${agent_uuid}: ${remotePath}`);
87
88 // Start streaming file in chunks
89 this.streamFileToAgent(transferId, agent_uuid, localPath);
90
91 // Audit log
92 this.logActivity(agent_uuid, 'file_upload_started', {
93 transfer_id: transferId,
94 remote_path: remotePath,
95 file_size: fileSize,
96 user: user_email
97 });
98
99 return transferId;
100 }
101
102 /**
103 * Stream file to agent in chunks
104 * @param {string} transferId - Transfer ID
105 * @param {string} agent_uuid - Agent UUID
106 * @param {string} localPath - Local file path
107 */
108 async streamFileToAgent(transferId, agent_uuid, localPath) {
109 const CHUNK_SIZE = 64 * 1024; // 64KB chunks
110 const transfer = this.activeTransfers.get(transferId);
111 const agentWs = wsManager.agentConnections.get(agent_uuid);
112
113 if (!transfer || !agentWs) {
114 console.error(`[FileTransfer] Transfer ${transferId} cancelled or agent disconnected`);
115 return;
116 }
117
118 try {
119 const fileHandle = await fs.open(localPath, 'r');
120 const buffer = Buffer.allocUnsafe(CHUNK_SIZE);
121 let offset = 0;
122
123 transfer.status = 'transferring';
124
125 while (true) {
126 const { bytesRead } = await fileHandle.read(buffer, 0, CHUNK_SIZE, offset);
127
128 if (bytesRead === 0) break; // EOF
129
130 // Send chunk to agent
131 agentWs.send(JSON.stringify({
132 type: 'file_upload_chunk',
133 transferId,
134 data: buffer.slice(0, bytesRead).toString('base64'),
135 offset,
136 size: bytesRead
137 }));
138
139 offset += bytesRead;
140 transfer.transferred = offset;
141 transfer.progress = (offset / transfer.fileSize) * 100;
142
143 this.emit('progress', { transferId, progress: transfer.progress });
144 }
145
146 await fileHandle.close();
147
148 // Send completion
149 agentWs.send(JSON.stringify({
150 type: 'file_upload_complete',
151 transferId
152 }));
153
154 transfer.status = 'completed';
155 transfer.completedAt = Date.now();
156
157 console.log(`[FileTransfer] Upload ${transferId} completed`);
158
159 // Audit log
160 this.logActivity(agent_uuid, 'file_upload_completed', {
161 transfer_id: transferId,
162 remote_path: transfer.remotePath,
163 file_size: transfer.fileSize,
164 duration_ms: transfer.completedAt - transfer.startedAt
165 });
166
167 } catch (err) {
168 console.error(`[FileTransfer] Upload ${transferId} failed:`, err);
169 transfer.status = 'failed';
170 transfer.error = err.message;
171
172 // Notify agent
173 if (agentWs && agentWs.readyState === 1) {
174 agentWs.send(JSON.stringify({
175 type: 'file_upload_cancel',
176 transferId,
177 error: err.message
178 }));
179 }
180 }
181 }
182
183 /**
184 * Start file download from agent
185 * @param {string} agent_uuid - Agent UUID
186 * @param {string} remotePath - File path on agent
187 * @param {string} user_email - User initiating transfer
188 * @returns {Promise<string>} transferId
189 */
190 async startDownload(agent_uuid, remotePath, user_email = 'system') {
191 const agentWs = wsManager.agentConnections.get(agent_uuid);
192 if (!agentWs || agentWs.readyState !== 1) {
193 throw new Error('Agent not connected');
194 }
195
196 const transferId = crypto.randomUUID();
197 const localPath = path.join(this.uploadDir, `${transferId}_${path.basename(remotePath)}`);
198
199 // Store transfer info
200 this.activeTransfers.set(transferId, {
201 transferId,
202 agent_uuid,
203 type: 'download',
204 localPath,
205 remotePath,
206 fileSize: 0,
207 transferred: 0,
208 status: 'starting',
209 user_email,
210 startedAt: Date.now()
211 });
212
213 // Send download request to agent
214 agentWs.send(JSON.stringify({
215 type: 'file_download_start',
216 transferId,
217 remotePath
218 }));
219
220 console.log(`[FileTransfer] Started download ${transferId} from agent ${agent_uuid}: ${remotePath}`);
221
222 // Audit log
223 this.logActivity(agent_uuid, 'file_download_started', {
224 transfer_id: transferId,
225 remote_path: remotePath,
226 user: user_email
227 });
228
229 return transferId;
230 }
231
232 /**
233 * Handle incoming chunk from agent (download)
234 * @param {object} data - Message data
235 */
236 async handleDownloadChunk(data) {
237 const { transferId, data: base64Data, offset, size, fileSize } = data;
238 const transfer = this.activeTransfers.get(transferId);
239
240 if (!transfer || transfer.type !== 'download') {
241 console.error(`[FileTransfer] Invalid download chunk for ${transferId}`);
242 return;
243 }
244
245 try {
246 // Initialize file size on first chunk
247 if (fileSize && transfer.fileSize === 0) {
248 transfer.fileSize = fileSize;
249 }
250
251 // Write chunk to file
252 const buffer = Buffer.from(base64Data, 'base64');
253 await fs.appendFile(transfer.localPath, buffer);
254
255 transfer.transferred += size;
256 transfer.progress = transfer.fileSize > 0 ? (transfer.transferred / transfer.fileSize) * 100 : 0;
257 transfer.status = 'transferring';
258
259 this.emit('progress', { transferId, progress: transfer.progress });
260
261 } catch (err) {
262 console.error(`[FileTransfer] Failed to write chunk for ${transferId}:`, err);
263 transfer.status = 'failed';
264 transfer.error = err.message;
265 }
266 }
267
268 /**
269 * Handle download completion from agent
270 * @param {object} data - Message data
271 */
272 async handleDownloadComplete(data) {
273 const { transferId } = data;
274 const transfer = this.activeTransfers.get(transferId);
275
276 if (!transfer || transfer.type !== 'download') {
277 console.error(`[FileTransfer] Invalid download completion for ${transferId}`);
278 return;
279 }
280
281 transfer.status = 'completed';
282 transfer.completedAt = Date.now();
283
284 console.log(`[FileTransfer] Download ${transferId} completed: ${transfer.localPath}`);
285
286 // Audit log
287 this.logActivity(transfer.agent_uuid, 'file_download_completed', {
288 transfer_id: transferId,
289 remote_path: transfer.remotePath,
290 file_size: transfer.fileSize,
291 duration_ms: transfer.completedAt - transfer.startedAt
292 });
293 }
294
295 /**
296 * Get transfer status
297 * @param {string} transferId - Transfer ID
298 * @returns {object | null}
299 */
300 getTransfer(transferId) {
301 return this.activeTransfers.get(transferId) || null;
302 }
303
304 /**
305 * Get all active transfers for agent
306 * @param {string} agent_uuid - Agent UUID
307 * @returns {Array}
308 */
309 getAgentTransfers(agent_uuid) {
310 const transfers = [];
311 for (const transfer of this.activeTransfers.values()) {
312 if (transfer.agent_uuid === agent_uuid) {
313 transfers.push(transfer);
314 }
315 }
316 return transfers;
317 }
318
319 /**
320 * Cancel transfer
321 * @param {string} transferId - Transfer ID
322 */
323 async cancelTransfer(transferId) {
324 const transfer = this.activeTransfers.get(transferId);
325 if (!transfer) return;
326
327 const agentWs = wsManager.agentConnections.get(transfer.agent_uuid);
328 if (agentWs && agentWs.readyState === 1) {
329 agentWs.send(JSON.stringify({
330 type: transfer.type === 'upload' ? 'file_upload_cancel' : 'file_download_cancel',
331 transferId
332 }));
333 }
334
335 transfer.status = 'cancelled';
336 console.log(`[FileTransfer] Cancelled transfer ${transferId}`);
337 }
338
339 /**
340 * Clean up completed/failed transfers older than 1 hour
341 */
342 async cleanup() {
343 const oneHourAgo = Date.now() - (60 * 60 * 1000);
344
345 for (const [transferId, transfer] of this.activeTransfers.entries()) {
346 if (['completed', 'failed', 'cancelled'].includes(transfer.status) &&
347 transfer.completedAt && transfer.completedAt < oneHourAgo) {
348
349 // Delete download file if exists
350 if (transfer.type === 'download' && transfer.localPath) {
351 try {
352 await fs.unlink(transfer.localPath);
353 } catch (err) {
354 // Ignore
355 }
356 }
357
358 this.activeTransfers.delete(transferId);
359 }
360 }
361 }
362
363 /**
364 * Log activity to database for audit trail
365 * @param {string} agent_uuid - Agent UUID
366 * @param {string} action - Action type
367 * @param {object} details - Additional details
368 */
369 async logActivity(agent_uuid, action, details = {}) {
370 try {
371 const db = require('../db');
372
373 // Get tenant_id from agent
374 const agentResult = await db.query(
375 'SELECT tenant_id FROM agents WHERE agent_uuid = $1',
376 [agent_uuid]
377 );
378
379 if (agentResult.rows.length === 0) {
380 console.error('[FileTransfer] Cannot log activity: agent not found');
381 return;
382 }
383
384 const tenant_id = agentResult.rows[0].tenant_id;
385
386 await db.query(
387 `INSERT INTO activity_logs (agent_uuid, tenant_id, action, details, user_email, created_at)
388 VALUES ($1, $2, $3, $4, $5, NOW())`,
389 [agent_uuid, tenant_id, action, JSON.stringify(details), details.user || 'system']
390 );
391
392 console.log(`[FileTransfer] Logged activity: ${action} for agent ${agent_uuid}`);
393 } catch (err) {
394 console.error('[FileTransfer] Failed to log activity:', err.message);
395 }
396 }
397}
398
399// Create singleton
400const fileTransferService = new FileTransferService();
401fileTransferService.initialize();
402
403// Cleanup every hour
404setInterval(() => fileTransferService.cleanup(), 60 * 60 * 1000);
405
406module.exports = fileTransferService;