EverydayTech Platform - Developer Reference
Complete Source Code Documentation - All Applications
Loading...
Searching...
No Matches
pax8SyncWorker.js
Go to the documentation of this file.
1/**
2 * Pax8 Product Sync Worker
3 * Syncs Office 365 products from Pax8 API to local database
4 * - Runs daily at 3am to sync product catalog changes
5 * - Runs on startup to ensure fresh product data
6 * - Can be triggered via Redis pub/sub for manual sync
7 */
8const Redis = require("ioredis");
9const pool = require('./services/db');
10const Pax8Service = require('./services/pax8Service');
11
12const redisConfig = require('./config/redis');
13const redis = new Redis(redisConfig);
14
15const SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24 hours
16const SYNC_HOUR = 3; // 3am
17
18console.log('[Pax8 Sync Worker] Starting...');
19
20/**
21 * Sync products for a single tenant
22 * @param tenantId
23 * @param vendor
24 * @param force
25 */
26async function syncTenantProducts(tenantId, vendor = 'Microsoft', force = false) {
27 console.log(`[Pax8 Sync] Starting sync for tenant ${tenantId}, vendor: ${vendor}`);
28
29 try {
30 const pax8 = new Pax8Service(tenantId);
31
32 // Fetch products from Pax8 API
33 const products = await pax8.listProducts({ vendor });
34
35 if (!products || products.length === 0) {
36 console.log(`[Pax8 Sync] No products found for vendor ${vendor}`);
37 return { total: 0, synced: 0, updated: 0, skipped: 0 };
38 }
39
40 console.log(`[Pax8 Sync] Found ${products.length} products, syncing to database...`);
41
42 let synced = 0;
43 let updated = 0;
44 let skipped = 0;
45
46 for (const product of products) {
47 try {
48 // Check if product already exists by pax8_product_id
49 const existingResult = await pool.query(
50 'SELECT product_id, last_synced_at FROM products WHERE pax8_product_id = $1',
51 [product.id]
52 );
53
54 const now = new Date();
55 const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
56
57 // Skip if recently synced and not forcing
58 if (!force && existingResult.rows.length > 0 && existingResult.rows[0].last_synced_at) {
59 const lastSync = new Date(existingResult.rows[0].last_synced_at);
60 if (lastSync > oneHourAgo) {
61 skipped++;
62 continue;
63 }
64 }
65
66 // Extract product details
67 const name = product.name || product.productName || 'Unknown Product';
68 const description = product.description || product.productDescription || '';
69 const priceRetail = product.pricing?.retail || product.price || 0;
70 const priceExTax = product.pricing?.cost || priceRetail;
71 const pax8Vendor = product.vendor || vendor;
72 const pax8Category = product.category || null;
73 const pax8Subcategory = product.subcategory || product.subCategory || null;
74 const pax8Sku = product.sku || product.productSku || null;
75 const billingTerm = product.billingTerm || product.billing?.term || 'monthly';
76 const isAutoRenew = product.autoRenew !== false;
77 const pax8Metadata = JSON.stringify(product);
78
79 if (existingResult.rows.length > 0) {
80 // Update existing product
81 await pool.query(
82 `UPDATE products SET
83 name = $1, description = $2, price_retail = $3, price_ex_tax = $4,
84 pax8_vendor = $5, pax8_category = $6, pax8_subcategory = $7, pax8_sku = $8,
85 billing_term = $9, is_auto_renew = $10, pax8_metadata = $11,
86 last_synced_at = NOW(), updated_at = NOW()
87 WHERE pax8_product_id = $12`,
88 [name, description, priceRetail, priceExTax, pax8Vendor, pax8Category,
89 pax8Subcategory, pax8Sku, billingTerm, isAutoRenew, pax8Metadata, product.id]
90 );
91 updated++;
92 } else {
93 // Insert new product
94 await pool.query(
95 `INSERT INTO products (
96 tenant_id, name, description, supplier, price_retail, price_ex_tax,
97 is_service, divisible, is_stock,
98 pax8_product_id, pax8_vendor, pax8_category, pax8_subcategory,
99 pax8_sku, billing_term, is_auto_renew, pax8_metadata,
100 is_active, last_synced_at, created_at, updated_at
101 ) VALUES (
102 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, NOW(), NOW(), NOW()
103 )`,
104 [tenantId, name, description, 'Pax8', priceRetail, priceExTax,
105 true, true, false, product.id, pax8Vendor, pax8Category, pax8Subcategory,
106 pax8Sku, billingTerm, isAutoRenew, pax8Metadata, true]
107 );
108 synced++;
109 }
110
111 } catch (productErr) {
112 console.error(`[Pax8 Sync] Error processing product ${product.id}:`, productErr.message);
113 }
114 }
115
116 const stats = { total: products.length, synced, updated, skipped };
117 console.log(`[Pax8 Sync] Completed sync for tenant ${tenantId}:`, stats);
118
119 return stats;
120
121 } catch (err) {
122 console.error(`[Pax8 Sync] Error syncing tenant ${tenantId}:`, err.message);
123 throw err;
124 }
125}
126
127/**
128 * Sync all active tenants with Pax8 configuration
129 */
130async function syncAllTenants() {
131 try {
132 const result = await pool.query(
133 `SELECT tenant_id FROM pax8_config WHERE is_active = true`
134 );
135
136 if (result.rows.length === 0) {
137 console.log('[Pax8 Sync] No active Pax8 configurations found');
138 return;
139 }
140
141 console.log(`[Pax8 Sync] Starting sync for ${result.rows.length} tenant(s)`);
142
143 for (const row of result.rows) {
144 try {
145 await syncTenantProducts(row.tenant_id, 'Microsoft', false);
146 } catch (error) {
147 console.error(`[Pax8 Sync] Failed to sync tenant ${row.tenant_id}:`, error.message);
148 // Continue with next tenant
149 }
150 }
151
152 console.log('[Pax8 Sync] All tenants synced successfully');
153 } catch (error) {
154 console.error('[Pax8 Sync] Error in sync cycle:', error);
155 }
156}
157
158/**
159 * Calculate milliseconds until next scheduled sync time (3am)
160 */
161function msUntilNextSync() {
162 const now = new Date();
163 const nextSync = new Date();
164 nextSync.setHours(SYNC_HOUR, 0, 0, 0);
165
166 // If 3am has already passed today, schedule for tomorrow
167 if (now.getHours() >= SYNC_HOUR) {
168 nextSync.setDate(nextSync.getDate() + 1);
169 }
170
171 return nextSync.getTime() - now.getTime();
172}
173
174/**
175 * Start the sync worker
176 */
177function startSyncWorker() {
178 console.log(`[Pax8 Sync] Worker started - syncing daily at ${SYNC_HOUR}am`);
179
180 // Run immediately on startup
181 syncAllTenants().catch(err => {
182 console.error('[Pax8 Sync] Initial sync failed:', err);
183 });
184
185 // Schedule next sync at 3am
186 const msUntilSync = msUntilNextSync();
187 console.log(`[Pax8 Sync] Next sync in ${Math.round(msUntilSync / 1000 / 60 / 60)} hours`);
188
189 setTimeout(() => {
190 syncAllTenants();
191 // Then repeat every 24 hours
192 setInterval(syncAllTenants, SYNC_INTERVAL);
193 }, msUntilSync);
194}
195
196// Subscribe to manual sync requests via Redis
197redis.subscribe('pax8:sync:request', (err) => {
198 if (err) {
199 console.error('[Pax8 Sync] Failed to subscribe:', err);
200 } else {
201 console.log('[Pax8 Sync] Subscribed to pax8:sync:request for manual triggers');
202 }
203});
204
205redis.on('message', async (channel, message) => {
206 if (channel !== 'pax8:sync:request') return;
207
208 let jobData;
209 try {
210 jobData = JSON.parse(message);
211 } catch (err) {
212 console.error('[Pax8 Sync] Invalid job data:', message);
213 return;
214 }
215
216 const { tenantId, vendor = 'Microsoft', force = false, jobId } = jobData;
217
218 console.log(`[Pax8 Sync] Manual sync triggered - Job ${jobId} for tenant ${tenantId}`);
219
220 try {
221 const stats = await syncTenantProducts(tenantId, vendor, force);
222
223 // Publish success result
224 await redis.publish('pax8:sync:result', JSON.stringify({
225 jobId,
226 tenantId,
227 success: true,
228 stats,
229 message: `Synced ${stats.synced + stats.updated} products (${stats.synced} new, ${stats.updated} updated, ${stats.skipped} skipped)`
230 }));
231
232 } catch (err) {
233 console.error(`[Pax8 Sync] Job ${jobId} failed:`, err.message);
234
235 // Publish error result
236 await redis.publish('pax8:sync:result', JSON.stringify({
237 jobId,
238 tenantId,
239 success: false,
240 error: err.message,
241 message: 'Product sync failed'
242 }));
243 }
244});
245
246// Handle graceful shutdown
247process.on('SIGTERM', () => {
248 console.log('[Pax8 Sync] SIGTERM received, shutting down gracefully...');
249 redis.disconnect();
250 process.exit(0);
251});
252
253process.on('SIGINT', () => {
254 console.log('[Pax8 Sync] SIGINT received, shutting down gracefully...');
255 redis.disconnect();
256 process.exit(0);
257});
258
259// Start the worker if this file is run directly
260if (require.main === module) {
261 startSyncWorker();
262}
263
264module.exports = { startSyncWorker, syncAllTenants, syncTenantProducts };