From 036f3b178a771114bacecfa8f701ef9dc95af39f Mon Sep 17 00:00:00 2001 From: Philipp Date: Tue, 2 Dec 2025 12:50:22 +0100 Subject: [PATCH] que and training status --- backend/routes/api.py | 25 +++- backend/services/training_queue.py | 180 +++++++++++++++++++++++++++++ overview-training.html | 134 +++++++++++++++++++++ 3 files changed, 333 insertions(+), 6 deletions(-) create mode 100644 backend/services/training_queue.py diff --git a/backend/routes/api.py b/backend/routes/api.py index 60df6a3..093b4b1 100644 --- a/backend/routes/api.py +++ b/backend/routes/api.py @@ -73,7 +73,7 @@ def generate_yolox_json(): @api_bp.route('/start-yolox-training', methods=['POST']) def start_yolox_training(): - """Generate JSONs, exp.py, and start YOLOX training""" + """Generate JSONs, exp.py, and add training to queue""" try: data = request.get_json() project_id = data.get('project_id') @@ -113,8 +113,8 @@ def start_yolox_training(): print(f'Generating exp.py at {exp_file_path}...') save_yolox_exp(training_id, exp_file_path) - # Step 3: Start training - print(f'Starting YOLOX training for training {training_id}...') + # Step 3: Build training command + print(f'Preparing training command for training {training_id}...') # Get YOLOX configuration from settings yolox_main_dir = get_setting('yolox_path', '/home/kitraining/Yolox/YOLOX-main') @@ -162,11 +162,12 @@ def start_yolox_training(): print(f'Training command: {cmd}') - # Start training in background - subprocess.Popen(cmd, shell=True, cwd=yolox_main_dir) + # Step 4: Add to training queue + from services.training_queue import training_queue + training_queue.add_to_queue(training_id, cmd, yolox_main_dir) return jsonify({ - 'message': f'JSONs and exp.py generated, training started for training {training_id}', + 'message': f'Training {training_id} added to queue', 'exp_path': exp_file_path }) @@ -176,6 +177,18 @@ def start_yolox_training(): traceback.print_exc() return jsonify({'message': 'Failed to start training', 'error': str(err)}), 500 +@api_bp.route('/training-status', methods=['GET']) +def get_training_status(): + """Get current training queue status""" + try: + from services.training_queue import training_queue + status = training_queue.get_status() + return jsonify(status) + + except Exception as err: + print(f'Error getting training status: {err}') + return jsonify({'current': None, 'queue': []}), 500 + @api_bp.route('/training-log', methods=['GET']) def training_log(): """Get YOLOX training log""" diff --git a/backend/services/training_queue.py b/backend/services/training_queue.py new file mode 100644 index 0000000..f996a05 --- /dev/null +++ b/backend/services/training_queue.py @@ -0,0 +1,180 @@ +""" +Training Queue Manager +Manages a queue of training jobs and tracks their progress +""" +import threading +import queue +import subprocess +import re +import os +from services.settings_service import get_setting +from models.training import Training + +class TrainingQueueManager: + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self.queue = queue.Queue() + self.current_training = None + self.current_process = None + self.worker_thread = None + self.running = False + self._initialized = True + + # Start the worker thread + self.start_worker() + + def start_worker(self): + """Start the background worker thread""" + if self.worker_thread is None or not self.worker_thread.is_alive(): + self.running = True + self.worker_thread = threading.Thread(target=self._process_queue, daemon=True) + self.worker_thread.start() + + def add_to_queue(self, training_id, command, cwd): + """Add a training job to the queue""" + job = { + 'training_id': training_id, + 'command': command, + 'cwd': cwd, + 'iteration': 0, + 'max_epoch': 300 # Will be updated from training record + } + + # Get max_epoch from training record + try: + training = Training.query.get(training_id) + if training: + job['max_epoch'] = training.max_epoch or 300 + job['name'] = training.exp_name or f'Training {training_id}' + except: + pass + + self.queue.put(job) + print(f'Added training {training_id} to queue. Queue size: {self.queue.qsize()}') + + def _process_queue(self): + """Worker thread that processes the queue""" + while self.running: + try: + # Wait for a job (blocking with timeout) + job = self.queue.get(timeout=1) + + print(f'Starting training {job["training_id"]} from queue') + self.current_training = job + + # Execute the training command + self._run_training(job) + + # Mark as done + self.queue.task_done() + self.current_training = None + self.current_process = None + + except queue.Empty: + continue + except Exception as e: + print(f'Error processing training job: {e}') + self.current_training = None + self.current_process = None + + def _run_training(self, job): + """Run a training command and monitor its output""" + try: + import platform + is_windows = platform.system() == 'Windows' + + # Start process + self.current_process = subprocess.Popen( + job['command'], + shell=True, + cwd=job['cwd'], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1 + ) + + # Monitor output for progress + for line in iter(self.current_process.stdout.readline, ''): + if line: + print(line.strip()) + + # Parse iteration from YOLOX output + # Example: "2025-12-02 07:30:15 | INFO | yolox.core.trainer:78 - Epoch: [5/300]" + match = re.search(r'Epoch:\s*\[(\d+)/(\d+)\]', line) + if match: + current_epoch = int(match.group(1)) + total_epochs = int(match.group(2)) + if self.current_training: + self.current_training['iteration'] = current_epoch + self.current_training['max_epoch'] = total_epochs + print(f'Progress: {current_epoch}/{total_epochs}') + + # Wait for completion + self.current_process.wait() + print(f'Training {job["training_id"]} completed with exit code {self.current_process.returncode}') + + except Exception as e: + print(f'Error running training: {e}') + + def get_status(self): + """Get current status of training queue""" + queue_items = [] + + # Get items from queue without removing them + temp_items = [] + while not self.queue.empty(): + try: + item = self.queue.get_nowait() + temp_items.append(item) + queue_items.append({ + 'training_id': item['training_id'], + 'name': item.get('name', f'Training {item["training_id"]}'), + 'max_epoch': item.get('max_epoch', 300) + }) + except queue.Empty: + break + + # Put items back + for item in temp_items: + self.queue.put(item) + + result = { + 'current': None, + 'queue': queue_items + } + + if self.current_training: + result['current'] = { + 'training_id': self.current_training['training_id'], + 'name': self.current_training.get('name', f'Training {self.current_training["training_id"]}'), + 'iteration': self.current_training.get('iteration', 0), + 'max_epoch': self.current_training.get('max_epoch', 300) + } + + return result + + def stop(self): + """Stop the worker thread""" + self.running = False + if self.current_process: + try: + self.current_process.terminate() + except: + pass + +# Global instance +training_queue = TrainingQueueManager() diff --git a/overview-training.html b/overview-training.html index 9759331..6b17e61 100644 --- a/overview-training.html +++ b/overview-training.html @@ -30,6 +30,14 @@
+ +
+ + + + \ No newline at end of file