Orchestrator and Deployment Management
The Orchestrator and Deployment Management subsystem provides the interface for interacting with the Cloudify orchestrator’s REST API surface (/api/v3.1/*). This layer handles the specific authentication contract required by the orchestrator, manages the lifecycle of blueprints and deployments, and executes workflows. It abstracts away the complexities of async operations, such as blueprint uploads and environment creation, by implementing polling mechanisms with appropriate timeouts to handle long-running cold-start scenarios.
Authentication and Client Setup
Section titled “Authentication and Client Setup”The orchestrator client (orch_client.py) serves as the single source of truth for the authentication contract. Unlike the inventory client which may use different endpoints, the orchestrator’s session-service base64-decodes the raw Authorization header and fails on the space inside Bearer . Consequently, every v3.1 request must carry Cookie: session.id=<jwt> instead of an Authorization header 1.
The attach_to_orchestrator function resolves authentication via two paths:
- Browser Session: If a
Sessionobject is provided, it attaches the portal’ssession.idcookie to the client. For portal-scoped JWTs, the system also extracts thecsrfclaim from the JWT and injects it into theCSRF-Tokenheader for all state-changing calls (PUT/POST/PATCH/DELETE) to pass the anti-CSRF gate. - Service Account: If an
admin(BootstrapResult) is provided, the system mints a Bearer JWT via/api/v1/oidc/tokenand writes it into the cookie jar assession.id. Service-account tokens are exempt from the CSRF gate because they have a different scope claim.
The open_orchestrator_client context manager yields an authenticated httpx.Client with a default timeout of 60 seconds. This higher timeout is necessary because the first install of a blueprint on a fresh pod provisions an ansible-core venv and galaxy collections, which can take 8-15 minutes end-to-end.
Blueprint Management
Section titled “Blueprint Management”The blueprint engine (blueprint/client.py) provides a minimal set of functions to drive the standard “upload, deploy, run install, watch the result” loop. It enforces several load-bearing contracts discovered through probing:
- Multipart Upload: Uploading a blueprint requires a multipart form with exactly two fields:
params(a JSON form field containingapplication_file_nameandvisibility) andblueprint_archive(the file) 2. Other field names result in 400 errors, and non-multipart requests result in 422 errors 3. - Tar Structure: The blueprint source directory is tarred and gzipped, rooted at the source directory’s name. Cloudify requires a single top-level directory; flat tars are rejected with “main blueprint file not found” 2.
- Async Upload: The upload returns immediately with
state: uploading. The client polls/api/v3.1/blueprints/<id>untilstate == uploadedor fails.
The upload_blueprint function handles the tar creation, multipart request, and polling for the uploaded state. The list_blueprints and delete_blueprint functions provide read and delete capabilities, with delete_blueprint supporting a force flag to remove blueprints even if deployments exist 4.
Deployment and Workflow Execution
Section titled “Deployment and Workflow Execution”Deployment creation and workflow execution are also asynchronous processes that require careful polling.
- Deployment Creation: Creating a deployment triggers
create_deployment_environmentautomatically. Subsequentexecute(install)calls will 409 if this environment creation is still in flight. Thecreate_deploymentfunction polls the most recentcreate_deployment_environmentexecution until it terminates 2. - Workflow Execution: The
execute_workflowfunction POSTs to/api/v3.1/executionsto start a workflow. Thewait_executionfunction then polls the execution status until it reachesterminated,failed, orcancelledstates. The default timeout for execution is 1800 seconds to accommodate long cold-starts.
The latest_execution function queries the most recent execution for a deployment, optionally filtered by workflow ID, which powers the dap blueprint logs command 5.
Runtime Properties and Secrets
Section titled “Runtime Properties and Secrets”Runtime properties may contain secrets. The orchestrator’s sensitive_keys mechanism masks values in execution logs but NOT in the GET /api/v3.1/node-instances response. To prevent stray secrets from appearing in operator scrollback, the redact_runtime_properties function walks the object and replaces values under keys containing hints like “password”, “secret”, “token”, “credential”, “private_key”, “api_key”, or “auth” with <redacted> 2.
The fetch_runtime_properties function retrieves node instances for a deployment, which can then be passed through redact_runtime_properties before display 6.
"""Orchestrator REST client setup.
Single source of truth for the auth contract on the orchestrator's
``/api/v3.1/*`` surface (blueprints, deployments, executions,
secrets). Two things make this surface different from the
``/api/v2/*`` endpoints that ``inventory/client.py`` talks to:
1. **Authorization vs Cookie.** The orchestrator's session-svc
base64-decodes the raw ``Authorization`` header value and trips
at the space inside ``Bearer ``. Every v3.1 request MUST carry
``Cookie: session.id=<jwt>`` instead. ``attach_to_orchestrator``
below does this regardless of whether the caller arrived via a
browser session (which already has a portal-issued ``session.id``
in the cookie jar) or via a service-account client_credentials
mint (which has to inject the freshly minted Bearer into the
cookie jar under the same name).
2. **Long timeouts.** The first install of a blueprint inside a
fresh deployment provisions an ansible-core venv + galaxy
collections inside the orchestrator's mgmtworker pod. On a cold
pod this takes 8-15 minutes end-to-end. The default httpx
``timeout=10`` cannot cope. Default here is 60s per call, with
the blueprint client polling on top.
"""
from __future__ import annotations
import base64
import json
from collections.abc import Iterator
from contextlib import contextmanager
import httpx
from .auth import mint_onprem_token
from .auth.session import Session
from .bootstrap import BootstrapResult
from .config import ClusterConfig
from .errors import DapError
"""HTTP calls against the orchestrator's ``/api/v3.1/*`` surface.
Function set is intentionally minimal -- everything needed to drive
the standard "upload, deploy, run install, watch the result" loop
from a script. Each call uses :func:`open_orchestrator_client` so
the auth contract (``Cookie: session.id``, never ``Authorization:
Bearer``) is enforced in one place.
The blueprint engine surface has six load-bearing contracts that
took an evening of probing to discover, all locked in by tests in
``tests/test_blueprint_client.py``:
1. **Multipart upload with TWO fields**: ``params`` (JSON form
field with ``application_file_name`` + ``visibility``) AND
``blueprint_archive`` (file). Anything else 422s or 400s.
2. **Tar must root at the source-dir name** (Cloudify wants a
single top-level directory; flat tars get rejected with
"main blueprint file not found").
3. **Blueprint upload is async**: returns immediately with
``state: uploading``; poll for ``state: uploaded`` before
creating a deployment.
4. **Deployment creation is also async**: the orchestrator kicks
off ``create_deployment_environment`` automatically; subsequent
``execute(install)`` calls 409 if create_dep_env is still
running.
5. **Long cold-start**: a fresh deployment provisions an
ansible-core venv + galaxy collections inside mgmtworker on
first install. Cold takes 8-15 minutes; subsequent runs 30-90s.
The default ``wait_execution`` timeout is 1800s.
6. **runtime_properties may carry secrets**: dump them through
:func:`redact_runtime_properties` before printing.
"""
from __future__ import annotations
import json
import tarfile
import tempfile
import time
from pathlib import Path
part. Other field names (``archive``, ``file``, ``resource``,
``content``) 400 with archive-missing. Anything other than
multipart 422s with "Validation error on field: params".
"""
params_payload = json.dumps(
{
"application_file_name": main_file_name,
"visibility": "tenant",
}
)
tar_path = _tar_blueprint(source_dir)
try:
with open(tar_path, "rb") as fh:
archive_bytes = fh.read()
files = [
("params", (None, params_payload, "application/json")),
("blueprint_archive", (tar_path.name, archive_bytes, "application/octet-stream")),
]
with open_orchestrator_client(
config,
session=session,
admin=admin,
insecure=insecure,
timeout=poll_timeout,
) as cx:
r = cx.put(f"/api/v3.1/blueprints/{blueprint_id}", files=files)
if r.status_code not in (200, 201):
raise DapError(f"blueprint upload failed: HTTP {r.status_code} {r.text[:400]}")
return wait_blueprint_uploaded(
config,
blueprint_id,
session=session,
admin=admin,
insecure=insecure,
timeout=poll_timeout,
)
finally:
tar_path.unlink(missing_ok=True)
admin=admin,
insecure=insecure,
) as cx:
r = cx.delete(url)
if r.status_code not in (200, 204):
raise DapError(f"delete blueprint {blueprint_id!r} failed: HTTP {r.status_code} {r.text[:300]}")
def create_deployment(
config: ClusterConfig,
deployment_id: str,
blueprint_id: str,
inputs: dict[str, Any],
*,
session: Session | None = None,
admin: BootstrapResult | None = None,
insecure: bool = True,
wait: bool = True,
timeout: float = DEFAULT_DEPLOY_TIMEOUT,
) -> dict[str, Any]:
"""PUT ``/api/v3.1/deployments/<id>``, then (by default) poll
``create_deployment_environment`` until terminated. Without that
wait, subsequent ``execute_workflow`` calls 409 with the env
creation still in flight.
"""
body = {
"blueprint_id": blueprint_id,
"inputs": inputs,
"visibility": "tenant",
}
with open_orchestrator_client(
config,
session=session,
admin=admin,
insecure=insecure,
) as cx:
r = cx.put(f"/api/v3.1/deployments/{deployment_id}", json=body)
if r.status_code not in (200, 201):
raise DapError(f"create deployment {deployment_id!r} failed: HTTP {r.status_code} {r.text[:400]}")
if wait:
) as cx:
r = cx.post("/api/v3.1/executions", json=body)
if r.status_code not in (200, 201):
raise DapError(
f"start workflow {workflow_id!r} on {deployment_id!r} failed: HTTP {r.status_code} {r.text[:400]}"
)
return r.json()
def wait_execution(
config: ClusterConfig,
execution_id: str,
*,
session: Session | None = None,
admin: BootstrapResult | None = None,
insecure: bool = True,
timeout: float = DEFAULT_EXEC_TIMEOUT,
) -> dict[str, Any]:
"""Poll ``/api/v3.1/executions/<id>`` until terminated/failed/cancelled."""
deadline = time.time() + timeout
with open_orchestrator_client(
config,
session=session,
admin=admin,
insecure=insecure,
) as cx:
while time.time() < deadline:
r = cx.get(f"/api/v3.1/executions/{execution_id}")
if r.status_code != 200:
raise DapError(f"execution status check failed: HTTP {r.status_code} {r.text[:300]}")
body = r.json()
status = body.get("status")
if status == "terminated":
return body
if status in ("failed", "cancelled"):
raise DapError(
f"execution {execution_id} ended in {status}: {body.get('error_message') or '<no message>'}"
)
time.sleep(POLL_SECONDS)
raise DapError(f"execution {execution_id} did not terminate within {timeout}s")
)
if r.status_code != 200:
raise DapError(f"node-instances fetch failed: HTTP {r.status_code} {r.text[:300]}")
return r.json().get("items", [])