#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: prereceivecli.py
#
# Copyright 2019 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for prereceivecli.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
import argparse
import json
import logging
import logging.config
import os
from dataclasses import dataclass
import boto3
import botocore
from .configuration import HASHES_SCHEMA
from .lib import (get_project,
get_table_for_project_group,
send_slack_message,
HashChecker,
SecurityEntry,
parse_hook_input)
[docs]@dataclass
class AwsCredentials:
"""Stores AWS Credentials."""
access_key_id: str
secret_access_key: str
session_token: str
__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''26-02-2019'''
__copyright__ = '''Copyright 2019, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos", "Alberto Rodriguez Garcia"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''prereceivecli'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
LOGGERS_TO_DISABLE = ['botocore.credentials']
[docs]def get_arguments():
"""Gets us the cli arguments.
Returns the args as parsed from the argsparser.
"""
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser(description=('A cli that implements a git server side pre-receive hook that gets '
'driven from dynamodb and reports to slack offending pushes.'))
excluding_key_arn = parser.add_mutually_exclusive_group(required=True)
excluding_secret_token = parser.add_mutually_exclusive_group()
excluding_key_token = parser.add_mutually_exclusive_group()
excluding_secret_arn = parser.add_mutually_exclusive_group()
key = excluding_key_arn.add_argument('--key', '-k',
dest='access_key',
action='store',
help='The aws access key',
type=str,
required=False)
arn = excluding_key_arn.add_argument('--arn', '-arn',
dest='aws_role_arn',
action='store',
help='The aws role arn, defaults to environment variable',
type=str,
required=False)
token = excluding_secret_token.add_argument('--token', '-token',
dest='aws_web_identity_token_file',
action='store',
help='The aws web identity token file, defaults to environment variable',
type=argparse.FileType('r'),
required=False)
secret = excluding_secret_token.add_argument('--secret', '-s',
dest='secret_key',
action='store',
help='The aws secret key',
type=str,
required=False)
# See: https://bugs.python.org/issue10984#msg219660
excluding_key_token._group_actions.extend([key, token]) # pylint: disable=protected-access
excluding_secret_arn._group_actions.extend([arn, secret]) # pylint: disable=protected-access
parser.add_argument('--log-config',
'-l',
action='store',
dest='logger_config',
help='The location of the logging config json file',
default='')
parser.add_argument('--log-level',
'-L',
help='Provide the log level. Defaults to info.',
dest='log_level',
action='store',
default='info',
choices=['debug',
'info',
'warning',
'error',
'critical'])
parser.add_argument('--slack-web-hook', '-w',
dest='web_hook',
action='store',
help='The slack web_hook to post messages to',
type=str,
required=True)
parser.add_argument('--region', '-r',
dest='region',
action='store',
help='The aws region to use',
type=str,
required=True)
feature_parser = parser.add_mutually_exclusive_group(required=False)
feature_parser.add_argument('--aggressive-check', '-a',
dest='aggressive',
action='store_true',
help=('Flag noting whether the project should be rejected if no entry in the security '
'table'))
feature_parser.add_argument('--no-aggressive-check', '-n',
dest='aggressive',
action='store_false',
help=('Flag noting whether the project should be rejected if no entry in the security '
'table'))
parser.set_defaults(aggressive=False)
args = parser.parse_args()
if all([args.access_key, args.secret_key is None]):
parser.error("--key requires --secret.")
elif all([args.aws_role_arn, args.aws_web_identity_token_file is None]):
parser.error("--arn requires --token.")
return args
[docs]def setup_logging(level, config_file=None):
"""Sets up the logging.
Args:
level: The level to log for.
config_file: The config file with the logging configuration. If provided it superseeds the level arg.
Returns:
args: The parsed arguments.
"""
# This will configure the logging, if the user has set a config file.
# If there's no config file, logging will default to stdout.
if config_file:
# Get the config for the logger. Of course this needs exception
# catching in case the file is not there and everything. Proper IO
# handling is not shown here.
configuration = json.loads(open(config_file).read())
# Configure the logger
logging.config.dictConfig(configuration)
else:
handler = logging.StreamHandler()
handler.setLevel(level.upper())
formatter = logging.Formatter(('%(asctime)s - '
'%(name)s - '
'%(levelname)s - '
'%(message)s'))
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
LOGGER.setLevel(level.upper())
for logger in LOGGERS_TO_DISABLE:
logging.getLogger(logger).disabled = True
[docs]def get_credentials(args):
"""Gets AWS credentials.
Needs the args to either assume role or get credentials
Credentials:
Credentials: The AWS credentials to set for our environment
"""
if not all([args.access_key, args.secret_key]):
client = boto3.client('sts',
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
aws_session_token=os.environ.get('AWS_SESSION_TOKEN'))
with open(args.aws_web_identity_token_file.name, 'r') as opened_file:
token = opened_file.read()
try:
response = client.assume_role_with_web_identity(RoleArn=args.aws_role_arn,
RoleSessionName='prereceive',
WebIdentityToken=token)
except botocore.exceptions.ClientError:
LOGGER.exception('client')
return AwsCredentials(response['Credentials']['AccessKeyId'],
response['Credentials']['SecretAccessKey'],
response['Credentials']['SessionToken'])
return AwsCredentials(args.access_key,
args.secret_key,
None)
[docs]def validate_commit(project, dynamodb_table, web_hook, aggressive_checking):
"""Validates that no unauthorized change has been performed on protected files on a specified commit.
Args:
project (Project): An object exposing attributes of the required variables.
dynamodb_table (Table): The dynamodb table with the entries for the projects.
web_hook (str): The url of the slack webhook.
aggressive_checking (bool): If set any unmatched repositories will be rejected.
Returns:
success (bool): True if the commit is valid False otherwise.
"""
entries = dynamodb_table.get_item(Key={'slug': project.slug}).get('Item', {})
if aggressive_checking and not entries:
message = f'No dynamodb entries found for project "{project.slug}", project rejected due to aggressive checking'
LOGGER.info(message)
# This print is required to provide feedback to the user through the git hook via stdout
print(message)
return False
if not entries:
LOGGER.info('No dynamodb entries found for project "%s", project not secured', project.slug)
return True
entries = HASHES_SCHEMA.validate(entries)
entries = [SecurityEntry(entry.get('hashes', []),
entry.get('name', ''),
entry.get('type', '')) for entry in entries.get('protected_items', [])]
if not any([entry.hashes for entry in entries]):
LOGGER.info('No hashes found for project "%s" for any type, project not secured', project.slug)
return True
errors = HashChecker().verify(project, entries)
success = False if errors else True # pylint: disable=simplifiable-if-expression
for error_message in errors:
send_slack_message(web_hook, error_message)
return success
[docs]def main():
"""Main method.
This method holds what you want to execute when
the script is run on command line.
"""
try:
args = get_arguments()
setup_logging(args.log_level, args.logger_config)
# get the base, commit and ref as provided by the calling pre-receive hook
base, commit = parse_hook_input()
project = get_project(base, commit)
if not project.group:
LOGGER.info('Action seems as automatic merge for project "%s" from user "%s" and not a pre-receive hook, '
'letting through', project.slug, project.username)
raise SystemExit(0)
if not project.commit:
LOGGER.info('It seems only tags were pushed for project "%s" by username "%s", letting through',
project.slug, project.username)
raise SystemExit(0)
# In some cases (e.g. if a branch is deleted), incoming commit (newrev) is
# 0000000000000000000000000000000000000000 on which we cannot do any meaningful checks
if commit.isdecimal() and int(commit) == 0:
LOGGER.info('Disregarding push with newrev="%s" for project "%s" by username "%s", '
'letting through', commit, project.slug, project.username)
raise SystemExit(0)
os.environ['AWS_DEFAULT_REGION'] = args.region
dynamodb_table = get_table_for_project_group(project.group, get_credentials(args))
if not dynamodb_table:
LOGGER.info('Project "%s" does not appear to have security settings set, letting through...', project.slug)
raise SystemExit(0)
success = validate_commit(project, dynamodb_table, args.web_hook, args.aggressive)
except Exception:
LOGGER.exception('Some unexpected error occurred letting things through!')
raise SystemExit(0)
raise SystemExit(0 if success else 1)
if __name__ == '__main__':
main()