# -*- mode:python; coding:utf-8 -*-
# Copyright (c) 2020 IBM Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance notification management automation module."""
import copy
import json
import logging
import os
import sys
import time
from datetime import datetime
from urllib.parse import urlparse
from compliance.config import get_config
from compliance.utils.services import pagerduty
from compliance.utils.services.github import Github
from compliance.utils.test import parse_test_id
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_security_advisor_findings_api_sdk import ApiException, FindingsApiV1
import requests
class _BaseNotifier(object):
    """
    Base notifier class.
    It shouldn't be used outside of this module.
    """
    def __init__(self, results, controls, push_error):
        self._results = results
        self._controls = controls
        self._push_error = push_error
        self.logger = logging.getLogger(name='compliance.notifier')
        self._handler = logging.StreamHandler()
        self._handler.setFormatter(
            logging.Formatter('%(levelname)s: %(message)s')
        )
        self.logger.handlers.clear()
        self.logger.addHandler(self._handler)
        self.logger.setLevel(logging.INFO)
    @property
    def messages(self):
        """
        Check test messages.
        A generator of list of tuples containing the following structure::
          ([str] test_id, [dict] test_descriptor, [dict] message)
        """
        for test_id, test_desc in self._results.items():
            test_obj = test_desc['test'].test
            method_name = parse_test_id(test_id)['method']
            msg_method = 'get_notification_message'
            if len(test_obj.tests) > 1:
                candidate = method_name.replace('test_', 'msg_', 1)
                if hasattr(test_obj, candidate):
                    msg_method = candidate
            # set body to None if the notification function hasn't been
            # defined or if it returns None.
            # use a predefined error message for error status.
            # otherwise get the results of the notification function.
            # note that passed tests get their notifications called in order to
            # deduce things like subtitle, but their notifications are not
            # displayed.
            if not hasattr(test_obj, msg_method):
                msg = None
                body = None
            elif test_desc['status'] == 'error':
                msg = None
                body = f'Check {test_id} failed to execute'
            elif len(test_obj.tests) > 1 and not msg_method.startswith('msg_'):
                msg = getattr(test_obj, msg_method)(method_name)
                body = msg and 'body' in msg and msg['body'] or None
            else:
                msg = getattr(test_obj, msg_method)()
                body = msg and 'body' in msg and msg['body'] or None
            title = test_obj.title
            if msg and 'subtitle' in msg and msg['subtitle']:
                title += f' - {msg["subtitle"]}'
            failure_count = 0
            if msg and test_obj.failures:
                failure_count = test_obj.failures_count()
            warning_count = 0
            if msg and test_obj.warnings:
                warning_count = test_obj.warnings_count()
            msg = {
                'title': title,
                'body': body,
                'failure_count': failure_count,
                'warning_count': warning_count
            }
            yield test_id, test_desc, msg
    def _messages_by_accreditations(self):
        retval = {}
        for test_id, test_desc, msg in self.messages:
            test_class = parse_test_id(test_id)['class_path']
            accreditations = self._controls.get_accreditations(test_class)
            for a in accreditations:
                messages = retval.get(a, [])
                messages.append((test_id, test_desc, msg))
                retval[a] = messages
        return retval
    def _split_by_status(self, messages):
        passed_tests = []
        failed_tests = []
        warned_tests = []
        errored_tests = []
        sorted_msgs = sorted(messages, key=lambda x: x[2]['title'])
        for test_id, test_desc, msg in sorted_msgs:
            if test_desc['status'] == 'pass':
                passed_tests.append((test_id, test_desc, msg))
            elif test_desc['status'] == 'error':
                errored_tests.append((test_id, test_desc, msg))
            elif test_desc['status'] == 'warn':
                warned_tests.append((test_id, test_desc, msg))
            else:
                failed_tests.append((test_id, test_desc, msg))
        return passed_tests, failed_tests, warned_tests, errored_tests
    def _get_check_names(self, checks, include_path=False):
        if include_path:
            return [(msg['title'], path) for path, _, msg in checks]
        return [msg['title'] for _, _, msg in checks]
    def _get_report_links(self, test_desc, link_format=None):
        if not link_format:
            link_format = '<{url}|{name}>'
        if test_desc['status'] == 'error':
            return []
        test_obj = test_desc['test'].test
        return [
            link_format.format(
                url=test_obj.locker.get_remote_location(report.path),
                name=report.name
            ) for report in test_obj.reports
        ]
    def _get_summary_and_body(
        self,
        test_desc,
        msg,
        include_title=True,
        summary_format=None,
        link_format=None
    ):
        link_format = link_format or '<{url}|{name}>'
        if not summary_format:
            summary_format = ''
            if include_title:
                summary_format += '{title} - '
            summary_format += (
                '{status} ({issues}) '
                'Reports: {reports} {runbook}'
            )
        test_obj = test_desc['test'].test
        issues_list = []
        if msg['failure_count'] > 0:
            issues_list.append(f'{msg["failure_count"]} failures')
        if msg['warning_count'] > 0:
            issues_list.append(f'{msg["warning_count"]} warnings')
        if test_obj.fixed_failure_count > 0:
            issues_list.append(f'{test_obj.fixed_failure_count} fixed')
        issues = ', '.join(issues_list)
        report_links = self._get_report_links(test_desc, link_format)
        if not report_links:
            report_links.append('(none)')
        runbook_conditional = ''
        if test_obj.runbook_url:
            runbook_conditional = '| ' + link_format.format(
                url=test_obj.runbook_url, name='Run Book'
            )
        summary_line = summary_format.format(
            title=msg['title'],
            status=test_desc['status'].upper(),
            issues=issues,
            reports=', '.join(report_links),
            runbook=runbook_conditional
        )
        body = msg['body'] and f'\n{msg["body"]}' or ''
        return summary_line, body
