Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jordotech/9dbaad8e4d9fa7ceab6526d61276d946 to your computer and use it in GitHub Desktop.

Select an option

Save jordotech/9dbaad8e4d9fa7ceab6526d61276d946 to your computer and use it in GitHub Desktop.
ENG-1131: Encryption Key Management UI — Implementation Plan

ENG-1131: Encryption Key Management UI — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add a self-service Encryption Key Management page to Organization Settings, gated by a new is_infra_admin flag, with DDB+SSM CRUD for Azure Key Vault credentials.

Architecture: Orthogonal is_infra_admin boolean on organization_x_members table (clj-pg-wrapper), exposed via JWT claim. platform-api gets 4 new /api/v1/admin/encryption/* endpoints using DDB+SSM hybrid storage (mirrors client_api_keys). Frontend adds /settings/encryption page with Configuration + Status tabs, plus a checkbox in the existing admin edit-member modal.

Tech Stack: Python/FastAPI (clj-pg-wrapper, platform-api), TypeScript/React (platform-frontend), Terraform/HCL, Azure SDK (azure-identity, azure-keyvault-keys), DynamoDB, SSM Parameter Store.

Spec: docs/superpowers/specs/2026-04-17-eng-1131-encryption-key-mgmt-ui-design.md

Branch: feature/ENG-1131-encryption-key-mgmt-ui (same name across all repos)


Task 1: Add is_infra_admin column to clj-pg-wrapper

Repo: /Users/admin/dev/cap/clj-pg-wrapper

Files:

  • Modify: src/models/organization.py:42-51 (OrganizationMember model)

  • Modify: data/clj-db-schema.sql (authoritative schema)

  • Modify: src/schemas/organization.py (response schemas)

  • Step 1: Add column to SQL schema

In data/clj-db-schema.sql, find the organization_x_members table definition and add:

is_infra_admin BOOLEAN NOT NULL DEFAULT FALSE
  • Step 2: Add field to SQLModel

In src/models/organization.py, add to OrganizationMember:

class OrganizationMember(SQLModel, table=True):
    """Organization member relationship model."""

    __tablename__ = "organization_x_members"

    id: UUID = Field(default_factory=uuid4, primary_key=True)
    organization_id: UUID = Field(foreign_key="organizations.id", nullable=False)
    user_id: UUID = Field(foreign_key="users.id", nullable=False)
    role: Optional[str] = Field(default="member")
    is_infra_admin: bool = Field(default=False)
  • Step 3: Add to response schema

In src/schemas/organization.py, find OrganizationMemberResponse and add is_infra_admin: bool = False.

  • Step 4: Run the live database migration

Since clj-pg-wrapper uses raw SQL (not alembic), run directly against the Postgres DB:

ALTER TABLE organization_x_members ADD COLUMN IF NOT EXISTS is_infra_admin BOOLEAN NOT NULL DEFAULT FALSE;

This is a non-locking DDL on Postgres for adding a column with a default — safe on a live table.

  • Step 5: Verify tests pass
cd /Users/admin/dev/cap/clj-pg-wrapper
poetry run pytest -x -q
  • Step 6: Commit
git add src/models/organization.py data/clj-db-schema.sql src/schemas/organization.py
git commit -m "feat(ENG-1131): add is_infra_admin column to organization_x_members"

Task 2: Expose is_infra_admin in PATCH members endpoint + /me response (clj-pg-wrapper)

Repo: /Users/admin/dev/cap/clj-pg-wrapper

Files:

  • Modify: src/routes/organizations.py (PATCH member endpoint)

  • Modify: src/routes/auth.py or wherever /me is defined (add claim)

  • Create: src/tests/test_infra_admin.py

  • Step 1: Write failing test for PATCH member

Create src/tests/test_infra_admin.py:

import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
async def test_patch_member_infra_admin(client: AsyncClient, test_org_member):
    """PATCH /api/v1/organizations/{org_id}/members/{member_id} with is_infra_admin."""
    org_id, member_id = test_org_member
    response = await client.patch(
        f"/api/v1/organizations/{org_id}/members/{member_id}",
        json={"is_infra_admin": True},
    )
    assert response.status_code == 200
    data = response.json()
    assert data["is_infra_admin"] is True


@pytest.mark.asyncio
async def test_get_member_includes_infra_admin(client: AsyncClient, test_org_member):
    """GET member should include is_infra_admin field."""
    org_id, member_id = test_org_member
    response = await client.get(f"/api/v1/organizations/{org_id}/members/{member_id}")
    assert response.status_code == 200
    assert "is_infra_admin" in response.json()
    assert response.json()["is_infra_admin"] is False
  • Step 2: Run test to verify it fails
cd /Users/admin/dev/cap/clj-pg-wrapper
poetry run pytest src/tests/test_infra_admin.py -v

Expected: FAIL (endpoint may not accept is_infra_admin yet).

  • Step 3: Update PATCH endpoint to accept is_infra_admin

In src/routes/organizations.py, find the PATCH member endpoint. Add is_infra_admin to the update schema and update the DB query to include it.

The exact shape depends on the existing update pattern — look for update_member or similar in the routes file. The field is just another column on the same row.

  • Step 4: Run tests
poetry run pytest src/tests/test_infra_admin.py -v

Expected: PASS

  • Step 5: Add is_infra_admin to JWT claims

Find the JWT token issuance code (likely in src/auth/ or src/routes/auth.py). Where the membership/role is added to the payload, add:

"is_infra_admin": membership.is_infra_admin,
  • Step 6: Run full test suite
poetry run pytest -x -q
  • Step 7: Commit
git add -A
git commit -m "feat(ENG-1131): expose is_infra_admin in PATCH members + JWT claim"

Task 3: Terraform — DynamoDB table + IAM + ConfigMap (terraform repo)

Repo: /Users/admin/dev/cap/terraform

Files:

  • Modify: eks/services/platform-api/dynamodb.tf (new table)

  • Modify: eks/services/platform-api/config.tf (new env var)

  • Modify: eks/services/platform-api/iam.tf (SSM + SFN permissions)

  • Modify: eks/services/platform-api/variables.tf (if needed for SFN ARN)

  • Step 1: Add DynamoDB table

Append to eks/services/platform-api/dynamodb.tf:

resource "aws_dynamodb_table" "org_encryption_configs_v1" {
  name                        = "${local.global_prefix}-org-encryption-configs_v1"
  billing_mode                = "PAY_PER_REQUEST"
  hash_key                    = "orgid"
  deletion_protection_enabled = var.deletion_protection_enabled[terraform.workspace]

  point_in_time_recovery {
    enabled = var.point_in_time_recovery_enabled[terraform.workspace]
  }

  attribute {
    name = "orgid"
    type = "S"
  }

  stream_enabled = false
}
  • Step 2: Add ConfigMap env var for table name

In eks/services/platform-api/config.tf, find kubernetes_config_map.platform-api-configs and add to the data block:

DB__ORG_ENCRYPTION_CONFIGS_TABLE_NAME = aws_dynamodb_table.org_encryption_configs_v1.name
  • Step 3: Add IAM permissions for SSM encryption params

In eks/services/platform-api/iam.tf, add a new statement to the platform-api IRSA policy:

    {
      Effect = "Allow"
      Action = [
        "ssm:PutParameter",
        "ssm:GetParameter",
        "ssm:DeleteParameter"
      ]
      Resource = "arn:aws:ssm:*:${data.aws_caller_identity.current.account_id}:parameter/platform-api/*/encryption/*"
    },
  • Step 4: Add IAM for DynamoDB table

Add the new table ARN to the existing DynamoDB policy statement's Resource list, or add a dedicated statement:

    {
      Effect = "Allow"
      Action = [
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:Query"
      ]
      Resource = aws_dynamodb_table.org_encryption_configs_v1.arn
    },
  • Step 5: Add IAM for Step Functions (read-only, for status endpoint)
    {
      Effect = "Allow"
      Action = [
        "states:ListExecutions",
        "states:DescribeExecution"
      ]
      Resource = "arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:*kek-rotation*"
    },
  • Step 6: Format and validate
cd /Users/admin/dev/cap/terraform/eks/services/platform-api
terraform fmt
terraform validate
  • Step 7: Commit
git add dynamodb.tf config.tf iam.tf
git commit -m "feat(ENG-1131): add org_encryption_configs DDB table + IAM for SSM/SFN"

Task 4: platform-api — config + schemas + auth dependency

Repo: /Users/admin/dev/cap/platform-api

Files:

  • Modify: src/config.py:130+ (add table name to DynamoDbTableNames)

  • Create: src/schemas/encryption_config.py (Pydantic models)

  • Modify: src/schemas/__init__.py (import new schemas)

  • Create: src/dependencies/infra_admin_auth.py (requires_infra_admin dep)

  • Step 1: Add table name to config

In src/config.py, class DynamoDbTableNames, add:

ORG_ENCRYPTION_CONFIGS_TABLE_NAME: str = "local-org-encryption-configs_v1"
  • Step 2: Create schemas

Create src/schemas/encryption_config.py:

from pydantic import BaseModel, Field
from typing import Optional, List


class EncryptionConfigCreate(BaseModel):
    vault_url: str = Field(..., description="Azure Key Vault URL")
    tenant_id: str = Field(..., description="Azure AD tenant ID")
    client_id: str = Field(..., description="Azure AD client ID")
    client_secret: Optional[str] = Field(None, description="Azure AD client secret (omit to keep existing)")
    key_name: str = Field(..., description="Name of the RSA key in Key Vault")


class EncryptionConfigResponse(BaseModel):
    orgid: str
    vault_url: str
    tenant_id: str
    client_id: str
    key_name: str
    ssm_parameter_arn: Optional[str] = None
    kek_id: Optional[str] = None
    configured_at: Optional[str] = None
    configured_by_user_id: Optional[str] = None
    status: str = "active"


class TestConnectionRequest(BaseModel):
    vault_url: str
    tenant_id: str
    client_id: str
    client_secret: str
    key_name: str


class TestConnectionResponse(BaseModel):
    ok: bool
    error_code: Optional[str] = None
    error_detail: Optional[str] = None


class BucketStats(BaseModel):
    name: str
    rewrapped: int = 0
    failed: int = 0
    skipped: int = 0
    last_run_at: Optional[str] = None


class EncryptionStatusResponse(BaseModel):
    configured: bool
    vault_url: Optional[str] = None
    key_name: Optional[str] = None
    current_kek_version: Optional[str] = None
    last_rotation_at: Optional[str] = None
    cached_at: Optional[str] = None
    buckets: List[BucketStats] = []


class DeleteConfigResponse(BaseModel):
    removed: bool
    fallback_available: bool
  • Step 3: Add import to schemas/init.py

Add to src/schemas/__init__.py:

from schemas.encryption_config import (
    EncryptionConfigCreate,
    EncryptionConfigResponse,
    TestConnectionRequest,
    TestConnectionResponse,
    EncryptionStatusResponse,
    DeleteConfigResponse,
)
  • Step 4: Create infra_admin auth dependency

Create src/dependencies/infra_admin_auth.py:

from fastapi import Depends, HTTPException, status
from dependencies.user_auth import get_current_user
from schemas import User
from utils import get_logger

logger = get_logger("infra_admin_auth")


async def requires_infra_admin(user: User = Depends(get_current_user)) -> User:
    """
    Dependency that requires the current user to have is_infra_admin=True in their JWT.
    Returns the user if authorized, raises 403 otherwise.
    """
    is_infra_admin = getattr(user, "is_infra_admin", False)
    if not is_infra_admin:
        logger.warning(f"User {user.id} attempted infra-admin action without permission")
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Infra Admin access required",
        )
    return user

Note: The exact mechanism for reading is_infra_admin depends on how the JWT payload maps to the User schema. Check src/dependencies/user_auth.py and src/schemas/users.py — you may need to add is_infra_admin: bool = False to the User model or the JWT payload dict.

  • Step 5: Commit
git add src/config.py src/schemas/encryption_config.py src/schemas/__init__.py src/dependencies/infra_admin_auth.py
git commit -m "feat(ENG-1131): add encryption config schemas + infra_admin auth dep"

Task 5: platform-api — EncryptionConfigClient (DDB + SSM)

Repo: /Users/admin/dev/cap/platform-api

Files:

  • Create: src/dynamodb_client/encryption_config.py

  • Modify: src/dynamodb_client/table_schemas.py (add local dev schema)

  • Step 1: Create EncryptionConfigClient

Create src/dynamodb_client/encryption_config.py:

from uuid import uuid4
import arrow
from config import settings
from utils import get_logger
from dynamodb_client.base import BaseDynamoDbClient

logger = get_logger("encryption_config")


class EncryptionConfigClient(BaseDynamoDbClient):
    def __init__(self):
        super().__init__()
        self.table = self.get_table(settings.db.ORG_ENCRYPTION_CONFIGS_TABLE_NAME)

    def get_config(self, orgid: str) -> dict | None:
        response = self.table.get_item(Key={"orgid": orgid})
        return response.get("Item")

    def upsert_config(
        self,
        orgid: str,
        vault_url: str,
        tenant_id: str,
        client_id: str,
        key_name: str,
        client_secret: str | None,
        user_id: str,
        kek_id: str | None = None,
    ) -> dict:
        now = arrow.utcnow().isoformat()
        existing = self.get_config(orgid)

        item = {
            "orgid": orgid,
            "vault_url": vault_url,
            "tenant_id": tenant_id,
            "client_id": client_id,
            "key_name": key_name,
            "configured_at": now,
            "configured_by_user_id": user_id,
            "status": "active",
        }

        if kek_id:
            item["kek_id"] = kek_id

        # Handle client_secret → SSM
        if client_secret:
            ssm_param_name = f"/platform-api/{orgid}/encryption/client_secret"
            arn = self.ssm_client.create_secure_parameter(
                name=ssm_param_name,
                value=client_secret,
                description=f"Azure Key Vault client secret for org {orgid}",
            )
            item["ssm_parameter_arn"] = arn
        elif existing and "ssm_parameter_arn" in existing:
            item["ssm_parameter_arn"] = existing["ssm_parameter_arn"]

        # Invalidate status cache
        item["cached_at"] = None
        item.pop("status_cache", None)

        try:
            self.table.put_item(Item=item)
        except Exception:
            # Compensation: if DDB fails but SSM was written, clean up SSM
            if client_secret and "ssm_parameter_arn" in item:
                try:
                    self.ssm_client.delete_parameter(item["ssm_parameter_arn"])
                except Exception as cleanup_err:
                    logger.error(f"SSM compensation cleanup failed: {cleanup_err}")
            raise

        return item

    def delete_config(self, orgid: str) -> bool:
        existing = self.get_config(orgid)
        if not existing:
            return False

        # Delete SSM parameter first
        if "ssm_parameter_arn" in existing:
            try:
                self.ssm_client.delete_parameter(existing["ssm_parameter_arn"])
            except Exception as e:
                logger.warning(f"Failed to delete SSM param for org {orgid}: {e}")

        self.table.delete_item(Key={"orgid": orgid})
        return True

    def update_status_cache(self, orgid: str, status_cache: dict) -> None:
        now = arrow.utcnow().isoformat()
        self.table.update_item(
            Key={"orgid": orgid},
            UpdateExpression="SET status_cache = :cache, cached_at = :ts",
            ExpressionAttributeValues={":cache": status_cache, ":ts": now},
        )

    def get_cached_status(self, orgid: str, ttl_seconds: int = 300) -> dict | None:
        item = self.get_config(orgid)
        if not item or not item.get("cached_at") or not item.get("status_cache"):
            return None
        cached_at = arrow.get(item["cached_at"])
        if (arrow.utcnow() - cached_at).total_seconds() > ttl_seconds:
            return None
        return item["status_cache"]
  • Step 2: Add table schema for local dev

In src/dynamodb_client/table_schemas.py, add:

"org-encryption-configs_v1": {
    "key_schema": [{"AttributeName": "orgid", "KeyType": "HASH"}],
    "attribute_definitions": [{"AttributeName": "orgid", "AttributeType": "S"}],
    "global_secondary_indexes": [],
},
  • Step 3: Commit
git add src/dynamodb_client/encryption_config.py src/dynamodb_client/table_schemas.py
git commit -m "feat(ENG-1131): add EncryptionConfigClient with DDB+SSM CRUD"

Task 6: platform-api — Azure Key Vault test-connection helper

Repo: /Users/admin/dev/cap/platform-api

Files:

  • Create: src/utils/azure_keyvault.py

  • Modify: pyproject.toml (add azure-identity, azure-keyvault-keys deps)

  • Step 1: Add Azure SDK dependencies

cd /Users/admin/dev/cap/platform-api
poetry add azure-identity azure-keyvault-keys
  • Step 2: Create Azure KV helper

Create src/utils/azure_keyvault.py:

import os
from azure.identity import ClientSecretCredential
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, KeyWrapAlgorithm
from utils import get_logger

logger = get_logger("azure_keyvault")


def test_keyvault_connection(
    vault_url: str,
    tenant_id: str,
    client_id: str,
    client_secret: str,
    key_name: str,
) -> dict:
    """
    Test Azure Key Vault connectivity and permissions.
    Returns {"ok": True} or {"ok": False, "error_code": "...", "error_detail": "..."}.
    """
    # Validate vault URL format
    if not vault_url.startswith("https://") or not vault_url.endswith(".vault.azure.net"):
        return {
            "ok": False,
            "error_code": "invalid_vault_url",
            "error_detail": f"Vault URL must be https://<name>.vault.azure.net, got: {vault_url}",
        }

    # Step 1: Authenticate
    try:
        credential = ClientSecretCredential(
            tenant_id=tenant_id,
            client_id=client_id,
            client_secret=client_secret,
        )
        key_client = KeyClient(vault_url=vault_url, credential=credential)
    except Exception as e:
        return {"ok": False, "error_code": "azure_auth_failed", "error_detail": str(e)}

    # Step 2: Get key
    try:
        key = key_client.get_key(key_name)
    except Exception as e:
        error_str = str(e).lower()
        if "not found" in error_str or "404" in error_str:
            return {"ok": False, "error_code": "key_not_found", "error_detail": str(e)}
        if "forbidden" in error_str or "403" in error_str:
            return {"ok": False, "error_code": "permission_denied", "error_detail": str(e)}
        if "could not be resolved" in error_str or "name resolution" in error_str:
            return {"ok": False, "error_code": "network_error", "error_detail": str(e)}
        return {"ok": False, "error_code": "azure_auth_failed", "error_detail": str(e)}

    # Step 3: Test wrapKey permission
    try:
        crypto_client = CryptographyClient(key.id, credential=credential)
        test_dek = os.urandom(32)
        crypto_client.wrap_key(KeyWrapAlgorithm.rsa_oaep, test_dek)
    except Exception as e:
        return {"ok": False, "error_code": "permission_denied", "error_detail": str(e)}

    return {"ok": True, "kek_id": key.id}


def get_current_kek_version(
    vault_url: str,
    tenant_id: str,
    client_id: str,
    client_secret: str,
    key_name: str,
) -> dict | None:
    """Fetch current KEK version from Azure Key Vault. Returns {kek_id, version} or None."""
    try:
        credential = ClientSecretCredential(
            tenant_id=tenant_id,
            client_id=client_id,
            client_secret=client_secret,
        )
        key_client = KeyClient(vault_url=vault_url, credential=credential)
        key = key_client.get_key(key_name)
        return {
            "kek_id": key.id,
            "version": key.properties.version,
            "created_on": key.properties.created_on.isoformat() if key.properties.created_on else None,
            "updated_on": key.properties.updated_on.isoformat() if key.properties.updated_on else None,
        }
    except Exception as e:
        logger.error(f"Failed to fetch KEK version: {e}")
        return None
  • Step 3: Commit
git add src/utils/azure_keyvault.py pyproject.toml poetry.lock
git commit -m "feat(ENG-1131): add Azure Key Vault test-connection helper"

Task 7: platform-api — admin_encryption router (all 4 endpoints)

Repo: /Users/admin/dev/cap/platform-api

Files:

  • Create: src/routes/admin_encryption.py

  • Modify: src/main.py (register router)

  • Step 1: Create the router

Create src/routes/admin_encryption.py:

import json
import boto3
from fastapi import APIRouter, Depends, HTTPException, Query
from dependencies.infra_admin_auth import requires_infra_admin
from dynamodb_client.encryption_config import EncryptionConfigClient
from schemas.encryption_config import (
    EncryptionConfigCreate,
    EncryptionConfigResponse,
    TestConnectionRequest,
    TestConnectionResponse,
    EncryptionStatusResponse,
    DeleteConfigResponse,
    BucketStats,
)
from utils.azure_keyvault import test_keyvault_connection, get_current_kek_version
from utils import get_logger
from config import settings

logger = get_logger("admin_encryption")

admin_encryption_router = APIRouter()


@admin_encryption_router.post("/test-connection", response_model=TestConnectionResponse)
async def test_connection(
    body: TestConnectionRequest,
    user=Depends(requires_infra_admin),
):
    """Test Azure Key Vault connectivity without persisting anything."""
    result = test_keyvault_connection(
        vault_url=body.vault_url,
        tenant_id=body.tenant_id,
        client_id=body.client_id,
        client_secret=body.client_secret,
        key_name=body.key_name,
    )
    return TestConnectionResponse(**result)


@admin_encryption_router.post("/configure", response_model=EncryptionConfigResponse)
async def configure_encryption(
    body: EncryptionConfigCreate,
    user=Depends(requires_infra_admin),
):
    """Validate Azure KV creds, then persist config to DDB + SSM."""
    # Need a client_secret for first-time setup
    db_client = EncryptionConfigClient()
    existing = db_client.get_config(user.orgid)

    client_secret = body.client_secret
    if not client_secret and not existing:
        raise HTTPException(status_code=400, detail="client_secret is required for initial configuration")

    # If no new secret provided, retrieve existing from SSM for validation
    if not client_secret and existing and "ssm_parameter_arn" in existing:
        try:
            ssm = boto3.client("ssm", region_name="us-east-1")
            param_name = f"/platform-api/{user.orgid}/encryption/client_secret"
            resp = ssm.get_parameter(Name=param_name, WithDecryption=True)
            client_secret = resp["Parameter"]["Value"]
        except Exception as e:
            logger.error(f"Failed to read existing secret from SSM: {e}")
            raise HTTPException(status_code=400, detail="client_secret required (existing secret unreadable)")

    # Validate connection before persisting
    result = test_keyvault_connection(
        vault_url=body.vault_url,
        tenant_id=body.tenant_id,
        client_id=body.client_id,
        client_secret=client_secret,
        key_name=body.key_name,
    )
    if not result.get("ok"):
        raise HTTPException(
            status_code=400,
            detail={"error_code": result.get("error_code"), "error_detail": result.get("error_detail")},
        )

    # Persist
    item = db_client.upsert_config(
        orgid=user.orgid,
        vault_url=body.vault_url,
        tenant_id=body.tenant_id,
        client_id=body.client_id,
        key_name=body.key_name,
        client_secret=body.client_secret,  # original (None = keep existing)
        user_id=str(user.id),
        kek_id=result.get("kek_id"),
    )

    logger.info(f"Encryption configured for org {user.orgid} by user {user.id}")
    return EncryptionConfigResponse(**item)


@admin_encryption_router.get("/status", response_model=EncryptionStatusResponse)
async def get_encryption_status(
    fresh: bool = Query(False, description="Bypass cache"),
    user=Depends(requires_infra_admin),
):
    """Get encryption status with 5-min cached Azure KV + SFN data."""
    db_client = EncryptionConfigClient()
    config = db_client.get_config(user.orgid)

    if not config:
        return EncryptionStatusResponse(configured=False)

    # Check cache
    if not fresh:
        cached = db_client.get_cached_status(user.orgid)
        if cached:
            return EncryptionStatusResponse(**cached)

    # Build fresh status
    status_data = {
        "configured": True,
        "vault_url": config.get("vault_url"),
        "key_name": config.get("key_name"),
        "current_kek_version": None,
        "last_rotation_at": None,
        "buckets": [],
    }

    # Fetch current KEK from Azure (needs client_secret from SSM)
    try:
        ssm = boto3.client("ssm", region_name="us-east-1")
        param_name = f"/platform-api/{user.orgid}/encryption/client_secret"
        resp = ssm.get_parameter(Name=param_name, WithDecryption=True)
        client_secret = resp["Parameter"]["Value"]

        kek_info = get_current_kek_version(
            vault_url=config["vault_url"],
            tenant_id=config["tenant_id"],
            client_id=config["client_id"],
            client_secret=client_secret,
            key_name=config["key_name"],
        )
        if kek_info:
            status_data["current_kek_version"] = kek_info.get("version")
            status_data["last_rotation_at"] = kek_info.get("updated_on")
    except Exception as e:
        logger.warning(f"Failed to fetch KEK version for status: {e}")

    # Fetch per-bucket stats from Step Function history
    try:
        sfn = boto3.client("stepfunctions", region_name=settings.AWS_REGION)
        # List state machines matching kek-rotation pattern
        paginator = sfn.get_paginator("list_state_machines")
        sm_arn = None
        for page in paginator.paginate():
            for sm in page["stateMachines"]:
                if "kek-rotation" in sm["name"].lower():
                    sm_arn = sm["stateMachineArn"]
                    break
            if sm_arn:
                break

        if sm_arn:
            executions = sfn.list_executions(
                stateMachineArn=sm_arn,
                statusFilter="SUCCEEDED",
                maxResults=1,
            )
            if executions.get("executions"):
                exec_arn = executions["executions"][0]["executionArn"]
                exec_detail = sfn.describe_execution(executionArn=exec_arn)
                output = json.loads(exec_detail.get("output", "{}"))

                bucket_results = output.get("bucket_results", [])
                for br in bucket_results:
                    status_data["buckets"].append({
                        "name": br.get("bucket", "unknown"),
                        "rewrapped": br.get("objects_rewrapped", 0),
                        "failed": br.get("failed", 0),
                        "skipped": br.get("skipped", 0),
                        "last_run_at": executions["executions"][0].get("stopDate", "").isoformat()
                        if hasattr(executions["executions"][0].get("stopDate", ""), "isoformat")
                        else str(executions["executions"][0].get("stopDate", "")),
                    })
    except Exception as e:
        logger.warning(f"Failed to fetch SFN execution history: {e}")

    # Cache the result
    try:
        db_client.update_status_cache(user.orgid, status_data)
    except Exception as e:
        logger.warning(f"Failed to cache status: {e}")

    return EncryptionStatusResponse(**status_data)


@admin_encryption_router.delete("/configure", response_model=DeleteConfigResponse)
async def delete_encryption_config(
    user=Depends(requires_infra_admin),
):
    """Remove encryption config (DDB row + SSM secret)."""
    db_client = EncryptionConfigClient()
    removed = db_client.delete_config(user.orgid)

    # Check if workspace-level fallback exists
    fallback_available = False
    try:
        ssm = boto3.client("ssm", region_name="us-east-1")
        ssm.get_parameter(Name=f"/internal/prod/capai-crypto/azure-client-secret")
        fallback_available = True
    except Exception:
        pass

    logger.info(f"Encryption config removed for org {user.orgid} by user {user.id}")
    return DeleteConfigResponse(removed=removed, fallback_available=fallback_available)
  • Step 2: Register router in main.py

In src/main.py, add import and router entry:

from routes.admin_encryption import admin_encryption_router

Add to the shared_routers list:

{
    "router": admin_encryption_router,
    "prefix": "/admin/encryption",
    "tags": ["Admin Encryption"],
},
  • Step 3: Commit
git add src/routes/admin_encryption.py src/main.py
git commit -m "feat(ENG-1131): add /api/v1/admin/encryption/* router with all 4 endpoints"

Task 8: platform-api — Tests for encryption endpoints

Repo: /Users/admin/dev/cap/platform-api

Files:

  • Create: tests/test_admin_encryption.py

  • Step 1: Write endpoint tests

Create tests/test_admin_encryption.py:

import pytest
from unittest.mock import patch, MagicMock


@pytest.fixture
def mock_azure_success():
    with patch("utils.azure_keyvault.test_keyvault_connection") as mock:
        mock.return_value = {"ok": True, "kek_id": "https://vault.azure.net/keys/test/v1"}
        yield mock


@pytest.fixture
def mock_azure_failure():
    with patch("utils.azure_keyvault.test_keyvault_connection") as mock:
        mock.return_value = {"ok": False, "error_code": "key_not_found", "error_detail": "Key not found"}
        yield mock


class TestAdminEncryptionEndpoints:
    """Tests for /api/v1/admin/encryption/* endpoints."""

    def test_configure_without_infra_admin_returns_403(self, client):
        """Non-infra-admin user gets 403."""
        response = client.post(
            "/api/v1/admin/encryption/configure",
            json={
                "vault_url": "https://test.vault.azure.net",
                "tenant_id": "tid",
                "client_id": "cid",
                "client_secret": "secret",
                "key_name": "key",
            },
        )
        assert response.status_code == 403

    def test_test_connection_success(self, infra_admin_client, mock_azure_success):
        """Test connection with valid creds returns ok=True."""
        response = infra_admin_client.post(
            "/api/v1/admin/encryption/test-connection",
            json={
                "vault_url": "https://test.vault.azure.net",
                "tenant_id": "tid",
                "client_id": "cid",
                "client_secret": "secret",
                "key_name": "key",
            },
        )
        assert response.status_code == 200
        assert response.json()["ok"] is True

    def test_test_connection_bad_key(self, infra_admin_client, mock_azure_failure):
        """Test connection with bad key name returns ok=False with error_code."""
        response = infra_admin_client.post(
            "/api/v1/admin/encryption/test-connection",
            json={
                "vault_url": "https://test.vault.azure.net",
                "tenant_id": "tid",
                "client_id": "cid",
                "client_secret": "secret",
                "key_name": "nonexistent",
            },
        )
        assert response.status_code == 200
        data = response.json()
        assert data["ok"] is False
        assert data["error_code"] == "key_not_found"

    def test_configure_bad_creds_returns_400(self, infra_admin_client, mock_azure_failure):
        """Configure with bad creds returns 400, nothing persisted."""
        response = infra_admin_client.post(
            "/api/v1/admin/encryption/configure",
            json={
                "vault_url": "https://test.vault.azure.net",
                "tenant_id": "tid",
                "client_id": "cid",
                "client_secret": "badsecret",
                "key_name": "nonexistent",
            },
        )
        assert response.status_code == 400

Note: infra_admin_client fixture needs to be set up in conftest.py — a test client authenticated with a JWT that includes is_infra_admin: True. Pattern depends on existing test fixtures.

  • Step 2: Run tests
cd /Users/admin/dev/cap/platform-api
poetry run pytest tests/test_admin_encryption.py -v
  • Step 3: Commit
git add tests/test_admin_encryption.py
git commit -m "test(ENG-1131): add tests for admin encryption endpoints"

Task 9: platform-frontend — Types + API hooks

Repo: /Users/admin/dev/cap/platform-frontend

Files:

  • Modify: existing types file where Member/Me types are defined (find with grep -rn "is_infra_admin\|role.*member.*owner" src/types or similar)

  • Create: src/api/encryption.ts (API hooks for the 4 endpoints)

  • Step 1: Find and update Member/Me types

Search for existing type definitions:

grep -rn "interface.*Member\|type.*Member\|interface.*Me " src/ --include="*.ts" --include="*.tsx" | head -20

Add is_infra_admin: boolean to Member, Membership, and Me interfaces.

  • Step 2: Create encryption API types and hooks

Create src/api/encryption.ts (adjust path to match existing API hook patterns):

import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { api } from "./client"; // adjust import to match existing API client

export interface EncryptionConfig {
  orgid: string;
  vault_url: string;
  tenant_id: string;
  client_id: string;
  key_name: string;
  ssm_parameter_arn?: string;
  kek_id?: string;
  configured_at?: string;
  configured_by_user_id?: string;
  status: string;
}

export interface TestConnectionResult {
  ok: boolean;
  error_code?: string;
  error_detail?: string;
}

export interface BucketStats {
  name: string;
  rewrapped: number;
  failed: number;
  skipped: number;
  last_run_at?: string;
}

export interface EncryptionStatus {
  configured: boolean;
  vault_url?: string;
  key_name?: string;
  current_kek_version?: string;
  last_rotation_at?: string;
  cached_at?: string;
  buckets: BucketStats[];
}

export interface ConfigureEncryptionRequest {
  vault_url: string;
  tenant_id: string;
  client_id: string;
  client_secret?: string;
  key_name: string;
}

export function useEncryptionStatus(fresh = false) {
  return useQuery({
    queryKey: ["encryption-status", fresh],
    queryFn: () =>
      api.get<EncryptionStatus>(`/admin/encryption/status${fresh ? "?fresh=true" : ""}`),
  });
}

export function useTestConnection() {
  return useMutation({
    mutationFn: (data: ConfigureEncryptionRequest & { client_secret: string }) =>
      api.post<TestConnectionResult>("/admin/encryption/test-connection", data),
  });
}

export function useConfigureEncryption() {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (data: ConfigureEncryptionRequest) =>
      api.post<EncryptionConfig>("/admin/encryption/configure", data),
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ["encryption-status"] });
    },
  });
}

export function useDeleteEncryptionConfig() {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: () => api.delete<{ removed: boolean; fallback_available: boolean }>("/admin/encryption/configure"),
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ["encryption-status"] });
    },
  });
}
  • Step 3: Commit
git add -A
git commit -m "feat(ENG-1131): add encryption config types + API hooks"

Task 10: platform-frontend — Settings page with Configuration + Status tabs

Repo: /Users/admin/dev/cap/platform-frontend

Files:

  • Create: src/pages/org-settings/encryption/ directory with page component
  • Modify: sidebar/nav config to add "Encryption" entry (conditional on is_infra_admin)
  • Modify: router config to add /settings/encryption route

This task requires exploring the existing FE codebase to understand:

  1. How src/pages/org-settings/ is structured
  2. Where sidebar nav items are defined
  3. How routes are registered
  4. What UI component library is used (shadcn? MUI? custom?)
  • Step 1: Explore existing settings structure
find src/pages/org-settings -type f | head -20
grep -rn "Organization Settings\|org-settings\|/settings/" src/ --include="*.tsx" | head -20
  • Step 2: Create encryption settings page

Create the page with two tabs (Configuration + Status). Use the same component library and layout patterns as existing settings pages.

The Configuration tab contains:

  • Form fields: vault_url, tenant_id, client_id, client_secret (password), key_name
  • "Test Connection" button → useTestConnection() → inline result
  • "Save" button → useConfigureEncryption() → navigate to Status tab

The Status tab contains:

  • Header card with vault_url, key_name, current KEK version, last rotation

  • Per-bucket stats table

  • Refresh button → useEncryptionStatus(true)

  • Edit Configuration link → switch to Configuration tab

  • Remove Configuration button → confirm modal → useDeleteEncryptionConfig()

  • Step 3: Add sidebar entry (conditional)

Find where the Organization Settings sidebar items are defined. Add "Encryption" entry, conditionally rendered when useMe().is_infra_admin === true.

  • Step 4: Add route

Register /settings/encryption in the router config, pointing to the new page component.

  • Step 5: Add route guard

If user lands on /settings/encryption without is_infra_admin, redirect to /settings/general.

  • Step 6: Test manually
npm run dev  # or pnpm dev / yarn dev

Navigate to /settings/encryption — should see the Configuration form.

  • Step 7: Commit
git add -A
git commit -m "feat(ENG-1131): add /settings/encryption page with Configuration + Status tabs"

Task 11: platform-frontend — Admin edit-member modal checkbox

Repo: /Users/admin/dev/cap/platform-frontend

Files:

  • Modify: src/pages/admin/org-details/org-details-page.tsx (or wherever the edit-member modal lives)

  • Modify: src/pages/admin/settings/members-admin-view.tsx (alternative location)

  • Step 1: Find the edit-member modal

grep -rn "edit.*member\|member.*modal\|role.*select\|Role.*Owner\|Role.*Member" src/pages/admin/ --include="*.tsx" | head -20
  • Step 2: Add "Infra Admin" checkbox

In the edit-member modal, next to the Role dropdown, add a checkbox labeled "Infra Admin". Wire it to the is_infra_admin field in the PATCH request body.

  • Step 3: Test manually

Navigate to /admin/orgs/<orgid> → Members tab → click edit on a member → verify "Infra Admin" checkbox appears.

  • Step 4: Commit
git add -A
git commit -m "feat(ENG-1131): add Infra Admin checkbox to admin edit-member modal"

Task 12: Final integration test + cleanup

All repos

  • Step 1: Run all test suites
# clj-pg-wrapper
cd /Users/admin/dev/cap/clj-pg-wrapper && poetry run pytest -x -q

# platform-api
cd /Users/admin/dev/cap/platform-api && poetry run pytest -x -q

# platform-frontend
cd /Users/admin/dev/cap/platform-frontend && npm test  # or equivalent
  • Step 2: Terraform format check
cd /Users/admin/dev/cap/terraform
terraform fmt -recursive -check
  • Step 3: Verify all commits on feature branch
# Each repo should be on feature/ENG-1131-encryption-key-mgmt-ui
for repo in clj-pg-wrapper platform-api platform-frontend terraform; do
  echo "=== $repo ==="
  cd /Users/admin/dev/cap/$repo
  git log --oneline main..HEAD
  cd -
done
  • Step 4: Create PRs

One PR per repo, all referencing ENG-1131:

# In each repo:
gh pr create --title "ENG-1131: Encryption Key Management UI" --body "$(cat <<'EOF'
## Summary
- Adds self-service encryption key management for EY
- New `is_infra_admin` flag on org membership (orthogonal to role)
- DDB + SSM hybrid config storage (mirrors client_api_keys pattern)
- Four `/api/v1/admin/encryption/*` endpoints
- New `/settings/encryption` page (Configuration + Status tabs)
- Admin edit-member modal gains Infra Admin checkbox

## Test plan
- [ ] Grant infra-admin via admin panel → user sees Encryption in sidebar
- [ ] Configure with valid Azure KV creds → dashboard shows KEK version
- [ ] Test connection with bad creds → shows specific error
- [ ] Edit config (blank secret) → existing secret preserved
- [ ] Remove config → falls back to workspace-level SSM

Spec: `docs/superpowers/specs/2026-04-17-eng-1131-encryption-key-mgmt-ui-design.md`

🤖 Generated with [Claude Code](https://claude.com/claude-code)
EOF
)"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment