Saturday, November 1, 2025

From Model to Production: Serving Your Python ML Model with a Node.js API

 

Why Node.js + Python for ML Serving?

  • Node.js: Excellent for I/O-bound tasks, real-time APIs, and handling concurrent requests

  • Python: Unmatched for ML/Data Science workloads

  • Together: Get the best of both worlds - Python's ML ecosystem with Node.js's performance

Approach 1: HTTP API (Most Common)

Python Flask/FastAPI Service

python
# app.py - FastAPI Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import logging

app = FastAPI(title="ML Model API")

# Load your trained model
try:
    with open('model.pkl', 'rb') as f:
        model = pickle.load(f)
    logging.info("Model loaded successfully")
except Exception as e:
    logging.error(f"Error loading model: {e}")
    model = None

class PredictionRequest(BaseModel):
    features: list
    model_name: str = "default"

class PredictionResponse(BaseModel):
    prediction: list
    confidence: list
    model_version: str

@app.get("/health")
async def health_check():
    return {"status": "healthy", "model_loaded": model is not None}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    try:
        # Preprocess input
        features_array = np.array(request.features).reshape(1, -1)
        
        # Make prediction
        prediction = model.predict(features_array)
        probabilities = model.predict_proba(features_array)
        
        confidence = np.max(probabilities, axis=1).tolist()
        
        return PredictionResponse(
            prediction=prediction.tolist(),
            confidence=confidence,
            model_version="1.0.0"
        )
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Prediction error: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Node.js API Gateway

javascript
// server.js - Node.js Gateway
const express = require('express');
const axios = require('axios');
const cors = require('cors');
const rateLimit = require('express-rate-limit');

const app = express();
const PORT = process.env.PORT || 3000;

// Middleware
app.use(cors());
app.use(express.json());

// Rate limiting
const limiter = rateLimit({
    windowMs: 15 * 60 * 1000, // 15 minutes
    max: 100 // limit each IP to 100 requests per windowMs
});
app.use(limiter);

// Configuration
const PYTHON_ML_SERVICE = process.env.ML_SERVICE_URL || 'http://localhost:8000';

// Health check endpoint
app.get('/health', async (req, res) => {
    try {
        const response = await axios.get(`${PYTHON_ML_SERVICE}/health`);
        res.json({
            status: 'healthy',
            ml_service: response.data,
            timestamp: new Date().toISOString()
        });
    } catch (error) {
        res.status(503).json({
            status: 'unhealthy',
            error: 'ML service unavailable',
            timestamp: new Date().toISOString()
        });
    }
});

// Prediction endpoint
app.post('/api/predict', async (req, res) => {
    try {
        const { features, model_name } = req.body;

        // Input validation
        if (!features || !Array.isArray(features)) {
            return res.status(400).json({
                error: 'Invalid input: features must be an array'
            });
        }

        // Call Python ML service
        const response = await axios.post(`${PYTHON_ML_SERVICE}/predict`, {
            features,
            model_name
        }, {
            timeout: 10000 // 10 second timeout
        });

        // Add Node.js gateway metadata
        const result = {
            ...response.data,
            gateway_version: '1.0.0',
            processed_at: new Date().toISOString()
        };

        res.json(result);
    } catch (error) {
        console.error('Prediction error:', error.message);

        if (error.code === 'ECONNREFUSED') {
            return res.status(503).json({
                error: 'ML service unavailable',
                message: 'Please try again later'
            });
        }

        if (error.response) {
            // Python service returned an error
            return res.status(error.response.status).json(error.response.data);
        }

        res.status(500).json({
            error: 'Internal server error',
            message: 'Prediction failed'
        });
    }
});

// Batch prediction endpoint
app.post('/api/predict/batch', async (req, res) => {
    try {
        const { instances, model_name } = req.body;

        if (!instances || !Array.isArray(instances)) {
            return res.status(400).json({
                error: 'Invalid input: instances must be an array of feature arrays'
            });
        }

        // Process batch sequentially (for large batches, consider queuing)
        const predictions = [];
        for (const features of instances) {
            try {
                const response = await axios.post(`${PYTHON_ML_SERVICE}/predict`, {
                    features,
                    model_name
                });
                predictions.push(response.data);
            } catch (error) {
                predictions.push({
                    error: `Prediction failed: ${error.message}`,
                    features
                });
            }
        }

        res.json({
            predictions,
            total: instances.length,
            successful: predictions.filter(p => !p.error).length
        });
    } catch (error) {
        res.status(500).json({
            error: 'Batch processing failed',
            message: error.message
        });
    }
});

app.listen(PORT, () => {
    console.log(`Node.js API Gateway running on port ${PORT}`);
    console.log(`ML Service: ${PYTHON_ML_SERVICE}`);
});

Approach 2: Child Process Spawning

For simpler models or when you want to avoid separate services:

javascript
// child-process-handler.js
const { spawn } = require('child_process');
const path = require('path');

class PythonModelHandler {
    constructor(modelPath) {
        this.modelPath = modelPath;
    }

    async predict(features) {
        return new Promise((resolve, reject) => {
            const pythonProcess = spawn('python', [
                path.join(__dirname, 'python_predict.py'),
                JSON.stringify(features)
            ]);

            let result = '';
            let error = '';

            pythonProcess.stdout.on('data', (data) => {
                result += data.toString();
            });

            pythonProcess.stderr.on('data', (data) => {
                error += data.toString();
            });

            pythonProcess.on('close', (code) => {
                if (code !== 0) {
                    reject(new Error(`Python process exited with code ${code}: ${error}`));
                    return;
                }

                try {
                    const prediction = JSON.parse(result);
                    resolve(prediction);
                } catch (parseError) {
                    reject(new Error(`Failed to parse prediction: ${parseError.message}`));
                }
            });
        });
    }
}