class _BaseMDNotifier(_BaseNotifier):
    """
    Base markdown notifier class.
    It shouldn't be used outside of this module.
    """
    def __init__(self, results, controls, push_error):
        super(_BaseMDNotifier, self).__init__(results, controls, push_error)
    def _generate_accred_content(self, accred, results, skip_title=False):
        md_content = []
        if not skip_title:
            now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
            md_content.append(f'# CHECK RESULTS: {now}')
        md_content.append(
            f'\n## Notification for {accred.upper()} accreditation\n'
        )
        if self._push_error:
            md_content.append('### All Checks (Errored)\n')
            md_content.append(
                '   - Evidence/Results failed to push to remote locker.  '
                'See execution log for details.'
            )
        else:
            for heading in ['Passed Checks', 'Errored Checks']:
                md_content.append(f'### {heading}\n')
                checks = self._get_check_names(
                    results[heading[:-9].lower()], include_path=True
                )
                if checks:
                    check_title = None
                    for check in checks:
                        if check[0] != check_title:
                            md_content.append(f'- **{check[0]}**')
                            check_title = check[0]
                        if heading == 'Errored Checks':
                            md_content.append(
                                f'   - {check[1]} failed to execute'
                            )
                else:
                    md_content.append(f'- **No {heading.lower()}**')
            md_content.append('### Failures/Warnings\n')
            fail_and_warn = results['fail'] + results['warn']
            if fail_and_warn:
                summary_format = [
                    '- **{title}**',
                    '   - **{status}** | {reports} {runbook}',
                    '   - {issues}'
                ]
                rpt_link_format = '[{name}]({url})'
                for _, test_desc, msg in fail_and_warn:
                    summary, addl_content = self._get_summary_and_body(
                        test_desc,
                        msg,
                        summary_format='\n'.join(summary_format),
                        link_format=rpt_link_format
                    )
                    md_content.append(summary)
                    if addl_content:
                        for line in addl_content.strip().split('\n'):
                            md_content.append(f'   - _{line}_')
            else:
                md_content.append('- **No failures or warnings**')
        return md_content
[docs]class FDNotifier(_BaseNotifier):
    """
    File descriptor notifier class.
    Notifications are written to the file descriptor specified.
    Defaults to STDOUT.
    """
    def __init__(self, results, controls, fd=sys.stdout, push_error=False):
        """
        Construct and initialize the file descriptor notifier object.
        :param results: dictionary generated by
          :py:class:`compliance.runners.CheckMode` at the end of the execution.
        :param controls: the control descriptor that manages accreditations.
        :param fd: a file descriptor where to write the notifications on.
          Defaults to STDOUT.
        """
        super(FDNotifier, self).__init__(results, controls, push_error)
        self.fd = fd
