Source code for khp.utils

"""
Utils module, contains utility functions used throughout the codebase.
"""

import logging
import yaml
import os
import boto3
import pytz
import re

import dateutil.parser
from datetime import datetime, timedelta, date
import json
import pandas as pd

LOG = logging.getLogger(__name__)

[docs]def chunker(seq, chunk_size): """Break a list into a set of smaller lists with len = chunk_size Args: seq (list): list to split up into chunks chunk_size (int): size of chunks Returns: list: list of lists with len = chunk_size """ return (seq[pos:pos + chunk_size] for pos in
range(0, len(seq), chunk_size))
[docs]def generate_date_range(start_date, end_date): """Generate the range of dates between start_date and end_date Args: start_date (str): Start date, `YYYY-mm-dd` end_date (str): End date, `YYYY-mm-dd` Returns: list: List of dates, as datetime.datetime objects, between start_date and end_date """ pandas_range = list(pd.date_range(start_date, end_date))
return [dt.to_pydatetime() for dt in pandas_range]
[docs]def read_jason(filename): """Read a json file into a python object Args: filename (str): path of the file Returns: list or dict: parsed data from the file """ with open(filename, "r") as f: content = f.read()
return json.loads(content)
[docs]def write_jason(data, filename): """Write a Python list or dictionary to a json file. Args: data (list or dict): data to write to file filename (str): path of the file to write to """ LOG.info("Writing data as json to {}".format(filename)) with open(filename, 'w') as outfile:
json.dump(data, outfile)
[docs]def yesterdays_range(): """Generate yesterdays date range, in datetime objects Returns: datetime.datetime: Beginning of yesterday datetime.datetime: End of yesterday """ yesterday = datetime.now() - timedelta(1) start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) end = start + timedelta(hours=23, minutes=59, seconds=59, milliseconds=999) LOG.info("Yesterday's range start: {} end: {}".format(start, end))
return start, end
[docs]def check_response(response): """Check the status of a requests response. If the status code is not 200, log the error and raise an exception. Args: response (requests.models.Response): Requests response object Raises Exception: If the status code is not 200 """ # TODO: implement if response.status_code == 200: return else: LOG.error("Requests error code: {0}. Response:\n{1}".format( response.status_code, response.text)) raise Exception("Non-200 status code returned")
return
[docs]def parse_date(str_dt): """Convert a date string to a datetime object Args: str_dt (str): Date in any format excepted by dateutil.parser. WARNING: read the dateutil.parser docs before using to udnerstand default behaviour (i.e. how str_dt's like `2018` or `2` are handled) Returns: datetime.datetime: Datetime object """ LOG.info("Parsing '{}' into datetime object".format(str_dt)) try: return dateutil.parser.parse(str_dt) except ValueError: raise except Exception: LOG.error("Unable to parse str_dt due to error.")
raise
[docs]def convert_timezone(dt, tz1, tz2): """Convert a datetime object from one timezone to another timezone Args: dt (datetime.datime): Datetime object to convert tz1 (str): pytz acceptable timezone that dt is in tz1 (str): pytz acceptable timezone to conver to Returns: datetime.datetime: Datetime object in timezone 2 """ LOG.info("Converting '{}' from timezone: '{}' to timezone '{}'".format( dt, tz1, tz2)) timezones = pytz.all_timezones if tz1 not in timezones or tz2 not in timezones: raise Exception("Supplied timezone(s) not in pytz available timezones. " "See pytz.all_timezones for available timezones") zone1 = pytz.timezone(tz1) zone2 = pytz.timezone(tz2) dt1 = zone1.localize(dt) # not sure how to deal with `is_dst` dynamically.. dt2 = dt1.astimezone(zone2)
return dt2
[docs]def read_yaml(yaml_file): """Read a yaml file. Args: yaml_file (str): Full path of the yaml file. Returns: dict: Dictionary of yaml_file contents. Raises: Exception: If the yaml_file cannot be opened. """ try: LOG.debug("Reading in yaml file %s" % yaml_file) with open(yaml_file) as f: # use safe_load instead load data = yaml.safe_load(f) return data except Exception: LOG.error('Unable to read file %s.' % (yaml_file))
raise
[docs]def upload_to_s3(s3_bucket, files, encrypt=True): """Upload a list of files to S3. Args: s3_bucket (str): Name of the S3 bucket. files (list): List of files to upload encrypt (:obj:`bool`, optional): Use serverside AES256 encryption, defaults to True. """ LOG.info("Attempting to load {0} files to s3 bucket: {1}".format( len(files), s3_bucket)) s3 = boto3.resource('s3') for f in files: data = open(f, 'rb') if encrypt: s3.Bucket(s3_bucket).put_object(Key=os.path.basename(f), Body=data, ServerSideEncryption='AES256') else:
s3.Bucket(s3_bucket).put_object(Key=os.path.basename(f), Body=data)
[docs]def get_s3_keys(s3_bucket, prefix=None): """Get a list of keys in an S3 bucket. Optionally specify a prefix to narrow down the keys returned. Args: s3_bucket (str): Name of the S3 bucket. prefix (:obj:`str`, optional): File prefix. Defaults to None. Returns: list: List of keys in the S3 bucket. """ keys = [] s3 = boto3.client('s3') paginator = s3.get_paginator('list_objects_v2') filters = {'Bucket': s3_bucket} if prefix is not None: filters['Prefix'] = prefix page_iterator = paginator.paginate(**filters) for page in page_iterator: for obj in page['Contents']: keys.append(obj['Key'])
return keys
[docs]def read_s3_file(s3_bucket, key): """Read the contents of an S3 object. Args: s3_bucket (str): Name of the S3 bucket. key (str): Name of the S3 object Returns: str: Contents of S3 object """ LOG.info("Reading {0} from S3 bucket: {1}".format(key, s3_bucket)) s3 = boto3.resource('s3') obj = s3.Object(s3_bucket, key) contents = obj.get()['Body'].read().decode('utf-8')
return contents
[docs]def parse_s3_contents(contents, delimiter, remove_dupes=False, skip_first_line=False): """Read the contents of an S3 object into a list of lists. Args: contents (str): contents of an S3 object delimiter (str): delimiter to split the contents of each line with remove_dupes (:obj:`bool`, optional): ensure each line is unique. Defaults to False. skip_first_line (:obj:`bool`, optional): skip the first line of the S3 object. Defaults to False. Returns: list: List of lists, where each tuple is the contents of a single line. """ lines = [line for line in contents.split('\r\n') if line != ''] if remove_dupes: lines = list(set(lines)) print (lines) parsed_contents = [line.split(delimiter) for line in lines] if skip_first_line: parsed_contents = parsed_contents[1:]
return parsed_contents
[docs]def search_path(path, like=None): """Search a path and return all the files. Optionally specify file prefixes and/or filetypes to narrow your criteria. Args: path (str): input path like (:obj:`list`, optional): List of file regexes to match files on Returns: list: list of files matching the specified filetypes """ files = [] LOG.info('Searching for files in %s' % path) for p in os.listdir(path): full_path = os.path.join(path, p) if os.path.isfile(full_path): basename = os.path.basename(full_path) if like: matches = [(True if re.search(reg, basename) else False) for reg in like] if sum(matches) > 0: files.append(full_path) else: files.append(full_path) LOG.info("Found %s files in %s", len(files), path)
return files
[docs]def clean_dir(path, prefix=None): """Helper function to clear any folders and files in a specified path. Args: path (str): input path prefix (:obj:`str`, optional): File prefix """ LOG.info("Cleaning folders in %s" % path) for p in os.listdir(path): prefix_check = True fullPath = os.path.join(path, p) if os.path.isdir(fullPath): shutil.rmtree(fullPath) assert not os.path.isdir(fullPath) log.debug("Successfully removed folder " + fullPath) elif os.path.isfile(fullPath): basename = os.path.basename(fullPath) if prefix is not None: prefix_check = basename.startswith(prefix) if prefix_check and basename.startswith('.') == False: os.remove(fullPath) assert not os.path.isfile(fullPath) log.debug("Successfully removed file " + fullPath)
return