Files
Abschluss-Projekt/backend/services/generate_yolox_exp.py
2025-12-08 12:26:34 +01:00

374 lines
16 KiB
Python
Executable File

import os
import shutil
import importlib.util
from models.training import Training
from models.TrainingProject import TrainingProject
def load_base_config(selected_model):
"""Load base configuration for a specific YOLOX model"""
model_name = selected_model.lower().replace('-', '_').replace('.pth', '')
base_config_path = os.path.join(os.path.dirname(__file__), '..', 'data', f'{model_name}.py')
if not os.path.exists(base_config_path):
raise Exception(f'Base configuration not found for model: {model_name} at {base_config_path}')
# Load the module dynamically
spec = importlib.util.spec_from_file_location(f"base_config_{model_name}", base_config_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Extract all attributes from BaseExp class
base_exp = module.BaseExp()
base_config = {}
for attr in dir(base_exp):
if not attr.startswith('_'):
base_config[attr] = getattr(base_exp, attr)
return base_config
def generate_yolox_exp(training_id):
"""Generate YOLOX exp.py file"""
# Fetch training row from DB
training = Training.query.get(training_id)
if not training:
training = Training.query.filter_by(project_details_id=training_id).first()
if not training:
raise Exception(f'Training not found for trainingId or project_details_id: {training_id}')
# If transfer_learning is 'coco', generate exp using base config + custom settings
if training.transfer_learning == 'coco':
exp_content = generate_yolox_inference_exp(training_id, use_base_config=True)
return {'type': 'custom', 'expContent': exp_content}
# If transfer_learning is 'sketch', generate custom exp.py
if training.transfer_learning == 'sketch':
exp_content = generate_yolox_inference_exp(training_id, use_base_config=False)
return {'type': 'custom', 'expContent': exp_content}
raise Exception(f'Unknown transfer_learning type: {training.transfer_learning}')
def save_yolox_exp(training_id, out_path):
"""Save YOLOX exp.py to specified path"""
exp_result = generate_yolox_exp(training_id)
if exp_result['type'] == 'custom' and 'expContent' in exp_result:
with open(out_path, 'w') as f:
f.write(exp_result['expContent'])
return out_path
elif exp_result['type'] == 'default' and 'expPath' in exp_result:
# Optionally copy the file if outPath is different
if exp_result['expPath'] != out_path:
shutil.copyfile(exp_result['expPath'], out_path)
return out_path
else:
raise Exception('Unknown expResult type or missing content')
def generate_yolox_inference_exp(training_id, options=None, use_base_config=False):
"""Generate inference exp.py using DB values
Args:
training_id: The training/project_details ID
options: Optional overrides for data paths
use_base_config: If True, load base config and only override with user-defined values
"""
if options is None:
options = {}
training = Training.query.get(training_id)
if not training:
training = Training.query.filter_by(project_details_id=training_id).first()
if not training:
raise Exception(f'Training not found for trainingId or project_details_id: {training_id}')
# Always use the project_details_id for annotation file names and paths
project_details_id = training.project_details_id
# Get annotation file names from options or use defaults
# Use training.id (not project_details_id) for consistency with generate_training_json
train_ann = options.get('train_ann', f'coco_project_{training_id}_train.json')
val_ann = options.get('val_ann', f'coco_project_{training_id}_valid.json')
test_ann = options.get('test_ann', f'coco_project_{training_id}_test.json')
# Get data_dir - this should point to where IMAGES are located (not annotations)
# YOLOX will combine data_dir + file_name from COCO JSON to find images
# The annotations are in a separate location (output folder)
from services.settings_service import get_setting
from models.TrainingProjectDetails import TrainingProjectDetails
if 'data_dir' in options:
data_dir = options['data_dir']
else:
# Use the yolox_data_dir setting - this is where training images are stored
data_dir = get_setting('yolox_data_dir', '/home/kitraining/To_Annotate/')
# Ensure it ends with a separator
if not data_dir.endswith(os.sep) and not data_dir.endswith('/'):
data_dir += os.sep
# Get num_classes from ProjectClass table (3NF)
num_classes = 80
try:
from models.ProjectClass import ProjectClass
training_project = TrainingProject.query.get(project_details_id)
if training_project:
# Count classes from ProjectClass table
class_count = ProjectClass.query.filter_by(project_id=training_project.project_id).count()
if class_count > 0:
num_classes = class_count
except Exception as e:
print(f'Could not determine num_classes from ProjectClass: {e}')
# Initialize config dictionary
config = {}
# If using base config (transfer learning from COCO), load protected parameters first
if use_base_config and training.selected_model:
try:
base_config = load_base_config(training.selected_model)
config.update(base_config)
print(f'Loaded base config for {training.selected_model}: {list(base_config.keys())}')
except Exception as e:
print(f'Warning: Could not load base config for {training.selected_model}: {e}')
print('Falling back to custom settings only')
# Get size arrays from TrainingSize table (3NF)
from models.TrainingSize import TrainingSize
def get_size_array(training_id, size_type):
"""Helper to get size array from TrainingSize table"""
sizes = TrainingSize.query.filter_by(
training_id=training_id,
size_type=size_type
).order_by(TrainingSize.value_order).all()
return [s.value for s in sizes] if sizes else None
input_size = get_size_array(training.id, 'input_size')
test_size = get_size_array(training.id, 'test_size')
mosaic_scale = get_size_array(training.id, 'mosaic_scale')
mixup_scale = get_size_array(training.id, 'mixup_scale')
# Override with user-defined values from training table (only if they exist and are not None)
user_overrides = {
'depth': training.depth,
'width': training.width,
'input_size': input_size,
'mosaic_scale': mosaic_scale,
'test_size': test_size,
'enable_mixup': training.enable_mixup,
'max_epoch': training.max_epoch,
'warmup_epochs': training.warmup_epochs,
'warmup_lr': training.warmup_lr,
'basic_lr_per_img': training.basic_lr_per_img,
'scheduler': training.scheduler,
'no_aug_epochs': training.no_aug_epochs,
'min_lr_ratio': training.min_lr_ratio,
'ema': training.ema,
'weight_decay': training.weight_decay,
'momentum': training.momentum,
'print_interval': training.print_interval,
'eval_interval': training.eval_interval,
'test_conf': training.test_conf,
'nms_thre': training.nms_thre,
'mosaic_prob': training.mosaic_prob,
'mixup_prob': training.mixup_prob,
'hsv_prob': training.hsv_prob,
'flip_prob': training.flip_prob,
'degrees': training.degrees,
'translate': training.translate,
'shear': training.shear,
'mixup_scale': mixup_scale,
'activation': training.activation,
}
# Only override if value is explicitly set (not None)
for key, value in user_overrides.items():
if value is not None:
config[key] = value
# Apply any additional options overrides
config.update(options)
# Set defaults for any missing required parameters
config.setdefault('depth', 1.00)
config.setdefault('width', 1.00)
config.setdefault('input_size', [640, 640])
config.setdefault('mosaic_scale', [0.1, 2])
config.setdefault('random_size', [10, 20])
config.setdefault('test_size', [640, 640])
config.setdefault('enable_mixup', False)
config.setdefault('exp_name', 'inference_exp')
# Prepare data_dir for template - escape backslashes and remove trailing separator
data_dir_clean = data_dir.rstrip('/\\')
data_dir_escaped = data_dir_clean.replace('\\', '\\\\')
# Calculate annotations directory (where JSON files are stored)
# This is in the output folder, not with the images
from models.TrainingProjectDetails import TrainingProjectDetails
details = TrainingProjectDetails.query.get(project_details_id)
if details:
training_project = TrainingProject.query.get(details.project_id)
project_name = training_project.title.replace(' ', '_') if training_project and training_project.title else f'project_{details.project_id}'
else:
project_name = f'project_{project_details_id}'
training_folder_name = f"{training.exp_name or training.training_name or 'training'}_{training_id}"
training_folder_name = training_folder_name.replace(' ', '_')
output_base_path = get_setting('yolox_output_path', './backend')
annotations_parent_dir = os.path.join(output_base_path, project_name, training_folder_name)
annotations_parent_escaped = annotations_parent_dir.replace('\\', '\\\\')
# Set output directory for checkpoints - models subdirectory
models_dir = os.path.join(annotations_parent_dir, 'models')
models_dir_escaped = models_dir.replace('\\', '\\\\')
# Build exp content
exp_content = f'''#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Copyright (c) Megvii, Inc. and its affiliates.
import os
from yolox.exp import Exp as MyExp
class Exp(MyExp):
def __init__(self):
super(Exp, self).__init__()
self.data_dir = "{data_dir_escaped}" # Where images are located
self.annotations_dir = "{annotations_parent_escaped}" # Where annotation JSONs are located
self.output_dir = "{models_dir_escaped}" # Where checkpoints will be saved
self.train_ann = "{train_ann}"
self.val_ann = "{val_ann}"
self.test_ann = "{test_ann}"
self.num_classes = {num_classes}
# Disable train2017 subdirectory - our images are directly in data_dir
self.name = ""
# Set data workers for training
self.data_num_workers = 8
'''
# Set pretrained_ckpt if transfer_learning is 'coco'
if training.transfer_learning and isinstance(training.transfer_learning, str) and training.transfer_learning.lower() == 'coco':
yolox_base_dir = '/home/kitraining/Yolox/YOLOX-main'
selected_model = training.selected_model.replace('.pth', '') if training.selected_model else ''
if selected_model:
exp_content += f" self.pretrained_ckpt = r'{yolox_base_dir}/pretrained/{selected_model}.pth'\n"
# Format arrays and values for Python code generation
# Integer-only parameters (sizes, epochs, intervals)
integer_params = {
'input_size', 'test_size', 'random_size', 'max_epoch', 'warmup_epochs',
'no_aug_epochs', 'print_interval', 'eval_interval', 'multiscale_range',
'data_num_workers', 'num_classes'
}
def format_value(val, param_name=''):
if isinstance(val, (list, tuple)):
# Check if this parameter should have integer values
if param_name in integer_params:
# Convert all values to integers
formatted_items = [str(int(float(item))) if isinstance(item, (int, float)) else str(item) for item in val]
else:
# Keep as floats or original type
formatted_items = []
for item in val:
if isinstance(item, float):
formatted_items.append(str(item))
elif isinstance(item, int):
formatted_items.append(str(item))
else:
formatted_items.append(str(item))
return '(' + ', '.join(formatted_items) + ')'
elif isinstance(val, bool):
return str(val)
elif isinstance(val, str):
return f'"{val}"'
elif isinstance(val, int):
return str(val)
elif isinstance(val, float):
return str(val)
else:
return str(val)
# Add all config parameters to exp
for key, value in config.items():
if key not in ['exp_name']: # exp_name is handled separately
exp_content += f" self.{key} = {format_value(value, key)}\n"
# Add get_dataset override using name parameter for image directory
exp_content += '''
def get_dataset(self, cache=False, cache_type="ram"):
"""Override to use name parameter for images directory"""
from yolox.data import COCODataset
# COCODataset constructs image paths as: os.path.join(data_dir, name, file_name)
# YOLOX adds "annotations/" to data_dir automatically, so we pass annotations_dir directly
# Use empty string for name since we have absolute paths in JSON
return COCODataset(
data_dir=self.annotations_dir,
json_file=self.train_ann,
name="",
img_size=self.input_size,
preproc=self.preproc if hasattr(self, 'preproc') else None,
cache=cache,
cache_type=cache_type,
)
def get_eval_dataset(self, **kwargs):
"""Override eval dataset using name parameter"""
from yolox.data import COCODataset, ValTransform
testdev = kwargs.get("testdev", False)
legacy = kwargs.get("legacy", False)
return COCODataset(
data_dir=self.annotations_dir,
json_file=self.val_ann if not testdev else self.test_ann,
name="",
img_size=self.test_size,
preproc=ValTransform(legacy=legacy), # Use proper validation transform
)
def get_eval_loader(self, batch_size, is_distributed, **kwargs):
"""Standard YOLOX eval loader - matches official implementation"""
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
valdataset = self.get_eval_dataset(**kwargs)
if is_distributed:
batch_size = batch_size // dist.get_world_size()
sampler = torch.utils.data.distributed.DistributedSampler(
valdataset, shuffle=False
)
else:
sampler = torch.utils.data.SequentialSampler(valdataset)
dataloader_kwargs = {
"num_workers": self.data_num_workers,
"pin_memory": True,
"sampler": sampler,
}
dataloader_kwargs["batch_size"] = batch_size
val_loader = DataLoader(valdataset, **dataloader_kwargs)
return val_loader
'''
# Add exp_name at the end (uses dynamic path)
exp_content += f''' self.exp_name = os.path.split(os.path.realpath(__file__))[1].split(".")[0]
'''
return exp_content
def save_yolox_inference_exp(training_id, out_path, options=None):
"""Save inference exp.py to custom path"""
exp_content = generate_yolox_inference_exp(training_id, options, use_base_config=False)
with open(out_path, 'w') as f:
f.write(exp_content)
return out_path