Source code for compliance.config
# -*- mode:python; coding:utf-8 -*-
# Copyright (c) 2020 IBM Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance configuration automation module."""
import inspect
import json
import os
from collections import OrderedDict
from copy import deepcopy
from compliance.utils.credentials import Config
[docs]class ComplianceConfig(object):
"""
The configuration for a compliance run.
Credentials and a cache of known evidences are included.
"""
DEFAULTS = {
'locker': {
'dirname': 'compliance',
'repo_url': 'https://github.com/YOUR_ORG/YOUR_PROJECT'
},
'runbooks': {
'enabled': False, 'base_url': 'https://example.com/runbooks'
},
'notify': {
# Slack channel to use for an accreditation
# E.g. {"mycompany.soc2": ["#compliance"]}
'slack': {},
# GH repo to use for an accreditation
# E.g. {"mycompany.soc2": {"repo": ["my-org/accr1-repo"]}}
'gh_issues': {},
# GHE repo to use for an accreditation
# Deprecated (use gh_issues), included for backward compatibility.
# E.g. {"mycompany.soc2": {"repo": ["my-org/accr1-repo"]}}
'ghe_issues': {},
# Pagerduty service id to use for an accreditation
# E.g. {"mycompany.soc2": "ABCDEFG"}
'pagerduty': {},
# Security Advisor FindingsAPI endpoint to use for an accreditation
# E.g. {"mycompany.soc2": "https://my.findings.api/findings"}
'findings': {}
},
'org': {
'name': 'YOUR_ORG', 'settings': {}
}
}
def __init__(self):
"""Construct and initialize the configuration object."""
self._config = {}
self._creds = None
self._evidence_cache = OrderedDict()
self._org = None
self.creds_path = None
self.dependency_rerun = False
@property
def creds(self):
"""Credentials used for locker management and running fetchers."""
if self.creds_path is None:
raise ValueError('Path to credentials file not provided')
if self._creds is None:
self._creds = Config(self.creds_path)
return self._creds
@property
def evidences(self):
"""All evidence objects currently in the evidence cache."""
return self._evidence_cache.values()
@property
def raw_config(self):
"""Raw configuration settings as a dictionary."""
return self._config
[docs] def load(self, config_file=None):
"""
Load configuration from a JSON file.
:param config_file: the path to the JSON config file.
If ``None``, the ``DEFAULT`` configuration is used.
"""
if config_file is None:
self._config = self.DEFAULTS.copy()
return
try:
self._config = json.loads(open(config_file).read())
except ValueError as err:
err.args += (config_file, )
raise
[docs] def get(self, config_path, default=None):
"""
Provide the configuration value for the supplied ``config_path``.
Returns the default if ``config_path`` cannot be retrieved.
:param config_path: dot notation path with the following format
``'key[.subkey]``. For instance, ``locker.dirname``.
"""
chunks = config_path.split('.')
value = self._config
for c in chunks:
if value is None:
break
value = value.get(c)
if value is None:
value = self.DEFAULTS
for c in chunks:
if value is None:
break
value = value.get(c)
return deepcopy(value) if value is not None else deepcopy(default)
[docs] def get_evidence(self, evidence_path):
"""
Provide an evidence object from the evidence cache.
:param path: the path to the evidence within the Locker.
For example, ``raw/source1/evidence.json``
"""
return self._evidence_cache.get(evidence_path)
[docs] def add_evidences(self, evidence_list):
"""
Add a list of evidence objects to the evidence cache.
:param evidence_list: a list of evidence objects.
"""
for e in evidence_list:
if (not self.dependency_rerun
and e.path in self._evidence_cache.keys()):
raise ValueError(f'Evidence {e.path} duplicated')
self._evidence_cache[e.path] = e
[docs] def get_template_dir(self, test_obj=None):
"""
Provide the full path to the template directory for the test object.
The associated path will be the first directory found named
``templates`` in the test object absolute path traversed in reverse.
If ``test_obj`` is ``None``, then current directory ``'.'`` is
assumed as initial path.
:param test_obj: a :class:`compliance.ComplianceTest` object from
where the template directory search will start from.
"""
path = os.path.abspath(os.curdir)
if test_obj is not None:
path = inspect.getfile(test_obj.__class__)
while True:
if path == '/':
return None
result = os.path.join(path, 'templates')
if os.path.isdir(result):
return result
path = os.path.dirname(path)
__config = None
[docs]def get_config():
"""Provide the global configuration object."""
global __config
if __config is None:
__config = ComplianceConfig()
return __config