[docs]    def notify(self):
        """Write notifications into the file descriptor."""
        self.logger.info('Running the STDOUT notifier...')
        self.fd.write('\n-- NOTIFICATIONS --\n\n')
        if not self._results:
            self.fd.write('No results\n')
        elif self._push_error:
            self.fd.write(
                'All accreditation checks:  '
                'Evidence/Results failed to push to remote locker.\n'
            )
        else:
            accreds = []
            messages = list(self._messages_by_accreditations().items())
            messages.sort(key=lambda x: x[0])
            for accreditation, msgs in messages:
                if not msgs:
                    continue
                passed, failed, warned, errored = self._split_by_status(msgs)
                accreds.append(
                    {
                        'name': accreditation,
                        'passed': passed,
                        'failed': failed,
                        'warned': warned,
                        'errored': errored
                    }
                )
            for accred in accreds:
                self.fd.write(
                    f'Notifications for {accred["name"].upper()} '
                    'accreditation\n\n'
                )
                passed_msg = ', '.join(
                    self._get_check_names(accred['passed'])
                ) or '(none)'
                accred_msgs = [f'PASSED checks: {passed_msg}']
                for msg_type in ['errored', 'warned', 'failed']:
                    accred_msgs.append(
                        '\n\n'.join(
                            [
                                ''.join(
                                    self._get_summary_and_body(test_desc, msg)
                                ) for (_, test_desc, msg) in accred[msg_type]
                            ]
                        )
                    )
                self.fd.write('\n\n'.join(accred_msgs) + '\n\n')
        self.fd.flush()  
[docs]class LockerNotifier(_BaseMDNotifier):
    """
    Evidence Locker notifier class.
    Notifications are written to the evidence locker.
    :param results: dictionary generated by
      :py:class:`compliance.runners.CheckMode` at the end of the execution.
    :param controls: dictionary of checks and the accreditations and controls
      that they belong to.
    """
    def __init__(self, results, controls, locker, push_error=False):
        """
        Construct and initialize the evidence locker notifier object.
        :param results: dictionary generated by
          :py:class:`compliance.runners.CheckMode` at the end of the execution.
        :param controls: the control descriptor that manages accreditations.
        :param locker: the evidence locker object.
        """
        super(LockerNotifier, self).__init__(results, controls, push_error)
        self.locker = locker
[docs]    def notify(self):
        """Write notifications into the evidence locker."""
        if not self._results:
            self.logger.error('No results.  Locker notifier not triggered.')
            return
        if self._push_error:
            self.logger.error(
                'Remote locker push failed.  Locker notifier not triggered.'
            )
            return
        self.logger.info('Running the Locker notifier...')
        messages = list(self._messages_by_accreditations().items())
        messages.sort(key=lambda x: x[0])
        now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
        md_content = [f'# CHECK RESULTS: {now}']
        for accreditation, results in messages:
            passed, failed, warned, errored = self._split_by_status(results)
            results_by_status = {
                'pass': passed,
                'fail': failed,
                'warn': warned,
                'error': errored
            }
            md_content.extend(
                self._generate_accred_content(
                    accreditation, results_by_status, skip_title=True
                )
            )
        folder = 'notifications'
        filename = 'alerts_summary.md'
        self.locker.add_content_to_locker(
            '\n'.join(md_content), folder, filename
        )
        self.locker.checkin(
            'Locker notification sent at local time '
            f'{time.ctime(time.time())}\n\n{os.path.join(folder, filename)}'
        )
        self.locker.push()  
[docs]class GHIssuesNotifier(_BaseMDNotifier):
    """
    Github notifier class.
    Notifications are sent to Github as repository issues.  This
    notifier is configurable via :class:`compliance.config.ComplianceConfig`.
    """
    def __init__(self, results, controls, push_error=False):
        """
        Construct and initialize the Github notifier object.
        :param results: dictionary generated by
          :py:class:`compliance.runners.CheckMode` at the end of the execution.
        :param controls: the control descriptor that manages accreditations.
        """
        super(GHIssuesNotifier, self).__init__(results, controls, push_error)
        self._config = get_config().get('notify.gh_issues')
        if not self._config:
            # Ensure that legacy ghe_issues config still works
            self._config = get_config().get('notify.ghe_issues', {})
        # Using the locker repo url to define the base url.  The expectation
        # is that the Github issues repository will share the base url.
        parsed_locker_url = urlparse(get_config().get('locker.repo_url'))
        self._github = Github(
            get_config().creds,
            f'{parsed_locker_url.scheme}://{parsed_locker_url.hostname}'
        )