module.exports = PythonModelHandler;
python
# python_predict.py
import sys
import json
import pickle
import numpy as np

# Load model (would be cached in production)
def load_model():
    with open('model.pkl', 'rb') as f:
        return pickle.load(f)

if __name__ == "__main__":
    try:
        # Parse input features
        features = json.loads(sys.argv[1])
        model = load_model()
        
        # Make prediction
        features_array = np.array(features).reshape(1, -1)
        prediction = model.predict(features_array).tolist()
        probability = model.predict_proba(features_array).max().item()
        
        # Return result as JSON
        result = {
            'prediction': prediction,
            'confidence': probability,
            'status': 'success'
        }
        print(json.dumps(result))
        
    except Exception as e:
        error_result = {
            'error': str(e),
            'status': 'error'
        }
        print(json.dumps(error_result))
        sys.exit(1)

Approach 3: Message Queue (RabbitMQ/Redis)

For high-throughput, asynchronous processing:

javascript
// queue-producer.js
const amqp = require('amqplib');

class PredictionQueue {
    constructor() {
        this.connection = null;
        this.channel = null;
    }

    async connect() {
        this.connection = await amqplib.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
        this.channel = await this.connection.createChannel();
        await this.channel.assertQueue('predictions', { durable: true });
    }

    async submitPredictionRequest(features, correlationId) {
        const message = {
            features,
            correlationId,
            timestamp: new Date().toISOString()
        };

        this.channel.sendToQueue(
            'predictions',
            Buffer.from(JSON.stringify(message)),
            { persistent: true }
        );
    }
}

module.exports = PredictionQueue;
python
# queue_consumer.py
import pika
import json
import pickle
import numpy as np

class PredictionConsumer:
    def __init__(self, model_path):
        self.model = pickle.load(open(model_path, 'rb'))
        self.connection = None
        self.channel = None
    
    def connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost')
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='predictions', durable=True)
        self.channel.queue_declare(queue='results', durable=True)
    
    def process_prediction(self, ch, method, properties, body):
        try:
            message = json.loads(body)
            features = np.array(message['features']).reshape(1, -1)
            
            # Make prediction
            prediction = self.model.predict(features).tolist()
            confidence = self.model.predict_proba(features).max().item()
            
            result = {
                'correlationId': message['correlationId'],
                'prediction': prediction,
                'confidence': confidence,
                'status': 'completed'
            }
            
            # Send result back
            self.channel.basic_publish(
                exchange='',
                routing_key='results',
                body=json.dumps(result),
                properties=pika.BasicProperties(delivery_mode=2)
            )
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            print(f"Error processing prediction: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    
    def start_consuming(self):
        self.channel.basic_consume(
            queue='predictions',
            on_message_callback=self.process_prediction
        )
        self.channel.start_consuming()

if __name__ == "__main__":
    consumer = PredictionConsumer('model.pkl')
    consumer.connect()
    consumer.start_consuming()

Production-Grade Setup

Docker Configuration

dockerfile
# Dockerfile for Node.js API
FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY . .

EXPOSE 3000

USER node

CMD ["node", "server.js"]
dockerfile
# Dockerfile for Python ML Service
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
# docker-compose.yml
version: '3.8'
services:
  node-api:
    build: ./node-api
    ports:
      - "3000:3000"
    environment:
      - ML_SERVICE_URL=http://python-ml-service:8000
      - NODE_ENV=production
    depends_on:
      - python-ml-service

  python-ml-service:
    build: ./python-ml-service
    ports:
      - "8000:8000"
    environment:
      - MODEL_PATH=/app/models/model.pkl

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  # Optional: for monitoring
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"

Monitoring and Logging

javascript
// monitoring.js
const client = require('prom-client');

// Collect default metrics
const collectDefaultMetrics = client.collectDefaultMetrics;
collectDefaultMetrics({ timeout: 5000 });

// Custom metrics
const predictionCounter = new client.Counter({
    name: 'model_predictions_total',
    help: 'Total number of predictions',
    labelNames: ['model', 'status']
});

const predictionDuration = new client.Histogram({
    name: 'prediction_duration_seconds',
    help: 'Duration of prediction requests',
    labelNames: ['model']
});

// Express middleware for monitoring
function monitoringMiddleware(req, res, next) {
    const start = Date.now();
    
    res.on('finish', () => {
        const duration = (Date.now() - start) / 1000;
        predictionDuration.observe({ model: 'default' }, duration);
        
        if (req.path === '/api/predict') {
            predictionCounter.inc({
                model: 'default',
                status: res.statusCode === 200 ? 'success' : 'error'
            });
        }
    });
    
    next();
}

// Metrics endpoint
app.get('/metrics', async (req, res) => {
    res.set('Content-Type', client.register.contentType);
    res.end(await client.register.metrics());
});

module.exports = { monitoringMiddleware };

Best Practices

  1. Error Handling: Always handle Python process failures gracefully

  2. Input Validation: Validate features before sending to Python

  3. Timeouts: Set appropriate timeouts for model predictions

  4. Caching: Cache frequent predictions

  5. Circuit Breaker: Implement circuit breaker pattern for ML service calls

  6. Versioning: Version your models and APIs

  7. Monitoring: Track prediction latency, success rates, and model performance

Performance Considerations

  • Keep Python processes warm to avoid model reloading

  • Use connection pooling for HTTP approach

  • Implement request batching for high-throughput scenarios

  • Consider model quantization for faster inference

  • Use GPU acceleration when available

No comments:

Post a Comment