Source code for compliance.agent

# -*- mode:python; coding:utf-8 -*-
# Copyright (c) 2022 IBM Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance check automation module."""

import base64
import hashlib
from pathlib import PurePath

from compliance.config import get_config

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed


[docs]class ComplianceAgent: """Compliance agent class.""" AGENTS_DIR = 'agents' PUBLIC_KEYS_EVIDENCE_PATH = 'raw/auditree/agent_public_keys.json' def __init__(self, name=None, use_agent_dir=True): """Construct and initialize the agent object.""" self._name = name self._private_key = self._public_key = None self._use_agent_dir = use_agent_dir @property def name(self): """Get agent name.""" return self._name @property def private_key(self): """Get agent private key.""" return self._private_key @private_key.setter def private_key(self, data_bytes): """ Set agent private key. :param data_bytes: The PEM encoded key data as `bytes`. """ self._private_key = serialization.load_pem_private_key( data_bytes, None, default_backend() ) @property def public_key(self): """Get agent public key.""" return self._public_key @public_key.setter def public_key(self, data_bytes): """ Set agent public key. :param data_bytes: The PEM encoded key data as `bytes`. """ if self.name: self._public_key = serialization.load_pem_public_key(data_bytes)
[docs] def get_path(self, path): """ Get the full evidence path. :param path: The relative evidence path as a string. :returns: The full evidence path as a string. """ if self.name and self._use_agent_dir: if PurePath(path).parts[0] != self.AGENTS_DIR: return str(PurePath(self.AGENTS_DIR, self.name, path)) return path
[docs] def signable(self): """Determine if the agent can sign evidence.""" return all([self.name, self.private_key])
[docs] def verifiable(self): """Determine if the agent can verify evidence.""" return all([self.name, self.public_key])
[docs] def load_public_key_from_locker(self, locker): """ Load agent public key from locker. :param locker: A locker of type :class:`compliance.locker.Locker`. """ if not self.name: return try: public_keys = locker.get_evidence(self.PUBLIC_KEYS_EVIDENCE_PATH) public_key_str = public_keys.content_as_json[self.name] self.public_key = public_key_str.encode() except Exception: self._public_key = None # Missing public key evidence.
[docs] def hash_and_sign(self, data_bytes): """ Hash and sign evidence using the agent private key. :param data_bytes: The data to sign as `bytes`. :returns: A `tuple` containing the hexadecimal digest string and the base64 encoded signature string. Returns tuple `(None, None)` if the agent is not configured to sign evidence. """ if not self.signable(): return None, None hashed = hashlib.sha256(data_bytes) signature = self.private_key.sign( hashed.digest(), padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), Prehashed(hashes.SHA256()) ) return hashed.hexdigest(), base64.b64encode(signature).decode()
[docs] def verify(self, data_bytes, signature_b64): """ Verify evidence using the agent public key. :param data_bytes: The data to verify as `bytes`. :param signature_b64: The base64 encoded signature string. :returns: `True` if data can be verified, else `False`. """ if not self.verifiable(): return False try: self.public_key.verify( base64.b64decode(signature_b64), data_bytes, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return True except InvalidSignature: return False
[docs] @classmethod def from_config(cls): """Load agent from configuration.""" config = get_config() agent = cls( name=config.get('agent_name'), use_agent_dir=config.get('use_agent_dir', True) ) private_key_path = config.get('agent_private_key') public_key_path = config.get('agent_public_key') if private_key_path: with open(private_key_path, 'rb') as key_file: agent.private_key = key_file.read() agent.public_key = agent.private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) elif public_key_path: with open(public_key_path, 'rb') as key_file: agent.public_key = key_file.read() return agent