Source code for compliance.fetch

# -*- mode:python; coding:utf-8 -*-
# Copyright (c) 2020 IBM Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance fetcher automation module."""

import os
import shutil
import tempfile
import unittest

from compliance.config import get_config
from compliance.utils.http import BaseSession

import requests


[docs]class ComplianceFetcher(unittest.TestCase): """Compliance fetcher automation TestCase class."""
[docs] @classmethod def tearDownClass(cls): """Perform clean up.""" if hasattr(cls, '_session'): cls._session.close()
[docs] @classmethod def session(cls, url=None, creds=None, **headers): """ Provide a requests session object with User-Agent header. :param url: optional base URL for the session requests to use. A url argument triggers a new session object to be created whereas no url argument will return the current session object if one exists. :param creds: optional authentication credentials. :param headers: optional kwargs to add to session headers. :returns: a requests Session object. """ if url is not None and hasattr(cls, '_session'): cls._session.close() delattr(cls, '_session') if not hasattr(cls, '_session'): if url: cls._session = BaseSession(url) else: cls._session = requests.Session() if creds: cls._session.auth = creds cls._session.headers.update(headers) org = cls.config.raw_config.get('org', {}).get('name', '') ua = f'{org.lower().replace(" ", "-")}-compliance-checks' cls._session.headers.update({'User-Agent': ua}) return cls._session
def __init__(self, *args, **kwargs): """Construct and initialize the fetcher test object.""" super(ComplianceFetcher, self).__init__(*args, **kwargs) if hasattr(ComplianceFetcher, 'config'): self.config = ComplianceFetcher.config else: self.config = get_config()
[docs] def fetchURL(self, url, params=None, creds=None): # noqa: N802 """ Retrieve remote content through an HTTP GET request. Helper/Convenience method. :param url: the URL of the file. :param params: optional parameters to include in the GET request. :param creds: optional tuple with (user, password) to be used. :returns: the remote content. """ org = self.config.raw_config.get('org', {}).get('name', '') ua = f'{org.lower().replace(" ", "-")}-compliance-checks' response = requests.get( url, params=params, auth=creds, headers={'User-Agent': ua} ) response.raise_for_status() return response.content
[docs] def fetchCloudantDoc(self, db_url, params=None): # noqa: N802 """ Retrieve a Cloudant document. Helper/Convenience method. :param db_url: the URL to the Cloudant doc to retrieve. :param params: optional parameters to include in the GET request. :returns: the Cloudant document content. """ creds = self.config.creds return self.fetchURL( db_url, params=params, creds=(creds['cloudant'].user, creds['cloudant'].password) )
[docs]def fetch(url, name): """ Write content retrieved from provided url to a file. :param url: the URL to GET content from. :param name: the name of the file to write to in the TMPDIR. :returns: the path to the file. """ r = requests.get(url) r.raise_for_status() path = os.path.join(tempfile.gettempdir(), name) with open(path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) return path