import requests import time from services.settings_service import get_setting def get_api_credentials(): """Get Label Studio API credentials from settings""" api_url = get_setting('labelstudio_api_url', 'http://192.168.1.19:8080/api') api_token = get_setting('labelstudio_api_token', 'c1cef980b7c73004f4ee880a42839313b863869f') return api_url, api_token def fetch_label_studio_project(project_id): """Fetch Label Studio project annotations""" API_URL, API_TOKEN = get_api_credentials() export_url = f'{API_URL}/projects/{project_id}/export?exportType=JSON_MIN' headers = {'Authorization': f'Token {API_TOKEN}'} # Trigger export res = requests.get(export_url, headers=headers) if not res.ok: error_text = res.text if res.text else '' print(f'Failed to trigger export: {res.status_code} {res.reason} - {error_text}') raise Exception(f'Failed to trigger export: {res.status_code} {res.reason}') data = res.json() # If data is an array, it's ready if isinstance(data, list): return data # If not, poll for the export file file_url = data.get('download_url') or data.get('url') tries = 0 while not file_url and tries < 20: time.sleep(2) res = requests.get(export_url, headers=headers) if not res.ok: error_text = res.text if res.text else '' print(f'Failed to poll export: {res.status_code} {res.reason} - {error_text}') raise Exception(f'Failed to poll export: {res.status_code} {res.reason}') data = res.json() file_url = data.get('download_url') or data.get('url') tries += 1 if not file_url: raise Exception('Label Studio export did not become ready') # Download the export file full_url = file_url if file_url.startswith('http') else f"{API_URL.replace('/api', '')}{file_url}" res = requests.get(full_url, headers=headers) if not res.ok: error_text = res.text if res.text else '' print(f'Failed to download export: {res.status_code} {res.reason} - {error_text}') raise Exception(f'Failed to download export: {res.status_code} {res.reason}') return res.json() def fetch_project_ids_and_titles(): """Fetch all Label Studio project IDs and titles""" API_URL, API_TOKEN = get_api_credentials() try: response = requests.get( f'{API_URL}/projects/', headers={ 'Authorization': f'Token {API_TOKEN}', 'Content-Type': 'application/json' } ) if not response.ok: error_text = response.text if response.text else '' print(f'Failed to fetch projects: {response.status_code} {response.reason} - {error_text}') raise Exception(f'HTTP error! status: {response.status_code}') data = response.json() if 'results' not in data or not isinstance(data['results'], list): raise Exception('API response does not contain results array') # Extract id and title from each project projects = [ {'id': project['id'], 'title': project['title']} for project in data['results'] ] print(projects) return projects except Exception as error: print(f'Failed to fetch projects: {error}') return []