API Error Handling - Synthreo Builder
Error handling guide for the Builder API - understand HTTP status codes, error response shapes, and how to implement retry logic and graceful failure handling in your integration.
Proper error handling is crucial for building robust applications with the Synthreo Builder API. This guide covers different types of errors you may encounter, how to handle them gracefully, and best practices for debugging and recovery.
Types of Errors
Section titled “Types of Errors”1. HTTP Status Code Errors
Section titled “1. HTTP Status Code Errors”These are standard HTTP errors returned by the API server before your request is processed.
Authentication Errors (401 Unauthorized)
Section titled “Authentication Errors (401 Unauthorized)”Cause: Invalid or expired JWT token, incorrect credentials.
{ "error": "Invalid credentials"}Common scenarios:
- JWT token has expired (24-hour lifetime)
- Incorrect email, password, or user ID
- Missing or malformed Authorization header
Handling:
import requests
def handle_auth_error(response): if response.status_code == 401: print("Authentication failed. Please check your credentials.") # Re-authenticate and retry new_token = authenticate() return new_token return None
try: response = requests.get(url, headers=headers) if response.status_code == 401: new_token = handle_auth_error(response) if new_token: headers['Authorization'] = f'Bearer {new_token}' response = requests.get(url, headers=headers) # Retryexcept requests.exceptions.HTTPError as e: print(f"HTTP Error: {e}")Bad Request Errors (400)
Section titled “Bad Request Errors (400)”Cause: Malformed request payload or missing required fields.
{ "error": "The request body is malformed or missing required fields"}Common scenarios:
- Invalid JSON in request body
- Missing required parameters (
Action,UserSays) - Incorrect data types (string instead of integer)
Not Found Errors (404)
Section titled “Not Found Errors (404)”Cause: Invalid cognitive diagram ID or endpoint URL.
{ "error": "Cognitive diagram not found"}Rate Limiting (429)
Section titled “Rate Limiting (429)”Cause: Too many requests in a short time period.
Handling:
async function makeRequestWithRetry(url, options, maxRetries = 3) { for (let i = 0; i < maxRetries; i++) { try { const response = await fetch(url, options);
if (response.status === 429) { const retryAfter = response.headers.get('Retry-After') || (2 ** i); console.log(`Rate limited. Retrying after ${retryAfter} seconds...`); await new Promise(resolve => setTimeout(resolve, retryAfter * 1000)); continue; }
return response; } catch (error) { if (i === maxRetries - 1) throw error; } }}Server Errors (500-599)
Section titled “Server Errors (500-599)”Cause: Internal server issues, temporary outages, or system maintenance.
Handling:
import timeimport random
def exponential_backoff_retry(func, max_retries=3, base_delay=1): for attempt in range(max_retries): try: return func() except requests.exceptions.HTTPError as e: if 500 <= e.response.status_code < 600: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) + random.uniform(0, 1) print(f"Server error. Retrying in {delay:.2f} seconds...") time.sleep(delay) continue raise raise Exception("Max retries exceeded")2. Asynchronous Job Status Errors
Section titled “2. Asynchronous Job Status Errors”Job Status Codes
Section titled “Job Status Codes”When polling job status, different HTTP status codes indicate different states:
| Status Code | Meaning | Action |
|---|---|---|
202 Accepted | Job is still running | Continue polling |
200 OK | Job completed successfully | Process results |
400 Bad Request | Invalid job ID | Check job ID format |
404 Not Found | Job not found or expired | Job may have been cleaned up |
500 Internal Server Error | Job failed due to system error | Check error logs, possibly retry |
Job Polling Best Practices
Section titled “Job Polling Best Practices”def poll_job_with_error_handling(client, job_id, max_attempts=120, interval=30): """Poll job status with comprehensive error handling""" attempts = 0 consecutive_errors = 0 max_consecutive_errors = 3
while attempts < max_attempts: try: response = client.get_job_status(job_id) consecutive_errors = 0 # Reset error counter on success
if response.status_code == 202: print(f"Job {job_id} still running (attempt {attempts + 1})") time.sleep(interval) attempts += 1 continue
elif response.status_code == 200: print("Job completed successfully") return response.json()
elif response.status_code == 404: raise JobNotFoundError(f"Job {job_id} not found or expired")
else: raise JobStatusError(f"Unexpected status: {response.status_code}")
except requests.exceptions.RequestException as e: consecutive_errors += 1 print(f"Network error polling job: {e}")
if consecutive_errors >= max_consecutive_errors: raise JobPollingError("Too many consecutive network errors")
# Exponential backoff for network errors error_delay = min(interval * (2 ** consecutive_errors), 300) time.sleep(error_delay) attempts += 1
raise JobTimeoutError(f"Job {job_id} timed out after {max_attempts} attempts")
class JobError(Exception): """Base class for job-related errors""" pass
class JobNotFoundError(JobError): pass
class JobTimeoutError(JobError): pass
class JobStatusError(JobError): pass
class JobPollingError(JobError): pass3. Cognitive Diagram Execution Errors
Section titled “3. Cognitive Diagram Execution Errors”Even when the HTTP request succeeds (200 OK), the cognitive diagram execution itself may encounter errors. These are returned in the errorData field of the response.
Understanding errorData
Section titled “Understanding errorData”Important: The errorData field can contain both actual errors and informational messages. Not all content in errorData indicates a failure.
{ "result": "OK", "outputData": "Task completed", "errorData": "[{\"message\":\"Processing completed\",\"type\":\"INFO\"}]"}Common Execution Errors
Section titled “Common Execution Errors”{ "result": "OK", "outputData": "", "errorData": "[{\"message\":\"General error: variable not populated\",\"node_name\":\"Azure OpenAI\",\"node_id\":\"abc-123\",\"type\":\"ERROR\"}]"}Parsing and Handling Execution Errors
Section titled “Parsing and Handling Execution Errors”import json
def parse_execution_response(api_response): """Parse cognitive diagram execution response with error handling""" try: # Check for successful output first if api_response.get('outputData'): output_data = api_response['outputData']
# Try to parse as JSON try: parsed_output = json.loads(output_data) if isinstance(parsed_output, list) and parsed_output: return {"success": True, "data": parsed_output[0]} elif isinstance(parsed_output, dict): return {"success": True, "data": parsed_output} else: return {"success": True, "data": str(parsed_output)} except json.JSONDecodeError: # Return raw string if not JSON return {"success": True, "data": output_data}
# Check errorData for actual errors if api_response.get('errorData') and api_response['errorData'] != "[]": try: error_list = json.loads(api_response['errorData']) errors = [] warnings = [] info = []
for item in error_list: error_type = item.get('type', 'UNKNOWN').upper() message = item.get('message', 'No message provided') node_name = item.get('node_name', 'Unknown node')
error_info = { 'message': message, 'node_name': node_name, 'type': error_type }
if error_type == 'ERROR': errors.append(error_info) elif error_type == 'WARNING': warnings.append(error_info) else: info.append(error_info)
if errors: return { "success": False, "errors": errors, "warnings": warnings, "info": info } else: # Only warnings/info, treat as success return { "success": True, "data": "Operation completed with warnings", "warnings": warnings, "info": info }
except json.JSONDecodeError: return { "success": False, "errors": [{"message": f"Failed to parse error data: {api_response['errorData']}"}] }
# No output and no errors return {"success": False, "errors": [{"message": "No response generated"}]}
except Exception as e: return {"success": False, "errors": [{"message": f"Response parsing failed: {str(e)}"}]}
def execute_with_error_handling(client, diagram_id, message): try: response = client.execute_diagram(diagram_id, message) result = parse_execution_response(response)
if result['success']: print("Execution successful!") print(f"Result: {result['data']}")
if result.get('warnings'): print("Warnings:") for warning in result['warnings']: print(f" - {warning['node_name']}: {warning['message']}")
else: print("Execution failed!") for error in result['errors']: print(f"Error in {error.get('node_name', 'unknown')}: {error['message']}")
return result
except Exception as e: print(f"Request failed: {e}") return {"success": False, "errors": [{"message": str(e)}]}4. Training State Errors
Section titled “4. Training State Errors”Invalid State Transitions
Section titled “Invalid State Transitions”When triggering agent training, monitor the stateId to ensure proper state transitions:
def monitor_training_with_error_handling(client, diagram_id, timeout_minutes=60): """Monitor training with comprehensive state error handling""" start_time = time.time() timeout_seconds = timeout_minutes * 60 last_state = None state_change_count = 0 max_state_changes = 10 # Prevent infinite state loops
while time.time() - start_time < timeout_seconds: try: response = client.get_agent_status(diagram_id) agent_data = response.json()
current_state = agent_data.get('stateId') if current_state != last_state: state_change_count += 1 if state_change_count > max_state_changes: raise TrainingError("Too many state changes, possible system instability")
print(f"State changed from {last_state} to {current_state}") last_state = current_state
if current_state == 6: # Training print("Agent is training...") time.sleep(60)
elif current_state == 2: # Idle/Ready print("Training completed successfully!") return {"success": True, "final_state": current_state}
elif current_state in [3, 4, 5]: # Error states error_msg = get_state_error_message(current_state) raise TrainingError(f"Training failed with state {current_state}: {error_msg}")
else: print(f"Unexpected state {current_state}, continuing to monitor...") time.sleep(30)
except requests.exceptions.RequestException as e: print(f"Network error checking training status: {e}") time.sleep(30) # Continue monitoring despite network errors
raise TrainingTimeoutError(f"Training timed out after {timeout_minutes} minutes")
def get_state_error_message(state_id): """Get human-readable error message for state IDs""" error_messages = { 3: "Training failed due to data issues", 4: "Training failed due to configuration error", 5: "Training failed due to system error" } return error_messages.get(state_id, f"Unknown error state: {state_id}")
class TrainingError(Exception): pass
class TrainingTimeoutError(TrainingError): pass5. Network and Connection Errors
Section titled “5. Network and Connection Errors”Connection Timeouts
Section titled “Connection Timeouts”import requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retry
def create_robust_session(): """Create a requests session with robust retry strategy""" session = requests.Session()
# Define retry strategy retry_strategy = Retry( total=3, # Total number of retries backoff_factor=1, # Delay between retries: 1, 2, 4 seconds status_forcelist=[429, 500, 502, 503, 504], # HTTP status codes to retry method_whitelist=["HEAD", "GET", "OPTIONS", "POST", "PATCH"] )
# Mount adapter with retry strategy adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter)
# Set reasonable timeouts session.timeout = (10, 300) # (connection timeout, read timeout)
return session
session = create_robust_session()try: response = session.post(url, json=payload, headers=headers) response.raise_for_status()except requests.exceptions.Timeout: print("Request timed out")except requests.exceptions.ConnectionError: print("Connection error occurred")except requests.exceptions.RequestException as e: print(f"Request failed: {e}")DNS and SSL Errors
Section titled “DNS and SSL Errors”import sslimport socket
def diagnose_connection_error(url): """Diagnose common connection issues""" try: # Test DNS resolution from urllib.parse import urlparse hostname = urlparse(url).hostname socket.gethostbyname(hostname) print(f"DNS resolution successful for {hostname}")
# Test SSL connection (if HTTPS) if url.startswith('https'): context = ssl.create_default_context() with socket.create_connection((hostname, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: print(f"SSL connection successful to {hostname}")
except socket.gaierror as e: print(f"DNS resolution failed: {e}") except ssl.SSLError as e: print(f"SSL error: {e}") except socket.timeout: print("Connection timed out") except Exception as e: print(f"Connection test failed: {e}")Comprehensive Error Handling Strategy
Section titled “Comprehensive Error Handling Strategy”Complete Error Handling Class
Section titled “Complete Error Handling Class”import loggingimport timefrom typing import Dict, Any, Optionalfrom enum import Enum
class ErrorSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
class SynthreoErrorHandler: def __init__(self, log_level=logging.INFO): self.logger = logging.getLogger(__name__) self.logger.setLevel(log_level)
# Error counters for monitoring self.error_counts = { 'auth_errors': 0, 'network_errors': 0, 'job_failures': 0, 'execution_errors': 0 }
def handle_api_error(self, error: Exception, context: str = "") -> Dict[str, Any]: """Central error handling method""" error_info = { 'timestamp': time.time(), 'context': context, 'error_type': type(error).__name__, 'message': str(error), 'severity': ErrorSeverity.MEDIUM }
if isinstance(error, requests.exceptions.HTTPError): status_code = error.response.status_code error_info.update(self._handle_http_error(status_code, error))
elif isinstance(error, requests.exceptions.Timeout): error_info.update(self._handle_timeout_error(error))
elif isinstance(error, requests.exceptions.ConnectionError): error_info.update(self._handle_connection_error(error))
elif isinstance(error, (JobError, TrainingError)): error_info.update(self._handle_job_error(error))
else: error_info.update(self._handle_unknown_error(error))
# Log the error self._log_error(error_info)
# Update error counters self._update_error_counts(error_info)
return error_info
def _handle_http_error(self, status_code: int, error: Exception) -> Dict[str, Any]: severity_map = { 400: ErrorSeverity.MEDIUM, 401: ErrorSeverity.HIGH, 403: ErrorSeverity.HIGH, 404: ErrorSeverity.MEDIUM, 429: ErrorSeverity.LOW, 500: ErrorSeverity.HIGH, 502: ErrorSeverity.HIGH, 503: ErrorSeverity.MEDIUM }
retry_map = { 429: True, # Rate limit 500: True, # Internal server error 502: True, # Bad gateway 503: True, # Service unavailable }
return { 'status_code': status_code, 'severity': severity_map.get(status_code, ErrorSeverity.MEDIUM), 'should_retry': retry_map.get(status_code, False), 'retry_delay': self._calculate_retry_delay(status_code) }
def _handle_timeout_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['network_errors'] += 1 return { 'severity': ErrorSeverity.MEDIUM, 'should_retry': True, 'retry_delay': 30 }
def _handle_connection_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['network_errors'] += 1 return { 'severity': ErrorSeverity.HIGH, 'should_retry': True, 'retry_delay': 60 }
def _handle_job_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['job_failures'] += 1 severity = ErrorSeverity.CRITICAL if isinstance(error, JobTimeoutError) else ErrorSeverity.HIGH return { 'severity': severity, 'should_retry': False }
def _handle_unknown_error(self, error: Exception) -> Dict[str, Any]: return { 'severity': ErrorSeverity.MEDIUM, 'should_retry': False }
def _calculate_retry_delay(self, status_code: int) -> int: delay_map = { 429: 60, # Rate limit - wait longer 500: 30, # Server error 502: 15, # Bad gateway 503: 45 # Service unavailable } return delay_map.get(status_code, 30)
def _log_error(self, error_info: Dict[str, Any]): level_map = { ErrorSeverity.LOW: logging.INFO, ErrorSeverity.MEDIUM: logging.WARNING, ErrorSeverity.HIGH: logging.ERROR, ErrorSeverity.CRITICAL: logging.CRITICAL }
level = level_map[error_info['severity']] message = f"[{error_info['context']}] {error_info['error_type']}: {error_info['message']}"
self.logger.log(level, message)
def _update_error_counts(self, error_info: Dict[str, Any]): if 'status_code' in error_info: if error_info['status_code'] == 401: self.error_counts['auth_errors'] += 1
if error_info['error_type'] in ['ConnectionError', 'Timeout']: self.error_counts['network_errors'] += 1
def get_error_summary(self) -> Dict[str, Any]: """Get summary of all errors encountered""" return { 'error_counts': self.error_counts.copy(), 'total_errors': sum(self.error_counts.values()) }
def should_circuit_break(self, error_type: str, threshold: int = 5) -> bool: """Determine if circuit breaker should activate""" return self.error_counts.get(error_type, 0) >= threshold
error_handler = SynthreoErrorHandler()
def robust_api_call(client, operation, *args, **kwargs): max_retries = 3
for attempt in range(max_retries): try: return operation(*args, **kwargs)
except Exception as e: error_info = error_handler.handle_api_error(e, f"Attempt {attempt + 1}")
# Check if we should retry if attempt < max_retries - 1 and error_info.get('should_retry', False): delay = error_info.get('retry_delay', 30) print(f"Retrying in {delay} seconds...") time.sleep(delay) continue else: # Final attempt failed or shouldn't retry raise e
raise Exception("All retry attempts failed")Monitoring and Alerting
Section titled “Monitoring and Alerting”Error Rate Monitoring
Section titled “Error Rate Monitoring”def monitor_error_rates(error_handler: SynthreoErrorHandler, alert_threshold: float = 0.1): """Monitor error rates and trigger alerts""" summary = error_handler.get_error_summary() total_errors = summary['total_errors']
# Calculate error rate (you'd track total requests separately) total_requests = 100 # Example - track this in your application error_rate = total_errors / total_requests if total_requests > 0 else 0
if error_rate > alert_threshold: send_alert(f"High error rate detected: {error_rate:.2%}")
# Check for specific error patterns if summary['error_counts']['auth_errors'] > 5: send_alert("Multiple authentication failures - check credentials")
if summary['error_counts']['network_errors'] > 10: send_alert("Network connectivity issues detected")
def send_alert(message: str): """Send alert (implement your preferred alerting mechanism)""" print(f"ALERT: {message}") # Implement: send email, Slack notification, logging to monitoring system, etc.Best Practices Summary
Section titled “Best Practices Summary”- Implement comprehensive error handling for all API interactions
- Use exponential backoff for retry strategies
- Monitor error patterns and rates for early detection of issues
- Parse errorData carefully - it contains both errors and informational messages
- Set appropriate timeouts for different operation types
- Log errors with context for easier debugging
- Implement circuit breakers for cascading failure prevention
- Handle token expiration gracefully with automatic re-authentication
- Validate responses before processing to catch malformed data early
- Use structured error handling with proper exception hierarchies
Related pages:
- Best Practices - broader security and performance guidance
- Cognitive Diagrams API - understanding
errorDatain responses - Authentication - handling 401 errors and token refresh