Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README-NGTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enrollment. This guide covers using it against **Palo Alto Networks Next-Gen Tru
- [Connect with a pre-issued access token](#connect-with-a-pre-issued-access-token)
- [Request and retrieve a certificate](#request-and-retrieve-a-certificate)
- [Renew a certificate](#renew-a-certificate)
- [Revoke a certificate](#revoke-a-certificate)

## Prerequisites

Expand Down Expand Up @@ -196,6 +197,34 @@ conn.renew_cert(request)
cert = conn.retrieve_cert(request)
```

### Revoke a certificate

NGTS (and Cloud) revocation is keyed by the certificate's **SHA-1 thumbprint** and goes through the
GraphQL CA-operations service. This differs from TPP revoke in three ways: an enrollment id alone is
**not** accepted (a thumbprint is required), the `disable` flag is **ignored**, and `ca_compromise`
is **not** a valid reason (it has no NGTS mapping). Valid reasons are `NoReason` (the default,
"unspecified"), `key_compromise`, `affiliation_changed`, `superseded`, and `cessation_of_operation`.

```python
import binascii
from cryptography.hazmat.primitives import hashes

from vcert import RevocationRequest

# `cert` is the issued certificate as a cryptography x509 object (e.g. parsed from retrieve_cert).
thumbprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()

request = RevocationRequest(thumbprint=thumbprint,
reason=RevocationRequest.RevocationReasons.key_compromise)
result = conn.revoke_cert(request)
print(result["status"]) # e.g. SUBMITTED / PENDING_APPROVAL
```

> `revoke_cert` revokes the certificate at the CA (CRL/OCSP). To instead withdraw the certificate
> record from the inventory (lifecycle housekeeping, not CA revocation), use `retire_cert`. Set
> `ca_account_name=` on the `RevocationRequest` only when the certificate was issued by an external
> CA that requires it; for certificates issued by CM SaaS itself, leave it unset (the id stays null).

---

For backend-neutral SDK usage (request/retrieve/renew/revoke data objects, output formats),
Expand Down
5 changes: 5 additions & 0 deletions docs/version_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Version History

#### 0.21.0
* Added certificate revocation (`revoke_cert`) for CyberArk Certificate Manager, SaaS (Cloud/VaaS) and NGTS (Strata Cloud Manager), via the GraphQL CA-operations `revokeCertificate` mutation (keyed by SHA-1 thumbprint)
* Cloud `revoke_cert` no longer raises `NotImplementedError`; NGTS inherits the same implementation
* Added the public `CertificateRevokeError` exception and an optional `ca_account_name` field on `RevocationRequest`

#### 0.20.0
* Added policy management (`get_policy`/`set_policy`) for NGTS (Strata Cloud Manager), operating on the CIT-only zone

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
long_description = f.read()

setup(name='vcert',
version='0.20.0',
version='0.21.0',
url="https://github.com/Venafi/vcert-python",
packages=['vcert', 'vcert.parser', 'vcert.policy'],
install_requires=['requests>=2.32.4', 'python-dateutil>=2.9.0.post0', 'six>=1.17.0',
Expand Down
164 changes: 162 additions & 2 deletions tests/test_local_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN
from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection,
NGTSConnection, logger)
NGTSConnection, RevocationRequest, logger)
from vcert.connection_cloud import URLS
from vcert.connection_ngts import (_parse_ngts_zone, DEFAULT_API_URL, DEFAULT_TOKEN_URL,
TRUSTED_TOKEN_HOST_SUFFIX)
from vcert.errors import ClientBadData, ServerUnexptedBehavior, VenafiError
from vcert.errors import (ClientBadData, ServerUnexptedBehavior, VenafiError, VenafiConnectionError,
CertificateRevokeError)
from vcert.http_status import HTTPStatus
from vcert.pem import parse_pem, Certificate
from vcert.policy.pm_cloud import CertificateAuthorityDetails, CertificateAuthorityInfo
from vcert.policy.policy_spec import DEFAULT_CA, Policy, PolicySpecification
Expand Down Expand Up @@ -627,3 +629,161 @@ def test_ngts_get_policy_missing_cit_raises(self):
with mock.patch.object(conn, '_get_cit', return_value=None):
with self.assertRaises(VenafiError):
conn.get_policy("does-not-exist")

# -- Cloud / NGTS revoke (offline) --------------------------------------------------------
#
# Cloud and NGTS revoke via the GraphQL CA-operations `revokeCertificate` mutation (no REST
# endpoint). `revoke_cert` + `_graphql` live on CloudConnection; NGTS inherits them unchanged
# and only differs in transport (Bearer auth + strata host), which the boundary tests pin.
# The logic tests patch `_graphql`/`_post`; the boundary tests patch `requests.post`.

@staticmethod
def _cloud_conn(**kwargs):
defaults = dict(token="apikey", url="https://api.venafi.cloud/")
defaults.update(kwargs)
return CloudConnection(**defaults)

@staticmethod
def _revoke_data(status="SUBMITTED", error=None, approval=None, fingerprint="AABB", cid="cid", serial="01"):
# The inner `data` object _graphql returns (after unwrapping the GraphQL envelope).
return {
"revokeCertificate": {
"id": cid,
"fingerprint": fingerprint,
"serialNumber": serial,
"revocation": {"status": status, "error": error, "approvalDetails": approval},
}
}

def test_revoke_requires_thumbprint(self):
conn = self._cloud_conn()
with self.assertRaises(ClientBadData):
conn.revoke_cert(RevocationRequest())

def test_revoke_rejects_ca_compromise_reason(self):
# ca_compromise has no GraphQL enum (TPP-only in Go) -> rejected, not silently mismapped.
conn = self._cloud_conn()
req = RevocationRequest(thumbprint="AABB", reason=RevocationRequest.RevocationReasons.ca_compromise)
with self.assertRaises(ClientBadData):
conn.revoke_cert(req)

def test_revoke_reason_mapping(self):
conn = self._cloud_conn()
cases = {
RevocationRequest.RevocationReasons.NoReason: "UNSPECIFIED",
RevocationRequest.RevocationReasons.key_compromise: "KEY_COMPROMISE",
RevocationRequest.RevocationReasons.affiliation_changed: "AFFILIATION_CHANGED",
RevocationRequest.RevocationReasons.superseded: "SUPERSEDED",
RevocationRequest.RevocationReasons.cessation_of_operation: "CESSATION_OF_OPERATION",
}
for reason, enum_name in cases.items():
with mock.patch.object(conn, '_graphql', return_value=self._revoke_data()) as gql:
conn.revoke_cert(RevocationRequest(thumbprint="aabb", reason=reason, comments=""))
variables = gql.call_args[0][1]
self.assertEqual(variables["revocationReason"], enum_name)
# all four variable keys are always present; nullable ones are None when unset.
self.assertEqual(set(variables), {"fingerprint", "certificateAuthorityAccountId",
"revocationReason", "revocationComment"})
self.assertEqual(variables["fingerprint"], "AABB") # normalized to uppercase
self.assertIsNone(variables["certificateAuthorityAccountId"])
self.assertIsNone(variables["revocationComment"])

def test_revoke_success_returns_result(self):
conn = self._cloud_conn()
with mock.patch.object(conn, '_graphql', return_value=self._revoke_data(status="SUBMITTED")):
result = conn.revoke_cert(RevocationRequest(thumbprint="aabb"))
self.assertEqual(result["id"], "cid")
self.assertEqual(result["thumbprint"], "AABB")
self.assertEqual(result["status"], "SUBMITTED")
self.assertEqual(result["serial"], "01")
self.assertIsNone(result["rejection_reason"])

def test_revoke_surfaces_revocation_error(self):
conn = self._cloud_conn()
data = self._revoke_data(status=None, error={"message": "boom", "code": 7, "arguments": ["x"]})
with mock.patch.object(conn, '_graphql', return_value=data):
with self.assertRaises(CertificateRevokeError):
conn.revoke_cert(RevocationRequest(thumbprint="aabb"))

def test_revoke_failed_status(self):
conn = self._cloud_conn()
with mock.patch.object(conn, '_graphql', return_value=self._revoke_data(status="FAILED")):
with self.assertRaises(CertificateRevokeError):
conn.revoke_cert(RevocationRequest(thumbprint="aabb"))

def test_revoke_ca_account_not_found(self):
# A supplied CA-account name not returned by ListCAAccounts is a caller/input error:
# plain VenafiError, NOT CertificateRevokeError (reserved for backend/transport failures).
conn = self._cloud_conn()
req = RevocationRequest(thumbprint="aabb", ca_account_name="missing")
with mock.patch.object(conn, '_graphql',
return_value={"certificateAuthorityAccounts": {"nodes": []}}):
with self.assertRaises(VenafiError) as ctx:
conn.revoke_cert(req)
self.assertNotIsInstance(ctx.exception, CertificateRevokeError)

def test_list_ca_accounts_failure(self):
# A transport failure of the ListCAAccounts call surfaces as CertificateRevokeError
# (distinct from the name-not-found VenafiError above).
conn = self._cloud_conn()
req = RevocationRequest(thumbprint="aabb", ca_account_name="acme")
with mock.patch.object(conn, '_post', side_effect=VenafiConnectionError("boom")):
with self.assertRaises(CertificateRevokeError):
conn.revoke_cert(req)

def test_graphql_raises_on_top_level_errors(self):
conn = self._cloud_conn()
with mock.patch.object(conn, '_post', return_value=(200, {"errors": [{"message": "bad"}]})):
with self.assertRaises(CertificateRevokeError):
conn._graphql("query {}", {}, "Op")

def test_graphql_raises_on_transport_error(self):
# Non-200/non-allowed statuses become VenafiConnectionError INSIDE process_server_response
# (before _graphql sees them); _graphql rewraps to CertificateRevokeError. Mock _post to
# RAISE (not return a tuple) to prove that path.
conn = self._cloud_conn()
with mock.patch.object(conn, '_post', side_effect=VenafiConnectionError("500")):
with self.assertRaises(CertificateRevokeError):
conn._graphql("query {}", {}, "Op")

def test_graphql_raises_on_conflict(self):
# 409 is the one non-2xx status process_server_response RETURNS (not raises), so _graphql
# must handle it explicitly.
conn = self._cloud_conn()
with mock.patch.object(conn, '_post', return_value=(HTTPStatus.CONFLICT, {"x": 1})):
with self.assertRaises(CertificateRevokeError):
conn._graphql("query {}", {}, "Op")

def test_cloud_revoke_post_boundary(self):
# Exercises the real _graphql -> _post -> process_server_response path for Cloud: asserts
# the graphql URL, the tppl-api-key header, and the {operationName, query, variables} body.
conn = self._cloud_conn()
fake_resp = mock.MagicMock()
fake_resp.status_code = 200
fake_resp.headers = {'content-type': 'application/json'}
fake_resp.json.return_value = {"data": self._revoke_data()}
with mock.patch('vcert.connection_cloud.requests.post', return_value=fake_resp) as post:
conn.revoke_cert(RevocationRequest(thumbprint="aabb"))
args, kwargs = post.call_args
self.assertEqual(args[0], "https://api.venafi.cloud/graphql")
self.assertEqual(kwargs['headers']['tppl-api-key'], "apikey")
body = kwargs['json']
self.assertEqual(body['operationName'], "RevokeCertificateRequest")
self.assertIn("revokeCertificate", body['query'])
self.assertEqual(set(body['variables']), {"fingerprint", "certificateAuthorityAccountId",
"revocationReason", "revocationComment"})
self.assertEqual(body['variables']['fingerprint'], "AABB")

def test_ngts_revoke_post_boundary(self):
# The same path on NGTS proves inheritance + Bearer/strata transport with no override.
conn = self._ngts_conn(access_token='t', token_url=None)
fake_resp = mock.MagicMock()
fake_resp.status_code = 200
fake_resp.headers = {'content-type': 'application/json'}
fake_resp.json.return_value = {"data": self._revoke_data()}
with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post:
conn.revoke_cert(RevocationRequest(thumbprint="aabb"))
args, kwargs = post.call_args
self.assertEqual(args[0], "https://api.strata.paloaltonetworks.com/ngts/graphql")
self.assertEqual(kwargs['headers']['Authorization'], "Bearer t")
self.assertEqual(kwargs['json']['operationName'], "RevokeCertificateRequest")
24 changes: 22 additions & 2 deletions tests/test_ngts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from test_env import (NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_TSG_ID, NGTS_SCOPE,
NGTS_ZONE)
from test_utils import random_word, enroll, renew, renew_by_thumbprint
from vcert import NGTSConnection, KeyType, logger
from vcert.common import RetireRequest
from vcert import NGTSConnection, KeyType, logger, CertificateRevokeError
from vcert.common import RetireRequest, RevocationRequest
from vcert.policy.policy_spec import Policy, PolicySpecification

log = logger.get_child("test-ngts")
Expand Down Expand Up @@ -74,6 +74,26 @@ def test_ngts_retire_by_thumbprint(self):
ret_request = RetireRequest(thumbprint=fingerprint)
self.assertTrue(self.ngts_conn.retire_cert(ret_request))

def test_ngts_revoke_by_thumbprint(self):
# Cryptographic revocation via the GraphQL CA-operations mutation (thumbprint-keyed).
# revoke_cert uppercases the thumbprint internally, so a lowercase hexlify is fine here.
cn = f"{random_word(10)}.venafi.example.com"
cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn)
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
rev_request = RevocationRequest(thumbprint=fingerprint)
try:
result = self.ngts_conn.revoke_cert(rev_request)
except CertificateRevokeError as e:
# The whole request path (GraphQL revokeCertificate via the CA-operations service) is
# exercised end-to-end here. A tenant whose zone uses only the built-in CA gets back
# "revocation is not supported for CA type BUILTIN_CA" from the backend - skip the
# success assertion in that case (it needs a zone backed by a revocation-capable CA).
if "not supported for CA type" in str(e) or "BUILTIN" in str(e).upper():
self.skipTest(f"zone CA does not support revocation: {e}")
raise
self.assertIsNotNone(result)
self.assertIn("status", result)

def test_ngts_read_zone_config(self):
zone = self.ngts_conn.read_zone_conf(self.ngts_zone)
self.assertIsNotNone(zone.policy)
Expand Down
14 changes: 13 additions & 1 deletion tests/test_vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
get_vaas_zone
from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE
from vcert.policy import KeyPair, DefaultKeyPair, PolicySpecification
from vcert.common import RetireRequest
from vcert.common import RetireRequest, RevocationRequest

log = logger.get_child("test-vaas")

Expand Down Expand Up @@ -213,3 +213,15 @@ def test_cloud_retire_by_thumbprint(self):
assert ret_data is True
except Exception as e:
log.error(msg=f"Error retiring certificate by thumbprint: {str(e)}")

def test_cloud_revoke_by_thumbprint(self):
# Cryptographic revocation via the GraphQL CA-operations mutation (thumbprint-keyed).
# revoke_cert uppercases the thumbprint internally, so a lowercase hexlify is fine here.
req, cert = simple_enroll(self.cloud_conn, self.cloud_zone)
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
time.sleep(1)
rev_request = RevocationRequest(thumbprint=fingerprint)
result = self.cloud_conn.revoke_cert(rev_request)
self.assertIsNotNone(result)
self.assertIn("status", result)
2 changes: 1 addition & 1 deletion vcert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .connection_tpp import TPPConnection
from .connection_tpp_token import TPPTokenConnection
from .connection_fake import FakeConnection
from .errors import VenafiError
from .errors import VenafiError, CertificateRevokeError
from .logger import setup_logger, get_logger, get_child
from .pem import Certificate
from .ssh_utils import SSHCertRequest, SSHKeyPair, write_ssh_files, SSHCATemplateRequest, SSHConfig
Expand Down
9 changes: 8 additions & 1 deletion vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,16 +580,23 @@ class RevocationReasons:
cessation_of_operation = 5 # OriginalUseNoLongerValid

def __init__(self, req_id=None, thumbprint=None, reason=RevocationReasons.NoReason,
comments="Revoked via api with python bindings", disable=True):
comments="Revoked via api with python bindings", disable=True, ca_account_name=None):
"""
:param req_id:
:param thumbprint:
:param reason:
:param comments:
:param disable: TPP-only; consumed by TPP revoke. Cloud/NGTS revoke ignores it (Go parity).
:param ca_account_name: Cloud/NGTS only. Optional CA-account name to target for revocation;
resolved to a CA-account id via GraphQL. Leave unset (the common case) for certificates
issued by CM SaaS itself, where the id stays null.
"""
self.id = req_id
self.thumbprint = thumbprint
self.reason = reason
self.comments = comments
self.disable = disable
self.ca_account_name = ca_account_name


class RetireRequest:
Expand Down
Loading