# -*- mode:python; coding:utf-8 -*-
# Copyright (c) 2020 IBM Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compliance Github service helper."""
import json
import random
from collections import OrderedDict
from urllib.parse import parse_qs, urlparse
from compliance.utils.credentials import Config
from compliance.utils.data_parse import deep_merge
from compliance.utils.http import BaseSession
[docs]class Github(object):
"""Github service helper class."""
def __init__(self, config=None, base_url='https://github.com'):
"""Construct the Github service object."""
if not config:
config = Config()
self.base_url = base_url
api_url = 'https://api.github.com'
service = 'github'
if self.base_url != 'https://github.com':
service = 'github_enterprise'
api_url = f'{self.base_url}/api/v3/'
self._creds = config[service]
self.session = BaseSession(api_url)
token = self._creds.token
if hasattr(self._creds, 'username'):
self.session.auth = (self._creds.username, token)
else:
self.session.headers['Authorization'] = 'token ' + token
self.session.headers.update(
{'Accept': 'application/vnd.github.inertia-preview+json'}
)
[docs] def get_all_projects(self, repo_path):
"""
Retrieve all GH repo projects.
repo_path looks like: my-gh-org/my-gh-repo
"""
owner, repo = repo_path.split('/')
# /repos/:owner/:repo/projects
return self._make_request(
'get', '/'.join(['repos', owner, repo, 'projects'])
)
[docs] def get_project(self, project, org=False):
"""
Retrieve the GH org or org/repo project.
For a repo project the project variable looks like:
my-gh-org/my-gh-repo/projects/1
For an org project the project variable looks like:
my-gh-org/projects/1
"""
pieces = []
if org:
owner, _, number = project.split('/')
# /orgs/:org/projects
pieces = ['orgs', owner]
else:
owner, repo, _, number = project.split('/')
# /repos/:owner/:repo/projects
pieces = ['repos', owner, repo]
pieces.append('projects')
r = self._make_request('get', '/'.join(pieces))
return [x['id'] for x in r if x['number'] == int(number)][0]
[docs] def get_columns(self, project_id):
"""Retrieve the columns for a project."""
return self._make_request(
'get', '/'.join(['projects', str(project_id), 'columns'])
)
[docs] def get_all_cards(self, columns):
"""Retrieve all cards for a given list of project columns."""
cards = OrderedDict()
for column_id in columns:
cards[column_id] = self.get_cards(column_id)
return cards
[docs] def get_cards(self, column_id):
"""Retrieve all cards for a given project column."""
return self._paginate_api(
'/'.join(['projects', 'columns', str(column_id), 'cards'])
)
[docs] def move_card(self, card, to_column_id):
"""Move a card from one project column to another."""
data = {'position': 'bottom', 'column_id': to_column_id}
return self._make_request(
'post',
'/'.join(['projects', 'columns', 'cards', str(card), 'moves']),
json=data
)
[docs] def add_card(self, column_id, message=None, issue=0):
"""Create a card in a project column."""
data = {}
if issue > 0:
data = {'content_id': issue, 'content_type': 'Issue'}
else:
data = {'note': message}
return self._make_request(
'post',
'/'.join(['projects', 'columns', str(column_id), 'cards']),
json=data
)
[docs] def add_milestone(self, owner, repo, milestone):
"""Create a repository milestone."""
return self._make_request(
'post',
'/'.join(['repos', owner, repo, 'milestones']),
json=milestone
)
[docs] def list_milestones(
self, owner, repo, state='open', sort='due_on', direction='asc'
):
"""Retrieve a repository's milestones."""
return self._paginate_api(
'/'.join(['repos', owner, repo, 'milestones']),
**{
'state': state, 'sort': sort, 'direction': direction
}
)
[docs] def add_issue(
self, owner, repo, title, body='', annotation=None, **kwargs
):
"""Create a repository issue."""
issue = {'title': title, 'body': body}
issue.update(kwargs)
if annotation:
issue['body'] = self._annotate_body(issue['body'], annotation)
return self._make_request(
'POST', '/'.join(['repos', owner, repo, 'issues']), json=issue
)
[docs] def patch_issue(self, owner, repo, issue, annotation=None, **params):
"""Edit a repository issue."""
if annotation and 'body' in params:
params['body'] = self._annotate_body(params['body'], annotation)
return self._make_request(
'PATCH',
'/'.join(['repos', owner, repo, 'issues', str(issue)]),
json=params
)
[docs] def get_issue(self, owner, repo, issue, parse_annotations=False):
"""
Retrieve the content and metadata for a repository issue.
If parse_annotations is True, then returns (issue, body, annotations),
where body is the body with the JSON annotations removed, and
annotations is a dictionary of the annotations. The annotations will
be an empty dictionary if there aren't any.
"""
issue = self._make_request(
'get', '/'.join(['repos', owner, repo, 'issues', str(issue)])
)
if parse_annotations:
body, annotations = extract_annotations(issue['body'])
return issue, body, annotations
return issue
[docs] def update_annotations(self, owner, repo, issue, annotations):
"""
Update the body of an existing issue, only changing the annotations.
If there are no existing annotations, the annotation block will be
added. If there is existing annotations, the given annotations will
be merged into them.
"""
_, body, old_anno = self.get_issue(
owner, repo, issue, parse_annotations=True
)
new_anno = deep_merge(old_anno, annotations)
return self.patch_issue(
owner, repo, issue, annotation=new_anno, body=body
)
[docs] def get_issues_page(self, owner, repo, **kwargs):
"""Retrieve a repository's issues by page."""
params = kwargs
# get the page number or default to 1
params['page'] = params.get('page', 1)
response = self._make_request(
'get',
'/'.join(['repos', owner, repo, 'issues']),
parse=False,
params=params
)
return response
[docs] def get_all_issues(self, owner, repo, **kwargs):
"""Retrieve all issues for a repository."""
all_issues = {}
page = 1
response = self.get_issues_page(owner, repo, page=page, **kwargs)
max_page = 1
if 'Link' in response.headers:
# Link is only present if there are multiple pages
link = response.headers['Link']
urls = link.replace('>', '').replace('<', '').split()
parsed_url = urlparse(urls[2].strip(';'))
max_page = int(parse_qs(parsed_url.query)['page'][0])
while response:
for i in response.json():
all_issues[i['number']] = i
page += 1
if page > max_page:
response = False
else:
response = self.get_issues_page(
owner, repo, page=page, **kwargs
)
return all_issues
[docs] def search_issues(
self, query, sort=None, order=None, owner=None, repo=None
):
"""
Perform a search against all issues based on the query provided.
If an owner and repo are passed in, then restrict the results to
that repo. Note that this can also be done in the query directly.
"""
if not query:
raise ValueError('Must specify a query')
if owner and repo:
query += f' repo:{owner}/{repo}'
return self._paginate_api(
'search/issues', q=query, sort=sort, order=order
)
[docs] def create_project(self, repo, name, body='', org=False):
"""Create a repository project."""
owner, repo = repo.split('/')
return self.creates_for_project(
'/'.join(['repos', owner, repo, 'projects']), {
'name': name, 'body': body
}
)
[docs] def create_column(self, project_id, column, org=False):
"""Create a project column."""
return self.creates_for_project(
'/'.join(['projects', str(project_id), 'columns']),
{'name': column}
)
[docs] def creates_for_project(self, url, data, org=False):
"""Create a repository project based on a properly formed url."""
if org:
raise NotImplementedError('orgs not supported yet')
return self._make_request('post', url, json=data)
[docs] def rand_color(self):
"""Generate a random color for labels."""
return (
f'{random.randint(0, 255):02X}'
f'{random.randint(0, 255):02X}'
f'{random.randint(0, 255):02X}'
)
[docs] def create_label(self, repo, name, org=False):
"""Create a label within a repository."""
return self.creates_for_project(
'/'.join(['repos', repo, 'labels']), {
'name': name, 'color': self.rand_color()
}
)
[docs] def apply_labels(self, repo, issue, *labels):
"""
Add label(s) to an issue.
repo looks like: my-gh-org/my-gh-repo
issue is an issue number (not id)
API takes a json list of labels
POST /repos/:owner/:repo/issues/:number/labels
"""
response = self._make_request(
'post',
'/'.join(['repos', repo, 'issues', str(issue), 'labels']),
json={'labels': labels}
)
return response
[docs] def remove_labels(self, repo, issue, *labels):
"""
Remove label(s) from an issue.
repo looks like: my-gh-org/my-gh-repo
issue is an issue number (not id)
API takes a json list of labels
POST /repos/:owner/:repo/issues/:number/labels
"""
for line in labels:
response = self._make_request(
'delete',
'/'.join(
['repos', repo, 'issues', str(issue), 'labels', line]
)
)
# Each response has all the labels, so only return the last one
return response
[docs] def get_repo_details(self, repo):
"""
Retrieve a repository's metadata.
:param repo: the organization/repository as a string.
:returns: the repository's metadata details.
"""
self.session.headers.update(
{'Accept': 'application/vnd.github.v3+json'}
)
return self._make_request('get', f'repos/{repo}')
[docs] def get_commit_details(self, repo, since, branch='master', path=None):
"""
Retrieve a repository branch's commit details since a given date/time.
:param repo: the organization/repository as a string.
:param since: the starting date/time as a datetime.
:param branch: the branch as a string. Defaults to master.
:param path: if provided, only commits for the path will be returned.
:returns: the repo branch's commit details since a given date/time.
"""
self.session.headers.update(
{'Accept': 'application/vnd.github.v3+json'}
)
opts = {'since': since.strftime('%Y-%m-%dT%H:%M:%SZ'), 'sha': branch}
if path:
opts['path'] = path
return self._make_request('get', f'repos/{repo}/commits', params=opts)
[docs] def get_branch_protection_details(self, repo, branch='master'):
"""
Retrieve a repository branch's branch protection details.
:param repo: the organization/repository as a string.
:param branch: the branch as a string.
:returns: the repository branch's branch protection details.
"""
self.session.headers.update(
{'Accept': 'application/vnd.github.zzzax-preview+json'}
)
return self._make_request(
'get', f'repos/{repo}/branches/{branch}/protection'
)
[docs] def make_request(self, method, url, parse=True, **kwargs):
"""
Perform a REST call to the Github API.
:param method: HTTP request method
:param url: The URL to make the request to
:param parse: Return the JSON response content, defaults to True. If
False then the entire response is returned
:param kwargs: Additional arguments added directly to the request call
:returns: response content from the request made
"""
return self._make_request(method, url, parse, **kwargs)
[docs] def paginate_api(self, api_url, **kwargs):
"""
Perform GET calls handling pagination.
:param api_url: The URL to make the GET request to
:param kwargs: Additional arguments added directly to the request call
:returns: Combined paginated JSON content
"""
return self._paginate_api(api_url, **kwargs)
def _make_request(self, method, url, parse=True, **kwargs):
r = self.session.request(method, url, **kwargs)
r.raise_for_status()
if parse:
return r.json()
return r
def _annotate_body(self, body, annotation):
anno_str = json.dumps(annotation, indent=2)
return f'```application/json+utilitarian\n{anno_str}\n```\n{body}'
def _paginate_api(self, api_url, **kwargs):
params = kwargs
params['page'] = params.get('page', 1)
response = self._make_request(
'get', api_url, parse=False, params=params
)
max_page = 1
all_items = []
if 'Link' in response.headers:
# Link is only present if there are multiple pages
link = response.headers['Link']
urls = link.replace('>', '').replace('<', '').split()
parsed_url = urlparse(urls[2].strip(';'))
max_page = int(parse_qs(parsed_url.query)['page'][0])
while response:
if api_url.startswith('search/'):
all_items.extend(response.json()['items'])
else:
all_items.extend(response.json())
params['page'] += 1
if params['page'] > max_page:
response = False
else:
response = self._make_request(
'get', api_url, parse=False, params=params
)
return all_items