feat(worker): implement AI vision services and complete image processing pipeline

- Add real OpenAI GPT-4 Vision integration with rate limiting
- Add real Google Cloud Vision API integration
- Create vision service orchestrator with fallback strategy
- Implement complete image processing pipeline with BullMQ
- Add batch processing with progress tracking
- Create virus scanning processor with ClamAV integration
- Add SEO filename generation with multiple strategies
- Include comprehensive error handling and retry logic
- Add production-ready configuration and validation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
DustyWalker 2025-08-05 18:23:18 +02:00
parent d53cbb6757
commit 1329e874a4
17 changed files with 3352 additions and 0 deletions

View file

@ -0,0 +1,9 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"deleteOutDir": true,
"tsConfigPath": "tsconfig.json"
}
}

View file

@ -0,0 +1,101 @@
{
"name": "@seo-image-renamer/worker",
"version": "1.0.0",
"description": "Worker service for AI-powered image processing and SEO filename generation",
"main": "dist/main.js",
"scripts": {
"build": "nest build",
"format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"",
"start": "nest start",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:prod": "node dist/main",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix",
"test": "jest",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/bullmq": "^10.0.1",
"@nestjs/terminus": "^10.2.0",
"@nestjs/throttler": "^5.0.1",
"@prisma/client": "^5.6.0",
"bullmq": "^4.15.0",
"redis": "^4.6.10",
"ioredis": "^5.3.2",
"sharp": "^0.32.6",
"exifr": "^7.1.3",
"piexifjs": "^1.0.6",
"archiver": "^6.0.1",
"minio": "^7.1.3",
"aws-sdk": "^2.1489.0",
"openai": "^4.20.1",
"@google-cloud/vision": "^4.0.2",
"node-clamav": "^0.8.5",
"axios": "^1.6.0",
"class-validator": "^0.14.0",
"class-transformer": "^0.5.1",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1",
"uuid": "^9.0.1",
"lodash": "^4.17.21",
"mime-types": "^2.1.35",
"file-type": "^18.7.0",
"sanitize-filename": "^1.6.3",
"winston": "^3.11.0",
"winston-daily-rotate-file": "^4.7.1",
"@nestjs/websockets": "^10.2.7",
"@nestjs/platform-socket.io": "^10.2.7",
"socket.io": "^4.7.4",
"prom-client": "^15.0.0"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.2",
"@types/node": "^20.3.1",
"@types/uuid": "^9.0.7",
"@types/lodash": "^4.14.202",
"@types/mime-types": "^2.1.4",
"@types/archiver": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^6.0.0",
"@typescript-eslint/parser": "^6.0.0",
"eslint": "^8.42.0",
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-prettier": "^5.0.0",
"jest": "^29.5.0",
"prettier": "^3.0.0",
"source-map-support": "^0.5.21",
"supertest": "^6.3.3",
"ts-jest": "^29.1.0",
"ts-loader": "^9.4.3",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.1",
"typescript": "^5.1.3"
},
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*\\.spec\\.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
}

View file

@ -0,0 +1,103 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { TerminusModule } from '@nestjs/terminus';
import { ThrottlerModule } from '@nestjs/throttler';
// Import custom modules
import { VisionModule } from './vision/vision.module';
import { ProcessorsModule } from './processors/processors.module';
import { StorageModule } from './storage/storage.module';
import { QueueModule } from './queue/queue.module';
import { MonitoringModule } from './monitoring/monitoring.module';
import { HealthModule } from './health/health.module';
// Import configuration
import { validationSchema } from './config/validation.schema';
import { workerConfig } from './config/worker.config';
@Module({
imports: [
// Configuration module with environment validation
ConfigModule.forRoot({
isGlobal: true,
load: [workerConfig],
validationSchema,
validationOptions: {
abortEarly: true,
},
}),
// Rate limiting
ThrottlerModule.forRoot([{
ttl: 60000, // 1 minute
limit: 100, // 100 requests per minute
}]),
// BullMQ Redis connection
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get<string>('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB', 0),
retryDelayOnFailover: 100,
enableReadyCheck: false,
maxRetriesPerRequest: 3,
},
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
}),
inject: [ConfigService],
}),
// Register queues
BullModule.registerQueue(
{ name: 'image-processing' },
{ name: 'batch-processing' },
{ name: 'virus-scan' },
{ name: 'file-cleanup' },
),
// Health checks
TerminusModule,
// Core service modules
VisionModule,
ProcessorsModule,
StorageModule,
QueueModule,
MonitoringModule,
HealthModule,
],
controllers: [],
providers: [],
})
export class AppModule {
constructor(private configService: ConfigService) {
this.logConfiguration();
}
private logConfiguration() {
const logger = require('@nestjs/common').Logger;
const log = new logger('AppModule');
log.log('🔧 Worker Configuration:');
log.log(`• Environment: ${this.configService.get('NODE_ENV')}`);
log.log(`• Worker Port: ${this.configService.get('WORKER_PORT')}`);
log.log(`• Redis Host: ${this.configService.get('REDIS_HOST')}`);
log.log(`• Max Concurrent Jobs: ${this.configService.get('MAX_CONCURRENT_JOBS')}`);
log.log(`• OpenAI API Key: ${this.configService.get('OPENAI_API_KEY') ? '✓ Set' : '✗ Missing'}`);
log.log(`• Google Vision Key: ${this.configService.get('GOOGLE_CLOUD_VISION_KEY') ? '✓ Set' : '✗ Missing'}`);
log.log(`• MinIO Config: ${this.configService.get('MINIO_ENDPOINT') ? '✓ Set' : '✗ Missing'}`);
}
}

View file

@ -0,0 +1,102 @@
import * as Joi from 'joi';
export const validationSchema = Joi.object({
// Application settings
NODE_ENV: Joi.string().valid('development', 'production', 'test').default('development'),
WORKER_PORT: Joi.number().port().default(3002),
// Redis configuration
REDIS_HOST: Joi.string().default('localhost'),
REDIS_PORT: Joi.number().port().default(6379),
REDIS_PASSWORD: Joi.string().optional(),
REDIS_DB: Joi.number().integer().min(0).max(15).default(0),
REDIS_URL: Joi.string().uri().default('redis://localhost:6379'),
// Processing configuration
MAX_CONCURRENT_JOBS: Joi.number().integer().min(1).max(50).default(5),
JOB_TIMEOUT: Joi.number().integer().min(30000).max(3600000).default(300000),
RETRY_ATTEMPTS: Joi.number().integer().min(1).max(10).default(3),
RETRY_DELAY: Joi.number().integer().min(1000).max(60000).default(2000),
// AI Vision APIs (at least one is required)
OPENAI_API_KEY: Joi.string().when('GOOGLE_CLOUD_VISION_KEY', {
is: Joi.exist(),
then: Joi.optional(),
otherwise: Joi.required(),
}),
OPENAI_MODEL: Joi.string().default('gpt-4-vision-preview'),
OPENAI_MAX_TOKENS: Joi.number().integer().min(100).max(4000).default(500),
OPENAI_TEMPERATURE: Joi.number().min(0).max(2).default(0.1),
OPENAI_REQUESTS_PER_MINUTE: Joi.number().integer().min(1).max(1000).default(50),
OPENAI_TOKENS_PER_MINUTE: Joi.number().integer().min(1000).max(100000).default(10000),
GOOGLE_CLOUD_VISION_KEY: Joi.string().when('OPENAI_API_KEY', {
is: Joi.exist(),
then: Joi.optional(),
otherwise: Joi.required(),
}),
GOOGLE_CLOUD_PROJECT_ID: Joi.string().optional(),
GOOGLE_CLOUD_LOCATION: Joi.string().default('global'),
GOOGLE_REQUESTS_PER_MINUTE: Joi.number().integer().min(1).max(1000).default(100),
VISION_CONFIDENCE_THRESHOLD: Joi.number().min(0).max(1).default(0.40),
// Storage configuration (MinIO or AWS S3)
MINIO_ENDPOINT: Joi.string().when('AWS_BUCKET_NAME', {
is: Joi.exist(),
then: Joi.optional(),
otherwise: Joi.required(),
}),
MINIO_PORT: Joi.number().port().default(9000),
MINIO_USE_SSL: Joi.boolean().default(false),
MINIO_ACCESS_KEY: Joi.string().when('MINIO_ENDPOINT', {
is: Joi.exist(),
then: Joi.required(),
otherwise: Joi.optional(),
}),
MINIO_SECRET_KEY: Joi.string().when('MINIO_ENDPOINT', {
is: Joi.exist(),
then: Joi.required(),
otherwise: Joi.optional(),
}),
MINIO_BUCKET_NAME: Joi.string().default('seo-images'),
AWS_REGION: Joi.string().default('us-east-1'),
AWS_ACCESS_KEY_ID: Joi.string().when('AWS_BUCKET_NAME', {
is: Joi.exist(),
then: Joi.required(),
otherwise: Joi.optional(),
}),
AWS_SECRET_ACCESS_KEY: Joi.string().when('AWS_BUCKET_NAME', {
is: Joi.exist(),
then: Joi.required(),
otherwise: Joi.optional(),
}),
AWS_BUCKET_NAME: Joi.string().optional(),
// Database
DATABASE_URL: Joi.string().uri().required(),
DB_MAX_CONNECTIONS: Joi.number().integer().min(1).max(100).default(10),
// File processing
MAX_FILE_SIZE: Joi.number().integer().min(1024).max(100 * 1024 * 1024).default(50 * 1024 * 1024), // Max 100MB
ALLOWED_FILE_TYPES: Joi.string().default('jpg,jpeg,png,gif,webp'),
TEMP_DIR: Joi.string().default('/tmp/seo-worker'),
TEMP_FILE_CLEANUP_INTERVAL: Joi.number().integer().min(60000).max(86400000).default(3600000), // 1 minute to 24 hours
// Virus scanning (optional)
VIRUS_SCAN_ENABLED: Joi.boolean().default(false),
CLAMAV_HOST: Joi.string().default('localhost'),
CLAMAV_PORT: Joi.number().port().default(3310),
CLAMAV_TIMEOUT: Joi.number().integer().min(5000).max(120000).default(30000),
// Monitoring
METRICS_ENABLED: Joi.boolean().default(true),
METRICS_PORT: Joi.number().port().default(9090),
HEALTH_CHECK_PORT: Joi.number().port().default(8080),
// Logging
LOG_LEVEL: Joi.string().valid('error', 'warn', 'info', 'debug', 'verbose').default('info'),
FILE_LOGGING_ENABLED: Joi.boolean().default(false),
LOG_DIR: Joi.string().default('./logs'),
});

View file