[docs]    def notify(self):
        """Send notifications to Github as repository issues."""
        self.logger.info('Running the Github Issues notifier...')
        if not self._config:
            self.logger.warning('Using Github Issues notifier without config')
        messages = list(self._messages_by_accreditations().items())
        messages.sort(key=lambda x: x[0])
        for accreditation, results in messages:
            if accreditation not in self._config:
                continue
            passed, failed, warned, errored = self._split_by_status(results)
            results_by_status = {
                'pass': passed,
                'fail': failed,
                'warn': warned,
                'error': errored
            }
            if self._config[accreditation].get('summary_issue'):
                self._notify_by_summary_issue(accreditation, results_by_status)
            elif self._push_error:
                self.logger.error(
                    'Remote locker push failed.  '
                    'Github Issues notifier not triggered.'
                )
            else:
                self._notify_by_check_issues(accreditation, results_by_status) 
    def _notify_by_summary_issue(self, accred, results):
        issue = [self._generate_summary_issue(accred, results)]
        repos = self._config[accred].get('repo', [])
        for repo in repos:
            owner, repository = repo.split('/')
            issue_urls = self._process_new_alerts(
                owner,
                repository,
                issue,
                self._config[accred]['summary_issue'].get('message')
            )
            self._assign_projects(issue_urls, repo, accred)
    def _generate_summary_issue(self, accred, results):
        summary_config = self._config[accred]['summary_issue']
        title = summary_config['title']
        labels = summary_config.get('labels', [])
        assignees = summary_config.get('assignees', [])
        frequency = summary_config.get('frequency')
        rotation = summary_config.get('rotation')
        rotation_index = None
        now = datetime.utcnow()
        if frequency == 'day':
            today = now.strftime('%Y-%m-%d')
            title = f'{today} - {title}'
            labels.extend([frequency, today])
            rotation_index = now.timetuple().tm_yday
        elif frequency == 'week':
            year, week, _ = now.isocalendar()
            title = f'{year}, {week}W - {title}'
            labels.extend([frequency, str(year), f'{week}W'])
            rotation_index = week
        elif frequency == 'month':
            year = now.strftime('%Y')
            month = now.strftime('%mM')
            title = f'{year}, {month} - {title}'
            labels.extend([frequency, year, month])
            rotation_index = int(month[:-1])
        elif frequency == 'year':
            year = now.strftime('%Y')
            title = f'{year} - {title}'
            labels.extend([frequency, year])
            rotation_index = int(year)
        if rotation and rotation_index:
            assignees = rotation[divmod(rotation_index, len(rotation))[1]]
        issue = {'title': title, 'labels': labels, 'assignees': assignees}
        issue['body'] = '\n'.join(
            self._generate_accred_content(accred, results)
        )
        return issue
    def _notify_by_check_issues(self, accred, results):
        issues = []
        statuses = self._config[accred].get('status', ['fail'])
        repos = self._config[accred].get('repo', [])
        for status, result in results.items():
            if status in statuses:
                issues += self._generate_issues(accred, result)
        for repo in repos:
            owner, repository = repo.split('/')
            issue_urls = self._process_new_alerts(owner, repository, issues)
            self._assign_projects(issue_urls, repo, accred)
        if 'pass' not in statuses:
            for repo in repos:
                owner, repository = repo.split('/')
                issues = self._generate_issues(accred, results['pass'])
                issue_urls = self._process_old_alerts(
                    owner, repository, issues
                )
                self._assign_projects(issue_urls, repo, accred)
    def _generate_issues(self, accred, results):
        issues = []
        if not results:
            return issues
        for check_path, result, message in results:
            # If the 'checks' configuration element exists
            # within an accreditation, only create issues
            # for the set of checks therein.
            if ('checks' in self._config[accred].keys()
                    and check_path not in self._config[accred]['checks']):
                continue
            now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
            body = [f'## Compliance check alert - {now}']
            body.append(f'- Check: {check_path}')
            test_obj = result['test'].test
            check_name = check_path.rsplit('.', 1).pop()
            doc = getattr(test_obj.__class__, check_name).__doc__
            if doc:
                doc = doc.strip()
                newline = doc.find('\n')
                if newline > -1:
                    doc = doc[:newline]
                body.append(f'- Description: {doc}')
            body.append(f'- Accreditation: {accred}')
            status = ''.join(
                self._get_summary_and_body(
                    result,
                    message,
                    include_title=False,
                    summary_format='{status} ({issues})',
                    link_format='[{name}]({url})'
                )
            )
            body.append(f'- Run Status: **{status}**')
            run_dttm = datetime.fromtimestamp(result['timestamp'])
            body.append(f'- Run Date/Time: {run_dttm}')
            report_links = self._get_report_links(
                result, link_format='[{name}]({url})'
            )
            if report_links:
                body.append(f'- Reports: {", ".join(report_links)}')
            issue = {
                'title': message['title'],
                'body': '\n'.join(body),
                'labels': [
                    f'accreditation: {accred}',
                    f'run status: {result["status"]}'
                ]
            }
            issues.append(issue)
        return issues
    def _process_new_alerts(self, owner, repository, issues, message=None):
        issue_urls = {}
        for issue in issues:
            gh_issue = self._find_gh_issue(
                '/'.join([owner, repository]), issue['title']
            )
            if gh_issue is None:
                body = issue['body']
                if message:
                    joined_msg = '\n'.join(message)
                    body = f'{joined_msg}\n\n{issue["body"]}'
                gh_issue = self._github.add_issue(
                    owner,
                    repository,
                    issue['title'],
                    body,
                    labels=issue['labels'],
                    assignees=issue.get('assignees', [])
                )
            else:
                self._update_issue_labels(
                    owner, repository, gh_issue, issue['labels']
                )
                self._github.add_issue_comment(
                    owner, repository, gh_issue['number'], issue['body']
                )
            issue_urls[gh_issue['id']] = gh_issue['url']
        return issue_urls
    def _process_old_alerts(self, owner, repository, issues):
        issue_urls = {}
        for issue in issues:
            gh_issue = self._find_gh_issue(
                '/'.join([owner, repository]), issue['title']
            )
            if gh_issue:
                self._update_issue_labels(
                    owner, repository, gh_issue, issue['labels']
                )
                self._github.add_issue_comment(
                    owner, repository, gh_issue['number'], issue['body']
                )
                issue_urls[gh_issue['id']] = gh_issue['url']
        return issue_urls
    def _find_gh_issue(self, repo, title):
        gh_issues = self._github.search_issues(
            f'{title} type:issue in:title is:open repo:{repo}'
        )
        found = None
        for issue in gh_issues:
            if issue['title'] == title:
                found = issue
                break
        return found
    def _update_issue_labels(self, owner, repository, issue, labels):
        current_labels = [label['name'] for label in issue['labels']]
        new_labels = list(set(labels) - set(current_labels))
        if new_labels:
            current_labels = [
                label for label in current_labels
                if not label.startswith('run status: ')
            ]
            self._github.patch_issue(
                owner,
                repository,
                issue['number'],
                labels=current_labels + new_labels
            )
    def _assign_projects(self, issues, repo, accred):
        config_projects = self._config[accred].get('project')
        if not config_projects:
            return
        all_projects = {
            p['name']: p['id']
            for p in self._github.get_all_projects(repo)
        }
        for project, column in config_projects.items():
            if project not in all_projects.keys():
                self.logger.warning(f'Project {project} not found in {repo}')
                continue
            columns = {
                c['name']: c['id']
                for c in self._github.get_columns(all_projects[project])
            }
            if column not in columns.keys():
                self.logger.warning(
                    f'Column {column} not found '
                    f'in {project} project, {repo} repo'
                )
                continue
            card_lists = self._github.get_all_cards(columns.values()).values()
            issue_urls = [
                c.get('content_url') for cl in card_lists for c in cl
            ]
            for issue_id, issue_url in issues.items():
                if issue_url in issue_urls:
                    continue
                self._github.add_card(columns[column], issue=issue_id) 
