From 2f34e9efb8ca6e494c985b5ea194ae3a1f5a0dfd Mon Sep 17 00:00:00 2001 From: Tomasz Swierszcz Date: Thu, 2 Jul 2026 13:19:42 +0200 Subject: [PATCH] feat(ngts): add Cloud/NGTS certificate revocation via GraphQL Implement revoke_cert on CloudConnection using a new _graphql helper that POSTs the GraphQL revokeCertificate mutation; NGTSConnection inherits it unchanged (Bearer auth + .../ngts/graphql via its _post override). Thumbprint-keyed (Go parity): thumbprint required and uppercased, disable ignored, ca_compromise rejected. Add CertificateRevokeError and an optional RevocationRequest.ca_account_name (resolved to a CA-account id via ListCAAccounts). Add offline unit tests plus live NGTS/VaaS revoke tests (skip when the zone CA cannot revoke). Document revoke in README-NGTS.md and bump version to 0.21.0. --- README-NGTS.md | 29 +++++++ docs/version_history.md | 5 ++ setup.py | 2 +- tests/test_local_methods.py | 164 ++++++++++++++++++++++++++++++++++- tests/test_ngts.py | 24 +++++- tests/test_vaas.py | 14 ++- vcert/__init__.py | 2 +- vcert/common.py | 9 +- vcert/connection_cloud.py | 168 ++++++++++++++++++++++++++++++++++-- vcert/errors.py | 5 ++ 10 files changed, 409 insertions(+), 13 deletions(-) diff --git a/README-NGTS.md b/README-NGTS.md index 8b55e17..c1c9768 100644 --- a/README-NGTS.md +++ b/README-NGTS.md @@ -31,6 +31,7 @@ enrollment. This guide covers using it against **Palo Alto Networks Next-Gen Tru - [Connect with a pre-issued access token](#connect-with-a-pre-issued-access-token) - [Request and retrieve a certificate](#request-and-retrieve-a-certificate) - [Renew a certificate](#renew-a-certificate) + - [Revoke a certificate](#revoke-a-certificate) ## Prerequisites @@ -196,6 +197,34 @@ conn.renew_cert(request) cert = conn.retrieve_cert(request) ``` +### Revoke a certificate + +NGTS (and Cloud) revocation is keyed by the certificate's **SHA-1 thumbprint** and goes through the +GraphQL CA-operations service. This differs from TPP revoke in three ways: an enrollment id alone is +**not** accepted (a thumbprint is required), the `disable` flag is **ignored**, and `ca_compromise` +is **not** a valid reason (it has no NGTS mapping). Valid reasons are `NoReason` (the default, +"unspecified"), `key_compromise`, `affiliation_changed`, `superseded`, and `cessation_of_operation`. + +```python +import binascii +from cryptography.hazmat.primitives import hashes + +from vcert import RevocationRequest + +# `cert` is the issued certificate as a cryptography x509 object (e.g. parsed from retrieve_cert). +thumbprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + +request = RevocationRequest(thumbprint=thumbprint, + reason=RevocationRequest.RevocationReasons.key_compromise) +result = conn.revoke_cert(request) +print(result["status"]) # e.g. SUBMITTED / PENDING_APPROVAL +``` + +> `revoke_cert` revokes the certificate at the CA (CRL/OCSP). To instead withdraw the certificate +> record from the inventory (lifecycle housekeeping, not CA revocation), use `retire_cert`. Set +> `ca_account_name=` on the `RevocationRequest` only when the certificate was issued by an external +> CA that requires it; for certificates issued by CM SaaS itself, leave it unset (the id stays null). + --- For backend-neutral SDK usage (request/retrieve/renew/revoke data objects, output formats), diff --git a/docs/version_history.md b/docs/version_history.md index 38b8c32..ce190ee 100644 --- a/docs/version_history.md +++ b/docs/version_history.md @@ -2,6 +2,11 @@ ## Version History +#### 0.21.0 +* Added certificate revocation (`revoke_cert`) for CyberArk Certificate Manager, SaaS (Cloud/VaaS) and NGTS (Strata Cloud Manager), via the GraphQL CA-operations `revokeCertificate` mutation (keyed by SHA-1 thumbprint) +* Cloud `revoke_cert` no longer raises `NotImplementedError`; NGTS inherits the same implementation +* Added the public `CertificateRevokeError` exception and an optional `ca_account_name` field on `RevocationRequest` + #### 0.20.0 * Added policy management (`get_policy`/`set_policy`) for NGTS (Strata Cloud Manager), operating on the CIT-only zone diff --git a/setup.py b/setup.py index cb52eb7..66b37c7 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ long_description = f.read() setup(name='vcert', - version='0.20.0', + version='0.21.0', url="https://github.com/Venafi/vcert-python", packages=['vcert', 'vcert.parser', 'vcert.policy'], install_requires=['requests>=2.32.4', 'python-dateutil>=2.9.0.post0', 'six>=1.17.0', diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index 80ff14a..2072a21 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -24,11 +24,13 @@ from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection, - NGTSConnection, logger) + NGTSConnection, RevocationRequest, logger) from vcert.connection_cloud import URLS from vcert.connection_ngts import (_parse_ngts_zone, DEFAULT_API_URL, DEFAULT_TOKEN_URL, TRUSTED_TOKEN_HOST_SUFFIX) -from vcert.errors import ClientBadData, ServerUnexptedBehavior, VenafiError +from vcert.errors import (ClientBadData, ServerUnexptedBehavior, VenafiError, VenafiConnectionError, + CertificateRevokeError) +from vcert.http_status import HTTPStatus from vcert.pem import parse_pem, Certificate from vcert.policy.pm_cloud import CertificateAuthorityDetails, CertificateAuthorityInfo from vcert.policy.policy_spec import DEFAULT_CA, Policy, PolicySpecification @@ -627,3 +629,161 @@ def test_ngts_get_policy_missing_cit_raises(self): with mock.patch.object(conn, '_get_cit', return_value=None): with self.assertRaises(VenafiError): conn.get_policy("does-not-exist") + + # -- Cloud / NGTS revoke (offline) -------------------------------------------------------- + # + # Cloud and NGTS revoke via the GraphQL CA-operations `revokeCertificate` mutation (no REST + # endpoint). `revoke_cert` + `_graphql` live on CloudConnection; NGTS inherits them unchanged + # and only differs in transport (Bearer auth + strata host), which the boundary tests pin. + # The logic tests patch `_graphql`/`_post`; the boundary tests patch `requests.post`. + + @staticmethod + def _cloud_conn(**kwargs): + defaults = dict(token="apikey", url="https://api.venafi.cloud/") + defaults.update(kwargs) + return CloudConnection(**defaults) + + @staticmethod + def _revoke_data(status="SUBMITTED", error=None, approval=None, fingerprint="AABB", cid="cid", serial="01"): + # The inner `data` object _graphql returns (after unwrapping the GraphQL envelope). + return { + "revokeCertificate": { + "id": cid, + "fingerprint": fingerprint, + "serialNumber": serial, + "revocation": {"status": status, "error": error, "approvalDetails": approval}, + } + } + + def test_revoke_requires_thumbprint(self): + conn = self._cloud_conn() + with self.assertRaises(ClientBadData): + conn.revoke_cert(RevocationRequest()) + + def test_revoke_rejects_ca_compromise_reason(self): + # ca_compromise has no GraphQL enum (TPP-only in Go) -> rejected, not silently mismapped. + conn = self._cloud_conn() + req = RevocationRequest(thumbprint="AABB", reason=RevocationRequest.RevocationReasons.ca_compromise) + with self.assertRaises(ClientBadData): + conn.revoke_cert(req) + + def test_revoke_reason_mapping(self): + conn = self._cloud_conn() + cases = { + RevocationRequest.RevocationReasons.NoReason: "UNSPECIFIED", + RevocationRequest.RevocationReasons.key_compromise: "KEY_COMPROMISE", + RevocationRequest.RevocationReasons.affiliation_changed: "AFFILIATION_CHANGED", + RevocationRequest.RevocationReasons.superseded: "SUPERSEDED", + RevocationRequest.RevocationReasons.cessation_of_operation: "CESSATION_OF_OPERATION", + } + for reason, enum_name in cases.items(): + with mock.patch.object(conn, '_graphql', return_value=self._revoke_data()) as gql: + conn.revoke_cert(RevocationRequest(thumbprint="aabb", reason=reason, comments="")) + variables = gql.call_args[0][1] + self.assertEqual(variables["revocationReason"], enum_name) + # all four variable keys are always present; nullable ones are None when unset. + self.assertEqual(set(variables), {"fingerprint", "certificateAuthorityAccountId", + "revocationReason", "revocationComment"}) + self.assertEqual(variables["fingerprint"], "AABB") # normalized to uppercase + self.assertIsNone(variables["certificateAuthorityAccountId"]) + self.assertIsNone(variables["revocationComment"]) + + def test_revoke_success_returns_result(self): + conn = self._cloud_conn() + with mock.patch.object(conn, '_graphql', return_value=self._revoke_data(status="SUBMITTED")): + result = conn.revoke_cert(RevocationRequest(thumbprint="aabb")) + self.assertEqual(result["id"], "cid") + self.assertEqual(result["thumbprint"], "AABB") + self.assertEqual(result["status"], "SUBMITTED") + self.assertEqual(result["serial"], "01") + self.assertIsNone(result["rejection_reason"]) + + def test_revoke_surfaces_revocation_error(self): + conn = self._cloud_conn() + data = self._revoke_data(status=None, error={"message": "boom", "code": 7, "arguments": ["x"]}) + with mock.patch.object(conn, '_graphql', return_value=data): + with self.assertRaises(CertificateRevokeError): + conn.revoke_cert(RevocationRequest(thumbprint="aabb")) + + def test_revoke_failed_status(self): + conn = self._cloud_conn() + with mock.patch.object(conn, '_graphql', return_value=self._revoke_data(status="FAILED")): + with self.assertRaises(CertificateRevokeError): + conn.revoke_cert(RevocationRequest(thumbprint="aabb")) + + def test_revoke_ca_account_not_found(self): + # A supplied CA-account name not returned by ListCAAccounts is a caller/input error: + # plain VenafiError, NOT CertificateRevokeError (reserved for backend/transport failures). + conn = self._cloud_conn() + req = RevocationRequest(thumbprint="aabb", ca_account_name="missing") + with mock.patch.object(conn, '_graphql', + return_value={"certificateAuthorityAccounts": {"nodes": []}}): + with self.assertRaises(VenafiError) as ctx: + conn.revoke_cert(req) + self.assertNotIsInstance(ctx.exception, CertificateRevokeError) + + def test_list_ca_accounts_failure(self): + # A transport failure of the ListCAAccounts call surfaces as CertificateRevokeError + # (distinct from the name-not-found VenafiError above). + conn = self._cloud_conn() + req = RevocationRequest(thumbprint="aabb", ca_account_name="acme") + with mock.patch.object(conn, '_post', side_effect=VenafiConnectionError("boom")): + with self.assertRaises(CertificateRevokeError): + conn.revoke_cert(req) + + def test_graphql_raises_on_top_level_errors(self): + conn = self._cloud_conn() + with mock.patch.object(conn, '_post', return_value=(200, {"errors": [{"message": "bad"}]})): + with self.assertRaises(CertificateRevokeError): + conn._graphql("query {}", {}, "Op") + + def test_graphql_raises_on_transport_error(self): + # Non-200/non-allowed statuses become VenafiConnectionError INSIDE process_server_response + # (before _graphql sees them); _graphql rewraps to CertificateRevokeError. Mock _post to + # RAISE (not return a tuple) to prove that path. + conn = self._cloud_conn() + with mock.patch.object(conn, '_post', side_effect=VenafiConnectionError("500")): + with self.assertRaises(CertificateRevokeError): + conn._graphql("query {}", {}, "Op") + + def test_graphql_raises_on_conflict(self): + # 409 is the one non-2xx status process_server_response RETURNS (not raises), so _graphql + # must handle it explicitly. + conn = self._cloud_conn() + with mock.patch.object(conn, '_post', return_value=(HTTPStatus.CONFLICT, {"x": 1})): + with self.assertRaises(CertificateRevokeError): + conn._graphql("query {}", {}, "Op") + + def test_cloud_revoke_post_boundary(self): + # Exercises the real _graphql -> _post -> process_server_response path for Cloud: asserts + # the graphql URL, the tppl-api-key header, and the {operationName, query, variables} body. + conn = self._cloud_conn() + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.headers = {'content-type': 'application/json'} + fake_resp.json.return_value = {"data": self._revoke_data()} + with mock.patch('vcert.connection_cloud.requests.post', return_value=fake_resp) as post: + conn.revoke_cert(RevocationRequest(thumbprint="aabb")) + args, kwargs = post.call_args + self.assertEqual(args[0], "https://api.venafi.cloud/graphql") + self.assertEqual(kwargs['headers']['tppl-api-key'], "apikey") + body = kwargs['json'] + self.assertEqual(body['operationName'], "RevokeCertificateRequest") + self.assertIn("revokeCertificate", body['query']) + self.assertEqual(set(body['variables']), {"fingerprint", "certificateAuthorityAccountId", + "revocationReason", "revocationComment"}) + self.assertEqual(body['variables']['fingerprint'], "AABB") + + def test_ngts_revoke_post_boundary(self): + # The same path on NGTS proves inheritance + Bearer/strata transport with no override. + conn = self._ngts_conn(access_token='t', token_url=None) + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.headers = {'content-type': 'application/json'} + fake_resp.json.return_value = {"data": self._revoke_data()} + with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post: + conn.revoke_cert(RevocationRequest(thumbprint="aabb")) + args, kwargs = post.call_args + self.assertEqual(args[0], "https://api.strata.paloaltonetworks.com/ngts/graphql") + self.assertEqual(kwargs['headers']['Authorization'], "Bearer t") + self.assertEqual(kwargs['json']['operationName'], "RevokeCertificateRequest") diff --git a/tests/test_ngts.py b/tests/test_ngts.py index 2f03b07..1b56bc0 100644 --- a/tests/test_ngts.py +++ b/tests/test_ngts.py @@ -27,8 +27,8 @@ from test_env import (NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_TSG_ID, NGTS_SCOPE, NGTS_ZONE) from test_utils import random_word, enroll, renew, renew_by_thumbprint -from vcert import NGTSConnection, KeyType, logger -from vcert.common import RetireRequest +from vcert import NGTSConnection, KeyType, logger, CertificateRevokeError +from vcert.common import RetireRequest, RevocationRequest from vcert.policy.policy_spec import Policy, PolicySpecification log = logger.get_child("test-ngts") @@ -74,6 +74,26 @@ def test_ngts_retire_by_thumbprint(self): ret_request = RetireRequest(thumbprint=fingerprint) self.assertTrue(self.ngts_conn.retire_cert(ret_request)) + def test_ngts_revoke_by_thumbprint(self): + # Cryptographic revocation via the GraphQL CA-operations mutation (thumbprint-keyed). + # revoke_cert uppercases the thumbprint internally, so a lowercase hexlify is fine here. + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + rev_request = RevocationRequest(thumbprint=fingerprint) + try: + result = self.ngts_conn.revoke_cert(rev_request) + except CertificateRevokeError as e: + # The whole request path (GraphQL revokeCertificate via the CA-operations service) is + # exercised end-to-end here. A tenant whose zone uses only the built-in CA gets back + # "revocation is not supported for CA type BUILTIN_CA" from the backend - skip the + # success assertion in that case (it needs a zone backed by a revocation-capable CA). + if "not supported for CA type" in str(e) or "BUILTIN" in str(e).upper(): + self.skipTest(f"zone CA does not support revocation: {e}") + raise + self.assertIsNotNone(result) + self.assertIn("status", result) + def test_ngts_read_zone_config(self): zone = self.ngts_conn.read_zone_conf(self.ngts_zone) self.assertIsNotNone(zone.policy) diff --git a/tests/test_vaas.py b/tests/test_vaas.py index ff3f740..f887090 100644 --- a/tests/test_vaas.py +++ b/tests/test_vaas.py @@ -31,7 +31,7 @@ get_vaas_zone from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE from vcert.policy import KeyPair, DefaultKeyPair, PolicySpecification -from vcert.common import RetireRequest +from vcert.common import RetireRequest, RevocationRequest log = logger.get_child("test-vaas") @@ -213,3 +213,15 @@ def test_cloud_retire_by_thumbprint(self): assert ret_data is True except Exception as e: log.error(msg=f"Error retiring certificate by thumbprint: {str(e)}") + + def test_cloud_revoke_by_thumbprint(self): + # Cryptographic revocation via the GraphQL CA-operations mutation (thumbprint-keyed). + # revoke_cert uppercases the thumbprint internally, so a lowercase hexlify is fine here. + req, cert = simple_enroll(self.cloud_conn, self.cloud_zone) + cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) + fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + time.sleep(1) + rev_request = RevocationRequest(thumbprint=fingerprint) + result = self.cloud_conn.revoke_cert(rev_request) + self.assertIsNotNone(result) + self.assertIn("status", result) diff --git a/vcert/__init__.py b/vcert/__init__.py index 02e096f..8d28d98 100644 --- a/vcert/__init__.py +++ b/vcert/__init__.py @@ -21,7 +21,7 @@ from .connection_tpp import TPPConnection from .connection_tpp_token import TPPTokenConnection from .connection_fake import FakeConnection -from .errors import VenafiError +from .errors import VenafiError, CertificateRevokeError from .logger import setup_logger, get_logger, get_child from .pem import Certificate from .ssh_utils import SSHCertRequest, SSHKeyPair, write_ssh_files, SSHCATemplateRequest, SSHConfig diff --git a/vcert/common.py b/vcert/common.py index 528b7f8..1e5a76a 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -580,16 +580,23 @@ class RevocationReasons: cessation_of_operation = 5 # OriginalUseNoLongerValid def __init__(self, req_id=None, thumbprint=None, reason=RevocationReasons.NoReason, - comments="Revoked via api with python bindings", disable=True): + comments="Revoked via api with python bindings", disable=True, ca_account_name=None): """ :param req_id: :param thumbprint: + :param reason: + :param comments: + :param disable: TPP-only; consumed by TPP revoke. Cloud/NGTS revoke ignores it (Go parity). + :param ca_account_name: Cloud/NGTS only. Optional CA-account name to target for revocation; + resolved to a CA-account id via GraphQL. Leave unset (the common case) for certificates + issued by CM SaaS itself, where the id stays null. """ self.id = req_id self.thumbprint = thumbprint self.reason = reason self.comments = comments self.disable = disable + self.ca_account_name = ca_account_name class RetireRequest: diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index 3fe7409..fdc2169 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -21,11 +21,11 @@ import urllib.parse as urlparse from nacl.public import SealedBox -from .common import (ZoneConfig, CertificateRequest, CommonConnection, Policy, get_ip_address, log_errors, MIME_JSON, - MIME_TEXT, MIME_ANY, CertField, KeyType, DEFAULT_TIMEOUT, +from .common import (ZoneConfig, CertificateRequest, CommonConnection, Policy, RevocationRequest, get_ip_address, + log_errors, MIME_JSON, MIME_TEXT, MIME_ANY, CertField, KeyType, DEFAULT_TIMEOUT, CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_LAST) from .errors import (VenafiConnectionError, ServerUnexptedBehavior, ClientBadData, CertificateRequestError, - CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError) + CertificateRenewError, CertificateRevokeError, VenafiError, RetrieveCertificateTimeoutError) from .http_status import HTTPStatus from .logger import get_child from .pem import parse_pem, Certificate @@ -60,6 +60,47 @@ OWNER_TYPE_USER = "USER" OWNER_TYPE_TEAM = "TEAM" +# -- GraphQL certificate revocation (Cloud / NGTS) ------------------------------------------- +# Cloud and NGTS revoke a certificate through a GraphQL CA-operations mutation; there is no REST +# revoke endpoint. The Go reference does the same (vcert/pkg/webclient/caoperations). These +# constants replicate that wire contract so it can be POSTed as raw JSON via the existing _post +# helper -- no GraphQL client dependency is introduced. +URL_GRAPHQL = "graphql" # base_url is trailing-slash normalized, so base + "graphql" + +# Python RevocationRequest.RevocationReasons (int) -> GraphQL RevocationReason enum string. +# Mirrors Go's RevocationReasonsMap (pkg/venafi/{cloud,ngts}/cloud.go). ca_compromise (2) is +# intentionally absent: it has no GraphQL enum (ca-compromise is TPP-only in Go), so it is +# rejected rather than silently mismapped. Keys are the bare ints 0,1,3,4,5 (RevocationReasons +# attributes are plain ints), so an int reason hash-matches the map directly. +REVOCATION_REASON_MAP = { + RevocationRequest.RevocationReasons.NoReason: "UNSPECIFIED", + RevocationRequest.RevocationReasons.key_compromise: "KEY_COMPROMISE", + RevocationRequest.RevocationReasons.affiliation_changed: "AFFILIATION_CHANGED", + RevocationRequest.RevocationReasons.superseded: "SUPERSEDED", + RevocationRequest.RevocationReasons.cessation_of_operation: "CESSATION_OF_OPERATION", +} + +# Verbatim from vcert/pkg/webclient/caoperations/genqlient.graphql (whitespace is irrelevant to +# the server). operationName must match the mutation name. +REVOKE_CERTIFICATE_MUTATION = """\ +mutation RevokeCertificateRequest($fingerprint: ID!, $certificateAuthorityAccountId: UUID, $revocationReason: RevocationReason!, $revocationComment: String) { + revokeCertificate(fingerprint: $fingerprint, certificateAuthorityAccountId: $certificateAuthorityAccountId, revocationReason: $revocationReason, revocationComment: $revocationComment) { + id + fingerprint + revocation { status error { arguments code message } approvalDetails { rejectionReason } } + serialNumber + } +} +""" + +# From vcert/pkg/webclient/caaccounts: resolve a CA-account name to its id for the optional +# certificateAuthorityAccountId revocation variable. +LIST_CA_ACCOUNTS_QUERY = """\ +query ListCAAccounts { + certificateAuthorityAccounts { nodes { id name certificateAuthorityType } } +} +""" + log = get_child("connection-vaas") @@ -479,9 +520,126 @@ def retrieve_cert(self, request): else: raise ServerUnexptedBehavior + def _graphql(self, query, variables, operation_name): + """ + POST a GraphQL operation as raw JSON and return its parsed ``data`` object. + + Reuses the connector's own ``_post`` (so Cloud sends the ``tppl-api-key`` header and NGTS, + via its override, sends ``Authorization: Bearer`` with a fresh token) against the + ``graphql`` endpoint. ``_post`` funnels through + ``CommonConnection.process_server_response``, which RAISES ``VenafiConnectionError`` for any + status outside {200, 201, 202, 409} before control returns here -- so a 4xx/5xx GraphQL + response surfaces as that error, which we rewrap to ``CertificateRevokeError``. The only + statuses that reach this method are 200/201/202/409. GraphQL also reports failures as a + top-level ``errors`` array on an HTTP 200, so that is checked explicitly. + + :param str query: the GraphQL document + :param dict variables: the operation variables (nullable ones may be ``None``) + :param str operation_name: the operation name (must match the document) + :rtype: dict + """ + body = {"operationName": operation_name, "query": query, "variables": variables} + try: + status, data = self._post(URL_GRAPHQL, body) + except VenafiConnectionError as e: + log.error(f"GraphQL {operation_name} failed at transport layer: {e}") + raise CertificateRevokeError(f"GraphQL {operation_name} transport error: {e}") + if status == HTTPStatus.CONFLICT: + raise CertificateRevokeError(f"GraphQL {operation_name} returned 409 CONFLICT: {data}") + if not isinstance(data, dict): + raise ServerUnexptedBehavior(f"GraphQL {operation_name} response was not a JSON object") + if data.get("errors"): + raise CertificateRevokeError(f"GraphQL {operation_name} errors: {data['errors']}") + return data.get("data") + def revoke_cert(self, request): - # not supported in Venafi Cloud - raise NotImplementedError + """ + Revoke a certificate via the GraphQL CA-operations ``revokeCertificate`` mutation. + + Cloud/NGTS revoke is keyed by the certificate's SHA-1 thumbprint. Unlike TPP revoke (which + accepts an id or a thumbprint and honors ``disable``), it requires a thumbprint and ignores + ``request.disable`` -- matching the Go reference. ``request.reason`` maps to the GraphQL + ``RevocationReason`` enum; ``ca_compromise`` has no enum and is rejected. + + :param RevocationRequest request: + :rtype: dict + """ + # 1. thumbprint is mandatory (Go parity); unlike TPP, an id alone is not accepted. Normalize + # to uppercase hex: Go's producer computes the fingerprint as ToUpper(hex(sha1(DER))), so + # the backend's `fingerprint` ID is uppercase. + if not request.thumbprint: + log.error("certificate fingerprint (thumbprint) is required to revoke") + raise ClientBadData("certificate fingerprint(thumbprint) is required") + fingerprint = request.thumbprint.upper() + + # 2. disable is a TPP-only field; Cloud/NGTS ignore it. Log when a non-default value is set + # so the no-op is not silent. + if getattr(request, "disable", True) is not True: + log.debug("request.disable is ignored by Cloud/NGTS revoke (TPP-only field); no-op here") + + # 3. map the reason to the GraphQL enum; ca_compromise / unknown -> ClientBadData. + try: + revocation_reason = REVOCATION_REASON_MAP[request.reason] + except (KeyError, TypeError): + log.error(f"unsupported revocation reason: {request.reason!r}") + raise ClientBadData(f"unsupported revocation reason: {request.reason!r}") + + # 4. optional CA-account name -> id. Null by default (the common case: certs issued by CM + # SaaS itself); a name is resolved to an id via ListCAAccounts. + ca_account_id = None + ca_account_name = getattr(request, "ca_account_name", None) + if ca_account_name: + ca_data = self._graphql(LIST_CA_ACCOUNTS_QUERY, {}, "ListCAAccounts") + accounts = (ca_data or {}).get("certificateAuthorityAccounts") or {} + nodes = accounts.get("nodes") or [] + match = next((n for n in nodes if n.get("name") == ca_account_name), None) + if match is None: + raise VenafiError(f"failed to find CA account {ca_account_name!r}") + ca_account_id = match.get("id") + + # 5. comment: send null when empty (matches Go). + comment = request.comments if request.comments else None + + variables = { + "fingerprint": fingerprint, + "certificateAuthorityAccountId": ca_account_id, + "revocationReason": revocation_reason, + "revocationComment": comment, + } + data = self._graphql(REVOKE_CERTIFICATE_MUTATION, variables, "RevokeCertificateRequest") + + # 6. interpret the response (error-first), mirroring Go cloud/connector.go. + result = (data or {}).get("revokeCertificate") + if result is None: + raise ServerUnexptedBehavior("revoke certificate response is empty") + revocation = result.get("revocation") + if revocation is None: + raise ServerUnexptedBehavior("revocation object in revoke certificate response is empty") + + err = revocation.get("error") + if err is not None: + raise CertificateRevokeError( + f"revocation failed for {result.get('id')}/{result.get('fingerprint')}: " + f"{err.get('message')} (code={err.get('code')}, arguments={err.get('arguments')})" + ) + + status_value = revocation.get("status") + if status_value == "FAILED": + raise CertificateRevokeError( + f"revocation FAILED for {result.get('id')}/{result.get('fingerprint')}") + + rejection_reason = None + approval = revocation.get("approvalDetails") + if approval is not None and approval.get("rejectionReason") is not None: + rejection_reason = approval.get("rejectionReason") + + return { + "id": result.get("id"), + "thumbprint": result.get("fingerprint"), + "serial": result.get("serialNumber"), + "status": status_value, + "rejection_reason": rejection_reason, + } def retire_cert(self, request): cert_id = None diff --git a/vcert/errors.py b/vcert/errors.py index ac4ff4e..3a9d69e 100644 --- a/vcert/errors.py +++ b/vcert/errors.py @@ -43,6 +43,11 @@ class CertificateRenewError(ServerUnexptedBehavior): pass +class CertificateRevokeError(ServerUnexptedBehavior): + """Raised when the backend reports a certificate revocation failure.""" + pass + + class AuthenticationError(VenafiError): pass