@ -0,0 +1,105 @@
import { registerAs } from '@nestjs/config';
export const workerConfig = registerAs('worker', () => ({
// Application settings
port: parseInt(process.env.WORKER_PORT, 10) || 3002,
environment: process.env.NODE_ENV || 'development',
// Redis/Queue configuration
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT, 10) || 6379,
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB, 10) || 0,
url: process.env.REDIS_URL || 'redis://localhost:6379',
},
// Processing limits
processing: {
maxConcurrentJobs: parseInt(process.env.MAX_CONCURRENT_JOBS, 10) || 5,
jobTimeout: parseInt(process.env.JOB_TIMEOUT, 10) || 300000, // 5 minutes
retryAttempts: parseInt(process.env.RETRY_ATTEMPTS, 10) || 3,
retryDelay: parseInt(process.env.RETRY_DELAY, 10) || 2000, // 2 seconds
},
// AI Vision APIs
ai: {
openai: {
apiKey: process.env.OPENAI_API_KEY,
model: process.env.OPENAI_MODEL || 'gpt-4-vision-preview',
maxTokens: parseInt(process.env.OPENAI_MAX_TOKENS, 10) || 500,
temperature: parseFloat(process.env.OPENAI_TEMPERATURE) || 0.1,
},
google: {
apiKey: process.env.GOOGLE_CLOUD_VISION_KEY,
projectId: process.env.GOOGLE_CLOUD_PROJECT_ID,
location: process.env.GOOGLE_CLOUD_LOCATION || 'global',
},
confidenceThreshold: parseFloat(process.env.VISION_CONFIDENCE_THRESHOLD) || 0.40,
},
// Storage configuration
storage: {
minio: {
endpoint: process.env.MINIO_ENDPOINT || 'localhost',
port: parseInt(process.env.MINIO_PORT, 10) || 9000,
useSSL: process.env.MINIO_USE_SSL === 'true',
accessKey: process.env.MINIO_ACCESS_KEY || 'minioadmin',
secretKey: process.env.MINIO_SECRET_KEY || 'minioadmin',
bucketName: process.env.MINIO_BUCKET_NAME || 'seo-images',
},
aws: {
region: process.env.AWS_REGION || 'us-east-1',
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
bucketName: process.env.AWS_BUCKET_NAME,
},
},
// Database (shared with API)
database: {
url: process.env.DATABASE_URL,
maxConnections: parseInt(process.env.DB_MAX_CONNECTIONS, 10) || 10,
},
// File processing
files: {
maxFileSize: parseInt(process.env.MAX_FILE_SIZE, 10) || 50 * 1024 * 1024, // 50MB
allowedTypes: (process.env.ALLOWED_FILE_TYPES || 'jpg,jpeg,png,gif,webp').split(','),
tempDir: process.env.TEMP_DIR || '/tmp/seo-worker',
cleanupInterval: parseInt(process.env.TEMP_FILE_CLEANUP_INTERVAL, 10) || 3600000, // 1 hour
},
// Virus scanning
virusScan: {
enabled: process.env.VIRUS_SCAN_ENABLED === 'true',
clamavHost: process.env.CLAMAV_HOST || 'localhost',
clamavPort: parseInt(process.env.CLAMAV_PORT, 10) || 3310,
timeout: parseInt(process.env.CLAMAV_TIMEOUT, 10) || 30000, // 30 seconds
},
// Monitoring
monitoring: {
metricsEnabled: process.env.METRICS_ENABLED !== 'false',
metricsPort: parseInt(process.env.METRICS_PORT, 10) || 9090,
healthCheckPort: parseInt(process.env.HEALTH_CHECK_PORT, 10) || 8080,
},
// Logging
logging: {
level: process.env.LOG_LEVEL || 'info',
fileLogging: process.env.FILE_LOGGING_ENABLED === 'true',
logDir: process.env.LOG_DIR || './logs',
},
// Rate limiting for AI APIs
rateLimiting: {
openai: {
requestsPerMinute: parseInt(process.env.OPENAI_REQUESTS_PER_MINUTE, 10) || 50,
tokensPerMinute: parseInt(process.env.OPENAI_TOKENS_PER_MINUTE, 10) || 10000,
},
google: {
requestsPerMinute: parseInt(process.env.GOOGLE_REQUESTS_PER_MINUTE, 10) || 100,
},
},
}));

View file

@ -0,0 +1,78 @@
import { NestFactory } from '@nestjs/core';
import { Logger, ValidationPipe } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('WorkerMain');
try {
// Create NestJS application
const app = await NestFactory.create(AppModule, {
logger: ['error', 'warn', 'log', 'debug', 'verbose'],
});
// Get configuration service
const configService = app.get(ConfigService);
// Setup global validation pipe
app.useGlobalPipes(new ValidationPipe({
whitelist: true,
forbidNonWhitelisted: true,
transform: true,
disableErrorMessages: false,
}));
// Enable shutdown hooks for graceful shutdown
app.enableShutdownHooks();
// Get port from environment
const port = configService.get<number>('WORKER_PORT', 3002);
const redisUrl = configService.get<string>('REDIS_URL', 'redis://localhost:6379');
const environment = configService.get<string>('NODE_ENV', 'development');
logger.log(`Starting SEO Image Renamer Worker Service...`);
logger.log(`Environment: ${environment}`);
logger.log(`Port: ${port}`);
logger.log(`Redis URL: ${redisUrl}`);
// Start the application
await app.listen(port);
logger.log(`🚀 Worker service is running on port ${port}`);
logger.log(`🔄 Queue processors are active and ready`);
logger.log(`🤖 AI vision services initialized`);
logger.log(`📦 Storage services connected`);
} catch (error) {
logger.error('Failed to start worker service', error.stack);
process.exit(1);
}
}
// Handle uncaught exceptions
process.on('uncaughtException', (error) => {
const logger = new Logger('UncaughtException');
logger.error('Uncaught Exception:', error);
process.exit(1);
});
// Handle unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
const logger = new Logger('UnhandledRejection');
logger.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
// Graceful shutdown
process.on('SIGTERM', () => {
const logger = new Logger('SIGTERM');
logger.log('Received SIGTERM signal. Starting graceful shutdown...');
});
process.on('SIGINT', () => {
const logger = new Logger('SIGINT');
logger.log('Received SIGINT signal. Starting graceful shutdown...');
});
bootstrap();

View file

@ -0,0 +1,470 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Job } from 'bullmq';
import { DatabaseService } from '../database/database.service';
import { ProgressTrackerService } from '../queue/progress-tracker.service';
import { ZipCreatorService } from '../storage/zip-creator.service';
import { StorageService } from '../storage/storage.service';
export interface BatchProcessingJobData {
batchId: string;
userId: string;
imageIds: string[];
keywords?: string[];
processingOptions?: {
createZip?: boolean;
zipName?: string;
notifyUser?: boolean;
};
}
export interface BatchProgress {
percentage: number;
completedImages: number;
totalImages: number;
failedImages: number;
status: string;
currentStep?: string;
estimatedTimeRemaining?: number;
}
@Processor('batch-processing')
export class BatchProcessor extends WorkerHost {
private readonly logger = new Logger(BatchProcessor.name);
constructor(
private configService: ConfigService,
private databaseService: DatabaseService,
private progressTracker: ProgressTrackerService,
private zipCreatorService: ZipCreatorService,
private storageService: StorageService,
) {
super();
}
async process(job: Job<BatchProcessingJobData>): Promise<any> {
const startTime = Date.now();
const { batchId, userId, imageIds, keywords, processingOptions } = job.data;
this.logger.log(`🚀 Starting batch processing: ${batchId} (${imageIds.length} images)`);
try {
// Step 1: Initialize batch processing (5%)
await this.updateBatchProgress(job, {
percentage: 5,
completedImages: 0,
totalImages: imageIds.length,
failedImages: 0,
status: 'initializing',
currentStep: 'Initializing batch processing',
});
// Update batch status in database
await this.databaseService.updateBatchStatus(batchId, 'processing', {
startedAt: new Date(),
totalImages: imageIds.length,
processingJobId: job.id,
});
// Step 2: Wait for all image processing jobs to complete (80%)
await this.updateBatchProgress(job, {
percentage: 10,
completedImages: 0,
totalImages: imageIds.length,
failedImages: 0,
status: 'processing-images',
currentStep: 'Processing individual images',
});
const completionResults = await this.waitForImageCompletion(job, batchId, imageIds);
const { completed, failed } = completionResults;
const successfulImageIds = completed.map(result => result.imageId);
const failedImageIds = failed.map(result => result.imageId);
this.logger.log(`Batch ${batchId}: ${completed.length} successful, ${failed.length} failed`);
// Step 3: Generate batch summary (85%)
await this.updateBatchProgress(job, {
percentage: 85,
completedImages: completed.length,
totalImages: imageIds.length,
failedImages: failed.length,
status: 'generating-summary',
currentStep: 'Generating batch summary',
});
const batchSummary = await this.generateBatchSummary(batchId, completed, failed, keywords);
// Step 4: Create ZIP file if requested (90%)
let zipDownloadUrl: string | null = null;
if (processingOptions?.createZip && successfulImageIds.length > 0) {
await this.updateBatchProgress(job, {
percentage: 90,
completedImages: completed.length,
totalImages: imageIds.length,
failedImages: failed.length,
status: 'creating-zip',
currentStep: 'Creating downloadable ZIP file',
});
zipDownloadUrl = await this.createBatchZip(
batchId,
successfulImageIds,
processingOptions.zipName || `batch-${batchId}-renamed`
);
}
// Step 5: Finalize batch (95%)
await this.updateBatchProgress(job, {
percentage: 95,
completedImages: completed.length,
totalImages: imageIds.length,
failedImages: failed.length,
status: 'finalizing',
currentStep: 'Finalizing batch processing',
});
// Update batch in database with final results
const finalStatus = failed.length === 0 ? 'completed' : 'completed_with_errors';
await this.databaseService.updateBatchStatus(batchId, finalStatus, {
completedAt: new Date(),
completedImages: completed.length,
failedImages: failed.length,
summary: batchSummary,
zipDownloadUrl,
processingTime: Date.now() - startTime,
});
// Step 6: Complete (100%)
await this.updateBatchProgress(job, {
percentage: 100,
completedImages: completed.length,
totalImages: imageIds.length,
failedImages: failed.length,
status: 'completed',
currentStep: 'Batch processing completed',
});
// Send notification if requested
if (processingOptions?.notifyUser) {
await this.sendBatchCompletionNotification(userId, batchId, batchSummary, zipDownloadUrl);
}
const totalProcessingTime = Date.now() - startTime;
this.logger.log(`✅ Batch processing completed: ${batchId} in ${totalProcessingTime}ms`);
return {
batchId,
success: true,
summary: batchSummary,
zipDownloadUrl,
processingTime: totalProcessingTime,
completedImages: completed.length,
failedImages: failed.length,
};
} catch (error) {
const processingTime = Date.now() - startTime;
this.logger.error(`❌ Batch processing failed: ${batchId} - ${error.message}`, error.stack);
// Update batch with error status
await this.databaseService.updateBatchStatus(batchId, 'failed', {
error: error.message,
failedAt: new Date(),
processingTime,
});
// Update progress - Failed
await this.updateBatchProgress(job, {
percentage: 0,
completedImages: 0,
totalImages: imageIds.length,
failedImages: imageIds.length,
status: 'failed',
currentStep: `Batch processing failed: ${error.message}`,
});
throw error;
}
}
/**
* Wait for all image processing jobs to complete
*/
private async waitForImageCompletion(
job: Job<BatchProcessingJobData>,
batchId: string,
imageIds: string[]
): Promise<{ completed: any[]; failed: any[] }> {
const completed: any[] = [];
const failed: any[] = [];
const pollingInterval = 2000; // 2 seconds
const maxWaitTime = 30 * 60 * 1000; // 30 minutes
const startTime = Date.now();
while (completed.length + failed.length < imageIds.length) {
// Check if we've exceeded max wait time
if (Date.now() - startTime > maxWaitTime) {
const remaining = imageIds.length - completed.length - failed.length;
this.logger.warn(`Batch ${batchId} timeout: ${remaining} images still processing`);
// Mark remaining images as failed due to timeout
for (let i = completed.length + failed.length; i < imageIds.length; i++) {
failed.push({
imageId: imageIds[i],
error: 'Processing timeout',
});
}
break;
}
// Get current status from database
const imageStatuses = await this.databaseService.getImageStatuses(imageIds);
// Count completed and failed images
const newCompleted = imageStatuses.filter(img =>
img.status === 'completed' && !completed.some(c => c.imageId === img.id)
);
const newFailed = imageStatuses.filter(img =>
img.status === 'failed' && !failed.some(f => f.imageId === img.id)
);
// Add new completions
completed.push(...newCompleted.map(img => ({
imageId: img.id,
proposedName: img.proposedName,
visionAnalysis: img.visionAnalysis,
})));
// Add new failures
failed.push(...newFailed.map(img => ({
imageId: img.id,
error: img.error || 'Unknown processing error',
})));
// Update progress
const progressPercentage = Math.min(
85, // Max 85% for image processing phase
10 + (completed.length + failed.length) / imageIds.length * 75
);
await this.updateBatchProgress(job, {
percentage: progressPercentage,
completedImages: completed.length,
totalImages: imageIds.length,
failedImages: failed.length,
status: 'processing-images',
currentStep: `Processing images: ${completed.length + failed.length}/${imageIds.length}`,
estimatedTimeRemaining: this.estimateRemainingTime(
startTime,
completed.length + failed.length,
imageIds.length
),
});
// Wait before next polling
if (completed.length + failed.length < imageIds.length) {
await this.sleep(pollingInterval);
}
}
return { completed, failed };
}
/**
* Generate comprehensive batch summary
*/
private async generateBatchSummary(
batchId: string,
completed: any[],
failed: any[],
keywords?: string[]
): Promise<any> {
const totalImages = completed.length + failed.length;
const successRate = (completed.length / totalImages) * 100;
// Analyze vision results
const visionStats = this.analyzeVisionResults(completed);
// Generate keyword analysis
const keywordAnalysis = this.analyzeKeywords(completed, keywords);
return {
batchId,
totalImages,
completedImages: completed.length,
failedImages: failed.length,
successRate: Math.round(successRate * 100) / 100,
visionStats,
keywordAnalysis,
completedAt: new Date(),
failureReasons: failed.map(f => f.error),
};
}
private analyzeVisionResults(completed: any[]): any {
if (completed.length === 0) return null;
const confidences = completed
.map(img => img.visionAnalysis?.confidence)
.filter(conf => conf !== undefined);
const avgConfidence = confidences.length > 0
? confidences.reduce((sum, conf) => sum + conf, 0) / confidences.length
: 0;
const providersUsed = completed
.flatMap(img => img.visionAnalysis?.providersUsed || [])
.reduce((acc, provider) => {
acc[provider] = (acc[provider] || 0) + 1;
return acc;
}, {} as Record<string, number>);
const commonObjects = this.findCommonElements(
completed.flatMap(img => img.visionAnalysis?.objects || [])
);
const commonColors = this.findCommonElements(
completed.flatMap(img => img.visionAnalysis?.colors || [])
);
return {
averageConfidence: Math.round(avgConfidence * 100) / 100,
providersUsed,
commonObjects: commonObjects.slice(0, 10),
commonColors: commonColors.slice(0, 5),
};
}
private analyzeKeywords(completed: any[], userKeywords?: string[]): any {
const generatedKeywords = completed.flatMap(img => img.visionAnalysis?.tags || []);
const keywordFrequency = this.findCommonElements(generatedKeywords);
return {
userKeywords: userKeywords || [],
generatedKeywords: keywordFrequency.slice(0, 20),
totalUniqueKeywords: new Set(generatedKeywords).size,
};
}
private findCommonElements(array: string[]): Array<{ element: string; count: number }> {
const frequency = array.reduce((acc, element) => {
acc[element] = (acc[element] || 0) + 1;
return acc;
}, {} as Record<string, number>);
return Object.entries(frequency)
.map(([element, count]) => ({ element, count }))
.sort((a, b) => b.count - a.count);
}
/**
* Create ZIP file with renamed images
*/
private async createBatchZip(
batchId: string,
imageIds: string[],
zipName: string
): Promise<string> {
try {
const zipPath = await this.zipCreatorService.createBatchZip(
batchId,
imageIds,
zipName
);
// Upload ZIP to storage and get download URL
const zipKey = `downloads/${batchId}/${zipName}.zip`;
await this.storageService.uploadFile(zipPath, zipKey);
const downloadUrl = await this.storageService.generateSignedUrl(zipKey, 24 * 60 * 60); // 24 hours
// Cleanup local ZIP file
await this.zipCreatorService.cleanupZipFile(zipPath);
return downloadUrl;
} catch (error) {
this.logger.error(`Failed to create ZIP for batch ${batchId}:`, error.message);
throw new Error(`ZIP creation failed: ${error.message}`);
}
}
/**
* Send batch completion notification
*/
private async sendBatchCompletionNotification(
userId: string,
batchId: string,
summary: any,
zipDownloadUrl?: string | null
): Promise<void> {
try {
// Broadcast via WebSocket
await this.progressTracker.broadcastBatchComplete(batchId, {
summary,
zipDownloadUrl,
completedAt: new Date(),
});
// TODO: Send email notification if configured
this.logger.log(`Batch completion notification sent for batch ${batchId}`);
} catch (error) {
this.logger.warn(`Failed to send notification for batch ${batchId}:`, error.message);
}
}
private estimateRemainingTime(
startTime: number,
completed: number,
total: number
): number | undefined {
if (completed === 0) return undefined;
const elapsed = Date.now() - startTime;
const avgTimePerImage = elapsed / completed;
const remaining = total - completed;
return Math.round(avgTimePerImage * remaining);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
private async updateBatchProgress(job: Job, progress: BatchProgress): Promise<void> {
try {
await job.updateProgress(progress);
// Broadcast progress to WebSocket clients
await this.progressTracker.broadcastBatchProgress(job.data.batchId, progress);
} catch (error) {
this.logger.warn(`Failed to update batch progress for job ${job.id}:`, error.message);
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`✅ Batch processing job completed: ${job.id}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`❌ Batch processing job failed: ${job.id}`, err.stack);
}
@OnWorkerEvent('progress')
onProgress(job: Job, progress: BatchProgress) {
this.logger.debug(`📊 Batch processing progress: ${job.id} - ${progress.percentage}% (${progress.currentStep})`);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`⚠️ Batch processing job stalled: ${jobId}`);
}
}

View file

@ -0,0 +1,553 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Job } from 'bullmq';
import { VisionService } from '../vision/vision.service';
import { DatabaseService } from '../database/database.service';
import sanitize from 'sanitize-filename';
import * as _ from 'lodash';
export interface FilenameGenerationJobData {
imageId: string;
batchId?: string;
userId: string;
visionAnalysis?: any;
userKeywords?: string[];
originalFilename: string;
options?: {
maxLength?: number;
includeColors?: boolean;
includeDimensions?: boolean;
customPattern?: string;
preserveExtension?: boolean;
};
}
export interface FilenameProgress {
percentage: number;
status: string;
currentStep?: string;
generatedNames?: string[];
selectedName?: string;
}
@Processor('filename-generation')
export class FilenameGeneratorProcessor extends WorkerHost {
private readonly logger = new Logger(FilenameGeneratorProcessor.name);
// Common words to filter out from filenames
private readonly STOP_WORDS = [
'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with',
'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before',
'after', 'above', 'below', 'is', 'are', 'was', 'were', 'be', 'been',
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would',
'could', 'should', 'may', 'might', 'must', 'can', 'image', 'photo',
'picture', 'file', 'jpeg', 'jpg', 'png', 'gif', 'webp'
];
constructor(
private configService: ConfigService,
private visionService: VisionService,
private databaseService: DatabaseService,
) {
super();
}
async process(job: Job<FilenameGenerationJobData>): Promise<any> {
const startTime = Date.now();
const {
imageId,
batchId,
userId,
visionAnalysis,
userKeywords,
originalFilename,
options
} = job.data;
this.logger.log(`📝 Starting filename generation: ${imageId}`);
try {
// Step 1: Initialize (10%)
await this.updateProgress(job, {
percentage: 10,
status: 'initializing',
currentStep: 'Preparing filename generation',
});
// Step 2: Extract and process keywords (30%)
await this.updateProgress(job, {
percentage: 30,
status: 'extracting-keywords',
currentStep: 'Extracting keywords from vision analysis',
});
const processedKeywords = await this.extractAndProcessKeywords(
visionAnalysis,
userKeywords,
options
);
// Step 3: Generate multiple filename variations (60%)
await this.updateProgress(job, {
percentage: 60,
status: 'generating-variations',
currentStep: 'Generating filename variations',
});
const filenameVariations = await this.generateFilenameVariations(
processedKeywords,
originalFilename,
visionAnalysis,
options
);
// Step 4: Select best filename (80%)
await this.updateProgress(job, {
percentage: 80,
status: 'selecting-best',
currentStep: 'Selecting optimal filename',
});
const selectedFilename = await this.selectBestFilename(
filenameVariations,
visionAnalysis,
options
);
// Step 5: Validate and finalize (95%)
await this.updateProgress(job, {
percentage: 95,
status: 'finalizing',
currentStep: 'Validating and finalizing filename',
});
const finalFilename = await this.validateAndSanitizeFilename(
selectedFilename,
originalFilename,
options
);
// Step 6: Update database (100%)
await this.updateProgress(job, {
percentage: 100,
status: 'completed',
currentStep: 'Saving generated filename',
selectedName: finalFilename,
});
// Save the generated filename to database
await this.databaseService.updateImageFilename(imageId, {
proposedName: finalFilename,
variations: filenameVariations,
keywords: processedKeywords,
generatedAt: new Date(),
generationStats: {
processingTime: Date.now() - startTime,
variationsGenerated: filenameVariations.length,
keywordsUsed: processedKeywords.length,
},
});
const totalProcessingTime = Date.now() - startTime;
this.logger.log(`✅ Filename generation completed: ${imageId} -> "${finalFilename}" in ${totalProcessingTime}ms`);
return {
imageId,
success: true,
finalFilename,
variations: filenameVariations,
keywords: processedKeywords,
processingTime: totalProcessingTime,
};
} catch (error) {
const processingTime = Date.now() - startTime;
this.logger.error(`❌ Filename generation failed: ${imageId} - ${error.message}`, error.stack);
// Update progress - Failed
await this.updateProgress(job, {
percentage: 0,
status: 'failed',
currentStep: `Generation failed: ${error.message}`,
});
// Fallback to sanitized original filename
const fallbackName = this.sanitizeFilename(originalFilename);
await this.databaseService.updateImageFilename(imageId, {
proposedName: fallbackName,
error: error.message,
fallback: true,
generatedAt: new Date(),
});
throw error;
}
}
/**
* Extract and process keywords from various sources
*/
private async extractAndProcessKeywords(
visionAnalysis: any,
userKeywords?: string[],
options?: any
): Promise<string[]> {
const keywords: string[] = [];
// 1. Add user keywords with highest priority
if (userKeywords && userKeywords.length > 0) {
keywords.push(...userKeywords.slice(0, 5)); // Limit to 5 user keywords
}
// 2. Add vision analysis objects
if (visionAnalysis?.objects) {
keywords.push(...visionAnalysis.objects.slice(0, 6));
}
// 3. Add high-confidence vision tags
if (visionAnalysis?.tags) {
keywords.push(...visionAnalysis.tags.slice(0, 4));
}
// 4. Add colors if enabled
if (options?.includeColors && visionAnalysis?.colors) {
keywords.push(...visionAnalysis.colors.slice(0, 2));
}
// 5. Extract keywords from scene description
if (visionAnalysis?.scene) {
const sceneKeywords = this.extractKeywordsFromText(visionAnalysis.scene);
keywords.push(...sceneKeywords.slice(0, 3));
}
// Process and clean keywords
return this.processKeywords(keywords);
}
/**
* Process and clean keywords
*/
private processKeywords(keywords: string[]): string[] {
return keywords
.map(keyword => keyword.toLowerCase().trim())
.filter(keyword => keyword.length > 2) // Remove very short words
.filter(keyword => !this.STOP_WORDS.includes(keyword)) // Remove stop words
.filter(keyword => /^[a-z0-9\s-]+$/i.test(keyword)) // Only alphanumeric and basic chars
.map(keyword => keyword.replace(/\s+/g, '-')) // Replace spaces with hyphens
.filter((keyword, index, arr) => arr.indexOf(keyword) === index) // Remove duplicates
.slice(0, 10); // Limit total keywords
}
/**
* Extract keywords from text description
*/
private extractKeywordsFromText(text: string): string[] {
return text
.toLowerCase()
.split(/[^a-z0-9]+/)
.filter(word => word.length > 3)
.filter(word => !this.STOP_WORDS.includes(word))
.slice(0, 5);
}
/**
* Generate multiple filename variations
*/
private async generateFilenameVariations(
keywords: string[],
originalFilename: string,
visionAnalysis: any,
options?: any
): Promise<string[]> {
const variations: string[] = [];
const extension = this.getFileExtension(originalFilename);
if (keywords.length === 0) {
return [this.sanitizeFilename(originalFilename)];
}
// Strategy 1: Main objects + descriptive words
if (keywords.length >= 3) {
const mainKeywords = keywords.slice(0, 4);
variations.push(this.buildFilename(mainKeywords, extension, options));
}
// Strategy 2: Scene-based naming
if (visionAnalysis?.scene && keywords.length >= 2) {
const sceneKeywords = [
...this.extractKeywordsFromText(visionAnalysis.scene).slice(0, 2),
...keywords.slice(0, 3)
];
variations.push(this.buildFilename(sceneKeywords, extension, options));
}
// Strategy 3: Object + color combination
if (options?.includeColors && visionAnalysis?.colors?.length > 0) {
const colorKeywords = [
...keywords.slice(0, 3),
...visionAnalysis.colors.slice(0, 1)
];
variations.push(this.buildFilename(colorKeywords, extension, options));
}
// Strategy 4: Descriptive approach
if (visionAnalysis?.description) {
const descriptiveKeywords = [
...this.extractKeywordsFromText(visionAnalysis.description).slice(0, 2),
...keywords.slice(0, 3)
];
variations.push(this.buildFilename(descriptiveKeywords, extension, options));
}
// Strategy 5: Short and concise
const shortKeywords = keywords.slice(0, 3);
variations.push(this.buildFilename(shortKeywords, extension, options));
// Strategy 6: Long descriptive (if many keywords available)
if (keywords.length >= 5) {
const longKeywords = keywords.slice(0, 6);
variations.push(this.buildFilename(longKeywords, extension, options));
}
// Strategy 7: Custom pattern if provided
if (options?.customPattern) {
const customFilename = this.applyCustomPattern(
options.customPattern,
keywords,
visionAnalysis,
extension
);
if (customFilename) {
variations.push(customFilename);
}
}
// Remove duplicates and empty strings
return [...new Set(variations)].filter(name => name && name.length > 0);
}
/**
* Build filename from keywords
*/
private buildFilename(
keywords: string[],
extension: string,
options?: any
): string {
if (keywords.length === 0) return '';
let filename = keywords
.filter(keyword => keyword && keyword.length > 0)
.join('-')
.toLowerCase()
.replace(/[^a-z0-9-]/g, '') // Remove special characters
.replace(/-+/g, '-') // Replace multiple hyphens with single
.replace(/^-|-$/g, ''); // Remove leading/trailing hyphens
// Apply length limit
const maxLength = options?.maxLength || 60;
if (filename.length > maxLength) {
filename = filename.substring(0, maxLength).replace(/-[^-]*$/, ''); // Cut at word boundary
}
return filename ? `${filename}.${extension}` : '';
}
/**
* Apply custom filename pattern
*/
private applyCustomPattern(
pattern: string,
keywords: string[],
visionAnalysis: any,
extension: string
): string {
try {
let filename = pattern;
// Replace placeholders
filename = filename.replace(/{keywords}/g, keywords.slice(0, 5).join('-'));
filename = filename.replace(/{objects}/g, (visionAnalysis?.objects || []).slice(0, 3).join('-'));
filename = filename.replace(/{colors}/g, (visionAnalysis?.colors || []).slice(0, 2).join('-'));
filename = filename.replace(/{scene}/g, this.extractKeywordsFromText(visionAnalysis?.scene || '').slice(0, 2).join('-'));
filename = filename.replace(/{timestamp}/g, new Date().toISOString().slice(0, 10));
// Clean and sanitize
filename = filename
.toLowerCase()
.replace(/[^a-z0-9-]/g, '')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '');
return filename ? `${filename}.${extension}` : '';
} catch (error) {
this.logger.warn(`Failed to apply custom pattern: ${error.message}`);
return '';
}
}
/**
* Select the best filename from variations
*/
private async selectBestFilename(
variations: string[],
visionAnalysis: any,
options?: any
): Promise<string> {
if (variations.length === 0) {
throw new Error('No filename variations generated');
}
if (variations.length === 1) {
return variations[0];
}
// Score each variation based on different criteria
const scoredVariations = variations.map(filename => ({
filename,
score: this.scoreFilename(filename, visionAnalysis, options),
}));
// Sort by score (highest first)
scoredVariations.sort((a, b) => b.score - a.score);
this.logger.debug(`Filename scoring results:`, scoredVariations);
return scoredVariations[0].filename;
}
/**
* Score filename based on SEO and usability criteria
*/
private scoreFilename(filename: string, visionAnalysis: any, options?: any): number {
let score = 0;
const nameWithoutExtension = filename.replace(/\.[^.]+$/, '');
const keywords = nameWithoutExtension.split('-');
// Length scoring (optimal 30-50 characters)
const nameLength = nameWithoutExtension.length;
if (nameLength >= 20 && nameLength <= 50) {
score += 20;
} else if (nameLength >= 15 && nameLength <= 60) {
score += 10;
} else if (nameLength < 15) {
score += 5;
}
// Keyword count scoring (optimal 3-5 keywords)
const keywordCount = keywords.length;
if (keywordCount >= 3 && keywordCount <= 5) {
score += 15;
} else if (keywordCount >= 2 && keywordCount <= 6) {
score += 10;
}
// Keyword quality scoring
if (visionAnalysis?.confidence) {
score += Math.round(visionAnalysis.confidence * 10);
}
// Readability scoring (avoid too many hyphens in a row)
if (!/--/.test(nameWithoutExtension)) {
score += 10;
}
// Avoid starting or ending with numbers
if (!/^[0-9]/.test(nameWithoutExtension) && !/[0-9]$/.test(nameWithoutExtension)) {
score += 5;
}
// Bonus for including high-confidence objects
if (visionAnalysis?.objects) {
const objectsIncluded = visionAnalysis.objects.filter((obj: string) =>
nameWithoutExtension.includes(obj.toLowerCase())
).length;
score += objectsIncluded * 3;
}
return score;
}
/**
* Validate and sanitize final filename
*/
private async validateAndSanitizeFilename(
filename: string,
originalFilename: string,
options?: any
): Promise<string> {
if (!filename || filename.trim().length === 0) {
return this.sanitizeFilename(originalFilename);
}
// Sanitize using sanitize-filename library
let sanitized = sanitize(filename, { replacement: '-' });
// Additional cleanup
sanitized = sanitized
.toLowerCase()
.replace(/[^a-z0-9.-]/g, '-')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '');
// Ensure it has an extension
if (!sanitized.includes('.')) {
const extension = this.getFileExtension(originalFilename);
sanitized = `${sanitized}.${extension}`;
}
// Ensure minimum length
const nameWithoutExtension = sanitized.replace(/\.[^.]+$/, '');
if (nameWithoutExtension.length < 3) {
const fallback = this.sanitizeFilename(originalFilename);
this.logger.warn(`Generated filename too short: "${sanitized}", using fallback: "${fallback}"`);
return fallback;
}
return sanitized;
}
private sanitizeFilename(filename: string): string {
return sanitize(filename, { replacement: '-' })
.toLowerCase()
.replace(/[^a-z0-9.-]/g, '-')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '');
}
private getFileExtension(filename: string): string {
const parts = filename.split('.');
return parts.length > 1 ? parts.pop()!.toLowerCase() : 'jpg';
}
private async updateProgress(job: Job, progress: FilenameProgress): Promise<void> {
try {
await job.updateProgress(progress);
} catch (error) {
this.logger.warn(`Failed to update filename generation progress for job ${job.id}:`, error.message);
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
const result = job.returnvalue;
this.logger.log(`✅ Filename generation completed: ${job.id} -> "${result?.finalFilename}"`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`❌ Filename generation job failed: ${job.id}`, err.stack);
}
@OnWorkerEvent('progress')
onProgress(job: Job, progress: FilenameProgress) {
this.logger.debug(`📝 Filename generation progress: ${job.id} - ${progress.percentage}% (${progress.currentStep})`);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`⚠️ Filename generation job stalled: ${jobId}`);
}
}

View file

@ -0,0 +1,348 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger, Inject } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Job } from 'bullmq';
import { VisionService } from '../vision/vision.service';
import { StorageService } from '../storage/storage.service';
import { VirusScanService } from '../security/virus-scan.service';
import { FileProcessorService } from '../storage/file-processor.service';
import { DatabaseService } from '../database/database.service';
import { ProgressTrackerService } from '../queue/progress-tracker.service';
export interface ImageProcessingJobData {
imageId: string;
batchId: string;
s3Key: string;
originalName: string;
userId: string;
keywords?: string[];
processingOptions?: {
skipVirusScan?: boolean;
preferredVisionProvider?: string;
maxRetries?: number;
};
}
export interface JobProgress {
percentage: number;
currentImage?: string;
processedCount: number;
totalCount: number;
status: string;
currentStep?: string;
error?: string;
}
@Processor('image-processing')
export class ImageProcessor extends WorkerHost {
private readonly logger = new Logger(ImageProcessor.name);
constructor(
private configService: ConfigService,
private visionService: VisionService,
private storageService: StorageService,
private virusScanService: VirusScanService,
private fileProcessorService: FileProcessorService,
private databaseService: DatabaseService,
private progressTracker: ProgressTrackerService,
) {
super();
}
async process(job: Job<ImageProcessingJobData>): Promise<any> {
const startTime = Date.now();
const { imageId, batchId, s3Key, originalName, userId, keywords, processingOptions } = job.data;
this.logger.log(`🚀 Starting image processing: ${imageId} (${originalName})`);
let tempFilePath: string | null = null;
let processedFilePath: string | null = null;
try {
// Step 1: Initialize progress tracking (5%)
await this.updateProgress(job, {
percentage: 5,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'initializing',
currentStep: 'Setting up processing pipeline',
});
// Update database with processing status
await this.databaseService.updateImageStatus(imageId, 'processing', {
startedAt: new Date(),
processingJobId: job.id,
});
// Step 2: Download image from storage (15%)
await this.updateProgress(job, {
percentage: 15,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'downloading',
currentStep: 'Downloading image from cloud storage',
});
tempFilePath = await this.storageService.downloadToTemp(s3Key);
this.logger.debug(`Image downloaded to temp: ${tempFilePath}`);
// Step 3: Validate file and extract metadata (25%)
await this.updateProgress(job, {
percentage: 25,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'validating',
currentStep: 'Validating file and extracting metadata',
});
const metadata = await this.fileProcessorService.extractMetadata(tempFilePath);
this.logger.debug(`Extracted metadata:`, metadata);
// Step 4: Virus scan (35% - optional)
if (!processingOptions?.skipVirusScan && this.virusScanService.isEnabled()) {
await this.updateProgress(job, {
percentage: 35,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'scanning',
currentStep: 'Performing virus scan',
});
const scanResult = await this.virusScanService.scanFile(tempFilePath);
if (!scanResult.clean) {
throw new Error(`Virus detected: ${scanResult.threat || 'Unknown threat'}`);
}
this.logger.debug('Virus scan passed');
}
// Step 5: Process and optimize image (45%)
await this.updateProgress(job, {
percentage: 45,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'processing',
currentStep: 'Optimizing image quality and format',
});
processedFilePath = await this.fileProcessorService.optimizeImage(tempFilePath, {
quality: 85,
maxWidth: 2048,
maxHeight: 2048,
preserveExif: true,
});
// Step 6: Upload to storage for AI analysis (55%)
await this.updateProgress(job, {
percentage: 55,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'uploading',
currentStep: 'Preparing image for AI analysis',
});
const analysisUrl = await this.storageService.getPublicUrl(s3Key);
// Step 7: AI Vision analysis (75%)
await this.updateProgress(job, {
percentage: 75,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'analyzing',
currentStep: 'Performing AI vision analysis',
});
const visionResult = await this.visionService.analyzeImage(
analysisUrl,
keywords,
undefined,
processingOptions?.preferredVisionProvider
);
if (!visionResult.success) {
throw new Error(`Vision analysis failed: ${visionResult.error}`);
}
this.logger.debug(`Vision analysis completed with confidence: ${visionResult.finalConfidence}`);
// Step 8: Generate SEO filename (85%)
await this.updateProgress(job, {
percentage: 85,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'generating-filename',
currentStep: 'Generating SEO-optimized filename',
});
const proposedName = await this.visionService.generateSeoFilename(
visionResult,
originalName,
80
);
// Step 9: Update database with results (95%)
await this.updateProgress(job, {
percentage: 95,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'updating-database',
currentStep: 'Saving analysis results',
});
const processingResult = {
visionAnalysis: {
objects: visionResult.finalObjects,
colors: visionResult.finalColors,
scene: visionResult.finalScene,
description: visionResult.finalDescription,
tags: visionResult.finalTags,
confidence: visionResult.finalConfidence,
providersUsed: visionResult.providersUsed,
processingTime: visionResult.totalProcessingTime,
},
proposedName,
metadata: {
...metadata,
fileSize: metadata.size,
dimensions: `${metadata.width}x${metadata.height}`,
format: metadata.format,
},
processingStats: {
totalTime: Date.now() - startTime,
completedAt: new Date(),
},
};
await this.databaseService.updateImageProcessingResult(imageId, {
status: 'completed',
proposedName,
visionAnalysis: processingResult.visionAnalysis,
metadata: processingResult.metadata,
processingStats: processingResult.processingStats,
});
// Step 10: Finalize (100%)
await this.updateProgress(job, {
percentage: 100,
currentImage: originalName,
processedCount: 1,
totalCount: 1,
status: 'completed',
currentStep: 'Processing completed successfully',
});
// Notify batch processor if this was the last image
await this.progressTracker.notifyImageCompleted(batchId, imageId);
const totalProcessingTime = Date.now() - startTime;
this.logger.log(`✅ Image processing completed: ${imageId} in ${totalProcessingTime}ms`);
return {
imageId,
success: true,
proposedName,
visionAnalysis: processingResult.visionAnalysis,
metadata: processingResult.metadata,
processingTime: totalProcessingTime,
};
} catch (error) {
const processingTime = Date.now() - startTime;
this.logger.error(`❌ Image processing failed: ${imageId} - ${error.message}`, error.stack);
// Update progress - Failed
await this.updateProgress(job, {
percentage: 0,
currentImage: originalName,
processedCount: 0,
totalCount: 1,
status: 'failed',
error: error.message,
});
// Update database with error
await this.databaseService.updateImageStatus(imageId, 'failed', {
error: error.message,
failedAt: new Date(),
processingTime,
});
// Notify batch processor of failure
await this.progressTracker.notifyImageFailed(batchId, imageId, error.message);
throw error;
} finally {
// Cleanup temporary files
if (tempFilePath) {
await this.fileProcessorService.cleanupTempFile(tempFilePath);
}
if (processedFilePath && processedFilePath !== tempFilePath) {
await this.fileProcessorService.cleanupTempFile(processedFilePath);
}
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.log(`✅ Image processing job completed: ${job.id}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`❌ Image processing job failed: ${job.id}`, err.stack);
}
@OnWorkerEvent('progress')
onProgress(job: Job, progress: JobProgress) {
this.logger.debug(`📊 Image processing progress: ${job.id} - ${progress.percentage}% (${progress.currentStep})`);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`⚠️ Image processing job stalled: ${jobId}`);
}
/**
* Update job progress and broadcast via WebSocket
*/
private async updateProgress(job: Job, progress: JobProgress): Promise<void> {
try {
await job.updateProgress(progress);
// Broadcast progress to WebSocket clients
await this.progressTracker.broadcastProgress(job.data.batchId, {
jobId: job.id as string,
imageId: job.data.imageId,
progress,
});
} catch (error) {
this.logger.warn(`Failed to update progress for job ${job.id}:`, error.message);
}
}
/**
* Validate processing options
*/
private validateProcessingOptions(options?: ImageProcessingJobData['processingOptions']): void {
if (!options) return;
if (options.maxRetries && (options.maxRetries < 0 || options.maxRetries > 10)) {
throw new Error('maxRetries must be between 0 and 10');
}
if (options.preferredVisionProvider &&
!['openai', 'google'].includes(options.preferredVisionProvider)) {
throw new Error('preferredVisionProvider must be either "openai" or "google"');
}
}
}

