Why Node.js + Python for ML Serving?
Node.js: Excellent for I/O-bound tasks, real-time APIs, and handling concurrent requests
Python: Unmatched for ML/Data Science workloads
Together: Get the best of both worlds - Python's ML ecosystem with Node.js's performance
Approach 1: HTTP API (Most Common)
Python Flask/FastAPI Service
# app.py - FastAPI Service from fastapi import FastAPI, HTTPException from pydantic import BaseModel import pickle import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier import logging app = FastAPI(title="ML Model API") # Load your trained model try: with open('model.pkl', 'rb') as f: model = pickle.load(f) logging.info("Model loaded successfully") except Exception as e: logging.error(f"Error loading model: {e}") model = None class PredictionRequest(BaseModel): features: list model_name: str = "default" class PredictionResponse(BaseModel): prediction: list confidence: list model_version: str @app.get("/health") async def health_check(): return {"status": "healthy", "model_loaded": model is not None} @app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): if model is None: raise HTTPException(status_code=503, detail="Model not loaded") try: # Preprocess input features_array = np.array(request.features).reshape(1, -1) # Make prediction prediction = model.predict(features_array) probabilities = model.predict_proba(features_array) confidence = np.max(probabilities, axis=1).tolist() return PredictionResponse( prediction=prediction.tolist(), confidence=confidence, model_version="1.0.0" ) except Exception as e: raise HTTPException(status_code=400, detail=f"Prediction error: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Node.js API Gateway
// server.js - Node.js Gateway const express = require('express'); const axios = require('axios'); const cors = require('cors'); const rateLimit = require('express-rate-limit'); const app = express(); const PORT = process.env.PORT || 3000; // Middleware app.use(cors()); app.use(express.json()); // Rate limiting const limiter = rateLimit({ windowMs: 15 * 60 * 1000, // 15 minutes max: 100 // limit each IP to 100 requests per windowMs }); app.use(limiter); // Configuration const PYTHON_ML_SERVICE = process.env.ML_SERVICE_URL || 'http://localhost:8000'; // Health check endpoint app.get('/health', async (req, res) => { try { const response = await axios.get(`${PYTHON_ML_SERVICE}/health`); res.json({ status: 'healthy', ml_service: response.data, timestamp: new Date().toISOString() }); } catch (error) { res.status(503).json({ status: 'unhealthy', error: 'ML service unavailable', timestamp: new Date().toISOString() }); } }); // Prediction endpoint app.post('/api/predict', async (req, res) => { try { const { features, model_name } = req.body; // Input validation if (!features || !Array.isArray(features)) { return res.status(400).json({ error: 'Invalid input: features must be an array' }); } // Call Python ML service const response = await axios.post(`${PYTHON_ML_SERVICE}/predict`, { features, model_name }, { timeout: 10000 // 10 second timeout }); // Add Node.js gateway metadata const result = { ...response.data, gateway_version: '1.0.0', processed_at: new Date().toISOString() }; res.json(result); } catch (error) { console.error('Prediction error:', error.message); if (error.code === 'ECONNREFUSED') { return res.status(503).json({ error: 'ML service unavailable', message: 'Please try again later' }); } if (error.response) { // Python service returned an error return res.status(error.response.status).json(error.response.data); } res.status(500).json({ error: 'Internal server error', message: 'Prediction failed' }); } }); // Batch prediction endpoint app.post('/api/predict/batch', async (req, res) => { try { const { instances, model_name } = req.body; if (!instances || !Array.isArray(instances)) { return res.status(400).json({ error: 'Invalid input: instances must be an array of feature arrays' }); } // Process batch sequentially (for large batches, consider queuing) const predictions = []; for (const features of instances) { try { const response = await axios.post(`${PYTHON_ML_SERVICE}/predict`, { features, model_name }); predictions.push(response.data); } catch (error) { predictions.push({ error: `Prediction failed: ${error.message}`, features }); } } res.json({ predictions, total: instances.length, successful: predictions.filter(p => !p.error).length }); } catch (error) { res.status(500).json({ error: 'Batch processing failed', message: error.message }); } }); app.listen(PORT, () => { console.log(`Node.js API Gateway running on port ${PORT}`); console.log(`ML Service: ${PYTHON_ML_SERVICE}`); });
Approach 2: Child Process Spawning
For simpler models or when you want to avoid separate services:
// child-process-handler.js const { spawn } = require('child_process'); const path = require('path'); class PythonModelHandler { constructor(modelPath) { this.modelPath = modelPath; } async predict(features) { return new Promise((resolve, reject) => { const pythonProcess = spawn('python', [ path.join(__dirname, 'python_predict.py'), JSON.stringify(features) ]); let result = ''; let error = ''; pythonProcess.stdout.on('data', (data) => { result += data.toString(); }); pythonProcess.stderr.on('data', (data) => { error += data.toString(); }); pythonProcess.on('close', (code) => { if (code !== 0) { reject(new Error(`Python process exited with code ${code}: ${error}`)); return; } try { const prediction = JSON.parse(result); resolve(prediction); } catch (parseError) { reject(new Error(`Failed to parse prediction: ${parseError.message}`)); } }); }); } } module.exports = PythonModelHandler;
# python_predict.py import sys import json import pickle import numpy as np # Load model (would be cached in production) def load_model(): with open('model.pkl', 'rb') as f: return pickle.load(f) if __name__ == "__main__": try: # Parse input features features = json.loads(sys.argv[1]) model = load_model() # Make prediction features_array = np.array(features).reshape(1, -1) prediction = model.predict(features_array).tolist() probability = model.predict_proba(features_array).max().item() # Return result as JSON result = { 'prediction': prediction, 'confidence': probability, 'status': 'success' } print(json.dumps(result)) except Exception as e: error_result = { 'error': str(e), 'status': 'error' } print(json.dumps(error_result)) sys.exit(1)
Approach 3: Message Queue (RabbitMQ/Redis)
For high-throughput, asynchronous processing:
// queue-producer.js const amqp = require('amqplib'); class PredictionQueue { constructor() { this.connection = null; this.channel = null; } async connect() { this.connection = await amqplib.connect(process.env.RABBITMQ_URL || 'amqp://localhost'); this.channel = await this.connection.createChannel(); await this.channel.assertQueue('predictions', { durable: true }); } async submitPredictionRequest(features, correlationId) { const message = { features, correlationId, timestamp: new Date().toISOString() }; this.channel.sendToQueue( 'predictions', Buffer.from(JSON.stringify(message)), { persistent: true } ); } } module.exports = PredictionQueue;
# queue_consumer.py import pika import json import pickle import numpy as np class PredictionConsumer: def __init__(self, model_path): self.model = pickle.load(open(model_path, 'rb')) self.connection = None self.channel = None def connect(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) self.channel = self.connection.channel() self.channel.queue_declare(queue='predictions', durable=True) self.channel.queue_declare(queue='results', durable=True) def process_prediction(self, ch, method, properties, body): try: message = json.loads(body) features = np.array(message['features']).reshape(1, -1) # Make prediction prediction = self.model.predict(features).tolist() confidence = self.model.predict_proba(features).max().item() result = { 'correlationId': message['correlationId'], 'prediction': prediction, 'confidence': confidence, 'status': 'completed' } # Send result back self.channel.basic_publish( exchange='', routing_key='results', body=json.dumps(result), properties=pika.BasicProperties(delivery_mode=2) ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"Error processing prediction: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) def start_consuming(self): self.channel.basic_consume( queue='predictions', on_message_callback=self.process_prediction ) self.channel.start_consuming() if __name__ == "__main__": consumer = PredictionConsumer('model.pkl') consumer.connect() consumer.start_consuming()
Production-Grade Setup
Docker Configuration
# Dockerfile for Node.js API FROM node:18-alpine WORKDIR /app COPY package*.json ./ RUN npm ci --only=production COPY . . EXPOSE 3000 USER node CMD ["node", "server.js"]
# Dockerfile for Python ML Service FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml version: '3.8' services: node-api: build: ./node-api ports: - "3000:3000" environment: - ML_SERVICE_URL=http://python-ml-service:8000 - NODE_ENV=production depends_on: - python-ml-service python-ml-service: build: ./python-ml-service ports: - "8000:8000" environment: - MODEL_PATH=/app/models/model.pkl redis: image: redis:alpine ports: - "6379:6379" # Optional: for monitoring prometheus: image: prom/prometheus ports: - "9090:9090"
Monitoring and Logging
// monitoring.js const client = require('prom-client'); // Collect default metrics const collectDefaultMetrics = client.collectDefaultMetrics; collectDefaultMetrics({ timeout: 5000 }); // Custom metrics const predictionCounter = new client.Counter({ name: 'model_predictions_total', help: 'Total number of predictions', labelNames: ['model', 'status'] }); const predictionDuration = new client.Histogram({ name: 'prediction_duration_seconds', help: 'Duration of prediction requests', labelNames: ['model'] }); // Express middleware for monitoring function monitoringMiddleware(req, res, next) { const start = Date.now(); res.on('finish', () => { const duration = (Date.now() - start) / 1000; predictionDuration.observe({ model: 'default' }, duration); if (req.path === '/api/predict') { predictionCounter.inc({ model: 'default', status: res.statusCode === 200 ? 'success' : 'error' }); } }); next(); } // Metrics endpoint app.get('/metrics', async (req, res) => { res.set('Content-Type', client.register.contentType); res.end(await client.register.metrics()); }); module.exports = { monitoringMiddleware };
Best Practices
Error Handling: Always handle Python process failures gracefully
Input Validation: Validate features before sending to Python
Timeouts: Set appropriate timeouts for model predictions
Caching: Cache frequent predictions
Circuit Breaker: Implement circuit breaker pattern for ML service calls
Versioning: Version your models and APIs
Monitoring: Track prediction latency, success rates, and model performance
Performance Considerations
Keep Python processes warm to avoid model reloading
Use connection pooling for HTTP approach
Implement request batching for high-throughput scenarios
Consider model quantization for faster inference
Use GPU acceleration when available
No comments:
Post a Comment