2 * Pax8 Product Sync Worker
3 * Syncs Office 365 products from Pax8 API to local database
4 * - Runs daily at 3am to sync product catalog changes
5 * - Runs on startup to ensure fresh product data
6 * - Can be triggered via Redis pub/sub for manual sync
8const Redis = require("ioredis");
9const pool = require('./services/db');
10const Pax8Service = require('./services/pax8Service');
12const redisConfig = require('./config/redis');
13const redis = new Redis(redisConfig);
15const SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24 hours
16const SYNC_HOUR = 3; // 3am
18console.log('[Pax8 Sync Worker] Starting...');
21 * Sync products for a single tenant
26async function syncTenantProducts(tenantId, vendor = 'Microsoft', force = false) {
27 console.log(`[Pax8 Sync] Starting sync for tenant ${tenantId}, vendor: ${vendor}`);
30 const pax8 = new Pax8Service(tenantId);
32 // Fetch products from Pax8 API
33 const products = await pax8.listProducts({ vendor });
35 if (!products || products.length === 0) {
36 console.log(`[Pax8 Sync] No products found for vendor ${vendor}`);
37 return { total: 0, synced: 0, updated: 0, skipped: 0 };
40 console.log(`[Pax8 Sync] Found ${products.length} products, syncing to database...`);
46 for (const product of products) {
48 // Check if product already exists by pax8_product_id
49 const existingResult = await pool.query(
50 'SELECT product_id, last_synced_at FROM products WHERE pax8_product_id = $1',
54 const now = new Date();
55 const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
57 // Skip if recently synced and not forcing
58 if (!force && existingResult.rows.length > 0 && existingResult.rows[0].last_synced_at) {
59 const lastSync = new Date(existingResult.rows[0].last_synced_at);
60 if (lastSync > oneHourAgo) {
66 // Extract product details
67 const name = product.name || product.productName || 'Unknown Product';
68 const description = product.description || product.productDescription || '';
69 const priceRetail = product.pricing?.retail || product.price || 0;
70 const priceExTax = product.pricing?.cost || priceRetail;
71 const pax8Vendor = product.vendor || vendor;
72 const pax8Category = product.category || null;
73 const pax8Subcategory = product.subcategory || product.subCategory || null;
74 const pax8Sku = product.sku || product.productSku || null;
75 const billingTerm = product.billingTerm || product.billing?.term || 'monthly';
76 const isAutoRenew = product.autoRenew !== false;
77 const pax8Metadata = JSON.stringify(product);
79 if (existingResult.rows.length > 0) {
80 // Update existing product
83 name = $1, description = $2, price_retail = $3, price_ex_tax = $4,
84 pax8_vendor = $5, pax8_category = $6, pax8_subcategory = $7, pax8_sku = $8,
85 billing_term = $9, is_auto_renew = $10, pax8_metadata = $11,
86 last_synced_at = NOW(), updated_at = NOW()
87 WHERE pax8_product_id = $12`,
88 [name, description, priceRetail, priceExTax, pax8Vendor, pax8Category,
89 pax8Subcategory, pax8Sku, billingTerm, isAutoRenew, pax8Metadata, product.id]
95 `INSERT INTO products (
96 tenant_id, name, description, supplier, price_retail, price_ex_tax,
97 is_service, divisible, is_stock,
98 pax8_product_id, pax8_vendor, pax8_category, pax8_subcategory,
99 pax8_sku, billing_term, is_auto_renew, pax8_metadata,
100 is_active, last_synced_at, created_at, updated_at
102 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, NOW(), NOW(), NOW()
104 [tenantId, name, description, 'Pax8', priceRetail, priceExTax,
105 true, true, false, product.id, pax8Vendor, pax8Category, pax8Subcategory,
106 pax8Sku, billingTerm, isAutoRenew, pax8Metadata, true]
111 } catch (productErr) {
112 console.error(`[Pax8 Sync] Error processing product ${product.id}:`, productErr.message);
116 const stats = { total: products.length, synced, updated, skipped };
117 console.log(`[Pax8 Sync] Completed sync for tenant ${tenantId}:`, stats);
122 console.error(`[Pax8 Sync] Error syncing tenant ${tenantId}:`, err.message);
128 * Sync all active tenants with Pax8 configuration
130async function syncAllTenants() {
132 const result = await pool.query(
133 `SELECT tenant_id FROM pax8_config WHERE is_active = true`
136 if (result.rows.length === 0) {
137 console.log('[Pax8 Sync] No active Pax8 configurations found');
141 console.log(`[Pax8 Sync] Starting sync for ${result.rows.length} tenant(s)`);
143 for (const row of result.rows) {
145 await syncTenantProducts(row.tenant_id, 'Microsoft', false);
147 console.error(`[Pax8 Sync] Failed to sync tenant ${row.tenant_id}:`, error.message);
148 // Continue with next tenant
152 console.log('[Pax8 Sync] All tenants synced successfully');
154 console.error('[Pax8 Sync] Error in sync cycle:', error);
159 * Calculate milliseconds until next scheduled sync time (3am)
161function msUntilNextSync() {
162 const now = new Date();
163 const nextSync = new Date();
164 nextSync.setHours(SYNC_HOUR, 0, 0, 0);
166 // If 3am has already passed today, schedule for tomorrow
167 if (now.getHours() >= SYNC_HOUR) {
168 nextSync.setDate(nextSync.getDate() + 1);
171 return nextSync.getTime() - now.getTime();
175 * Start the sync worker
177function startSyncWorker() {
178 console.log(`[Pax8 Sync] Worker started - syncing daily at ${SYNC_HOUR}am`);
180 // Run immediately on startup
181 syncAllTenants().catch(err => {
182 console.error('[Pax8 Sync] Initial sync failed:', err);
185 // Schedule next sync at 3am
186 const msUntilSync = msUntilNextSync();
187 console.log(`[Pax8 Sync] Next sync in ${Math.round(msUntilSync / 1000 / 60 / 60)} hours`);
191 // Then repeat every 24 hours
192 setInterval(syncAllTenants, SYNC_INTERVAL);
196// Subscribe to manual sync requests via Redis
197redis.subscribe('pax8:sync:request', (err) => {
199 console.error('[Pax8 Sync] Failed to subscribe:', err);
201 console.log('[Pax8 Sync] Subscribed to pax8:sync:request for manual triggers');
205redis.on('message', async (channel, message) => {
206 if (channel !== 'pax8:sync:request') return;
210 jobData = JSON.parse(message);
212 console.error('[Pax8 Sync] Invalid job data:', message);
216 const { tenantId, vendor = 'Microsoft', force = false, jobId } = jobData;
218 console.log(`[Pax8 Sync] Manual sync triggered - Job ${jobId} for tenant ${tenantId}`);
221 const stats = await syncTenantProducts(tenantId, vendor, force);
223 // Publish success result
224 await redis.publish('pax8:sync:result', JSON.stringify({
229 message: `Synced ${stats.synced + stats.updated} products (${stats.synced} new, ${stats.updated} updated, ${stats.skipped} skipped)`
233 console.error(`[Pax8 Sync] Job ${jobId} failed:`, err.message);
235 // Publish error result
236 await redis.publish('pax8:sync:result', JSON.stringify({
241 message: 'Product sync failed'
246// Handle graceful shutdown
247process.on('SIGTERM', () => {
248 console.log('[Pax8 Sync] SIGTERM received, shutting down gracefully...');
253process.on('SIGINT', () => {
254 console.log('[Pax8 Sync] SIGINT received, shutting down gracefully...');
259// Start the worker if this file is run directly
260if (require.main === module) {
264module.exports = { startSyncWorker, syncAllTenants, syncTenantProducts };