View file

@ -0,0 +1,46 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigModule } from '@nestjs/config';
// Import processors
import { ImageProcessor } from './image.processor';
import { BatchProcessor } from './batch.processor';
import { VirusScanProcessor } from './virus-scan.processor';
import { FilenameGeneratorProcessor } from './filename-generator.processor';
// Import required services
import { VisionModule } from '../vision/vision.module';
import { StorageModule } from '../storage/storage.module';
import { QueueModule } from '../queue/queue.module';
import { SecurityModule } from '../security/security.module';
import { DatabaseModule } from '../database/database.module';
@Module({
imports: [
ConfigModule,
BullModule.registerQueue(
{ name: 'image-processing' },
{ name: 'batch-processing' },
{ name: 'virus-scan' },
{ name: 'filename-generation' },
),
VisionModule,
StorageModule,
QueueModule,
SecurityModule,
DatabaseModule,
],
providers: [
ImageProcessor,
BatchProcessor,
VirusScanProcessor,
FilenameGeneratorProcessor,
],
exports: [
ImageProcessor,
BatchProcessor,
VirusScanProcessor,
FilenameGeneratorProcessor,
],
})
export class ProcessorsModule {}

View file

@ -0,0 +1,360 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Job } from 'bullmq';
import { VirusScanService } from '../security/virus-scan.service';
import { StorageService } from '../storage/storage.service';
import { DatabaseService } from '../database/database.service';
export interface VirusScanJobData {
fileId: string;
filePath: string;
s3Key?: string;
userId: string;
scanType: 'upload' | 'periodic' | 'suspicious';
metadata?: {
originalName: string;
fileSize: number;
mimeType: string;
};
}
export interface ScanProgress {
percentage: number;
status: string;
currentStep?: string;
scanResult?: {
clean: boolean;
threat?: string;
scanTime: number;
};
}
@Processor('virus-scan')
export class VirusScanProcessor extends WorkerHost {
private readonly logger = new Logger(VirusScanProcessor.name);
constructor(
private configService: ConfigService,
private virusScanService: VirusScanService,
private storageService: StorageService,
private databaseService: DatabaseService,
) {
super();
}
async process(job: Job<VirusScanJobData>): Promise<any> {
const startTime = Date.now();
const { fileId, filePath, s3Key, userId, scanType, metadata } = job.data;
this.logger.log(`🔍 Starting virus scan: ${fileId} (${scanType})`);
let tempFilePath: string | null = null;
try {
// Step 1: Initialize scan (10%)
await this.updateScanProgress(job, {
percentage: 10,
status: 'initializing',
currentStep: 'Preparing file for virus scan',
});
// Update database with scan status
await this.databaseService.updateFileScanStatus(fileId, 'scanning', {
startedAt: new Date(),
scanType,
scanJobId: job.id,
});
// Step 2: Download file if needed (20%)
let scanFilePath = filePath;
if (s3Key && !filePath) {
await this.updateScanProgress(job, {
percentage: 20,
status: 'downloading',
currentStep: 'Downloading file from storage',
});
tempFilePath = await this.storageService.downloadToTemp(s3Key);
scanFilePath = tempFilePath;
}
// Step 3: Validate file exists and is readable (30%)
await this.updateScanProgress(job, {
percentage: 30,
status: 'validating',
currentStep: 'Validating file accessibility',
});
const fileExists = await this.virusScanService.validateFile(scanFilePath);
if (!fileExists) {
throw new Error(`File not accessible: ${scanFilePath}`);
}
// Step 4: Perform virus scan (80%)
await this.updateScanProgress(job, {
percentage: 40,
status: 'scanning',
currentStep: 'Performing virus scan with ClamAV',
});
const scanResult = await this.virusScanService.scanFile(scanFilePath);
this.logger.log(`Scan result for ${fileId}: ${scanResult.clean ? 'Clean' : `Threat: ${scanResult.threat}`}`);
// Step 5: Process scan results (90%)
await this.updateScanProgress(job, {
percentage: 90,
status: 'processing-results',
currentStep: 'Processing scan results',
scanResult,
});
// Handle scan results
if (!scanResult.clean) {
await this.handleThreatDetected(fileId, s3Key, scanResult, userId, metadata);
} else {
await this.handleCleanFile(fileId, scanResult);
}
// Step 6: Complete (100%)
await this.updateScanProgress(job, {
percentage: 100,
status: scanResult.clean ? 'clean' : 'threat-detected',
currentStep: 'Virus scan completed',
scanResult,
});
const totalScanTime = Date.now() - startTime;
this.logger.log(`✅ Virus scan completed: ${fileId} in ${totalScanTime}ms - ${scanResult.clean ? 'Clean' : 'Threat detected'}`);
return {
fileId,
success: true,
scanResult: {
...scanResult,
scanTime: totalScanTime,
scanType,
},
};
} catch (error) {
const scanTime = Date.now() - startTime;
this.logger.error(`❌ Virus scan failed: ${fileId} - ${error.message}`, error.stack);
// Update database with error
await this.databaseService.updateFileScanStatus(fileId, 'failed', {
error: error.message,
failedAt: new Date(),
scanTime,
});
// Update progress - Failed
await this.updateScanProgress(job, {
percentage: 0,
status: 'failed',
currentStep: `Scan failed: ${error.message}`,
});
throw error;
} finally {
// Cleanup temporary file
if (tempFilePath) {
try {
await this.storageService.deleteTempFile(tempFilePath);
} catch (cleanupError) {
this.logger.warn(`Failed to cleanup temp file ${tempFilePath}:`, cleanupError.message);
}
}
}
}
/**
* Handle threat detected scenario
*/
private async handleThreatDetected(
fileId: string,
s3Key: string | undefined,
scanResult: any,
userId: string,
metadata?: any
): Promise<void> {
this.logger.warn(`🚨 THREAT DETECTED in file ${fileId}: ${scanResult.threat}`);
try {
// 1. Update database with threat information
await this.databaseService.updateFileScanStatus(fileId, 'threat-detected', {
threat: scanResult.threat,
threatDetails: scanResult.details,
detectedAt: new Date(),
quarantined: true,
});
// 2. Quarantine file if in storage
if (s3Key) {
await this.quarantineFile(s3Key, fileId, scanResult.threat);
}
// 3. Log security incident
await this.logSecurityIncident({
fileId,
userId,
threat: scanResult.threat,
s3Key,
metadata,
timestamp: new Date(),
});
// 4. Notify security team if configured
await this.notifySecurityTeam({
fileId,
userId,
threat: scanResult.threat,
metadata,
});
// 5. Block user if multiple threats detected
await this.checkUserThreatHistory(userId);
} catch (error) {
this.logger.error(`Failed to handle threat for file ${fileId}:`, error.message);
throw error;
}
}
/**
* Handle clean file scenario
*/
private async handleCleanFile(fileId: string, scanResult: any): Promise<void> {
// Update database with clean status
await this.databaseService.updateFileScanStatus(fileId, 'clean', {
scannedAt: new Date(),
scanEngine: scanResult.engine || 'ClamAV',
scanVersion: scanResult.version,
});
this.logger.debug(`✅ File ${fileId} is clean`);
}
/**
* Quarantine infected file
*/
private async quarantineFile(s3Key: string, fileId: string, threat: string): Promise<void> {
try {
const quarantineKey = `quarantine/${fileId}_${Date.now()}`;
// Move file to quarantine bucket/folder
await this.storageService.moveFile(s3Key, quarantineKey);
this.logger.warn(`🔒 File quarantined: ${s3Key} -> ${quarantineKey} (Threat: ${threat})`);
} catch (error) {
this.logger.error(`Failed to quarantine file ${s3Key}:`, error.message);
// If quarantine fails, delete the file as a safety measure
try {
await this.storageService.deleteFile(s3Key);
this.logger.warn(`🗑️ Infected file deleted as quarantine failed: ${s3Key}`);
} catch (deleteError) {
this.logger.error(`CRITICAL: Failed to delete infected file ${s3Key}:`, deleteError.message);
}
}
}
/**
* Log security incident
*/
private async logSecurityIncident(incident: any): Promise<void> {
try {
await this.databaseService.createSecurityIncident({
type: 'virus-detected',
severity: 'high',
details: incident,
status: 'active',
createdAt: new Date(),
});
this.logger.warn(`🚨 Security incident logged: ${incident.fileId}`);
} catch (error) {
this.logger.error(`Failed to log security incident:`, error.message);
}
}
/**
* Notify security team
*/
private async notifySecurityTeam(threat: any): Promise<void> {
try {
// TODO: Implement actual notification system (email, Slack, etc.)
this.logger.warn(`🚨 SECURITY ALERT: Virus detected in file ${threat.fileId} - ${threat.threat}`);
// For now, just log the alert. In production, this would:
// - Send email to security team
// - Post to Slack security channel
// - Create ticket in security system
// - Trigger incident response workflow
} catch (error) {
this.logger.error(`Failed to notify security team:`, error.message);
}
}
/**
* Check user threat history and take action if needed
*/
private async checkUserThreatHistory(userId: string): Promise<void> {
try {
const recentThreats = await this.databaseService.getUserRecentThreats(userId, 7); // Last 7 days
if (recentThreats.length >= 3) {
this.logger.warn(`🚨 User ${userId} has ${recentThreats.length} recent threats - considering account restriction`);
// TODO: Implement user restriction logic
// - Temporarily suspend account
// - Require manual review
// - Notify administrators
await this.databaseService.flagUserForReview(userId, {
reason: 'multiple-virus-detections',
threatCount: recentThreats.length,
flaggedAt: new Date(),
});
}
} catch (error) {
this.logger.error(`Failed to check user threat history for ${userId}:`, error.message);
}
}
private async updateScanProgress(job: Job, progress: ScanProgress): Promise<void> {
try {
await job.updateProgress(progress);
} catch (error) {
this.logger.warn(`Failed to update scan progress for job ${job.id}:`, error.message);
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
const result = job.returnvalue;
const status = result?.scanResult?.clean ? '✅ Clean' : '🚨 Threat detected';
this.logger.log(`Virus scan completed: ${job.id} - ${status}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job, err: Error) {
this.logger.error(`❌ Virus scan job failed: ${job.id}`, err.stack);
}
@OnWorkerEvent('progress')
onProgress(job: Job, progress: ScanProgress) {
this.logger.debug(`🔍 Virus scan progress: ${job.id} - ${progress.percentage}% (${progress.currentStep})`);
}
@OnWorkerEvent('stalled')
onStalled(jobId: string) {
this.logger.warn(`⚠️ Virus scan job stalled: ${jobId}`);
}
}

View file

@ -0,0 +1,324 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ImageAnnotatorClient } from '@google-cloud/vision';
import { VisionAnalysisResult, VisionProvider } from './types/vision.types';
@Injectable()
export class GoogleVisionService implements VisionProvider {
private readonly logger = new Logger(GoogleVisionService.name);
private readonly client: ImageAnnotatorClient;
private readonly confidenceThreshold: number;
// Rate limiting
private requestCount = 0;
private lastResetTime = Date.now();
private readonly requestsPerMinute: number;
constructor(private configService: ConfigService) {
const apiKey = this.configService.get<string>('GOOGLE_CLOUD_VISION_KEY');
if (!apiKey) {
throw new Error('Google Cloud Vision API key is required');
}
// Initialize the client with API key
this.client = new ImageAnnotatorClient({
keyFilename: apiKey, // If it's a file path
// Or use the key directly if it's a JSON string
...(apiKey.startsWith('{') ? { credentials: JSON.parse(apiKey) } : {}),
});
this.confidenceThreshold = this.configService.get<number>('VISION_CONFIDENCE_THRESHOLD', 0.40);
this.requestsPerMinute = this.configService.get<number>('GOOGLE_REQUESTS_PER_MINUTE', 100);
this.logger.log('Google Cloud Vision Service initialized');
}
async analyzeImage(
imageUrl: string,
keywords?: string[],
customPrompt?: string
): Promise<VisionAnalysisResult> {
await this.checkRateLimit();
const startTime = Date.now();
try {
this.logger.debug(`Analyzing image with Google Cloud Vision: ${imageUrl}`);
// Perform multiple types of detection
const [labelResult] = await this.client.labelDetection({
image: { source: { imageUri: imageUrl } },
maxResults: 20,
});
const [objectResult] = await this.client.objectLocalization({
image: { source: { imageUri: imageUrl } },
maxResults: 10,
});
const [propertiesResult] = await this.client.imageProperties({
image: { source: { imageUri: imageUrl } }
});
const [textResult] = await this.client.textDetection({
image: { source: { imageUri: imageUrl } }
});
// Update rate limiting counter
this.requestCount += 4; // We made 4 API calls
const processingTime = Date.now() - startTime;
// Process the results
const result = this.processGoogleVisionResults(
labelResult,
objectResult,
propertiesResult,
textResult,
processingTime,
keywords
);
this.logger.debug(`Google Vision analysis completed in ${processingTime}ms`);
return result;
} catch (error) {
const processingTime = Date.now() - startTime;
this.logger.error(`Google Vision analysis failed: ${error.message}`, error.stack);
// Return error result with fallback data
return {
provider: 'google',
success: false,
error: error.message,
objects: [],
colors: [],
scene: '',
description: '',
confidence: 0,
processingTime,
keywords: keywords || [],
tags: [],
labels: [],
};
}
}
private processGoogleVisionResults(
labelResult: any,
objectResult: any,
propertiesResult: any,
textResult: any,
processingTime: number,
keywords?: string[]
): VisionAnalysisResult {
// Process labels with confidence filtering
const labels = (labelResult.labelAnnotations || [])
.filter((label: any) => label.score >= this.confidenceThreshold)
.map((label: any) => ({
name: label.description.toLowerCase(),
confidence: label.score,
}))
.sort((a: any, b: any) => b.confidence - a.confidence);
// Process detected objects
const objects = (objectResult.localizedObjectAnnotations || [])
.filter((obj: any) => obj.score >= this.confidenceThreshold)
.map((obj: any) => obj.name.toLowerCase())
.slice(0, 10);
// Process dominant colors
const colors = this.extractDominantColors(propertiesResult);
// Process detected text
const detectedText = textResult.textAnnotations && textResult.textAnnotations.length > 0
? textResult.textAnnotations[0].description
: '';
// Combine all tags
const allTags = [
...labels.map((l: any) => l.name),
...objects,
...(keywords || []),
];
// Remove duplicates and filter
const uniqueTags = [...new Set(allTags)]
.filter(tag => tag.length > 2)
.filter(tag => !['image', 'photo', 'picture', 'file'].includes(tag))
.slice(0, 15);
// Generate scene description
const topLabels = labels.slice(0, 3).map((l: any) => l.name);
const scene = this.generateSceneDescription(topLabels, objects.slice(0, 3));
// Generate overall description
const description = this.generateDescription(labels, objects, colors, detectedText);
// Calculate overall confidence (average of top 5 labels)
const topConfidences = labels.slice(0, 5).map((l: any) => l.confidence);
const averageConfidence = topConfidences.length > 0
? topConfidences.reduce((sum, conf) => sum + conf, 0) / topConfidences.length
: 0;
return {
provider: 'google',
success: true,
objects: objects.slice(0, 8),
colors: colors.slice(0, 3),
scene,
description,
confidence: averageConfidence,
processingTime,
keywords: keywords || [],
tags: uniqueTags,
labels,
detectedText: detectedText ? detectedText.substring(0, 200) : undefined,
rawResponse: {
labels: labelResult.labelAnnotations,
objects: objectResult.localizedObjectAnnotations,
properties: propertiesResult.imagePropertiesAnnotation,
text: textResult.textAnnotations,
},
};
}
private extractDominantColors(propertiesResult: any): string[] {
if (!propertiesResult.imagePropertiesAnnotation?.dominantColors?.colors) {
return [];
}
return propertiesResult.imagePropertiesAnnotation.dominantColors.colors
.slice(0, 5) // Take top 5 colors
.map((colorInfo: any) => {
const { red = 0, green = 0, blue = 0 } = colorInfo.color;
return this.rgbToColorName(red, green, blue);
})
.filter((color: string) => color !== 'unknown')
.slice(0, 3); // Keep top 3 recognizable colors
}
private rgbToColorName(r: number, g: number, b: number): string {
// Simple color name mapping based on RGB values
const colors = [
{ name: 'red', r: 255, g: 0, b: 0 },
{ name: 'green', r: 0, g: 255, b: 0 },
{ name: 'blue', r: 0, g: 0, b: 255 },
{ name: 'yellow', r: 255, g: 255, b: 0 },
{ name: 'orange', r: 255, g: 165, b: 0 },
{ name: 'purple', r: 128, g: 0, b: 128 },
{ name: 'pink', r: 255, g: 192, b: 203 },
{ name: 'brown', r: 165, g: 42, b: 42 },
{ name: 'gray', r: 128, g: 128, b: 128 },
{ name: 'black', r: 0, g: 0, b: 0 },
{ name: 'white', r: 255, g: 255, b: 255 },
];
let closestColor = 'unknown';
let minDistance = Infinity;
for (const color of colors) {
const distance = Math.sqrt(
Math.pow(r - color.r, 2) +
Math.pow(g - color.g, 2) +
Math.pow(b - color.b, 2)
);
if (distance < minDistance) {
minDistance = distance;
closestColor = color.name;
}
}
return closestColor;
}
private generateSceneDescription(labels: string[], objects: string[]): string {
const combined = [...new Set([...labels, ...objects])].slice(0, 4);
if (combined.length === 0) return '';
if (combined.length === 1) return combined[0];
if (combined.length === 2) return combined.join(' and ');
const last = combined.pop();
return combined.join(', ') + ', and ' + last;
}
private generateDescription(labels: any[], objects: string[], colors: string[], text: string): string {
const parts = [];
if (objects.length > 0) {
parts.push(`Image containing ${objects.slice(0, 3).join(', ')}`);
} else if (labels.length > 0) {
parts.push(`Image featuring ${labels.slice(0, 3).map(l => l.name).join(', ')}`);
}
if (colors.length > 0) {
parts.push(`with ${colors.join(' and ')} colors`);
}
if (text && text.trim()) {
parts.push(`including text elements`);
}
return parts.join(' ') || 'Image analysis';
}
private async checkRateLimit(): Promise<void> {
const now = Date.now();
const timeSinceReset = now - this.lastResetTime;
// Reset counters every minute
if (timeSinceReset >= 60000) {
this.requestCount = 0;
this.lastResetTime = now;
return;
}
// Check if we're hitting rate limits
if (this.requestCount >= this.requestsPerMinute) {
const waitTime = 60000 - timeSinceReset;
this.logger.warn(`Google Vision request rate limit reached, waiting ${waitTime}ms`);
await this.sleep(waitTime);
this.requestCount = 0;
this.lastResetTime = Date.now();
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async isHealthy(): Promise<boolean> {
try {
// Simple health check - try to detect labels on a small test image
// Using Google's test image URL
const testImageUrl = 'https://cloud.google.com/vision/docs/images/bicycle_example.png';
const [result] = await this.client.labelDetection({
image: { source: { imageUri: testImageUrl } },
maxResults: 1,
});
return !!(result.labelAnnotations && result.labelAnnotations.length > 0);
} catch (error) {
this.logger.error('Google Vision health check failed:', error.message);
return false;
}
}
getProviderName(): string {
return 'google';
}
getConfiguration() {
return {
provider: 'google',
confidenceThreshold: this.confidenceThreshold,
rateLimits: {
requestsPerMinute: this.requestsPerMinute,
},
};
}
}

View file

@ -0,0 +1,267 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import OpenAI from 'openai';
import { VisionAnalysisResult, VisionProvider } from './types/vision.types';
@Injectable()
export class OpenAIVisionService implements VisionProvider {
private readonly logger = new Logger(OpenAIVisionService.name);
private readonly openai: OpenAI;
private readonly model: string;
private readonly maxTokens: number;
private readonly temperature: number;
// Rate limiting
private requestCount = 0;
private tokenCount = 0;
private lastResetTime = Date.now();
private readonly requestsPerMinute: number;
private readonly tokensPerMinute: number;
constructor(private configService: ConfigService) {
const apiKey = this.configService.get<string>('OPENAI_API_KEY');
if (!apiKey) {
throw new Error('OpenAI API key is required');
}
this.openai = new OpenAI({
apiKey,
timeout: 30000, // 30 seconds timeout
});
this.model = this.configService.get<string>('OPENAI_MODEL', 'gpt-4-vision-preview');
this.maxTokens = this.configService.get<number>('OPENAI_MAX_TOKENS', 500);
this.temperature = this.configService.get<number>('OPENAI_TEMPERATURE', 0.1);
this.requestsPerMinute = this.configService.get<number>('OPENAI_REQUESTS_PER_MINUTE', 50);
this.tokensPerMinute = this.configService.get<number>('OPENAI_TOKENS_PER_MINUTE', 10000);
this.logger.log(`OpenAI Vision Service initialized with model: ${this.model}`);
}
async analyzeImage(
imageUrl: string,
keywords?: string[],
customPrompt?: string
): Promise<VisionAnalysisResult> {
await this.checkRateLimit();
const startTime = Date.now();
try {
this.logger.debug(`Analyzing image with OpenAI: ${imageUrl}`);
const prompt = customPrompt || this.buildAnalysisPrompt(keywords);
const response = await this.openai.chat.completions.create({
model: this.model,
max_tokens: this.maxTokens,
temperature: this.temperature,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt,
},
{
type: 'image_url',
image_url: {
url: imageUrl,
detail: 'high', // Use high detail for better analysis
},
},
],
},
],
});
// Update rate limiting counters
this.requestCount++;
this.tokenCount += response.usage?.total_tokens || 0;
const processingTime = Date.now() - startTime;
const content = response.choices[0]?.message?.content;
if (!content) {
throw new Error('No content received from OpenAI API');
}
// Parse the structured response
const result = this.parseOpenAIResponse(content, processingTime);
this.logger.debug(`OpenAI analysis completed in ${processingTime}ms`);
return result;
} catch (error) {
const processingTime = Date.now() - startTime;
this.logger.error(`OpenAI vision analysis failed: ${error.message}`, error.stack);
// Return error result with fallback data
return {
provider: 'openai',
success: false,
error: error.message,
objects: [],
colors: [],
scene: '',
description: '',
confidence: 0,
processingTime,
keywords: keywords || [],
tags: [],
labels: [],
};
}
}
private buildAnalysisPrompt(keywords?: string[]): string {
const keywordContext = keywords && keywords.length > 0
? `\n\nUser context keywords: ${keywords.join(', ')}`
: '';
return `Analyze this image and provide a detailed description suitable for SEO filename generation.
Please provide your response as a JSON object with the following structure:
{
"objects": ["object1", "object2", "object3"],
"colors": ["color1", "color2"],
"scene": "brief scene description",
"description": "detailed description of the image",
"confidence": 0.95,
"tags": ["tag1", "tag2", "tag3"],
"labels": [
{"name": "label1", "confidence": 0.9},
{"name": "label2", "confidence": 0.8}
]
}
Focus on:
1. Main objects and subjects in the image
2. Dominant colors (max 3)
3. Scene type (indoor/outdoor, setting)
4. Style, mood, or theme
5. Any text or branding visible
6. Technical aspects if relevant (photography style, lighting)
Provide specific, descriptive terms that would be valuable for SEO and image search optimization.${keywordContext}`;
}
private parseOpenAIResponse(content: string, processingTime: number): VisionAnalysisResult {
try {
// Try to extract JSON from the response
const jsonMatch = content.match(/\{[\s\S]*\}/);
const jsonContent = jsonMatch ? jsonMatch[0] : content;
const parsed = JSON.parse(jsonContent);
return {
provider: 'openai',
success: true,
objects: Array.isArray(parsed.objects) ? parsed.objects : [],
colors: Array.isArray(parsed.colors) ? parsed.colors : [],
scene: parsed.scene || '',
description: parsed.description || '',
confidence: typeof parsed.confidence === 'number' ? parsed.confidence : 0.85,
processingTime,
keywords: [],
tags: Array.isArray(parsed.tags) ? parsed.tags : [],
labels: Array.isArray(parsed.labels) ? parsed.labels : [],
rawResponse: content,
};
} catch (parseError) {
this.logger.warn('Failed to parse OpenAI JSON response, using fallback parsing');
// Fallback parsing - extract keywords from plain text
const words = content.toLowerCase()
.split(/[^a-z0-9]+/)
.filter(word => word.length > 2)
.filter(word => !['the', 'and', 'with', 'for', 'are', 'was', 'this', 'that'].includes(word))
.slice(0, 10);
return {
provider: 'openai',
success: true,
objects: words.slice(0, 5),
colors: [],
scene: content.substring(0, 100),
description: content,
confidence: 0.7, // Lower confidence for fallback parsing
processingTime,
keywords: [],
tags: words,
labels: words.map(word => ({ name: word, confidence: 0.7 })),
rawResponse: content,
};
}
}
private async checkRateLimit(): Promise<void> {
const now = Date.now();
const timeSinceReset = now - this.lastResetTime;
// Reset counters every minute
if (timeSinceReset >= 60000) {
this.requestCount = 0;
this.tokenCount = 0;
this.lastResetTime = now;
return;
}
// Check if we're hitting rate limits
if (this.requestCount >= this.requestsPerMinute) {
const waitTime = 60000 - timeSinceReset;
this.logger.warn(`OpenAI request rate limit reached, waiting ${waitTime}ms`);
await this.sleep(waitTime);
this.requestCount = 0;
this.tokenCount = 0;
this.lastResetTime = Date.now();
}
if (this.tokenCount >= this.tokensPerMinute) {
const waitTime = 60000 - timeSinceReset;
this.logger.warn(`OpenAI token rate limit reached, waiting ${waitTime}ms`);
await this.sleep(waitTime);
this.requestCount = 0;
this.tokenCount = 0;
this.lastResetTime = Date.now();
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async isHealthy(): Promise<boolean> {
try {
// Simple health check - try to create a completion with minimal tokens
const response = await this.openai.chat.completions.create({
model: 'gpt-3.5-turbo', // Use cheaper model for health check
max_tokens: 5,
messages: [{ role: 'user', content: 'Hello' }],
});
return !!response.choices[0]?.message?.content;
} catch (error) {
this.logger.error('OpenAI health check failed:', error.message);
return false;
}
}
getProviderName(): string {
return 'openai';
}
getConfiguration() {
return {
provider: 'openai',
model: this.model,
maxTokens: this.maxTokens,
temperature: this.temperature,
rateLimits: {
requestsPerMinute: this.requestsPerMinute,
tokensPerMinute: this.tokensPerMinute,
},
};
}
}

View file

@ -0,0 +1,62 @@
export interface VisionLabel {
name: string;
confidence: number;
}
export interface VisionAnalysisResult {
provider: string;
success: boolean;
error?: string;
// Core analysis results
objects: string[];
colors: string[];
scene: string;
description: string;
confidence: number;
processingTime: number;
// Additional data
keywords: string[];
tags: string[];
labels: VisionLabel[];
// Optional fields
detectedText?: string;
emotions?: string[];
faces?: number;
// Raw provider response (for debugging)
rawResponse?: any;
}
export interface VisionProvider {
analyzeImage(
imageUrl: string,
keywords?: string[],
customPrompt?: string
): Promise<VisionAnalysisResult>;
isHealthy(): Promise<boolean>;
getProviderName(): string;
getConfiguration(): any;
}
export interface CombinedVisionResult {
primary: VisionAnalysisResult;
secondary?: VisionAnalysisResult;
// Merged results
finalObjects: string[];
finalColors: string[];
finalScene: string;
finalDescription: string;
finalTags: string[];
finalConfidence: number;
// Metadata
providersUsed: string[];
totalProcessingTime: number;
success: boolean;
error?: string;
}

View file

@ -0,0 +1,20 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { VisionService } from './vision.service';
import { OpenAIVisionService } from './openai-vision.service';
import { GoogleVisionService } from './google-vision.service';
@Module({
imports: [ConfigModule],
providers: [
VisionService,
OpenAIVisionService,
GoogleVisionService,
],
exports: [
VisionService,
OpenAIVisionService,
GoogleVisionService,
],
})
export class VisionModule {}

View file

@ -0,0 +1,370 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { OpenAIVisionService } from './openai-vision.service';
import { GoogleVisionService } from './google-vision.service';
import { VisionAnalysisResult, CombinedVisionResult, VisionProvider } from './types/vision.types';
@Injectable()
export class VisionService {
private readonly logger = new Logger(VisionService.name);
private readonly providers: VisionProvider[] = [];
private readonly confidenceThreshold: number;
constructor(
private configService: ConfigService,
private openaiVisionService: OpenAIVisionService,
private googleVisionService: GoogleVisionService,
) {
this.confidenceThreshold = this.configService.get<number>('VISION_CONFIDENCE_THRESHOLD', 0.40);
// Initialize available providers
this.initializeProviders();
}
private initializeProviders() {
const openaiKey = this.configService.get<string>('OPENAI_API_KEY');
const googleKey = this.configService.get<string>('GOOGLE_CLOUD_VISION_KEY');
if (openaiKey) {
this.providers.push(this.openaiVisionService);
this.logger.log('OpenAI Vision provider initialized');
}
if (googleKey) {
this.providers.push(this.googleVisionService);
this.logger.log('Google Vision provider initialized');
}
if (this.providers.length === 0) {
throw new Error('No vision providers available. Please configure at least one AI vision service.');
}
this.logger.log(`Vision service initialized with ${this.providers.length} provider(s)`);
}
/**
* Analyze image using all available providers with fallback strategy
*/
async analyzeImage(
imageUrl: string,
keywords?: string[],
customPrompt?: string,
preferredProvider?: string
): Promise<CombinedVisionResult> {
const startTime = Date.now();
this.logger.debug(`Starting vision analysis for image: ${imageUrl}`);
// Determine provider order based on preference and availability
const orderedProviders = this.getOrderedProviders(preferredProvider);
let primaryResult: VisionAnalysisResult | null = null;
let secondaryResult: VisionAnalysisResult | null = null;
const providersUsed: string[] = [];
// Try primary provider
for (const provider of orderedProviders) {
try {
this.logger.debug(`Attempting analysis with ${provider.getProviderName()}`);
const result = await provider.analyzeImage(imageUrl, keywords, customPrompt);
if (result.success && result.confidence >= this.confidenceThreshold) {
primaryResult = result;
providersUsed.push(result.provider);
this.logger.debug(`Primary analysis successful with ${result.provider} (confidence: ${result.confidence})`);
break;
} else if (result.success) {
this.logger.warn(`Provider ${result.provider} returned low confidence: ${result.confidence}`);
}
} catch (error) {
this.logger.warn(`Provider ${provider.getProviderName()} failed: ${error.message}`);
}
}
// If primary result has low confidence, try secondary provider for validation
if (primaryResult && primaryResult.confidence < 0.8 && orderedProviders.length > 1) {
const secondaryProvider = orderedProviders.find(p => p.getProviderName() !== primaryResult!.provider);
if (secondaryProvider) {
try {
this.logger.debug(`Getting secondary validation from ${secondaryProvider.getProviderName()}`);
secondaryResult = await secondaryProvider.analyzeImage(imageUrl, keywords, customPrompt);
if (secondaryResult.success) {
providersUsed.push(secondaryResult.provider);
this.logger.debug(`Secondary analysis completed with ${secondaryResult.provider}`);
}
} catch (error) {
this.logger.warn(`Secondary provider ${secondaryProvider.getProviderName()} failed: ${error.message}`);
}
}
}
const totalProcessingTime = Date.now() - startTime;
// If no successful analysis, return error result
if (!primaryResult) {
this.logger.error('All vision providers failed');
return {
primary: {
provider: 'none',
success: false,
error: 'All vision providers failed',
objects: [],
colors: [],
scene: '',
description: '',
confidence: 0,
processingTime: totalProcessingTime,
keywords: keywords || [],
tags: [],
labels: [],
},
finalObjects: [],
finalColors: [],
finalScene: '',
finalDescription: '',
finalTags: [],
finalConfidence: 0,
providersUsed,
totalProcessingTime,
success: false,
error: 'All vision providers failed',
};
}
// Combine results from both providers
const combinedResult = this.combineResults(primaryResult, secondaryResult, keywords);
combinedResult.providersUsed = providersUsed;
combinedResult.totalProcessingTime = totalProcessingTime;
this.logger.log(`Vision analysis completed in ${totalProcessingTime}ms using ${providersUsed.join(', ')}`);
return combinedResult;
}
/**
* Combine results from multiple providers using weighted scoring
*/
private combineResults(
primary: VisionAnalysisResult,
secondary?: VisionAnalysisResult,
keywords?: string[]
): CombinedVisionResult {
if (!secondary) {
// Single provider result
return {
primary,
finalObjects: primary.objects,
finalColors: primary.colors,
finalScene: primary.scene,
finalDescription: primary.description,
finalTags: this.mergeWithKeywords(primary.tags, keywords),
finalConfidence: primary.confidence,
providersUsed: [primary.provider],
totalProcessingTime: primary.processingTime,
success: primary.success,
};
}
// Combine results from both providers
const weightedObjects = this.combineWeightedArrays(
primary.objects,
secondary.objects,
primary.confidence,
secondary.confidence
);
const weightedColors = this.combineWeightedArrays(
primary.colors,
secondary.colors,
primary.confidence,
secondary.confidence
);
const weightedTags = this.combineWeightedArrays(
primary.tags,
secondary.tags,
primary.confidence,
secondary.confidence
);
// Choose the better scene description
const finalScene = primary.confidence >= secondary.confidence
? primary.scene
: secondary.scene;
// Combine descriptions
const finalDescription = this.combineDescriptions(primary, secondary);
// Calculate combined confidence
const finalConfidence = (primary.confidence + secondary.confidence) / 2;
return {
primary,
secondary,
finalObjects: weightedObjects.slice(0, 8),
finalColors: weightedColors.slice(0, 3),
finalScene,
finalDescription,
finalTags: this.mergeWithKeywords(weightedTags, keywords).slice(0, 12),
finalConfidence,
providersUsed: [primary.provider, secondary.provider],
totalProcessingTime: primary.processingTime + secondary.processingTime,
success: true,
};
}
private combineWeightedArrays(
arr1: string[],
arr2: string[],
weight1: number,
weight2: number
): string[] {
const scoreMap = new Map<string, number>();
// Score items from first array
arr1.forEach((item, index) => {
const positionScore = (arr1.length - index) / arr1.length; // Higher position = higher score
const weightedScore = positionScore * weight1;
scoreMap.set(item.toLowerCase(), (scoreMap.get(item.toLowerCase()) || 0) + weightedScore);
});
// Score items from second array
arr2.forEach((item, index) => {
const positionScore = (arr2.length - index) / arr2.length;
const weightedScore = positionScore * weight2;
scoreMap.set(item.toLowerCase(), (scoreMap.get(item.toLowerCase()) || 0) + weightedScore);
});
// Sort by combined score and return
return Array.from(scoreMap.entries())
.sort(([, scoreA], [, scoreB]) => scoreB - scoreA)
.map(([item]) => item);
}
private combineDescriptions(primary: VisionAnalysisResult, secondary: VisionAnalysisResult): string {
if (primary.confidence >= secondary.confidence) {
return primary.description;
} else {
return secondary.description;
}
}
private mergeWithKeywords(tags: string[], keywords?: string[]): string[] {
if (!keywords || keywords.length === 0) {
return tags;
}
// Combine and prioritize user keywords (70% vision tags, 30% user keywords)
const visionTags = tags.slice(0, Math.ceil(tags.length * 0.7));
const userKeywords = keywords.slice(0, Math.ceil(keywords.length * 0.3));
const combined = [...userKeywords, ...visionTags];
// Remove duplicates while preserving order
return [...new Set(combined.map(tag => tag.toLowerCase()))];
}
private getOrderedProviders(preferredProvider?: string): VisionProvider[] {
if (!preferredProvider) {
return [...this.providers]; // Default order
}
const preferred = this.providers.find(p => p.getProviderName() === preferredProvider);
const others = this.providers.filter(p => p.getProviderName() !== preferredProvider);
return preferred ? [preferred, ...others] : [...this.providers];
}
/**
* Generate SEO-optimized filename from vision analysis
*/
async generateSeoFilename(
visionResult: CombinedVisionResult,
originalFilename: string,
maxLength: number = 80
): Promise<string> {
try {
// Use the final combined tags
const tags = visionResult.finalTags.slice(0, 6); // Limit to 6 tags
if (tags.length === 0) {
return this.sanitizeFilename(originalFilename);
}
// Create SEO-friendly filename
let filename = tags
.join('-')
.toLowerCase()
.replace(/[^a-z0-9\s-]/g, '') // Remove special characters
.replace(/\s+/g, '-') // Replace spaces with hyphens
.replace(/-+/g, '-') // Replace multiple hyphens with single
.substring(0, maxLength);
// Get file extension from original name
const extension = originalFilename.split('.').pop()?.toLowerCase() || 'jpg';
// Ensure filename is not empty
if (!filename || filename === '-') {
filename = 'image';
}
// Remove trailing hyphens
filename = filename.replace(/-+$/, '');
return `${filename}.${extension}`;
} catch (error) {
this.logger.error('Failed to generate SEO filename', error.stack);
return this.sanitizeFilename(originalFilename);
}
}
private sanitizeFilename(filename: string): string {
return filename
.toLowerCase()
.replace(/[^a-z0-9.-]/g, '-')
.replace(/-+/g, '-')
.replace(/^-|-$/g, '');
}
/**
* Health check for all providers
*/
async getHealthStatus(): Promise<{
healthy: boolean;
providers: Array<{ name: string; healthy: boolean; config: any }>;
}> {
const providerStatus = await Promise.all(
this.providers.map(async (provider) => ({
name: provider.getProviderName(),
healthy: await provider.isHealthy(),
config: provider.getConfiguration(),
}))
);
const healthy = providerStatus.some(p => p.healthy);
return {
healthy,
providers: providerStatus,
};
}
/**
* Get service configuration and statistics
*/
getServiceInfo() {
return {
availableProviders: this.providers.map(p => p.getProviderName()),
confidenceThreshold: this.confidenceThreshold,
providerConfigs: this.providers.map(p => p.getConfiguration()),
};
}
}

View file

@ -0,0 +1,34 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"removeComments": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"allowSyntheticDefaultImports": true,
"target": "ES2021",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true,
"strictNullChecks": false,
"noImplicitAny": false,
"strictBindCallApply": false,
"forceConsistentCasingInFileNames": false,
"noFallthroughCasesInSwitch": false,
"resolveJsonModule": true,
"esModuleInterop": true,
"paths": {
"@/*": ["src/*"],
"@/vision/*": ["src/vision/*"],
"@/processors/*": ["src/processors/*"],
"@/storage/*": ["src/storage/*"],
"@/queue/*": ["src/queue/*"],
"@/config/*": ["src/config/*"],
"@/utils/*": ["src/utils/*"]
}
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "test", "**/*spec.ts"]
}