Agent skill
Harness API Operations
Comprehensive guide for Harness Platform API authentication, rate limiting, error handling, and endpoint operations
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/harness-api-operations
SKILL.md
Harness API Operations Skill
When to Use This Skill
Use this skill when you need to:
- Authenticate with Harness APIs using API keys or service account tokens
- Handle API rate limits and implement exponential backoff strategies
- Process API errors with proper retry logic and graceful degradation
- Interact with Harness endpoints for pipelines, templates, services, environments
- Implement API clients with best practices for production systems
- Debug API integration issues and understand error responses
- Build automation around Harness Platform APIs
Perfect for:
- Building CI/CD automation with Harness
- Creating infrastructure templates programmatically
- Implementing GitOps workflows
- Building custom Harness integrations
- Automating pipeline and service management
- Creating monitoring and reporting tools
- Troubleshooting API connectivity issues
Core Capabilities
1. API Authentication Methods
Master three authentication approaches:
- API Key Authentication: Header-based authentication using
x-api-key - Service Account Tokens: OAuth 2.0 bearer tokens for service accounts
- Personal Access Tokens: User-scoped tokens for personal automation
- Environment Variables: Secure credential storage patterns
- Token Rotation: Implementing credential refresh strategies
- Multi-Account Management: Handling multiple Harness accounts
2. Rate Limit Management
Implement robust rate limit handling:
- Read Operations: 1000 requests per minute
- Write Operations: 300 requests per minute
- Exponential Backoff: Progressive retry delays with jitter
- Rate Limit Headers: Parsing
X-RateLimit-*headers - Request Queuing: Managing request bursts
- Concurrent Request Limits: Throttling parallel operations
3. Error Handling Strategies
Comprehensive error management:
- HTTP Status Codes: Understanding 4xx and 5xx responses
- Retry Logic: Implementing smart retry with circuit breakers
- Error Correlation: Using correlation IDs for debugging
- Graceful Degradation: Fallback strategies for partial failures
- Error Logging: Structured error logging for observability
- Timeout Management: Request and connection timeout strategies
4. Harness API Endpoints
Complete endpoint reference:
- Pipeline API: Create, update, execute, and monitor pipelines
- Template API: Manage pipeline and step templates
- Service API: CRUD operations for services
- Environment API: Environment and infrastructure management
- IaCM API: Infrastructure as Code Management workspace operations
- Connector API: Managing cloud provider and tool connectors
- Secrets API: Secret management operations
- Project API: Project and organization management
5. Request/Response Patterns
API communication patterns:
- YAML Body Formatting: Proper YAML serialization for Harness objects
- Pagination Handling: Managing large result sets
- Filtering and Sorting: Query parameter patterns
- Bulk Operations: Batch processing strategies
- Webhook Integration: Event-driven patterns
- Streaming Responses: Handling long-running operations
Authentication Patterns
Authentication Matrix
| Method | Use Case | Header Format | Scope | Rotation |
|---|---|---|---|---|
| API Key | Service automation | x-api-key: {key} |
Account/Org/Project | Manual |
| Service Account Token | CI/CD integration | Authorization: Bearer {token} |
Configurable RBAC | Automatic |
| Personal Access Token | User automation | x-api-key: {token} |
User permissions | Manual |
| OAuth 2.0 Client | Third-party apps | Authorization: Bearer {token} |
Delegated | Automatic |
Authentication Code Examples
Example 1: API Key Authentication (Python)
python
import requests
import os
from typing import Optional, Dict, Any
class HarnessClient:
"""Harness API client with API key authentication."""
def __init__(
self,
account_id: str,
api_key: Optional[str] = None,
base_url: str = "https://app.harness.io"
):
"""
Initialize Harness API client.
Args:
account_id: Harness account identifier
api_key: API key (defaults to HARNESS_API_KEY env var)
base_url: Harness API base URL
"""
self.account_id = account_id
self.api_key = api_key or os.getenv("HARNESS_API_KEY")
self.base_url = base_url
if not self.api_key:
raise ValueError("API key must be provided or set in HARNESS_API_KEY")
def _get_headers(self) -> Dict[str, str]:
"""Generate request headers with authentication."""
return {
"x-api-key": self.api_key,
"Content-Type": "application/yaml",
"Accept": "application/json"
}
def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> requests.Response:
"""
Make authenticated API request.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path
**kwargs: Additional requests arguments
Returns:
Response object
"""
url = f"{self.base_url}{endpoint}"
headers = self._get_headers()
# Add account identifier to query params
params = kwargs.get("params", {})
params["accountIdentifier"] = self.account_id
kwargs["params"] = params
response = requests.request(
method=method,
url=url,
headers=headers,
**kwargs
)
return response
def get_pipeline(
self,
org_id: str,
project_id: str,
pipeline_id: str
) -> Dict[str, Any]:
"""
Retrieve pipeline by identifier.
Args:
org_id: Organization identifier
project_id: Project identifier
pipeline_id: Pipeline identifier
Returns:
Pipeline configuration dictionary
"""
endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}"
params = {
"orgIdentifier": org_id,
"projectIdentifier": project_id
}
response = self._make_request("GET", endpoint, params=params)
response.raise_for_status()
return response.json()
# Usage
client = HarnessClient(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
pipeline = client.get_pipeline(
org_id="default",
project_id="my_project",
pipeline_id="my_pipeline"
)
print(f"Pipeline: {pipeline['data']['name']}")
Example 2: Service Account Token Authentication (Node.js)
javascript
const axios = require('axios');
class HarnessClient {
/**
* Harness API client with service account token authentication.
*/
constructor(accountId, options = {}) {
this.accountId = accountId;
this.baseURL = options.baseURL || 'https://app.harness.io';
this.token = options.token || process.env.HARNESS_TOKEN;
if (!this.token) {
throw new Error('Service account token must be provided or set in HARNESS_TOKEN');
}
this.client = axios.create({
baseURL: this.baseURL,
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/yaml',
'Accept': 'application/json'
}
});
}
/**
* Add account identifier to request params.
*/
_addAccountParam(params = {}) {
return {
...params,
accountIdentifier: this.accountId
};
}
/**
* Get pipeline by identifier.
*/
async getPipeline(orgId, projectId, pipelineId) {
const endpoint = `/pipeline/api/pipelines/v2/${pipelineId}`;
const params = this._addAccountParam({
orgIdentifier: orgId,
projectIdentifier: projectId
});
try {
const response = await this.client.get(endpoint, { params });
return response.data;
} catch (error) {
console.error('Failed to fetch pipeline:', error.message);
throw error;
}
}
/**
* Execute pipeline with runtime inputs.
*/
async executePipeline(orgId, projectId, pipelineId, runtimeInputs = {}) {
const endpoint = `/pipeline/api/pipelines/v2/execute/${pipelineId}`;
const params = this._addAccountParam({
orgIdentifier: orgId,
projectIdentifier: projectId
});
try {
const response = await this.client.post(
endpoint,
{ runtimeInputYaml: runtimeInputs },
{ params }
);
return response.data;
} catch (error) {
console.error('Failed to execute pipeline:', error.message);
throw error;
}
}
}
// Usage
const client = new HarnessClient('your_account_id', {
token: process.env.HARNESS_TOKEN
});
(async () => {
try {
const pipeline = await client.getPipeline('default', 'my_project', 'my_pipeline');
console.log(`Pipeline: ${pipeline.data.name}`);
const execution = await client.executePipeline('default', 'my_project', 'my_pipeline', {
environment: 'production'
});
console.log(`Execution ID: ${execution.data.planExecution.uuid}`);
} catch (error) {
console.error('Error:', error);
}
})();
Example 3: Environment Variable Best Practices
bash
# .env file (never commit to version control)
HARNESS_ACCOUNT_ID=your_account_id
HARNESS_API_KEY=pat.your_account_id.your_token
HARNESS_BASE_URL=https://app.harness.io
# For service account tokens
HARNESS_TOKEN=your_service_account_token
# Organization and project defaults
HARNESS_ORG_ID=default
HARNESS_PROJECT_ID=my_project
# Optional: Multiple environments
HARNESS_API_KEY_DEV=pat.dev_account.dev_token
HARNESS_API_KEY_STAGING=pat.staging_account.staging_token
HARNESS_API_KEY_PROD=pat.prod_account.prod_token
python
# Loading environment variables securely
from dotenv import load_dotenv
import os
from typing import Optional
load_dotenv() # Load .env file
class HarnessConfig:
"""Centralized Harness configuration from environment variables."""
def __init__(self, environment: str = "prod"):
self.environment = environment
self.account_id = self._get_required("HARNESS_ACCOUNT_ID")
self.base_url = os.getenv("HARNESS_BASE_URL", "https://app.harness.io")
self.org_id = os.getenv("HARNESS_ORG_ID", "default")
self.project_id = os.getenv("HARNESS_PROJECT_ID")
# Environment-specific API key
env_key = f"HARNESS_API_KEY_{environment.upper()}"
self.api_key = os.getenv(env_key) or self._get_required("HARNESS_API_KEY")
def _get_required(self, key: str) -> str:
"""Get required environment variable or raise error."""
value = os.getenv(key)
if not value:
raise ValueError(f"Required environment variable {key} is not set")
return value
# Usage
config = HarnessConfig(environment="prod")
client = HarnessClient(
account_id=config.account_id,
api_key=config.api_key,
base_url=config.base_url
)
Rate Limit Management
Rate Limit Rules
| Operation Type | Limit | Window | Reset Behavior |
|---|---|---|---|
| Read Operations | 1000 requests | 1 minute | Rolling window |
| Write Operations | 300 requests | 1 minute | Rolling window |
| Template Operations | 100 requests | 1 minute | Rolling window |
| Webhook Triggers | 500 requests | 1 minute | Rolling window |
Rate Limit Response Headers
X-RateLimit-Limit: 1000 # Total limit for the window
X-RateLimit-Remaining: 847 # Remaining requests
X-RateLimit-Reset: 1642694400 # Unix timestamp when limit resets
Retry-After: 30 # Seconds to wait before retry (429 response)
Rate Limit Handling Examples
Example 1: Exponential Backoff with Jitter (Python)
python
import time
import random
from typing import Callable, Any, Optional
import requests
from functools import wraps
class RateLimitError(Exception):
"""Rate limit exceeded error."""
pass
def exponential_backoff_retry(
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
"""
Decorator for exponential backoff retry with jitter.
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay between retries
jitter: Add random jitter to prevent thundering herd
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
retries = 0
while retries <= max_retries:
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Rate limit exceeded
if retries == max_retries:
raise RateLimitError(
f"Rate limit exceeded after {max_retries} retries"
)
# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** retries), max_delay)
# Add jitter to prevent synchronized retries
if jitter:
delay = delay * (0.5 + random.random())
# Check if server provided Retry-After header
retry_after = e.response.headers.get('Retry-After')
if retry_after:
try:
delay = float(retry_after)
except ValueError:
pass # Use calculated delay
print(f"Rate limited. Retrying in {delay:.2f} seconds...")
time.sleep(delay)
retries += 1
else:
raise # Re-raise non-rate-limit errors
raise RateLimitError(f"Max retries ({max_retries}) exceeded")
return wrapper
return decorator
class HarnessClientWithRetry(HarnessClient):
"""Harness client with automatic retry logic."""
@exponential_backoff_retry(max_retries=5, base_delay=2.0)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make request with automatic retry on rate limit."""
response = super()._make_request(method, endpoint, **kwargs)
# Log rate limit status
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
limit = int(response.headers['X-RateLimit-Limit'])
if remaining < limit * 0.1: # Less than 10% remaining
print(f"Warning: Rate limit low ({remaining}/{limit} remaining)")
response.raise_for_status()
return response
# Usage
client = HarnessClientWithRetry(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
# Automatic retry on rate limit
pipeline = client.get_pipeline("default", "my_project", "my_pipeline")
Example 2: Request Queue with Rate Limiting (Node.js)
javascript
const pQueue = require('p-queue').default;
class RateLimitedHarnessClient extends HarnessClient {
/**
* Harness client with request queuing and rate limiting.
*/
constructor(accountId, options = {}) {
super(accountId, options);
// Create separate queues for read and write operations
this.readQueue = new pQueue({
concurrency: 10, // Max concurrent read requests
interval: 60000, // 1 minute window
intervalCap: 900 // Max 900 requests per minute (leaving buffer)
});
this.writeQueue = new pQueue({
concurrency: 5, // Max concurrent write requests
interval: 60000, // 1 minute window
intervalCap: 270 // Max 270 requests per minute (leaving buffer)
});
}
/**
* Make rate-limited request using appropriate queue.
*/
async _makeRateLimitedRequest(method, endpoint, options = {}) {
const isWriteOperation = ['POST', 'PUT', 'PATCH', 'DELETE'].includes(method.toUpperCase());
const queue = isWriteOperation ? this.writeQueue : this.readQueue;
return queue.add(async () => {
try {
const response = await this.client.request({
method,
url: endpoint,
...options
});
// Log rate limit status
const remaining = response.headers['x-ratelimit-remaining'];
const limit = response.headers['x-ratelimit-limit'];
if (remaining && limit) {
const percentRemaining = (remaining / limit) * 100;
if (percentRemaining < 10) {
console.warn(`Rate limit warning: ${remaining}/${limit} remaining (${percentRemaining.toFixed(1)}%)`);
}
}
return response;
} catch (error) {
if (error.response?.status === 429) {
const retryAfter = error.response.headers['retry-after'] || 30;
console.log(`Rate limited. Waiting ${retryAfter}s before retry...`);
// Wait and retry
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return this._makeRateLimitedRequest(method, endpoint, options);
}
throw error;
}
});
}
/**
* Get pipeline with rate limiting.
*/
async getPipeline(orgId, projectId, pipelineId) {
const endpoint = `/pipeline/api/pipelines/v2/${pipelineId}`;
const params = this._addAccountParam({
orgIdentifier: orgId,
projectIdentifier: projectId
});
const response = await this._makeRateLimitedRequest('GET', endpoint, { params });
return response.data;
}
/**
* Batch get multiple pipelines with rate limiting.
*/
async getPipelines(orgId, projectId, pipelineIds) {
const promises = pipelineIds.map(pipelineId =>
this.getPipeline(orgId, projectId, pipelineId)
);
return Promise.all(promises);
}
}
// Usage
const client = new RateLimitedHarnessClient('your_account_id', {
token: process.env.HARNESS_TOKEN
});
(async () => {
// Fetch 100 pipelines - automatically queued and rate-limited
const pipelineIds = Array.from({ length: 100 }, (_, i) => `pipeline_${i}`);
const pipelines = await client.getPipelines('default', 'my_project', pipelineIds);
console.log(`Fetched ${pipelines.length} pipelines`);
})();
Example 3: Circuit Breaker Pattern
python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
import requests
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker for API calls to prevent cascade failures.
"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: type = requests.exceptions.HTTPError
):
"""
Initialize circuit breaker.
Args:
failure_threshold: Number of failures before opening circuit
timeout: Seconds to wait before attempting recovery
expected_exception: Exception type to catch
"""
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with circuit breaker protection.
Args:
func: Function to execute
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If circuit is open or function fails
"""
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
# Try to recover
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: Attempting recovery (HALF_OPEN)")
else:
raise Exception(
f"Circuit breaker OPEN. Service unavailable. "
f"Retry after {self.timeout}s"
)
try:
result = func(*args, **kwargs)
# Success - reset failure count
if self.state == CircuitState.HALF_OPEN:
print("Circuit breaker: Recovery successful (CLOSED)")
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(
f"Circuit breaker: Threshold reached ({self.failure_count} failures). "
f"Circuit OPEN"
)
raise
class HarnessClientWithCircuitBreaker(HarnessClientWithRetry):
"""Harness client with circuit breaker pattern."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60
)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make request with circuit breaker protection."""
return self.circuit_breaker.call(
super()._make_request,
method,
endpoint,
**kwargs
)
# Usage
client = HarnessClientWithCircuitBreaker(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
# Protected by circuit breaker
try:
pipeline = client.get_pipeline("default", "my_project", "my_pipeline")
except Exception as e:
print(f"Request failed: {e}")
Error Handling
HTTP Status Code Reference
| Status Code | Meaning | Retry Strategy | Common Causes |
|---|---|---|---|
| 400 Bad Request | Invalid request format | Do not retry | Malformed YAML, missing required fields |
| 401 Unauthorized | Authentication failed | Do not retry | Invalid or expired API key/token |
| 403 Forbidden | Insufficient permissions | Do not retry | Missing RBAC permissions |
| 404 Not Found | Resource doesn't exist | Do not retry | Wrong identifier or deleted resource |
| 409 Conflict | Resource conflict | Retry after resolution | Concurrent modification |
| 422 Unprocessable Entity | Validation failed | Do not retry | Invalid field values |
| 429 Too Many Requests | Rate limit exceeded | Retry with backoff | Too many requests |
| 500 Internal Server Error | Server error | Retry with backoff | Temporary server issue |
| 502 Bad Gateway | Gateway error | Retry with backoff | Proxy or load balancer issue |
| 503 Service Unavailable | Service overloaded | Retry with backoff | Temporary unavailability |
| 504 Gateway Timeout | Request timeout | Retry with backoff | Long-running operation |
Error Response Format
json
{
"status": "ERROR",
"code": "INVALID_REQUEST",
"message": "Pipeline with identifier 'invalid_pipeline' does not exist",
"correlationId": "abc123-def456-ghi789",
"detailedMessage": "The requested pipeline 'invalid_pipeline' was not found in project 'my_project' under organization 'default'",
"responseMessages": [
{
"code": "RESOURCE_NOT_FOUND",
"level": "ERROR",
"message": "Pipeline not found",
"failureTypes": ["APPLICATION_ERROR"]
}
]
}
Error Handling Examples
Example 1: Comprehensive Error Handler (Python)
python
import logging
from typing import Optional, Dict, Any
from enum import Enum
import requests
class ErrorSeverity(Enum):
"""Error severity levels."""
TRANSIENT = "transient" # Temporary, retry
PERMANENT = "permanent" # Permanent, do not retry
FATAL = "fatal" # Fatal, terminate
class HarnessAPIError(Exception):
"""Base exception for Harness API errors."""
def __init__(
self,
message: str,
status_code: int,
response: Optional[Dict[str, Any]] = None,
correlation_id: Optional[str] = None
):
self.message = message
self.status_code = status_code
self.response = response or {}
self.correlation_id = correlation_id
super().__init__(self.message)
@property
def severity(self) -> ErrorSeverity:
"""Determine error severity based on status code."""
if self.status_code in [429, 500, 502, 503, 504]:
return ErrorSeverity.TRANSIENT
elif self.status_code in [400, 401, 403, 404, 422]:
return ErrorSeverity.PERMANENT
else:
return ErrorSeverity.FATAL
@property
def should_retry(self) -> bool:
"""Determine if error should be retried."""
return self.severity == ErrorSeverity.TRANSIENT
class HarnessErrorHandler:
"""Centralized error handling for Harness API operations."""
def __init__(self):
self.logger = logging.getLogger(__name__)
def handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
Handle API response and raise appropriate errors.
Args:
response: Requests response object
Returns:
Response data dictionary
Raises:
HarnessAPIError: For any error responses
"""
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
error_data = self._extract_error_data(response)
correlation_id = response.headers.get('X-Correlation-Id')
# Log error with context
self.logger.error(
f"Harness API error: {error_data['message']} "
f"(Status: {response.status_code}, "
f"Correlation ID: {correlation_id})"
)
raise HarnessAPIError(
message=error_data['message'],
status_code=response.status_code,
response=error_data,
correlation_id=correlation_id
) from e
def _extract_error_data(self, response: requests.Response) -> Dict[str, Any]:
"""Extract error information from response."""
try:
data = response.json()
# Handle different error response formats
if isinstance(data, dict):
return {
'message': data.get('message', 'Unknown error'),
'code': data.get('code', 'UNKNOWN'),
'detailed_message': data.get('detailedMessage'),
'response_messages': data.get('responseMessages', [])
}
except ValueError:
pass
# Fallback to status code message
return {
'message': response.reason,
'code': str(response.status_code),
'detailed_message': response.text[:500] if response.text else None,
'response_messages': []
}
class RobustHarnessClient(HarnessClientWithRetry):
"""Harness client with comprehensive error handling."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.error_handler = HarnessErrorHandler()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make request with error handling."""
try:
response = super()._make_request(method, endpoint, **kwargs)
return self.error_handler.handle_response(response)
except HarnessAPIError as e:
if e.status_code == 404:
# Handle specific error cases
self.logger.warning(f"Resource not found: {endpoint}")
raise
elif e.status_code == 401:
self.logger.error("Authentication failed. Check API key/token.")
raise
elif e.status_code == 403:
self.logger.error(
f"Insufficient permissions for {method} {endpoint}. "
f"Check RBAC settings."
)
raise
else:
raise
# Usage with error handling
client = RobustHarnessClient(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
try:
pipeline = client.get_pipeline("default", "my_project", "my_pipeline")
print(f"Successfully fetched pipeline: {pipeline['data']['name']}")
except HarnessAPIError as e:
if e.severity == ErrorSeverity.PERMANENT:
print(f"Permanent error (do not retry): {e.message}")
print(f"Correlation ID: {e.correlation_id}")
elif e.severity == ErrorSeverity.TRANSIENT:
print(f"Transient error (safe to retry): {e.message}")
else:
print(f"Fatal error: {e.message}")
raise
Example 2: Graceful Degradation Pattern
python
from typing import Optional, List, Dict, Any
import logging
class PipelineService:
"""
Service layer with graceful degradation for pipeline operations.
"""
def __init__(self, client: RobustHarnessClient):
self.client = client
self.logger = logging.getLogger(__name__)
self.cache = {} # Simple in-memory cache
def get_pipeline_with_fallback(
self,
org_id: str,
project_id: str,
pipeline_id: str,
use_cache: bool = True
) -> Optional[Dict[str, Any]]:
"""
Get pipeline with fallback to cached version on error.
Args:
org_id: Organization identifier
project_id: Project identifier
pipeline_id: Pipeline identifier
use_cache: Whether to use cached fallback
Returns:
Pipeline data or None if unavailable
"""
cache_key = f"{org_id}/{project_id}/{pipeline_id}"
try:
# Try to fetch from API
pipeline = self.client.get_pipeline(org_id, project_id, pipeline_id)
# Update cache on success
if use_cache:
self.cache[cache_key] = pipeline
return pipeline
except HarnessAPIError as e:
if e.severity == ErrorSeverity.TRANSIENT and use_cache:
# Fall back to cache for transient errors
cached_pipeline = self.cache.get(cache_key)
if cached_pipeline:
self.logger.warning(
f"API error, using cached pipeline data: {e.message}"
)
return cached_pipeline
else:
self.logger.error(
f"API error and no cached data available: {e.message}"
)
return None
else:
# Permanent errors or cache disabled
self.logger.error(f"Failed to fetch pipeline: {e.message}")
return None
def batch_get_pipelines(
self,
org_id: str,
project_id: str,
pipeline_ids: List[str],
fail_fast: bool = False
) -> List[Optional[Dict[str, Any]]]:
"""
Batch fetch pipelines with error isolation.
Args:
org_id: Organization identifier
project_id: Project identifier
pipeline_ids: List of pipeline identifiers
fail_fast: Stop on first error if True
Returns:
List of pipeline data (None for failed fetches)
"""
results = []
errors = []
for pipeline_id in pipeline_ids:
try:
pipeline = self.get_pipeline_with_fallback(
org_id, project_id, pipeline_id
)
results.append(pipeline)
except Exception as e:
errors.append({
'pipeline_id': pipeline_id,
'error': str(e)
})
if fail_fast:
raise
else:
# Continue processing remaining pipelines
results.append(None)
# Log summary
success_count = sum(1 for r in results if r is not None)
self.logger.info(
f"Batch fetch complete: {success_count}/{len(pipeline_ids)} successful"
)
if errors:
self.logger.warning(f"Errors encountered: {len(errors)}")
for error in errors:
self.logger.warning(
f" - {error['pipeline_id']}: {error['error']}"
)
return results
# Usage
client = RobustHarnessClient(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
service = PipelineService(client)
# Single pipeline with fallback
pipeline = service.get_pipeline_with_fallback("default", "my_project", "my_pipeline")
if pipeline:
print(f"Pipeline: {pipeline['data']['name']}")
else:
print("Pipeline unavailable")
# Batch fetch with error isolation
pipeline_ids = ["pipeline1", "pipeline2", "pipeline3"]
pipelines = service.batch_get_pipelines("default", "my_project", pipeline_ids)
print(f"Fetched {sum(1 for p in pipelines if p)} pipelines")
Harness API Endpoints
Complete Endpoint Reference
Pipeline API
Base: /pipeline/api/pipelines/v2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{pipelineId} |
GET | Get pipeline by ID | Read |
/ |
POST | Create pipeline | Write |
/{pipelineId} |
PUT | Update pipeline | Write |
/{pipelineId} |
DELETE | Delete pipeline | Write |
/execute/{pipelineId} |
POST | Execute pipeline | Write |
/execution/{executionId} |
GET | Get execution status | Read |
/list |
POST | List pipelines (with filters) | Read |
/{pipelineId}/summary |
GET | Get pipeline summary | Read |
Template API
Base: /template/api/templates
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{templateId} |
GET | Get template by ID | Read |
/ |
POST | Create template | Write |
/{templateId} |
PUT | Update template | Write |
/{templateId} |
DELETE | Delete template | Write |
/list |
POST | List templates | Read |
/{templateId}/versions |
GET | Get template versions | Read |
Service API
Base: /ng/api/servicesV2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{serviceId} |
GET | Get service by ID | Read |
/ |
POST | Create service | Write |
/{serviceId} |
PUT | Update service | Write |
/{serviceId} |
DELETE | Delete service | Write |
/list |
GET | List services | Read |
Environment API
Base: /ng/api/environmentsV2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{environmentId} |
GET | Get environment by ID | Read |
/ |
POST | Create environment | Write |
/{environmentId} |
PUT | Update environment | Write |
/{environmentId} |
DELETE | Delete environment | Write |
/list |
GET | List environments | Read |
IaCM API
Base: /iacm/api/workspaces
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{workspaceId} |
GET | Get workspace by ID | Read |
/ |
POST | Create workspace | Write |
/{workspaceId} |
PUT | Update workspace | Write |
/{workspaceId} |
DELETE | Delete workspace | Write |
/{workspaceId}/runs |
POST | Trigger workspace run | Write |
/{workspaceId}/runs/{runId} |
GET | Get run status | Read |
Connector API
Base: /ng/api/connectors
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
/{connectorId} |
GET | Get connector by ID | Read |
/ |
POST | Create connector | Write |
/{connectorId} |
PUT | Update connector | Write |
/{connectorId} |
DELETE | Delete connector | Write |
/{connectorId}/test-connection |
POST | Test connector | Write |
/list |
GET | List connectors | Read |
Endpoint Usage Examples
Example 1: Complete Pipeline CRUD Operations
python
from typing import Dict, Any, Optional
class PipelineManager:
"""Manager for pipeline CRUD operations."""
def __init__(self, client: RobustHarnessClient, org_id: str, project_id: str):
self.client = client
self.org_id = org_id
self.project_id = project_id
def create_pipeline(self, pipeline_yaml: str) -> Dict[str, Any]:
"""
Create a new pipeline.
Args:
pipeline_yaml: Pipeline YAML definition
Returns:
Created pipeline data
"""
endpoint = "/pipeline/api/pipelines/v2"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
response = self.client._make_request(
"POST",
endpoint,
params=params,
data=pipeline_yaml,
headers={"Content-Type": "application/yaml"}
)
return response
def get_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
"""
Get pipeline by identifier.
Args:
pipeline_id: Pipeline identifier
Returns:
Pipeline data
"""
endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
response = self.client._make_request("GET", endpoint, params=params)
return response
def update_pipeline(self, pipeline_id: str, pipeline_yaml: str) -> Dict[str, Any]:
"""
Update existing pipeline.
Args:
pipeline_id: Pipeline identifier
pipeline_yaml: Updated pipeline YAML
Returns:
Updated pipeline data
"""
endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
response = self.client._make_request(
"PUT",
endpoint,
params=params,
data=pipeline_yaml,
headers={"Content-Type": "application/yaml"}
)
return response
def delete_pipeline(self, pipeline_id: str) -> bool:
"""
Delete pipeline.
Args:
pipeline_id: Pipeline identifier
Returns:
True if deleted successfully
"""
endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
self.client._make_request("DELETE", endpoint, params=params)
return True
def execute_pipeline(
self,
pipeline_id: str,
runtime_inputs: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Execute pipeline with optional runtime inputs.
Args:
pipeline_id: Pipeline identifier
runtime_inputs: Runtime input values
Returns:
Execution data with execution ID
"""
endpoint = f"/pipeline/api/pipelines/v2/execute/{pipeline_id}"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
body = {}
if runtime_inputs:
import yaml
body["runtimeInputYaml"] = yaml.dump(runtime_inputs)
response = self.client._make_request(
"POST",
endpoint,
params=params,
json=body
)
return response
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""
Get pipeline execution status.
Args:
execution_id: Execution identifier
Returns:
Execution status data
"""
endpoint = f"/pipeline/api/pipelines/v2/execution/{execution_id}"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id
}
response = self.client._make_request("GET", endpoint, params=params)
return response
def list_pipelines(
self,
filters: Optional[Dict[str, Any]] = None,
page: int = 0,
size: int = 50
) -> Dict[str, Any]:
"""
List pipelines with optional filters.
Args:
filters: Filter criteria
page: Page number (0-indexed)
size: Page size
Returns:
Paginated list of pipelines
"""
endpoint = "/pipeline/api/pipelines/v2/list"
params = {
"orgIdentifier": self.org_id,
"projectIdentifier": self.project_id,
"page": page,
"size": size
}
body = filters or {}
response = self.client._make_request(
"POST",
endpoint,
params=params,
json=body
)
return response
# Usage
client = RobustHarnessClient(
account_id="your_account_id",
api_key=os.getenv("HARNESS_API_KEY")
)
manager = PipelineManager(client, "default", "my_project")
# Create pipeline
pipeline_yaml = """
pipeline:
identifier: my_new_pipeline
name: My New Pipeline
projectIdentifier: my_project
orgIdentifier: default
stages:
- stage:
name: Deploy
identifier: deploy
type: Deployment
"""
created = manager.create_pipeline(pipeline_yaml)
print(f"Created pipeline: {created['data']['identifier']}")
# Execute pipeline
execution = manager.execute_pipeline("my_new_pipeline", {
"environment": "production"
})
execution_id = execution['data']['planExecution']['uuid']
print(f"Started execution: {execution_id}")
# Monitor execution
status = manager.get_execution_status(execution_id)
print(f"Execution status: {status['data']['pipelineExecutionSummary']['status']}")
# List all pipelines
pipelines = manager.list_pipelines(page=0, size=100)
print(f"Found {len(pipelines['data']['content'])} pipelines")
# Update pipeline
updated_yaml = pipeline_yaml.replace("My New Pipeline", "My Updated Pipeline")
updated = manager.update_pipeline("my_new_pipeline", updated_yaml)
print(f"Updated pipeline: {updated['data']['name']}")
# Delete pipeline
deleted = manager.delete_pipeline("my_new_pipeline")
print(f"Deleted: {deleted}")
Request/Response Patterns
YAML Body Formatting
Harness APIs accept YAML for resource definitions. Proper formatting is critical.
python
import yaml
from typing import Dict, Any
def create_pipeline_yaml(config: Dict[str, Any]) -> str:
"""
Generate properly formatted pipeline YAML.
Args:
config: Pipeline configuration dictionary
Returns:
YAML string
"""
pipeline_dict = {
"pipeline": {
"identifier": config["identifier"],
"name": config["name"],
"projectIdentifier": config["project_id"],
"orgIdentifier": config["org_id"],
"stages": config.get("stages", [])
}
}
# Use safe_dump with proper formatting
return yaml.safe_dump(
pipeline_dict,
default_flow_style=False,
sort_keys=False,
indent=2
)
# Usage
config = {
"identifier": "my_pipeline",
"name": "My Pipeline",
"org_id": "default",
"project_id": "my_project",
"stages": [
{
"stage": {
"name": "Build",
"identifier": "build",
"type": "CI"
}
}
]
}
pipeline_yaml = create_pipeline_yaml(config)
print(pipeline_yaml)
Pagination Handling
python
from typing import Iterator, Dict, Any
def paginate_pipelines(
manager: PipelineManager,
page_size: int = 100
) -> Iterator[Dict[str, Any]]:
"""
Generator to iterate through all pipelines with pagination.
Args:
manager: PipelineManager instance
page_size: Number of items per page
Yields:
Pipeline data dictionaries
"""
page = 0
while True:
response = manager.list_pipelines(page=page, size=page_size)
pipelines = response['data']['content']
if not pipelines:
break
for pipeline in pipelines:
yield pipeline
# Check if there are more pages
total_pages = response['data']['totalPages']
if page >= total_pages - 1:
break
page += 1
# Usage
for pipeline in paginate_pipelines(manager, page_size=50):
print(f"Pipeline: {pipeline['identifier']}")
Best Practices
1. Secure Credential Management
Do:
- Store API keys in environment variables or secrets managers
- Rotate credentials regularly
- Use service accounts with minimal required permissions
- Implement credential refresh for long-running services
Don't:
- Commit credentials to version control
- Share credentials across environments
- Use overly permissive API keys
- Log credentials in error messages
2. Rate Limit Optimization
Do:
- Implement request queuing for batch operations
- Use exponential backoff with jitter
- Monitor rate limit headers proactively
- Cache frequently accessed data
- Use webhooks instead of polling when possible
Don't:
- Ignore rate limit headers
- Retry immediately after 429
- Make unnecessary duplicate requests
- Use fixed retry intervals
3. Error Handling Strategy
Do:
- Log errors with correlation IDs
- Implement circuit breakers for cascading failures
- Use structured error logging
- Provide fallback mechanisms
- Distinguish between transient and permanent errors
Don't:
- Swallow errors silently
- Retry on permanent errors (4xx)
- Expose internal error details to end users
- Assume all 5xx errors are retryable
4. API Request Optimization
Do:
- Use batch endpoints when available
- Request only needed fields
- Implement client-side caching
- Use conditional requests (ETags)
- Compress request/response bodies
Don't:
- Fetch entire resource graphs unnecessarily
- Poll for status updates too frequently
- Make sequential requests that could be parallel
- Ignore pagination for large result sets
5. Monitoring and Observability
Do:
- Track API latency and error rates
- Monitor rate limit usage
- Log correlation IDs for debugging
- Set up alerts for error thresholds
- Use structured logging
Don't:
- Rely on client-side logs alone
- Ignore slow API responses
- Skip correlation ID tracking
- Log sensitive data
Related Skills
- [[infrastructure-template-generator]] - Generate Harness templates programmatically
- [[harness-pipeline-builder]] - Build pipelines using API
- [[error-recovery-patterns]] - Advanced error recovery strategies
- [[api-client-design]] - Design robust API clients
- [[rate-limit-strategies]] - Advanced rate limiting patterns
- [[authentication-patterns]] - Secure authentication implementations
- [[observability-patterns]] - Monitoring and logging best practices
Troubleshooting
Common Issues and Solutions
Issue: Authentication Failures
Symptoms:
- 401 Unauthorized responses
- "Invalid API key" errors
Solutions:
- Verify API key format:
pat.{account_id}.{token} - Check API key permissions and scope
- Ensure account ID matches API key
- Verify token hasn't expired
- Check for leading/trailing whitespace in credentials
Issue: Rate Limit Exceeded
Symptoms:
- 429 Too Many Requests
- Retry-After headers in responses
Solutions:
- Implement exponential backoff with jitter
- Use request queuing
- Cache frequently accessed data
- Batch operations when possible
- Monitor rate limit headers proactively
Issue: Intermittent 500/502/503 Errors
Symptoms:
- Occasional server errors
- Gateway timeout responses
Solutions:
- Implement retry logic with backoff
- Use circuit breaker pattern
- Add request timeout configuration
- Check Harness status page for incidents
- Contact Harness support with correlation IDs
Issue: YAML Parsing Errors
Symptoms:
- 400 Bad Request
- "Invalid YAML" error messages
Solutions:
- Validate YAML syntax before sending
- Check for special characters needing escaping
- Ensure proper indentation
- Use YAML libraries instead of string concatenation
- Validate against Harness schema
Version: 1.0.0 Last Updated: 2026-01-19 Skill Type: API Integration & Operations Complexity: Intermediate to Advanced
Didn't find tool you were looking for?