[docs]class SlackNotifier(_BaseNotifier):
    """
    Slack notifier class.
    Notifications are sent to Slack channel(s).  This notifier is
    configurable via :class:`compliance.config.ComplianceConfig`.
    """
    MESSAGE_COLORS = {
        'pass': '#00D000',
        'fail': '#D00000',
        'error': '#9932CC',
        'warn': '#FFD300'
    }
    def __init__(self, results, controls, push_error=False):
        """
        Construct and initialize the Slack notifier object.
        :param results: dictionary generated by
          :py:class:`compliance.runners.CheckMode` at the end of the execution.
        :param controls: the control descriptor that manages accreditations.
        """
        super(SlackNotifier, self).__init__(results, controls, push_error)
        self._creds = get_config().creds
        self._config = get_config().get('notify.slack', {})
[docs]    def notify(self):
        """Send notifications to Slack channel(s)."""
        self.logger.info('Running the Slack notifier...')
        if not self._config:
            self.logger.warning('Using Slack notifier without config')
        messages = list(self._messages_by_accreditations().items())
        messages.sort(key=lambda x: x[0])
        for accreditation, desc in messages:
            if accreditation not in self._config:
                continue
            channels = []
            mode = 'normal'
            if isinstance(self._config[accreditation], list):
                channels = self._config[accreditation]
            elif isinstance(self._config[accreditation], dict):
                channels = self._config[accreditation].get('channels', [])
                rotation = self._config[accreditation].get('rotation')
                mode = self._config[accreditation].get('mode', 'normal')
                if rotation and isinstance(rotation, list):
                    iso_week = datetime.utcnow().isocalendar()[1]
                    on_duty = rotation[divmod(iso_week, len(rotation))[1]]
                    if isinstance(on_duty, dict):
                        channels.append(on_duty.get('id'))
                    else:
                        channels.append(on_duty)
            msg = self._generate_message(accreditation, desc, mode=mode)
            self._send_message(msg, channels) 
    def _generate_message(self, accreditation, test_descs, mode='normal'):
        if not test_descs:
            return {}
        text = (
            f'Notification for {accreditation.upper()} accreditation '
            f'at {datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")}'
        )
        message = {
            'username': 'Compliance Alerts',
            'icon_emoji': ':robot_face:',
            'text': text
        }
        modes = {
            'normal': self._generate_normal_attachments,
            'compact': self._generate_compact_attachments
        }
        if mode in modes.keys():
            if self._push_error:
                message['attachments'] = self._generate_push_error_attachment()
            else:
                message['attachments'] = modes[mode](test_descs)
        else:
            raise ValueError(f'Unknown Slack message mode: {mode}')
        return message
    def _generate_push_error_attachment(self):
        return [
            {
                'title': 'ALL checks',
                'text': (
                    'Evidence/Results failed to push to remote locker.  '
                    'See execution log for details.'
                ),
                'mrkdwn_in': ['text', 'pretext'],
                'color': SlackNotifier.MESSAGE_COLORS['error']
            }
        ]
    def _generate_normal_attachments(self, test_descs):
        retval = []
        passed, failed, warned, errored = self._split_by_status(test_descs)
        # first list each error, failure, and warning
        for _, test_desc, msg in (errored + failed + warned):
            text = ''.join(
                self._get_summary_and_body(
                    test_desc, msg, include_title=False
                )
            )
            attachment = {
                'title': msg['title'],
                'text': text,
                'mrkdwn_in': ['text', 'pretext'],
                'color': SlackNotifier.MESSAGE_COLORS[test_desc['status']]
            }
            retval.append(attachment)
        # then have a list of passed checks
        passed_titles = self._get_check_names(passed)
        if not passed_titles:
            passed_titles.append('(none)')
        retval.append(
            {
                'title': 'PASSED checks',
                'text': ', '.join(passed_titles),
                'mrkdwn_in': ['text', 'pretext'],
                'color': SlackNotifier.MESSAGE_COLORS['pass']
            }
        )
        return retval
    def _generate_compact_attachments(self, test_descs):
        retval = []
        passed, failed, warned, errored = self._split_by_status(test_descs)
        # passed tests
        if passed:
            retval.append(
                {
                    'title': f'PASS: {len(passed)} checks',
                    'text': '',
                    'mrkdwn_in': ['text', 'pretext'],
                    'color': SlackNotifier.MESSAGE_COLORS['pass']
                }
            )
        # warned and failed tests
        collections = [('warn', warned), ('fail', failed)]
        fmt = '{title} - {reports} {runbook} - ({issues})'
        for status, c in collections:
            text = ''
            for _, test_desc, msg in c:
                summary, body = self._get_summary_and_body(
                    test_desc,
                    msg,
                    include_title=False,
                    summary_format=fmt
                )
                text += '* ' + summary
                if body:
                    text += ' - ' + '; '.join(
                        body.strip().replace('Failures:', '').split('\n')
                    )
                text += '\n'
            if text:
                attachment = {
                    'title': f'{status.upper()}: {len(c)} checks',
                    'text': text.strip(),
                    'mrkdwn_in': ['text', 'pretext'],
                    'color': SlackNotifier.MESSAGE_COLORS[status]
                }
                retval.append(attachment)
        # errors tests
        if errored:
            retval.append(
                {
                    'title': f'ERRORS: {len(errored)} checks',
                    'text': ', '.join(set(self._get_check_names(errored))),
                    'mrkdwn_in': ['text', 'pretext'],
                    'color': SlackNotifier.MESSAGE_COLORS['error']
                }
            )
        return retval
    def _send_message(self, message, channels):
        msg = copy.deepcopy(message)
        for c in channels:
            msg['channel'] = c
            headers = {}
            url = (
                getattr(self._creds['cloobot'], 'webhook', None)
                or getattr(self._creds['slack'], 'webhook', None)
            )
            if not url:
                token = (
                    getattr(self._creds['cloobot'], 'token', None)
                    or getattr(self._creds['slack'], 'token', None)
                )
                if token is None:
                    raise RuntimeError(
                        'Unable to get a Slack webhook or token from '
                        'credentials file'
                    )
                url = 'https://slack.com/api/chat.postMessage'
                headers['Authorization'] = 'Bearer ' + token
                headers['Content-type'] = 'application/json'
            retries = self._config.get('retries', 3)
            retry = 0
            while retry < retries:
                response = requests.post(
                    url, headers=headers, data=json.dumps(msg)
                )
                if response.status_code == 429:
                    time.sleep(
                        int(response.headers.get('Retry-After', retry)) + 1
                    )
                    retry += 1
                else:
                    response.raise_for_status()
                    break 
