For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a self-service Encryption Key Management page to Organization Settings, gated by a new is_infra_admin flag, with DDB+SSM CRUD for Azure Key Vault credentials.
Architecture: Orthogonal is_infra_admin boolean on organization_x_members table (clj-pg-wrapper), exposed via JWT claim. platform-api gets 4 new /api/v1/admin/encryption/* endpoints using DDB+SSM hybrid storage (mirrors client_api_keys). Frontend adds /settings/encryption page with Configuration + Status tabs, plus a checkbox in the existing admin edit-member modal.
Tech Stack: Python/FastAPI (clj-pg-wrapper, platform-api), TypeScript/React (platform-frontend), Terraform/HCL, Azure SDK (azure-identity, azure-keyvault-keys), DynamoDB, SSM Parameter Store.
Spec: docs/superpowers/specs/2026-04-17-eng-1131-encryption-key-mgmt-ui-design.md
Branch: feature/ENG-1131-encryption-key-mgmt-ui (same name across all repos)
Repo: /Users/admin/dev/cap/clj-pg-wrapper
Files:
-
Modify:
src/models/organization.py:42-51(OrganizationMember model) -
Modify:
data/clj-db-schema.sql(authoritative schema) -
Modify:
src/schemas/organization.py(response schemas) -
Step 1: Add column to SQL schema
In data/clj-db-schema.sql, find the organization_x_members table definition and add:
is_infra_admin BOOLEAN NOT NULL DEFAULT FALSE- Step 2: Add field to SQLModel
In src/models/organization.py, add to OrganizationMember:
class OrganizationMember(SQLModel, table=True):
"""Organization member relationship model."""
__tablename__ = "organization_x_members"
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(foreign_key="organizations.id", nullable=False)
user_id: UUID = Field(foreign_key="users.id", nullable=False)
role: Optional[str] = Field(default="member")
is_infra_admin: bool = Field(default=False)- Step 3: Add to response schema
In src/schemas/organization.py, find OrganizationMemberResponse and add is_infra_admin: bool = False.
- Step 4: Run the live database migration
Since clj-pg-wrapper uses raw SQL (not alembic), run directly against the Postgres DB:
ALTER TABLE organization_x_members ADD COLUMN IF NOT EXISTS is_infra_admin BOOLEAN NOT NULL DEFAULT FALSE;This is a non-locking DDL on Postgres for adding a column with a default — safe on a live table.
- Step 5: Verify tests pass
cd /Users/admin/dev/cap/clj-pg-wrapper
poetry run pytest -x -q- Step 6: Commit
git add src/models/organization.py data/clj-db-schema.sql src/schemas/organization.py
git commit -m "feat(ENG-1131): add is_infra_admin column to organization_x_members"Repo: /Users/admin/dev/cap/clj-pg-wrapper
Files:
-
Modify:
src/routes/organizations.py(PATCH member endpoint) -
Modify:
src/routes/auth.pyor wherever/meis defined (add claim) -
Create:
src/tests/test_infra_admin.py -
Step 1: Write failing test for PATCH member
Create src/tests/test_infra_admin.py:
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_patch_member_infra_admin(client: AsyncClient, test_org_member):
"""PATCH /api/v1/organizations/{org_id}/members/{member_id} with is_infra_admin."""
org_id, member_id = test_org_member
response = await client.patch(
f"/api/v1/organizations/{org_id}/members/{member_id}",
json={"is_infra_admin": True},
)
assert response.status_code == 200
data = response.json()
assert data["is_infra_admin"] is True
@pytest.mark.asyncio
async def test_get_member_includes_infra_admin(client: AsyncClient, test_org_member):
"""GET member should include is_infra_admin field."""
org_id, member_id = test_org_member
response = await client.get(f"/api/v1/organizations/{org_id}/members/{member_id}")
assert response.status_code == 200
assert "is_infra_admin" in response.json()
assert response.json()["is_infra_admin"] is False- Step 2: Run test to verify it fails
cd /Users/admin/dev/cap/clj-pg-wrapper
poetry run pytest src/tests/test_infra_admin.py -vExpected: FAIL (endpoint may not accept is_infra_admin yet).
- Step 3: Update PATCH endpoint to accept is_infra_admin
In src/routes/organizations.py, find the PATCH member endpoint. Add is_infra_admin to the update schema and update the DB query to include it.
The exact shape depends on the existing update pattern — look for update_member or similar in the routes file. The field is just another column on the same row.
- Step 4: Run tests
poetry run pytest src/tests/test_infra_admin.py -vExpected: PASS
- Step 5: Add is_infra_admin to JWT claims
Find the JWT token issuance code (likely in src/auth/ or src/routes/auth.py). Where the membership/role is added to the payload, add:
"is_infra_admin": membership.is_infra_admin,- Step 6: Run full test suite
poetry run pytest -x -q- Step 7: Commit
git add -A
git commit -m "feat(ENG-1131): expose is_infra_admin in PATCH members + JWT claim"Repo: /Users/admin/dev/cap/terraform
Files:
-
Modify:
eks/services/platform-api/dynamodb.tf(new table) -
Modify:
eks/services/platform-api/config.tf(new env var) -
Modify:
eks/services/platform-api/iam.tf(SSM + SFN permissions) -
Modify:
eks/services/platform-api/variables.tf(if needed for SFN ARN) -
Step 1: Add DynamoDB table
Append to eks/services/platform-api/dynamodb.tf:
resource "aws_dynamodb_table" "org_encryption_configs_v1" {
name = "${local.global_prefix}-org-encryption-configs_v1"
billing_mode = "PAY_PER_REQUEST"
hash_key = "orgid"
deletion_protection_enabled = var.deletion_protection_enabled[terraform.workspace]
point_in_time_recovery {
enabled = var.point_in_time_recovery_enabled[terraform.workspace]
}
attribute {
name = "orgid"
type = "S"
}
stream_enabled = false
}- Step 2: Add ConfigMap env var for table name
In eks/services/platform-api/config.tf, find kubernetes_config_map.platform-api-configs and add to the data block:
DB__ORG_ENCRYPTION_CONFIGS_TABLE_NAME = aws_dynamodb_table.org_encryption_configs_v1.name- Step 3: Add IAM permissions for SSM encryption params
In eks/services/platform-api/iam.tf, add a new statement to the platform-api IRSA policy:
{
Effect = "Allow"
Action = [
"ssm:PutParameter",
"ssm:GetParameter",
"ssm:DeleteParameter"
]
Resource = "arn:aws:ssm:*:${data.aws_caller_identity.current.account_id}:parameter/platform-api/*/encryption/*"
},- Step 4: Add IAM for DynamoDB table
Add the new table ARN to the existing DynamoDB policy statement's Resource list, or add a dedicated statement:
{
Effect = "Allow"
Action = [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query"
]
Resource = aws_dynamodb_table.org_encryption_configs_v1.arn
},- Step 5: Add IAM for Step Functions (read-only, for status endpoint)
{
Effect = "Allow"
Action = [
"states:ListExecutions",
"states:DescribeExecution"
]
Resource = "arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:*kek-rotation*"
},- Step 6: Format and validate
cd /Users/admin/dev/cap/terraform/eks/services/platform-api
terraform fmt
terraform validate- Step 7: Commit
git add dynamodb.tf config.tf iam.tf
git commit -m "feat(ENG-1131): add org_encryption_configs DDB table + IAM for SSM/SFN"Repo: /Users/admin/dev/cap/platform-api
Files:
-
Modify:
src/config.py:130+(add table name to DynamoDbTableNames) -
Create:
src/schemas/encryption_config.py(Pydantic models) -
Modify:
src/schemas/__init__.py(import new schemas) -
Create:
src/dependencies/infra_admin_auth.py(requires_infra_admin dep) -
Step 1: Add table name to config
In src/config.py, class DynamoDbTableNames, add:
ORG_ENCRYPTION_CONFIGS_TABLE_NAME: str = "local-org-encryption-configs_v1"- Step 2: Create schemas
Create src/schemas/encryption_config.py:
from pydantic import BaseModel, Field
from typing import Optional, List
class EncryptionConfigCreate(BaseModel):
vault_url: str = Field(..., description="Azure Key Vault URL")
tenant_id: str = Field(..., description="Azure AD tenant ID")
client_id: str = Field(..., description="Azure AD client ID")
client_secret: Optional[str] = Field(None, description="Azure AD client secret (omit to keep existing)")
key_name: str = Field(..., description="Name of the RSA key in Key Vault")
class EncryptionConfigResponse(BaseModel):
orgid: str
vault_url: str
tenant_id: str
client_id: str
key_name: str
ssm_parameter_arn: Optional[str] = None
kek_id: Optional[str] = None
configured_at: Optional[str] = None
configured_by_user_id: Optional[str] = None
status: str = "active"
class TestConnectionRequest(BaseModel):
vault_url: str
tenant_id: str
client_id: str
client_secret: str
key_name: str
class TestConnectionResponse(BaseModel):
ok: bool
error_code: Optional[str] = None
error_detail: Optional[str] = None
class BucketStats(BaseModel):
name: str
rewrapped: int = 0
failed: int = 0
skipped: int = 0
last_run_at: Optional[str] = None
class EncryptionStatusResponse(BaseModel):
configured: bool
vault_url: Optional[str] = None
key_name: Optional[str] = None
current_kek_version: Optional[str] = None
last_rotation_at: Optional[str] = None
cached_at: Optional[str] = None
buckets: List[BucketStats] = []
class DeleteConfigResponse(BaseModel):
removed: bool
fallback_available: bool- Step 3: Add import to schemas/init.py
Add to src/schemas/__init__.py:
from schemas.encryption_config import (
EncryptionConfigCreate,
EncryptionConfigResponse,
TestConnectionRequest,
TestConnectionResponse,
EncryptionStatusResponse,
DeleteConfigResponse,
)- Step 4: Create infra_admin auth dependency
Create src/dependencies/infra_admin_auth.py:
from fastapi import Depends, HTTPException, status
from dependencies.user_auth import get_current_user
from schemas import User
from utils import get_logger
logger = get_logger("infra_admin_auth")
async def requires_infra_admin(user: User = Depends(get_current_user)) -> User:
"""
Dependency that requires the current user to have is_infra_admin=True in their JWT.
Returns the user if authorized, raises 403 otherwise.
"""
is_infra_admin = getattr(user, "is_infra_admin", False)
if not is_infra_admin:
logger.warning(f"User {user.id} attempted infra-admin action without permission")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Infra Admin access required",
)
return userNote: The exact mechanism for reading is_infra_admin depends on how the JWT payload maps to the User schema. Check src/dependencies/user_auth.py and src/schemas/users.py — you may need to add is_infra_admin: bool = False to the User model or the JWT payload dict.
- Step 5: Commit
git add src/config.py src/schemas/encryption_config.py src/schemas/__init__.py src/dependencies/infra_admin_auth.py
git commit -m "feat(ENG-1131): add encryption config schemas + infra_admin auth dep"Repo: /Users/admin/dev/cap/platform-api
Files:
-
Create:
src/dynamodb_client/encryption_config.py -
Modify:
src/dynamodb_client/table_schemas.py(add local dev schema) -
Step 1: Create EncryptionConfigClient
Create src/dynamodb_client/encryption_config.py:
from uuid import uuid4
import arrow
from config import settings
from utils import get_logger
from dynamodb_client.base import BaseDynamoDbClient
logger = get_logger("encryption_config")
class EncryptionConfigClient(BaseDynamoDbClient):
def __init__(self):
super().__init__()
self.table = self.get_table(settings.db.ORG_ENCRYPTION_CONFIGS_TABLE_NAME)
def get_config(self, orgid: str) -> dict | None:
response = self.table.get_item(Key={"orgid": orgid})
return response.get("Item")
def upsert_config(
self,
orgid: str,
vault_url: str,
tenant_id: str,
client_id: str,
key_name: str,
client_secret: str | None,
user_id: str,
kek_id: str | None = None,
) -> dict:
now = arrow.utcnow().isoformat()
existing = self.get_config(orgid)
item = {
"orgid": orgid,
"vault_url": vault_url,
"tenant_id": tenant_id,
"client_id": client_id,
"key_name": key_name,
"configured_at": now,
"configured_by_user_id": user_id,
"status": "active",
}
if kek_id:
item["kek_id"] = kek_id
# Handle client_secret → SSM
if client_secret:
ssm_param_name = f"/platform-api/{orgid}/encryption/client_secret"
arn = self.ssm_client.create_secure_parameter(
name=ssm_param_name,
value=client_secret,
description=f"Azure Key Vault client secret for org {orgid}",
)
item["ssm_parameter_arn"] = arn
elif existing and "ssm_parameter_arn" in existing:
item["ssm_parameter_arn"] = existing["ssm_parameter_arn"]
# Invalidate status cache
item["cached_at"] = None
item.pop("status_cache", None)
try:
self.table.put_item(Item=item)
except Exception:
# Compensation: if DDB fails but SSM was written, clean up SSM
if client_secret and "ssm_parameter_arn" in item:
try:
self.ssm_client.delete_parameter(item["ssm_parameter_arn"])
except Exception as cleanup_err:
logger.error(f"SSM compensation cleanup failed: {cleanup_err}")
raise
return item
def delete_config(self, orgid: str) -> bool:
existing = self.get_config(orgid)
if not existing:
return False
# Delete SSM parameter first
if "ssm_parameter_arn" in existing:
try:
self.ssm_client.delete_parameter(existing["ssm_parameter_arn"])
except Exception as e:
logger.warning(f"Failed to delete SSM param for org {orgid}: {e}")
self.table.delete_item(Key={"orgid": orgid})
return True
def update_status_cache(self, orgid: str, status_cache: dict) -> None:
now = arrow.utcnow().isoformat()
self.table.update_item(
Key={"orgid": orgid},
UpdateExpression="SET status_cache = :cache, cached_at = :ts",
ExpressionAttributeValues={":cache": status_cache, ":ts": now},
)
def get_cached_status(self, orgid: str, ttl_seconds: int = 300) -> dict | None:
item = self.get_config(orgid)
if not item or not item.get("cached_at") or not item.get("status_cache"):
return None
cached_at = arrow.get(item["cached_at"])
if (arrow.utcnow() - cached_at).total_seconds() > ttl_seconds:
return None
return item["status_cache"]- Step 2: Add table schema for local dev
In src/dynamodb_client/table_schemas.py, add:
"org-encryption-configs_v1": {
"key_schema": [{"AttributeName": "orgid", "KeyType": "HASH"}],
"attribute_definitions": [{"AttributeName": "orgid", "AttributeType": "S"}],
"global_secondary_indexes": [],
},- Step 3: Commit
git add src/dynamodb_client/encryption_config.py src/dynamodb_client/table_schemas.py
git commit -m "feat(ENG-1131): add EncryptionConfigClient with DDB+SSM CRUD"Repo: /Users/admin/dev/cap/platform-api
Files:
-
Create:
src/utils/azure_keyvault.py -
Modify:
pyproject.toml(add azure-identity, azure-keyvault-keys deps) -
Step 1: Add Azure SDK dependencies
cd /Users/admin/dev/cap/platform-api
poetry add azure-identity azure-keyvault-keys- Step 2: Create Azure KV helper
Create src/utils/azure_keyvault.py:
import os
from azure.identity import ClientSecretCredential
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, KeyWrapAlgorithm
from utils import get_logger
logger = get_logger("azure_keyvault")
def test_keyvault_connection(
vault_url: str,
tenant_id: str,
client_id: str,
client_secret: str,
key_name: str,
) -> dict:
"""
Test Azure Key Vault connectivity and permissions.
Returns {"ok": True} or {"ok": False, "error_code": "...", "error_detail": "..."}.
"""
# Validate vault URL format
if not vault_url.startswith("https://") or not vault_url.endswith(".vault.azure.net"):
return {
"ok": False,
"error_code": "invalid_vault_url",
"error_detail": f"Vault URL must be https://<name>.vault.azure.net, got: {vault_url}",
}
# Step 1: Authenticate
try:
credential = ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
)
key_client = KeyClient(vault_url=vault_url, credential=credential)
except Exception as e:
return {"ok": False, "error_code": "azure_auth_failed", "error_detail": str(e)}
# Step 2: Get key
try:
key = key_client.get_key(key_name)
except Exception as e:
error_str = str(e).lower()
if "not found" in error_str or "404" in error_str:
return {"ok": False, "error_code": "key_not_found", "error_detail": str(e)}
if "forbidden" in error_str or "403" in error_str:
return {"ok": False, "error_code": "permission_denied", "error_detail": str(e)}
if "could not be resolved" in error_str or "name resolution" in error_str:
return {"ok": False, "error_code": "network_error", "error_detail": str(e)}
return {"ok": False, "error_code": "azure_auth_failed", "error_detail": str(e)}
# Step 3: Test wrapKey permission
try:
crypto_client = CryptographyClient(key.id, credential=credential)
test_dek = os.urandom(32)
crypto_client.wrap_key(KeyWrapAlgorithm.rsa_oaep, test_dek)
except Exception as e:
return {"ok": False, "error_code": "permission_denied", "error_detail": str(e)}
return {"ok": True, "kek_id": key.id}
def get_current_kek_version(
vault_url: str,
tenant_id: str,
client_id: str,
client_secret: str,
key_name: str,
) -> dict | None:
"""Fetch current KEK version from Azure Key Vault. Returns {kek_id, version} or None."""
try:
credential = ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
)
key_client = KeyClient(vault_url=vault_url, credential=credential)
key = key_client.get_key(key_name)
return {
"kek_id": key.id,
"version": key.properties.version,
"created_on": key.properties.created_on.isoformat() if key.properties.created_on else None,
"updated_on": key.properties.updated_on.isoformat() if key.properties.updated_on else None,
}
except Exception as e:
logger.error(f"Failed to fetch KEK version: {e}")
return None- Step 3: Commit
git add src/utils/azure_keyvault.py pyproject.toml poetry.lock
git commit -m "feat(ENG-1131): add Azure Key Vault test-connection helper"Repo: /Users/admin/dev/cap/platform-api
Files:
-
Create:
src/routes/admin_encryption.py -
Modify:
src/main.py(register router) -
Step 1: Create the router
Create src/routes/admin_encryption.py:
import json
import boto3
from fastapi import APIRouter, Depends, HTTPException, Query
from dependencies.infra_admin_auth import requires_infra_admin
from dynamodb_client.encryption_config import EncryptionConfigClient
from schemas.encryption_config import (
EncryptionConfigCreate,
EncryptionConfigResponse,
TestConnectionRequest,
TestConnectionResponse,
EncryptionStatusResponse,
DeleteConfigResponse,
BucketStats,
)
from utils.azure_keyvault import test_keyvault_connection, get_current_kek_version
from utils import get_logger
from config import settings
logger = get_logger("admin_encryption")
admin_encryption_router = APIRouter()
@admin_encryption_router.post("/test-connection", response_model=TestConnectionResponse)
async def test_connection(
body: TestConnectionRequest,
user=Depends(requires_infra_admin),
):
"""Test Azure Key Vault connectivity without persisting anything."""
result = test_keyvault_connection(
vault_url=body.vault_url,
tenant_id=body.tenant_id,
client_id=body.client_id,
client_secret=body.client_secret,
key_name=body.key_name,
)
return TestConnectionResponse(**result)
@admin_encryption_router.post("/configure", response_model=EncryptionConfigResponse)
async def configure_encryption(
body: EncryptionConfigCreate,
user=Depends(requires_infra_admin),
):
"""Validate Azure KV creds, then persist config to DDB + SSM."""
# Need a client_secret for first-time setup
db_client = EncryptionConfigClient()
existing = db_client.get_config(user.orgid)
client_secret = body.client_secret
if not client_secret and not existing:
raise HTTPException(status_code=400, detail="client_secret is required for initial configuration")
# If no new secret provided, retrieve existing from SSM for validation
if not client_secret and existing and "ssm_parameter_arn" in existing:
try:
ssm = boto3.client("ssm", region_name="us-east-1")
param_name = f"/platform-api/{user.orgid}/encryption/client_secret"
resp = ssm.get_parameter(Name=param_name, WithDecryption=True)
client_secret = resp["Parameter"]["Value"]
except Exception as e:
logger.error(f"Failed to read existing secret from SSM: {e}")
raise HTTPException(status_code=400, detail="client_secret required (existing secret unreadable)")
# Validate connection before persisting
result = test_keyvault_connection(
vault_url=body.vault_url,
tenant_id=body.tenant_id,
client_id=body.client_id,
client_secret=client_secret,
key_name=body.key_name,
)
if not result.get("ok"):
raise HTTPException(
status_code=400,
detail={"error_code": result.get("error_code"), "error_detail": result.get("error_detail")},
)
# Persist
item = db_client.upsert_config(
orgid=user.orgid,
vault_url=body.vault_url,
tenant_id=body.tenant_id,
client_id=body.client_id,
key_name=body.key_name,
client_secret=body.client_secret, # original (None = keep existing)
user_id=str(user.id),
kek_id=result.get("kek_id"),
)
logger.info(f"Encryption configured for org {user.orgid} by user {user.id}")
return EncryptionConfigResponse(**item)
@admin_encryption_router.get("/status", response_model=EncryptionStatusResponse)
async def get_encryption_status(
fresh: bool = Query(False, description="Bypass cache"),
user=Depends(requires_infra_admin),
):
"""Get encryption status with 5-min cached Azure KV + SFN data."""
db_client = EncryptionConfigClient()
config = db_client.get_config(user.orgid)
if not config:
return EncryptionStatusResponse(configured=False)
# Check cache
if not fresh:
cached = db_client.get_cached_status(user.orgid)
if cached:
return EncryptionStatusResponse(**cached)
# Build fresh status
status_data = {
"configured": True,
"vault_url": config.get("vault_url"),
"key_name": config.get("key_name"),
"current_kek_version": None,
"last_rotation_at": None,
"buckets": [],
}
# Fetch current KEK from Azure (needs client_secret from SSM)
try:
ssm = boto3.client("ssm", region_name="us-east-1")
param_name = f"/platform-api/{user.orgid}/encryption/client_secret"
resp = ssm.get_parameter(Name=param_name, WithDecryption=True)
client_secret = resp["Parameter"]["Value"]
kek_info = get_current_kek_version(
vault_url=config["vault_url"],
tenant_id=config["tenant_id"],
client_id=config["client_id"],
client_secret=client_secret,
key_name=config["key_name"],
)
if kek_info:
status_data["current_kek_version"] = kek_info.get("version")
status_data["last_rotation_at"] = kek_info.get("updated_on")
except Exception as e:
logger.warning(f"Failed to fetch KEK version for status: {e}")
# Fetch per-bucket stats from Step Function history
try:
sfn = boto3.client("stepfunctions", region_name=settings.AWS_REGION)
# List state machines matching kek-rotation pattern
paginator = sfn.get_paginator("list_state_machines")
sm_arn = None
for page in paginator.paginate():
for sm in page["stateMachines"]:
if "kek-rotation" in sm["name"].lower():
sm_arn = sm["stateMachineArn"]
break
if sm_arn:
break
if sm_arn:
executions = sfn.list_executions(
stateMachineArn=sm_arn,
statusFilter="SUCCEEDED",
maxResults=1,
)
if executions.get("executions"):
exec_arn = executions["executions"][0]["executionArn"]
exec_detail = sfn.describe_execution(executionArn=exec_arn)
output = json.loads(exec_detail.get("output", "{}"))
bucket_results = output.get("bucket_results", [])
for br in bucket_results:
status_data["buckets"].append({
"name": br.get("bucket", "unknown"),
"rewrapped": br.get("objects_rewrapped", 0),
"failed": br.get("failed", 0),
"skipped": br.get("skipped", 0),
"last_run_at": executions["executions"][0].get("stopDate", "").isoformat()
if hasattr(executions["executions"][0].get("stopDate", ""), "isoformat")
else str(executions["executions"][0].get("stopDate", "")),
})
except Exception as e:
logger.warning(f"Failed to fetch SFN execution history: {e}")
# Cache the result
try:
db_client.update_status_cache(user.orgid, status_data)
except Exception as e:
logger.warning(f"Failed to cache status: {e}")
return EncryptionStatusResponse(**status_data)
@admin_encryption_router.delete("/configure", response_model=DeleteConfigResponse)
async def delete_encryption_config(
user=Depends(requires_infra_admin),
):
"""Remove encryption config (DDB row + SSM secret)."""
db_client = EncryptionConfigClient()
removed = db_client.delete_config(user.orgid)
# Check if workspace-level fallback exists
fallback_available = False
try:
ssm = boto3.client("ssm", region_name="us-east-1")
ssm.get_parameter(Name=f"/internal/prod/capai-crypto/azure-client-secret")
fallback_available = True
except Exception:
pass
logger.info(f"Encryption config removed for org {user.orgid} by user {user.id}")
return DeleteConfigResponse(removed=removed, fallback_available=fallback_available)- Step 2: Register router in main.py
In src/main.py, add import and router entry:
from routes.admin_encryption import admin_encryption_routerAdd to the shared_routers list:
{
"router": admin_encryption_router,
"prefix": "/admin/encryption",
"tags": ["Admin Encryption"],
},- Step 3: Commit
git add src/routes/admin_encryption.py src/main.py
git commit -m "feat(ENG-1131): add /api/v1/admin/encryption/* router with all 4 endpoints"Repo: /Users/admin/dev/cap/platform-api
Files:
-
Create:
tests/test_admin_encryption.py -
Step 1: Write endpoint tests
Create tests/test_admin_encryption.py:
import pytest
from unittest.mock import patch, MagicMock
@pytest.fixture
def mock_azure_success():
with patch("utils.azure_keyvault.test_keyvault_connection") as mock:
mock.return_value = {"ok": True, "kek_id": "https://vault.azure.net/keys/test/v1"}
yield mock
@pytest.fixture
def mock_azure_failure():
with patch("utils.azure_keyvault.test_keyvault_connection") as mock:
mock.return_value = {"ok": False, "error_code": "key_not_found", "error_detail": "Key not found"}
yield mock
class TestAdminEncryptionEndpoints:
"""Tests for /api/v1/admin/encryption/* endpoints."""
def test_configure_without_infra_admin_returns_403(self, client):
"""Non-infra-admin user gets 403."""
response = client.post(
"/api/v1/admin/encryption/configure",
json={
"vault_url": "https://test.vault.azure.net",
"tenant_id": "tid",
"client_id": "cid",
"client_secret": "secret",
"key_name": "key",
},
)
assert response.status_code == 403
def test_test_connection_success(self, infra_admin_client, mock_azure_success):
"""Test connection with valid creds returns ok=True."""
response = infra_admin_client.post(
"/api/v1/admin/encryption/test-connection",
json={
"vault_url": "https://test.vault.azure.net",
"tenant_id": "tid",
"client_id": "cid",
"client_secret": "secret",
"key_name": "key",
},
)
assert response.status_code == 200
assert response.json()["ok"] is True
def test_test_connection_bad_key(self, infra_admin_client, mock_azure_failure):
"""Test connection with bad key name returns ok=False with error_code."""
response = infra_admin_client.post(
"/api/v1/admin/encryption/test-connection",
json={
"vault_url": "https://test.vault.azure.net",
"tenant_id": "tid",
"client_id": "cid",
"client_secret": "secret",
"key_name": "nonexistent",
},
)
assert response.status_code == 200
data = response.json()
assert data["ok"] is False
assert data["error_code"] == "key_not_found"
def test_configure_bad_creds_returns_400(self, infra_admin_client, mock_azure_failure):
"""Configure with bad creds returns 400, nothing persisted."""
response = infra_admin_client.post(
"/api/v1/admin/encryption/configure",
json={
"vault_url": "https://test.vault.azure.net",
"tenant_id": "tid",
"client_id": "cid",
"client_secret": "badsecret",
"key_name": "nonexistent",
},
)
assert response.status_code == 400Note: infra_admin_client fixture needs to be set up in conftest.py — a test client authenticated with a JWT that includes is_infra_admin: True. Pattern depends on existing test fixtures.
- Step 2: Run tests
cd /Users/admin/dev/cap/platform-api
poetry run pytest tests/test_admin_encryption.py -v- Step 3: Commit
git add tests/test_admin_encryption.py
git commit -m "test(ENG-1131): add tests for admin encryption endpoints"Repo: /Users/admin/dev/cap/platform-frontend
Files:
-
Modify: existing types file where
Member/Metypes are defined (find withgrep -rn "is_infra_admin\|role.*member.*owner" src/typesor similar) -
Create:
src/api/encryption.ts(API hooks for the 4 endpoints) -
Step 1: Find and update Member/Me types
Search for existing type definitions:
grep -rn "interface.*Member\|type.*Member\|interface.*Me " src/ --include="*.ts" --include="*.tsx" | head -20Add is_infra_admin: boolean to Member, Membership, and Me interfaces.
- Step 2: Create encryption API types and hooks
Create src/api/encryption.ts (adjust path to match existing API hook patterns):
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { api } from "./client"; // adjust import to match existing API client
export interface EncryptionConfig {
orgid: string;
vault_url: string;
tenant_id: string;
client_id: string;
key_name: string;
ssm_parameter_arn?: string;
kek_id?: string;
configured_at?: string;
configured_by_user_id?: string;
status: string;
}
export interface TestConnectionResult {
ok: boolean;
error_code?: string;
error_detail?: string;
}
export interface BucketStats {
name: string;
rewrapped: number;
failed: number;
skipped: number;
last_run_at?: string;
}
export interface EncryptionStatus {
configured: boolean;
vault_url?: string;
key_name?: string;
current_kek_version?: string;
last_rotation_at?: string;
cached_at?: string;
buckets: BucketStats[];
}
export interface ConfigureEncryptionRequest {
vault_url: string;
tenant_id: string;
client_id: string;
client_secret?: string;
key_name: string;
}
export function useEncryptionStatus(fresh = false) {
return useQuery({
queryKey: ["encryption-status", fresh],
queryFn: () =>
api.get<EncryptionStatus>(`/admin/encryption/status${fresh ? "?fresh=true" : ""}`),
});
}
export function useTestConnection() {
return useMutation({
mutationFn: (data: ConfigureEncryptionRequest & { client_secret: string }) =>
api.post<TestConnectionResult>("/admin/encryption/test-connection", data),
});
}
export function useConfigureEncryption() {
const queryClient = useQueryClient();
return useMutation({
mutationFn: (data: ConfigureEncryptionRequest) =>
api.post<EncryptionConfig>("/admin/encryption/configure", data),
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["encryption-status"] });
},
});
}
export function useDeleteEncryptionConfig() {
const queryClient = useQueryClient();
return useMutation({
mutationFn: () => api.delete<{ removed: boolean; fallback_available: boolean }>("/admin/encryption/configure"),
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["encryption-status"] });
},
});
}- Step 3: Commit
git add -A
git commit -m "feat(ENG-1131): add encryption config types + API hooks"Repo: /Users/admin/dev/cap/platform-frontend
Files:
- Create:
src/pages/org-settings/encryption/directory with page component - Modify: sidebar/nav config to add "Encryption" entry (conditional on
is_infra_admin) - Modify: router config to add
/settings/encryptionroute
This task requires exploring the existing FE codebase to understand:
- How
src/pages/org-settings/is structured - Where sidebar nav items are defined
- How routes are registered
- What UI component library is used (shadcn? MUI? custom?)
- Step 1: Explore existing settings structure
find src/pages/org-settings -type f | head -20
grep -rn "Organization Settings\|org-settings\|/settings/" src/ --include="*.tsx" | head -20- Step 2: Create encryption settings page
Create the page with two tabs (Configuration + Status). Use the same component library and layout patterns as existing settings pages.
The Configuration tab contains:
- Form fields: vault_url, tenant_id, client_id, client_secret (password), key_name
- "Test Connection" button →
useTestConnection()→ inline result - "Save" button →
useConfigureEncryption()→ navigate to Status tab
The Status tab contains:
-
Header card with vault_url, key_name, current KEK version, last rotation
-
Per-bucket stats table
-
Refresh button →
useEncryptionStatus(true) -
Edit Configuration link → switch to Configuration tab
-
Remove Configuration button → confirm modal →
useDeleteEncryptionConfig() -
Step 3: Add sidebar entry (conditional)
Find where the Organization Settings sidebar items are defined. Add "Encryption" entry, conditionally rendered when useMe().is_infra_admin === true.
- Step 4: Add route
Register /settings/encryption in the router config, pointing to the new page component.
- Step 5: Add route guard
If user lands on /settings/encryption without is_infra_admin, redirect to /settings/general.
- Step 6: Test manually
npm run dev # or pnpm dev / yarn devNavigate to /settings/encryption — should see the Configuration form.
- Step 7: Commit
git add -A
git commit -m "feat(ENG-1131): add /settings/encryption page with Configuration + Status tabs"Repo: /Users/admin/dev/cap/platform-frontend
Files:
-
Modify:
src/pages/admin/org-details/org-details-page.tsx(or wherever the edit-member modal lives) -
Modify:
src/pages/admin/settings/members-admin-view.tsx(alternative location) -
Step 1: Find the edit-member modal
grep -rn "edit.*member\|member.*modal\|role.*select\|Role.*Owner\|Role.*Member" src/pages/admin/ --include="*.tsx" | head -20- Step 2: Add "Infra Admin" checkbox
In the edit-member modal, next to the Role dropdown, add a checkbox labeled "Infra Admin". Wire it to the is_infra_admin field in the PATCH request body.
- Step 3: Test manually
Navigate to /admin/orgs/<orgid> → Members tab → click edit on a member → verify "Infra Admin" checkbox appears.
- Step 4: Commit
git add -A
git commit -m "feat(ENG-1131): add Infra Admin checkbox to admin edit-member modal"All repos
- Step 1: Run all test suites
# clj-pg-wrapper
cd /Users/admin/dev/cap/clj-pg-wrapper && poetry run pytest -x -q
# platform-api
cd /Users/admin/dev/cap/platform-api && poetry run pytest -x -q
# platform-frontend
cd /Users/admin/dev/cap/platform-frontend && npm test # or equivalent- Step 2: Terraform format check
cd /Users/admin/dev/cap/terraform
terraform fmt -recursive -check- Step 3: Verify all commits on feature branch
# Each repo should be on feature/ENG-1131-encryption-key-mgmt-ui
for repo in clj-pg-wrapper platform-api platform-frontend terraform; do
echo "=== $repo ==="
cd /Users/admin/dev/cap/$repo
git log --oneline main..HEAD
cd -
done- Step 4: Create PRs
One PR per repo, all referencing ENG-1131:
# In each repo:
gh pr create --title "ENG-1131: Encryption Key Management UI" --body "$(cat <<'EOF'
## Summary
- Adds self-service encryption key management for EY
- New `is_infra_admin` flag on org membership (orthogonal to role)
- DDB + SSM hybrid config storage (mirrors client_api_keys pattern)
- Four `/api/v1/admin/encryption/*` endpoints
- New `/settings/encryption` page (Configuration + Status tabs)
- Admin edit-member modal gains Infra Admin checkbox
## Test plan
- [ ] Grant infra-admin via admin panel → user sees Encryption in sidebar
- [ ] Configure with valid Azure KV creds → dashboard shows KEK version
- [ ] Test connection with bad creds → shows specific error
- [ ] Edit config (blank secret) → existing secret preserved
- [ ] Remove config → falls back to workspace-level SSM
Spec: `docs/superpowers/specs/2026-04-17-eng-1131-encryption-key-mgmt-ui-design.md`
🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"