Error Handling
Error handling guide for the Builder API — understand HTTP status codes, error response shapes, and how to implement retry logic and graceful failure handling in your integration.
Proper error handling is crucial for building robust applications with the Synthreo Builder API. This guide covers different types of errors you may encounter, how to handle them gracefully, and best practices for debugging and recovery.
Types of Errors
Section titled “Types of Errors”1. HTTP Status Code Errors
Section titled “1. HTTP Status Code Errors”These are standard HTTP errors returned by the API server before your request is processed.
Authentication Errors (401 Unauthorized)
Section titled “Authentication Errors (401 Unauthorized)”Cause: Invalid or expired JWT token, incorrect credentials.
{ "error": "Invalid credentials"}```text**Common scenarios:**
- JWT token has expired (24-hour lifetime)- Incorrect email, password, or user ID- Missing or malformed Authorization header
**Handling:**
```pythonimport requests
def handle_auth_error(response): if response.status_code == 401: print("Authentication failed. Please check your credentials.") # Re-authenticate and retry new_token = authenticate() return new_token return None
try: response = requests.get(url, headers=headers) if response.status_code == 401: new_token = handle_auth_error(response) if new_token: headers['Authorization'] = f'Bearer {new_token}' response = requests.get(url, headers=headers) # Retryexcept requests.exceptions.HTTPError as e: print(f"HTTP Error: {e}")```text#### Bad Request Errors (400)
**Cause:** Malformed request payload or missing required fields.
```json{ "error": "The request body is malformed or missing required fields"}```text**Common scenarios:**
- Invalid JSON in request body- Missing required parameters (Action, UserSays)- Incorrect data types (string instead of integer)
#### Not Found Errors (404)
**Cause:** Invalid cognitive diagram ID or endpoint URL.
```json{ "error": "Cognitive diagram not found"}```text#### Rate Limiting (429)
**Cause:** Too many requests in a short time period.
**Handling:**
```javascriptasync function makeRequestWithRetry(url, options, maxRetries = 3) { for (let i = 0; i < maxRetries; i++) { try { const response = await fetch(url, options);
if (response.status === 429) { const retryAfter = response.headers.get('Retry-After') || (2 ** i); console.log(`Rate limited. Retrying after ${retryAfter} seconds...`); await new Promise(resolve => setTimeout(resolve, retryAfter * 1000)); continue; }
return response; } catch (error) { if (i === maxRetries - 1) throw error; } }}```text#### Server Errors (500-599)
**Cause:** Internal server issues, temporary outages, or system maintenance.
**Handling:**
```pythonimport timeimport random
def exponential_backoff_retry(func, max_retries=3, base_delay=1): for attempt in range(max_retries): try: return func() except requests.exceptions.HTTPError as e: if 500 <= e.response.status_code < 600: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) + random.uniform(0, 1) print(f"Server error. Retrying in {delay:.2f} seconds...") time.sleep(delay) continue raise raise Exception("Max retries exceeded")```text### 2. Asynchronous Job Status Errors
#### Job Status Codes
When polling job status, different HTTP status codes indicate different states:
| Status Code | Meaning | Action ||-------------|---------|---------|| **202 Accepted** | Job is still running | Continue polling || **200 OK** | Job completed successfully | Process results || **400 Bad Request** | Invalid job ID | Check job ID format || **404 Not Found** | Job not found or expired | Job may have been cleaned up || **500 Internal Server Error** | Job failed due to system error | Check error logs, possibly retry |
#### Job Polling Best Practices
```pythondef poll_job_with_error_handling(client, job_id, max_attempts=120, interval=30): """Poll job status with comprehensive error handling""" attempts = 0 consecutive_errors = 0 max_consecutive_errors = 3
while attempts < max_attempts: try: response = client.get_job_status(job_id) consecutive_errors = 0 # Reset error counter on success
if response.status_code == 202: print(f"Job {job_id} still running (attempt {attempts + 1})") time.sleep(interval) attempts += 1 continue
elif response.status_code == 200: print("Job completed successfully") return response.json()
elif response.status_code == 404: raise JobNotFoundError(f"Job {job_id} not found or expired")
else: raise JobStatusError(f"Unexpected status: {response.status_code}")
except requests.exceptions.RequestException as e: consecutive_errors += 1 print(f"Network error polling job: {e}")
if consecutive_errors >= max_consecutive_errors: raise JobPollingError("Too many consecutive network errors")
# Exponential backoff for network errors error_delay = min(interval * (2 ** consecutive_errors), 300) time.sleep(error_delay) attempts += 1
raise JobTimeoutError(f"Job {job_id} timed out after {max_attempts} attempts")
class JobError(Exception): """Base class for job-related errors""" pass
class JobNotFoundError(JobError): pass
class JobTimeoutError(JobError): pass
class JobStatusError(JobError): pass
class JobPollingError(JobError): pass```text### 3. Cognitive Diagram Execution Errors
Even when the HTTP request succeeds (200 OK), the cognitive diagram execution itself may encounter errors. These are returned in the `errorData` field of the response.
#### Understanding errorData
**Important:** The `errorData` field can contain both actual errors and informational messages. Not all content in `errorData` indicates a failure.
```json{ "result": "OK", "outputData": "Task completed", "errorData": "[{\"message\":\"Processing completed\",\"type\":\"INFO\"}]"}```text#### Common Execution Errors
```json{ "result": "OK", "outputData": "", "errorData": "[{\"message\":\"General error: variable not populated\",\"node_name\":\"Azure OpenAI\",\"node_id\":\"abc-123\",\"type\":\"ERROR\"}]"}```text#### Parsing and Handling Execution Errors
```pythonimport json
def parse_execution_response(api_response): """Parse cognitive diagram execution response with error handling""" try: # Check for successful output first if api_response.get('outputData'): output_data = api_response['outputData']
# Try to parse as JSON try: parsed_output = json.loads(output_data) if isinstance(parsed_output, list) and parsed_output: return {"success": True, "data": parsed_output[0]} elif isinstance(parsed_output, dict): return {"success": True, "data": parsed_output} else: return {"success": True, "data": str(parsed_output)} except json.JSONDecodeError: # Return raw string if not JSON return {"success": True, "data": output_data}
# Check errorData for actual errors if api_response.get('errorData') and api_response['errorData'] != "[]": try: error_list = json.loads(api_response['errorData']) errors = [] warnings = [] info = []
for item in error_list: error_type = item.get('type', 'UNKNOWN').upper() message = item.get('message', 'No message provided') node_name = item.get('node_name', 'Unknown node')
error_info = { 'message': message, 'node_name': node_name, 'type': error_type }
if error_type == 'ERROR': errors.append(error_info) elif error_type == 'WARNING': warnings.append(error_info) else: info.append(error_info)
if errors: return { "success": False, "errors": errors, "warnings": warnings, "info": info } else: # Only warnings/info, treat as success return { "success": True, "data": "Operation completed with warnings", "warnings": warnings, "info": info }
except json.JSONDecodeError: return { "success": False, "errors": [{"message": f"Failed to parse error data: {api_response['errorData']}"}] }
# No output and no errors return {"success": False, "errors": [{"message": "No response generated"}]}
except Exception as e: return {"success": False, "errors": [{"message": f"Response parsing failed: {str(e)}"}]}
def execute_with_error_handling(client, diagram_id, message): try: response = client.execute_diagram(diagram_id, message) result = parse_execution_response(response)
if result['success']: print("Execution successful!") print(f"Result: {result['data']}")
if result.get('warnings'): print("Warnings:") for warning in result['warnings']: print(f" - {warning['node_name']}: {warning['message']}")
else: print("Execution failed!") for error in result['errors']: print(f"Error in {error.get('node_name', 'unknown')}: {error['message']}")
return result
except Exception as e: print(f"Request failed: {e}") return {"success": False, "errors": [{"message": str(e)}]}```text### 4. Training State Errors
#### Invalid State Transitions
When triggering agent training, monitor the `stateId` to ensure proper state transitions:
```pythondef monitor_training_with_error_handling(client, diagram_id, timeout_minutes=60): """Monitor training with comprehensive state error handling""" start_time = time.time() timeout_seconds = timeout_minutes * 60 last_state = None state_change_count = 0 max_state_changes = 10 # Prevent infinite state loops
while time.time() - start_time < timeout_seconds: try: response = client.get_agent_status(diagram_id) agent_data = response.json()
current_state = agent_data.get('stateId') if current_state != last_state: state_change_count += 1 if state_change_count > max_state_changes: raise TrainingError("Too many state changes, possible system instability")
print(f"State changed from {last_state} to {current_state}") last_state = current_state
if current_state == 6: # Training print("Agent is training...") time.sleep(60)
elif current_state == 2: # Idle/Ready print("Training completed successfully!") return {"success": True, "final_state": current_state}
elif current_state in [3, 4, 5]: # Error states error_msg = get_state_error_message(current_state) raise TrainingError(f"Training failed with state {current_state}: {error_msg}")
else: print(f"Unexpected state {current_state}, continuing to monitor...") time.sleep(30)
except requests.exceptions.RequestException as e: print(f"Network error checking training status: {e}") time.sleep(30) # Continue monitoring despite network errors
raise TrainingTimeoutError(f"Training timed out after {timeout_minutes} minutes")
def get_state_error_message(state_id): """Get human-readable error message for state IDs""" error_messages = { 3: "Training failed due to data issues", 4: "Training failed due to configuration error", 5: "Training failed due to system error" } return error_messages.get(state_id, f"Unknown error state: {state_id}")
class TrainingError(Exception): pass
class TrainingTimeoutError(TrainingError): pass```text### 5. Network and Connection Errors
#### Connection Timeouts
```pythonimport requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retry
def create_robust_session(): """Create a requests session with robust retry strategy""" session = requests.Session()
# Define retry strategy retry_strategy = Retry( total=3, # Total number of retries backoff_factor=1, # Delay between retries: 1, 2, 4 seconds status_forcelist=[429, 500, 502, 503, 504], # HTTP status codes to retry method_whitelist=["HEAD", "GET", "OPTIONS", "POST", "PATCH"] )
# Mount adapter with retry strategy adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter)
# Set reasonable timeouts session.timeout = (10, 300) # (connection timeout, read timeout)
return session
session = create_robust_session()try: response = session.post(url, json=payload, headers=headers) response.raise_for_status()except requests.exceptions.Timeout: print("Request timed out")except requests.exceptions.ConnectionError: print("Connection error occurred")except requests.exceptions.RequestException as e: print(f"Request failed: {e}")```text#### DNS and SSL Errors
```pythonimport sslimport socket
def diagnose_connection_error(url): """Diagnose common connection issues""" try: # Test DNS resolution from urllib.parse import urlparse hostname = urlparse(url).hostname socket.gethostbyname(hostname) print(f"DNS resolution successful for {hostname}")
# Test SSL connection (if HTTPS) if url.startswith('https'): context = ssl.create_default_context() with socket.create_connection((hostname, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: print(f"SSL connection successful to {hostname}")
except socket.gaierror as e: print(f"DNS resolution failed: {e}") except ssl.SSLError as e: print(f"SSL error: {e}") except socket.timeout: print("Connection timed out") except Exception as e: print(f"Connection test failed: {e}")```text## Comprehensive Error Handling Strategy
### Complete Error Handling Class
```pythonimport loggingimport timefrom typing import Dict, Any, Optionalfrom enum import Enum
class ErrorSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
class SynthreoErrorHandler: def __init__(self, log_level=logging.INFO): self.logger = logging.getLogger(__name__) self.logger.setLevel(log_level)
# Error counters for monitoring self.error_counts = { 'auth_errors': 0, 'network_errors': 0, 'job_failures': 0, 'execution_errors': 0 }
def handle_api_error(self, error: Exception, context: str = "") -> Dict[str, Any]: """Central error handling method""" error_info = { 'timestamp': time.time(), 'context': context, 'error_type': type(error).__name__, 'message': str(error), 'severity': ErrorSeverity.MEDIUM }
if isinstance(error, requests.exceptions.HTTPError): status_code = error.response.status_code error_info.update(self._handle_http_error(status_code, error))
elif isinstance(error, requests.exceptions.Timeout): error_info.update(self._handle_timeout_error(error))
elif isinstance(error, requests.exceptions.ConnectionError): error_info.update(self._handle_connection_error(error))
elif isinstance(error, (JobError, TrainingError)): error_info.update(self._handle_job_error(error))
else: error_info.update(self._handle_unknown_error(error))
# Log the error self._log_error(error_info)
# Update error counters self._update_error_counts(error_info)
return error_info
def _handle_http_error(self, status_code: int, error: Exception) -> Dict[str, Any]: severity_map = { 400: ErrorSeverity.MEDIUM, 401: ErrorSeverity.HIGH, 403: ErrorSeverity.HIGH, 404: ErrorSeverity.MEDIUM, 429: ErrorSeverity.LOW, 500: ErrorSeverity.HIGH, 502: ErrorSeverity.HIGH, 503: ErrorSeverity.MEDIUM }
retry_map = { 429: True, # Rate limit 500: True, # Internal server error 502: True, # Bad gateway 503: True, # Service unavailable }
return { 'status_code': status_code, 'severity': severity_map.get(status_code, ErrorSeverity.MEDIUM), 'should_retry': retry_map.get(status_code, False), 'retry_delay': self._calculate_retry_delay(status_code) }
def _handle_timeout_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['network_errors'] += 1 return { 'severity': ErrorSeverity.MEDIUM, 'should_retry': True, 'retry_delay': 30 }
def _handle_connection_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['network_errors'] += 1 return { 'severity': ErrorSeverity.HIGH, 'should_retry': True, 'retry_delay': 60 }
def _handle_job_error(self, error: Exception) -> Dict[str, Any]: self.error_counts['job_failures'] += 1 severity = ErrorSeverity.CRITICAL if isinstance(error, JobTimeoutError) else ErrorSeverity.HIGH return { 'severity': severity, 'should_retry': False }
def _handle_unknown_error(self, error: Exception) -> Dict[str, Any]: return { 'severity': ErrorSeverity.MEDIUM, 'should_retry': False }
def _calculate_retry_delay(self, status_code: int) -> int: delay_map = { 429: 60, # Rate limit - wait longer 500: 30, # Server error 502: 15, # Bad gateway 503: 45 # Service unavailable } return delay_map.get(status_code, 30)
def _log_error(self, error_info: Dict[str, Any]): level_map = { ErrorSeverity.LOW: logging.INFO, ErrorSeverity.MEDIUM: logging.WARNING, ErrorSeverity.HIGH: logging.ERROR, ErrorSeverity.CRITICAL: logging.CRITICAL }
level = level_map[error_info['severity']] message = f"[{error_info['context']}] {error_info['error_type']}: {error_info['message']}"
self.logger.log(level, message)
def _update_error_counts(self, error_info: Dict[str, Any]): if 'status_code' in error_info: if error_info['status_code'] == 401: self.error_counts['auth_errors'] += 1
if error_info['error_type'] in ['ConnectionError', 'Timeout']: self.error_counts['network_errors'] += 1
def get_error_summary(self) -> Dict[str, Any]: """Get summary of all errors encountered""" return { 'error_counts': self.error_counts.copy(), 'total_errors': sum(self.error_counts.values()) }
def should_circuit_break(self, error_type: str, threshold: int = 5) -> bool: """Determine if circuit breaker should activate""" return self.error_counts.get(error_type, 0) >= threshold
error_handler = SynthreoErrorHandler()
def robust_api_call(client, operation, *args, **kwargs): max_retries = 3
for attempt in range(max_retries): try: return operation(*args, **kwargs)
except Exception as e: error_info = error_handler.handle_api_error(e, f"Attempt {attempt + 1}")
# Check if we should retry if attempt < max_retries - 1 and error_info.get('should_retry', False): delay = error_info.get('retry_delay', 30) print(f"Retrying in {delay} seconds...") time.sleep(delay) continue else: # Final attempt failed or shouldn't retry raise e
raise Exception("All retry attempts failed")```text## Monitoring and Alerting
### Error Rate Monitoring
```pythondef monitor_error_rates(error_handler: SynthreoErrorHandler, alert_threshold: float = 0.1): """Monitor error rates and trigger alerts""" summary = error_handler.get_error_summary() total_errors = summary['total_errors']
# Calculate error rate (you'd track total requests separately) total_requests = 100 # Example - track this in your application error_rate = total_errors / total_requests if total_requests > 0 else 0
if error_rate > alert_threshold: send_alert(f"High error rate detected: {error_rate:.2%}")
# Check for specific error patterns if summary['error_counts']['auth_errors'] > 5: send_alert("Multiple authentication failures - check credentials")
if summary['error_counts']['network_errors'] > 10: send_alert("Network connectivity issues detected")
def send_alert(message: str): """Send alert (implement your preferred alerting mechanism)""" print(f"ALERT: {message}") # Implement: send email, Slack notification, logging to monitoring system, etc.```text## Best Practices Summary
1. **Implement comprehensive error handling** for all API interactions2. **Use exponential backoff** for retry strategies3. **Monitor error patterns** and rates for early detection of issues4. **Parse errorData carefully** - it contains both errors and informational messages5. **Set appropriate timeouts** for different operation types6. **Log errors with context** for easier debugging7. **Implement circuit breakers** for cascading failure prevention8. **Handle token expiration** gracefully with automatic re-authentication9. **Validate responses** before processing to catch malformed data early10. **Use structured error handling** with proper exception hierarchies