Files
Abschluss-Projekt/backend/services/fetch_labelstudio.py
2025-11-28 12:50:27 +01:00

86 lines
3.0 KiB
Python

import requests
import time
API_URL = 'http://192.168.1.19:8080/api'
API_TOKEN = 'c1cef980b7c73004f4ee880a42839313b863869f'
def fetch_label_studio_project(project_id):
"""Fetch Label Studio project annotations"""
export_url = f'{API_URL}/projects/{project_id}/export?exportType=JSON_MIN'
headers = {'Authorization': f'Token {API_TOKEN}'}
# Trigger export
res = requests.get(export_url, headers=headers)
if not res.ok:
error_text = res.text if res.text else ''
print(f'Failed to trigger export: {res.status_code} {res.reason} - {error_text}')
raise Exception(f'Failed to trigger export: {res.status_code} {res.reason}')
data = res.json()
# If data is an array, it's ready
if isinstance(data, list):
return data
# If not, poll for the export file
file_url = data.get('download_url') or data.get('url')
tries = 0
while not file_url and tries < 20:
time.sleep(2)
res = requests.get(export_url, headers=headers)
if not res.ok:
error_text = res.text if res.text else ''
print(f'Failed to poll export: {res.status_code} {res.reason} - {error_text}')
raise Exception(f'Failed to poll export: {res.status_code} {res.reason}')
data = res.json()
file_url = data.get('download_url') or data.get('url')
tries += 1
if not file_url:
raise Exception('Label Studio export did not become ready')
# Download the export file
full_url = file_url if file_url.startswith('http') else f"{API_URL.replace('/api', '')}{file_url}"
res = requests.get(full_url, headers=headers)
if not res.ok:
error_text = res.text if res.text else ''
print(f'Failed to download export: {res.status_code} {res.reason} - {error_text}')
raise Exception(f'Failed to download export: {res.status_code} {res.reason}')
return res.json()
def fetch_project_ids_and_titles():
"""Fetch all Label Studio project IDs and titles"""
try:
response = requests.get(
f'{API_URL}/projects/',
headers={
'Authorization': f'Token {API_TOKEN}',
'Content-Type': 'application/json'
}
)
if not response.ok:
error_text = response.text if response.text else ''
print(f'Failed to fetch projects: {response.status_code} {response.reason} - {error_text}')
raise Exception(f'HTTP error! status: {response.status_code}')
data = response.json()
if 'results' not in data or not isinstance(data['results'], list):
raise Exception('API response does not contain results array')
# Extract id and title from each project
projects = [
{'id': project['id'], 'title': project['title']}
for project in data['results']
]
print(projects)
return projects
except Exception as error:
print(f'Failed to fetch projects: {error}')
return []