[docs]class FindingsNotifier(_BaseNotifier):
    """
    Findings notifier class.
    Notifications are sent using the Findings API.  This notifier is
    configurable via :class:`compliance.config.ComplianceConfig`.
    """
    def __init__(self, results, controls, push_error=False):
        """
        Construct and initialize the Findings notifier object.
        :param results: dictionary generated by
          :py:class:`compliance.runners.CheckMode` at the end of the execution.
        :param controls: the control descriptor that manages accreditations.
        """
        super(FindingsNotifier, self).__init__(results, controls, push_error)
        self._config = get_config().get('notify.findings')
        self._creds = get_config().creds
        api_key = self._creds['findings'].api_key
        authenticator = IAMAuthenticator(apikey=api_key)
        self.findings_api = FindingsApiV1(authenticator=authenticator)
[docs]    def notify(self):
        """Send notifications to the Findings API."""
        if self._push_error:
            self.logger.error(
                'Remote locker push failed.  Findings notifier not triggered.'
            )
            return
        self.logger.info('Running the Findings notifier...')
        if not self._config:
            self.logger.warning('Using findings notification without config')
        messages = list(self._messages_by_accreditations().items())
        messages.sort(key=lambda x: x[0])
        for accreditation, desc in messages:
            if accreditation not in self._config:
                continue
            findings_api_endpoint = self._config[accreditation]
            self.findings_api.set_service_url(findings_api_endpoint)
            passed, failed, warned, errored = self._split_by_status(desc)
            for _, _, msg in (failed + errored + passed + warned):
                self._create_findings(msg['body']) 
    def _create_findings(self, data):
        occurrence_list = data['occurrence_list']
        account_id = data['account_id']
        provider_id = data['provider_id']
        status = 0
        for occurrence in occurrence_list:
            try:
                response = self.findings_api.create_occurrence(
                    account_id=account_id,
                    provider_id=provider_id,
                    **occurrence
                )
                self.logger.info(response.status_code)
            except ApiException as e:
                status = e.code
                self.logger.error(
                    'Finding creation failed '
                    f'for occurrence id {occurrence["id"]} '
                    f'with {str(e.code)}: {str(e)}'
                )
            except Exception as e:
                status = -1
                self.logger.error(f'Unexpected error occurred: {str(e)}')
        return status 
[docs]def get_notifiers():
    """
    Provide a dictionary of all notifier class objects.
    This dictionary contains all valid notifier choices for the ``--notify``
    option in the CLI as keys and their corresponding notifier classes as
    values.
    NOTE: When adding/removing a notifier, update this dictionary accordingly.
    """
    return {
        'stdout': FDNotifier,
        'slack': SlackNotifier,
        'pagerduty': PagerDutyNotifier,
        'gh_issues': GHIssuesNotifier,
        'locker': LockerNotifier,
        'findings': FindingsNotifier
    }