"""Module containing a set of transformations that are run are the JSON
responses from the API, and the transcript dataframes.
"""
import builtins
import logging
import operator
import re
import sys
from functools import reduce
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
LOG = logging.getLogger(__name__)
[docs]def convo_start_indicator(dataframe):
"""Create an indicator for each message to signal whether it's the start of
the conversation. Starting messages are detected using a regex, since the
starting messages are system generated (hence filtering on message_type=1).
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Conversation start indicator
"""
joined_reg = r"(?:(?:joined\sthe\s)|(?:sest\sjoint\s.\sla\s))conversation\."
mtype_check = dataframe['message_type'] == 1
reg_check = dataframe['message'].str.contains(joined_reg)
convo_start_ind = pd.Series(0, index=dataframe.index)
convo_start_ind.loc[mtype_check & reg_check] = 1
return convo_start_ind
[docs]def convo_indicator(dataframe):
"""Create an indicator for each message to signal whether it's part of the
conversation. A message is deemed part of the conversation if it appears
after the convo_start_ind messages and is message type 3 or 4.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Conversation indicator
"""
mtype_check = dataframe['message_type'].isin([3, 4])
start_index = max(dataframe[dataframe['convo_start_ind'] == 1].index)
index_check = dataframe.index > start_index
convo_ind = pd.Series(0, index=dataframe.index)
convo_ind.loc[mtype_check & index_check] = 1
return convo_ind
[docs]def calc_wait_time(dataframe):
"""Calculate the wait time for a contact. Wait time is calculated as
time elasped between the start of the transcript and the first message
with convo_ind == 1.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
float: Wait time for the contact, in minutes
"""
start_queue_time = dataframe['dt'].min()
end_queue_time = dataframe[dataframe['convo_ind'] == 1]['dt'].min()
return convert_timedelta((end_queue_time-start_queue_time), 'm')
[docs]def calc_handle_time(dataframe):
"""Calculate the handle time for a contact. Handle time is calculated as
time elasped between all messages with convo_ind == 1.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
float: Handle time for the contact, in minutes
"""
start_convo_time = dataframe[dataframe['convo_ind'] == 1]['dt'].min()
end_convo_time = dataframe[dataframe['convo_ind'] == 1]['dt'].max()
return convert_timedelta((end_convo_time-start_convo_time), 'm')
[docs]def calc_response_time(dataframe):
"""Calculate the response time for each message. Defined as time elapsed
between message and previous message.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Response time for the message
"""
dataframe['prev_message_time'] = dataframe['dt'].shift(1)
null_check = pd.isnull(dataframe['prev_message_time'])
dataframe.loc[null_check, 'prev_message_time'] = dataframe['dt']
return dataframe['dt'] - dataframe['prev_message_time']
[docs]def calc_message_sequence(dataframe):
"""Calculate the message sequence for each message, defined as:
'prev_message_type' - 'message_type', used to indicate whether a message
was from counsellor to counsellor, counselee to counsellor, system to
counsellor etc.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Message sequence
"""
dataframe['prev_message_type'] = dataframe['message_type'].shift(1)
null_check = pd.isnull(dataframe['prev_message_type'])
dataframe.loc[null_check, 'prev_message_type'] = dataframe['message_type']
message_seq = dataframe['prev_message_type'].astype(int).astype(str) + '-' \
+ dataframe['message_type'].astype(str)
return message_seq
[docs]def str_length(dataframe, parameters):
"""Calculate the length of a string
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Number of characters for each row of a column
"""
return dataframe[parameters['column_name']].str.len()
[docs]def word_count(dataframe, parameters):
"""Count the number of words in a string
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
pandas.Series: Word counts for each row of a column
"""
str_series = dataframe[parameters['column_name']].fillna('')
return str_series.str.split().apply(len)
[docs]def parse_handlers(handlers):
"""Parse the handlers associated with a contact. Split out primary handler
and secondary handlers, assuming primary handler as the first handler in
the list of handlers.
Args:
handlers (list): List of handlers
Returns:
dict: Dictionary containing primary and secondary handlers
"""
output = {}
if not handlers: # no handlers on the call
output['agent_id'] = None
output['secondary_agents'] = None
elif len(handlers) == 1: # 1 handler
output["agent_id"] = handlers[0]
output["secondary_agents"] = None
else: # more than 2 handlers
output["agent_id"] = handlers[0]
output["secondary_agents"] = ",".join(handlers[1:])
return output
[docs]def filter_df(dataframe, filters):
"""Filter a dataframe
Args:
dataframe (pandas.DataFrame): Input dataframe
filters (list): list of filters (dicts) to apply
Returns:
pandas.DataFrame: Filtered dataframe
"""
for fltr_dict in filters:
fltr_fn = getattr(operator, fltr_dict['operator'])
fltr_value = fltr_dict['value']
fltr_value = getattr(builtins, fltr_dict['value_type'])(fltr_value)
fltr_check = fltr_fn(dataframe[fltr_dict['column']], fltr_value)
fltr_df = dataframe[fltr_check]
return fltr_df
[docs]def row_count(dataframe, parameters):
"""Count the number of rows in a dataframe, optionally applying filters
specified in parameters.
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
int: Number of rows
"""
if 'filters' in parameters.keys():
fltr_df = filter_df(dataframe, parameters['filters'])
else:
fltr_df = dataframe
return fltr_df.shape[0]
[docs]def column_operator(dataframe, parameters):
"""Apply a numpy operator on a column in a dataframe, optionally applying
filters specified in parameters.
If multiple aggregators are supplied, a dictionary will be returned instead
of the float result. For example, if the following parameters are provided:
.. code-block:: python
:emphasize-lines: 3
parameters = {
'output': khp_response_time,
'aggregator': [mean, max]
}
Function will return:
.. code-block:: python
{mean_khp_response_time: 1.2325, max_khp_response_time: 55.212}
Args:
dataframe (pandas.DataFrame): Input dataframe
parameters (dict): Parameters associated with the transform
Returns:
dict or float: aggregator operation output. If multiple aggregators are supplied, returns a dict with the result of
each aggregator. Otherwise, returns the float result of the
aggregator operation.
"""
fltr_df = dataframe
if 'filters' in parameters.keys():
fltr_df = filter_df(dataframe, parameters['filters'])
post_operator = None
if 'post_operator' in parameters.keys():
post = parameters['post_operator']
post_operator = getattr(sys.modules[__name__], post['name'])
post_args = post['args']
if isinstance(parameters['aggregator'], list):
result = {}
for agg_name in parameters['aggregator']:
agg = getattr(np, agg_name)
output_name = '{}_{}'.format(agg_name, parameters['output'])
series = getattr(dataframe, parameters['column'])
if post_operator:
result[output_name] = post_operator(agg(series), post_args)
else:
result[output_name] = agg(series)
else:
agg_name = parameters['aggregator']
agg = getattr(np, agg_name)
series = getattr(dataframe, parameters['column'])
if post_operator:
result = post_operator(agg(series), post_args)
else:
result = agg(series)
return result
[docs]def convert_timedelta(value, unit):
"""Convert a timedelta64 object to a float
Args:
value (numpy.timedelta64[ns]): Timedelta value to convert
unit (TYPE): Datetime unit code, see link for a list of acceptable codes
https://docs.scipy.org/doc/numpy-dev/reference/arrays.datetime.html
Returns:
float: Converted timedelta value
"""
return value / np.timedelta64(1, unit)
[docs]def parse_html(html):
"""Utilize the beautiful soup html parser to return the text from html
Args:
html (str): String of html
Returns:
str: extracted text
"""
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text()
[docs]def clean_text(text):
"""Function to sanitize the trnascript messages. Replaces all whitespace
with single spaces (since newlines break things when uploading to the DB).
Get rid of everything that isn't a number, letter, or in a list of
characters to keep.
Args:
text (str): Text string to clean
Returns:
str: Cleaned text str
"""
## replace newlines with spaces
text = ' '.join(text.split())
keep = [' ', '!', '?', '[', ']', '(', ')', '.', '$', '#', '*', ',', ':',
';']
# get rid of everything that isn't a number, letter, or in keep
text = ''.join([char for char in text if char.isalnum() or char in keep])
return text
[docs]def parse_message(message_text, is_html):
"""Return the text from a (potentially) html message string
Args:
message_text (str): Description
is_html (bool): Boolean indicator whether the text is html, provided
upstream from the API response.
Returns:
str: Parsed message text
"""
if is_html:
parsed_text = parse_html(message_text)
else:
parsed_text = message_text
parsed_text = clean_text(parsed_text)
return parsed_text
[docs]def parse_messages(messages):
"""Transformation function to parse a list of messages
Args:
messages (list): List of message dicts
Returns:
list: List of transformed message dicts
"""
name_map = {'Sender': 'sender', 'Timestamp': 'dt',
'MessageType': 'message_type', 'Message': 'message',
'ContactId': 'contact_id', 'DisplayName': 'display_name'}
output_messages = []
for message in messages:
# remap message dict keys with the name_map specified above
transformed = {name_map[k]:v for k, v in message.items()
if k in name_map.keys()}
transformed['message'] = parse_message(transformed['message'],
message['IsHtml'])
output_messages.append(transformed)
return output_messages
return transformed