import os import shutil import importlib.util from models.training import Training from models.TrainingProject import TrainingProject def load_base_config(selected_model): """Load base configuration for a specific YOLOX model""" model_name = selected_model.lower().replace('-', '_').replace('.pth', '') base_config_path = os.path.join(os.path.dirname(__file__), '..', 'data', f'{model_name}.py') if not os.path.exists(base_config_path): raise Exception(f'Base configuration not found for model: {model_name} at {base_config_path}') # Load the module dynamically spec = importlib.util.spec_from_file_location(f"base_config_{model_name}", base_config_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Extract all attributes from BaseExp class base_exp = module.BaseExp() base_config = {} for attr in dir(base_exp): if not attr.startswith('_'): base_config[attr] = getattr(base_exp, attr) return base_config def generate_yolox_exp(training_id): """Generate YOLOX exp.py file""" # Fetch training row from DB training = Training.query.get(training_id) if not training: training = Training.query.filter_by(project_details_id=training_id).first() if not training: raise Exception(f'Training not found for trainingId or project_details_id: {training_id}') # If transfer_learning is 'coco', generate exp using base config + custom settings if training.transfer_learning == 'coco': exp_content = generate_yolox_inference_exp(training_id, use_base_config=True) return {'type': 'custom', 'expContent': exp_content} # If transfer_learning is 'sketch', generate custom exp.py if training.transfer_learning == 'sketch': exp_content = generate_yolox_inference_exp(training_id, use_base_config=False) return {'type': 'custom', 'expContent': exp_content} raise Exception(f'Unknown transfer_learning type: {training.transfer_learning}') def save_yolox_exp(training_id, out_path): """Save YOLOX exp.py to specified path""" exp_result = generate_yolox_exp(training_id) if exp_result['type'] == 'custom' and 'expContent' in exp_result: with open(out_path, 'w') as f: f.write(exp_result['expContent']) return out_path elif exp_result['type'] == 'default' and 'expPath' in exp_result: # Optionally copy the file if outPath is different if exp_result['expPath'] != out_path: shutil.copyfile(exp_result['expPath'], out_path) return out_path else: raise Exception('Unknown expResult type or missing content') def generate_yolox_inference_exp(training_id, options=None, use_base_config=False): """Generate inference exp.py using DB values Args: training_id: The training/project_details ID options: Optional overrides for data paths use_base_config: If True, load base config and only override with user-defined values """ if options is None: options = {} training = Training.query.get(training_id) if not training: training = Training.query.filter_by(project_details_id=training_id).first() if not training: raise Exception(f'Training not found for trainingId or project_details_id: {training_id}') # Always use the project_details_id for annotation file names and paths project_details_id = training.project_details_id # Get annotation file names from options or use defaults # Use training.id (not project_details_id) for consistency with generate_training_json train_ann = options.get('train_ann', f'coco_project_{training_id}_train.json') val_ann = options.get('val_ann', f'coco_project_{training_id}_valid.json') test_ann = options.get('test_ann', f'coco_project_{training_id}_test.json') # Get data_dir - this should point to where IMAGES are located (not annotations) # YOLOX will combine data_dir + file_name from COCO JSON to find images # The annotations are in a separate location (output folder) from services.settings_service import get_setting from models.TrainingProjectDetails import TrainingProjectDetails if 'data_dir' in options: data_dir = options['data_dir'] else: # Use the yolox_data_dir setting - this is where training images are stored data_dir = get_setting('yolox_data_dir', '/home/kitraining/To_Annotate/') # Ensure it ends with a separator if not data_dir.endswith(os.sep) and not data_dir.endswith('/'): data_dir += os.sep # Get num_classes from ProjectClass table (3NF) num_classes = 80 try: from models.ProjectClass import ProjectClass training_project = TrainingProject.query.get(project_details_id) if training_project: # Count classes from ProjectClass table class_count = ProjectClass.query.filter_by(project_id=training_project.project_id).count() if class_count > 0: num_classes = class_count except Exception as e: print(f'Could not determine num_classes from ProjectClass: {e}') # Initialize config dictionary config = {} # If using base config (transfer learning from COCO), load protected parameters first if use_base_config and training.selected_model: try: base_config = load_base_config(training.selected_model) config.update(base_config) print(f'Loaded base config for {training.selected_model}: {list(base_config.keys())}') except Exception as e: print(f'Warning: Could not load base config for {training.selected_model}: {e}') print('Falling back to custom settings only') # Get size arrays from TrainingSize table (3NF) from models.TrainingSize import TrainingSize def get_size_array(training_id, size_type): """Helper to get size array from TrainingSize table""" sizes = TrainingSize.query.filter_by( training_id=training_id, size_type=size_type ).order_by(TrainingSize.value_order).all() return [s.value for s in sizes] if sizes else None input_size = get_size_array(training.id, 'input_size') test_size = get_size_array(training.id, 'test_size') mosaic_scale = get_size_array(training.id, 'mosaic_scale') mixup_scale = get_size_array(training.id, 'mixup_scale') # Override with user-defined values from training table (only if they exist and are not None) user_overrides = { 'depth': training.depth, 'width': training.width, 'input_size': input_size, 'mosaic_scale': mosaic_scale, 'test_size': test_size, 'enable_mixup': training.enable_mixup, 'max_epoch': training.max_epoch, 'warmup_epochs': training.warmup_epochs, 'warmup_lr': training.warmup_lr, 'basic_lr_per_img': training.basic_lr_per_img, 'scheduler': training.scheduler, 'no_aug_epochs': training.no_aug_epochs, 'min_lr_ratio': training.min_lr_ratio, 'ema': training.ema, 'weight_decay': training.weight_decay, 'momentum': training.momentum, 'print_interval': training.print_interval, 'eval_interval': training.eval_interval, 'test_conf': training.test_conf, 'nms_thre': training.nms_thre, 'mosaic_prob': training.mosaic_prob, 'mixup_prob': training.mixup_prob, 'hsv_prob': training.hsv_prob, 'flip_prob': training.flip_prob, # Convert single values to tuples for YOLOX augmentation parameters 'degrees': (training.degrees, training.degrees) if training.degrees is not None and not isinstance(training.degrees, (list, tuple)) else training.degrees, 'translate': (training.translate, training.translate) if training.translate is not None and not isinstance(training.translate, (list, tuple)) else training.translate, 'shear': (training.shear, training.shear) if training.shear is not None and not isinstance(training.shear, (list, tuple)) else training.shear, 'mixup_scale': mixup_scale, 'activation': training.activation, } # Only override if value is explicitly set (not None) for key, value in user_overrides.items(): if value is not None: config[key] = value # Apply any additional options overrides config.update(options) # Set defaults for any missing required parameters config.setdefault('depth', 1.00) config.setdefault('width', 1.00) config.setdefault('input_size', [640, 640]) config.setdefault('mosaic_scale', [0.1, 2]) config.setdefault('random_size', [10, 20]) config.setdefault('test_size', [640, 640]) config.setdefault('enable_mixup', False) config.setdefault('exp_name', 'inference_exp') # Prepare data_dir for template - escape backslashes and remove trailing separator data_dir_clean = data_dir.rstrip('/\\') data_dir_escaped = data_dir_clean.replace('\\', '\\\\') # Calculate annotations directory (where JSON files are stored) # This is in the output folder, not with the images from models.TrainingProjectDetails import TrainingProjectDetails details = TrainingProjectDetails.query.get(project_details_id) if details: training_project = TrainingProject.query.get(details.project_id) project_name = training_project.title.replace(' ', '_') if training_project and training_project.title else f'project_{details.project_id}' else: project_name = f'project_{project_details_id}' training_folder_name = f"{training.exp_name or training.training_name or 'training'}_{training_id}" training_folder_name = training_folder_name.replace(' ', '_') output_base_path = get_setting('yolox_output_path', './backend') annotations_parent_dir = os.path.join(output_base_path, project_name, training_folder_name) annotations_parent_escaped = annotations_parent_dir.replace('\\', '\\\\') # Build exp content exp_content = f'''#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Copyright (c) Megvii, Inc. and its affiliates. import os from yolox.exp import Exp as MyExp class Exp(MyExp): def __init__(self): super(Exp, self).__init__() self.data_dir = "{data_dir_escaped}" # Where images are located self.annotations_dir = "{annotations_parent_escaped}" # Where annotation JSONs are located self.train_ann = "{train_ann}" self.val_ann = "{val_ann}" self.test_ann = "{test_ann}" self.num_classes = {num_classes} # Disable train2017 subdirectory - our images are directly in data_dir self.name = "" # Set data workers for training self.data_num_workers = 8 ''' # Set pretrained_ckpt if transfer_learning is 'coco' if training.transfer_learning and isinstance(training.transfer_learning, str) and training.transfer_learning.lower() == 'coco': yolox_base_dir = '/home/kitraining/Yolox/YOLOX-main' selected_model = training.selected_model.replace('.pth', '') if training.selected_model else '' if selected_model: exp_content += f" self.pretrained_ckpt = r'{yolox_base_dir}/pretrained/{selected_model}.pth'\n" # Format arrays def format_value(val): if isinstance(val, (list, tuple)): # Convert float values to int for size-related parameters formatted_items = [] for item in val: # Convert to int if it's a whole number float if isinstance(item, float) and item.is_integer(): formatted_items.append(str(int(item))) else: formatted_items.append(str(item)) return '(' + ', '.join(formatted_items) + ')' elif isinstance(val, bool): return str(val) elif isinstance(val, str): return f'"{val}"' elif isinstance(val, float) and val.is_integer(): # Convert whole number floats to ints return str(int(val)) else: return str(val) # Add all config parameters to exp for key, value in config.items(): if key not in ['exp_name']: # exp_name is handled separately exp_content += f" self.{key} = {format_value(value)}\n" # Add get_dataset override using name parameter for image directory exp_content += ''' def get_dataset(self, cache=False, cache_type="ram"): """Override to use name parameter for images directory""" from yolox.data import COCODataset # COCODataset constructs image paths as: os.path.join(data_dir, name, file_name) # YOLOX adds "annotations/" to data_dir automatically, so we pass annotations_dir directly # Use empty string for name since we have absolute paths in JSON return COCODataset( data_dir=self.annotations_dir, json_file=self.train_ann, name="", img_size=self.input_size, preproc=self.preproc if hasattr(self, 'preproc') else None, cache=cache, cache_type=cache_type, ) def get_eval_dataset(self, **kwargs): """Override eval dataset using name parameter""" from yolox.data import COCODataset testdev = kwargs.get("testdev", False) legacy = kwargs.get("legacy", False) return COCODataset( data_dir=self.annotations_dir, json_file=self.val_ann if not testdev else self.test_ann, name="", img_size=self.test_size, preproc=None, # No preprocessing for evaluation ) ''' # Add exp_name at the end (uses dynamic path) exp_content += f''' self.exp_name = os.path.split(os.path.realpath(__file__))[1].split(".")[0] ''' return exp_content def save_yolox_inference_exp(training_id, out_path, options=None): """Save inference exp.py to custom path""" exp_content = generate_yolox_inference_exp(training_id, options, use_base_config=False) with open(out_path, 'w') as f: f.write(exp_content) return out_path