Utilities¶
#exports
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import junix
from html.parser import HTMLParser
from nbdev.export2html import convert_md
from joblib import Parallel, delayed
from scipy.stats import rankdata
from skopt import BayesSearchCV
import os
import codecs
from ipypb import track
from warnings import warn
from functools import partial
from distutils.dir_util import copy_tree
from collections.abc import Iterable, Sized
from collections import defaultdict
import sklearn
from sklearn import linear_model
from sklearn.metrics import r2_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import is_classifier, clone
from sklearn.utils.validation import indexable
try:
from sklearn.metrics import check_scoring
except ImportError:
from sklearn.metrics.scorer import check_scoring
User Inputs¶
dev_nbs_dir = '.'
docs_dir = '../docs'
docs_nb_img_dir = f'{docs_dir}/img/nbs'
nb_img_dir = '../img/nbs'
Monkey Patching skopt
¶
We'll quickly fix the issue skopt
has with the latest sklearn
version, first we'll fix the BayesSearchCV
initialisation
#exports
def bayes_search_CV_init(self, estimator, search_spaces, optimizer_kwargs=None,
n_iter=50, scoring=None, fit_params=None, n_jobs=1,
n_points=1, iid=True, refit=True, cv=None, verbose=0,
pre_dispatch='2*n_jobs', random_state=None,
error_score='raise', return_train_score=False):
self.search_spaces = search_spaces
self.n_iter = n_iter
self.n_points = n_points
self.random_state = random_state
self.optimizer_kwargs = optimizer_kwargs
self._check_search_space(self.search_spaces)
self.fit_params = fit_params
self.iid = None
super(BayesSearchCV, self).__init__(
estimator=estimator, scoring=scoring,
n_jobs=n_jobs, refit=refit, cv=cv, verbose=verbose,
pre_dispatch=pre_dispatch, error_score=error_score,
return_train_score=return_train_score)
BayesSearchCV.__init__ = bayes_search_CV_init
We'll then fix the BayesSearchCV
fit method
#exports
def bayes_search_CV__fit(self, X, y, groups, parameter_iterable):
"""
Actual fitting, performing the search over parameters.
Taken from https://github.com/scikit-learn/scikit-learn/blob/0.18.X
.../sklearn/model_selection/_search.py
"""
estimator = self.estimator
cv = sklearn.model_selection._validation.check_cv(
self.cv, y, classifier=is_classifier(estimator))
self.scorer_ = check_scoring(
self.estimator, scoring=self.scoring)
X, y, groups = indexable(X, y, groups)
n_splits = cv.get_n_splits(X, y, groups)
if self.verbose > 0 and isinstance(parameter_iterable, Sized):
n_candidates = len(parameter_iterable)
print("Fitting {0} folds for each of {1} candidates, totalling"
" {2} fits".format(n_splits, n_candidates,
n_candidates * n_splits))
base_estimator = clone(self.estimator)
pre_dispatch = self.pre_dispatch
cv_iter = list(cv.split(X, y, groups))
out = Parallel(
n_jobs=self.n_jobs, verbose=self.verbose,
pre_dispatch=pre_dispatch
)(delayed(sklearn.model_selection._validation._fit_and_score)(
clone(base_estimator),
X, y, self.scorer_,
train, test, self.verbose, parameters,
fit_params=self.fit_params,
return_train_score=self.return_train_score,
return_n_test_samples=True,
return_times=True, return_parameters=True,
error_score=self.error_score
)
for parameters in parameter_iterable
for train, test in cv_iter)
# if one choose to see train score, "out" will contain train score info
if self.return_train_score:
(train_scores, test_scores, n_test_samples,
fit_time, score_time, parameters) = zip(*out)
else:
from warnings import warn
(fit_failed, test_scores, n_test_samples,
fit_time, score_time, parameters) = zip(*[a.values() for a in out])
candidate_params = parameters[::n_splits]
n_candidates = len(candidate_params)
results = dict()
def _store(key_name, array, weights=None, splits=False, rank=False):
"""A small helper to store the scores/times to the cv_results_"""
array = np.array(array, dtype=np.float64).reshape(n_candidates,
n_splits)
if splits:
for split_i in range(n_splits):
results["split%d_%s"
% (split_i, key_name)] = array[:, split_i]
array_means = np.average(array, axis=1, weights=weights)
results['mean_%s' % key_name] = array_means
# Weighted std is not directly available in numpy
array_stds = np.sqrt(np.average((array -
array_means[:, np.newaxis]) ** 2,
axis=1, weights=weights))
results['std_%s' % key_name] = array_stds
if rank:
results["rank_%s" % key_name] = np.asarray(
rankdata(-array_means, method='min'), dtype=np.int32)
# Computed the (weighted) mean and std for test scores alone
# NOTE test_sample counts (weights) remain the same for all candidates n_test_samples
n_test_samples = np.array(n_test_samples[:n_splits],
dtype=np.int)
_store('test_score', test_scores, splits=True, rank=True,
weights=n_test_samples if self.iid else None)
if self.return_train_score:
_store('train_score', train_scores, splits=True)
_store('fit_time', fit_time)
_store('score_time', score_time)
best_index = np.flatnonzero(results["rank_test_score"] == 1)[0]
best_parameters = candidate_params[best_index]
# Use one MaskedArray and mask all the places where the param is not
# applicable for that candidate. Use defaultdict as each candidate may
# not contain all the params
param_results = defaultdict(partial(np.ma.array,
np.empty(n_candidates,),
mask=True,
dtype=object))
for cand_i, params in enumerate(candidate_params):
for name, value in params.items():
# An all masked empty array gets created for the key
# `"param_%s" % name` at the first occurence of `name`.
# Setting the value at an index also unmasks that index
param_results["param_%s" % name][cand_i] = value
results.update(param_results)
# Store a list of param dicts at est_sample_counts = np.array(n_test_samples[:n_splits], key 'params'
results['params'] = candidate_params
self.cv_results_ = results
self.best_index_ = best_index
self.n_splits_ = n_splits
if self.refit:
# fit the best estimator using the entire dataset
# clone first to work around broken estimators
best_estimator = clone(base_estimator).set_params(
**best_parameters)
if y is not None:
best_estimator.fit(X, y, **self.fit_params)
else:
best_estimator.fit(X, **self.fit_params)
self.best_estimator_ = best_estimator
return self
BayesSearchCV._fit = bayes_search_CV__fit
Custom sklearn
Models¶
We require access to the dataframe index in order to evaluate the discharge optimisation predictions accurately (as we need to group predictions by date), however, standard sklearn
strips them and returns only numpy arrays. We'll create a custom model that preserves the index.
#exports
def add_series_index(idx_arg_pos=0):
def decorator(func):
def decorator_wrapper(*args, **kwargs):
input_s = args[idx_arg_pos]
assert isinstance(input_s, (pd.Series, pd.DataFrame))
result = pd.Series(func(*args, **kwargs), index=input_s.index)
return result
return decorator_wrapper
return decorator
class PandasRandomForestRegressor(RandomForestRegressor):
def __init__(self, n_estimators=100, *, criterion='mse', max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features='auto', max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False, ccp_alpha=0.0, max_samples=None, score_func=None):
super().__init__(n_estimators=n_estimators, criterion=criterion, max_depth=max_depth, min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf, min_weight_fraction_leaf=min_weight_fraction_leaf, max_features=max_features, max_leaf_nodes=max_leaf_nodes, min_impurity_decrease=min_impurity_decrease, min_impurity_split=min_impurity_split, bootstrap=bootstrap, oob_score=oob_score, n_jobs=n_jobs, random_state=random_state, verbose=verbose, warm_start=warm_start, ccp_alpha=ccp_alpha, max_samples=max_samples)
if score_func is None:
self.score_func = r2_score
else:
self.score_func = score_func
@add_series_index(1)
def predict(self, X):
pred = super().predict(X)
return pred
def score(self, X, y, *args, **kwargs):
y_pred = self.predict(X)
score = self.score_func(y, y_pred, *args, **kwargs)
return score
pandas_RF = PandasRandomForestRegressor()
pandas_RF
Saving the Development Notebooks¶
We'll first convert the notebooks to markdown
#exports
def convert_file_to_json(filepath):
with open(filepath, 'r', encoding='utf8') as f:
contents = f.read()
f.close()
return json.loads(contents)
junix.exporter.convert_file_to_json = convert_file_to_json
def encode_file_as_utf8(fp):
with codecs.open(fp, 'r') as file:
contents = file.read(1048576)
file.close()
if not contents:
pass
else:
with codecs.open(fp, 'w', 'utf-8') as file:
file.write(contents)
def convert_nbs_to_md(nbs_dir, docs_nb_img_dir, docs_dir):
nb_files = [f for f in os.listdir(nbs_dir) if f[-6:]=='.ipynb']
for nb_file in track(nb_files):
nb_fp = f'{nbs_dir}/{nb_file}'
junix.export_images(nb_fp, docs_nb_img_dir)
convert_md(nb_fp, docs_dir, img_path=f'{docs_nb_img_dir}/', jekyll=False)
md_fp = docs_dir + '/'+ nb_file.replace('.ipynb', '') + '.md'
encode_file_as_utf8(md_fp)
convert_nbs_to_md(dev_nbs_dir, docs_nb_img_dir, docs_dir)
We'll then parse the HTML tables into markdown
#exports
class MyHTMLParser(HTMLParser):
def __init__(self):
super().__init__()
self.tags = []
def handle_starttag(self, tag, attrs):
self.tags.append(self.get_starttag_text())
def handle_endtag(self, tag):
self.tags.append(f"</{tag}>")
get_substring_idxs = lambda string, substring: [num for num in range(len(string)-len(substring)+1) if string[num:num+len(substring)]==substring]
def convert_df_to_md(df):
idx_col = df.columns[0]
df = df.set_index(idx_col)
if idx_col == 'Unnamed: 0':
df.index.name = ''
table_md = df.to_markdown()
return table_md
def extract_div_to_md_table(start_idx, end_idx, table_and_div_tags, file_txt):
n_start_divs_before = table_and_div_tags[:start_idx].count('<div>')
n_end_divs_before = table_and_div_tags[:end_idx].count('</div>')
div_start_idx = get_substring_idxs(file_txt, '<div>')[n_start_divs_before-1]
div_end_idx = get_substring_idxs(file_txt, '</div>')[n_end_divs_before]
div_txt = file_txt[div_start_idx:div_end_idx]
potential_dfs = pd.read_html(div_txt)
assert len(potential_dfs) == 1, 'Multiple tables were found when there should be only one'
df = potential_dfs[0]
md_table = convert_df_to_md(df)
return div_txt, md_table
def extract_div_to_md_tables(md_fp):
with open(md_fp, 'r') as f:
file_txt = f.read()
parser = MyHTMLParser()
parser.feed(file_txt)
table_and_div_tags = [tag for tag in parser.tags if tag in ['<div>', '</div>', '<table border="1" class="dataframe">', '</table>']]
table_start_tag_idxs = [i for i, tag in enumerate(table_and_div_tags) if tag=='<table border="1" class="dataframe">']
table_end_tag_idxs = [table_start_tag_idx+table_and_div_tags[table_start_tag_idx:].index('</table>') for table_start_tag_idx in table_start_tag_idxs]
div_to_md_tables = []
for start_idx, end_idx in zip(table_start_tag_idxs, table_end_tag_idxs):
div_txt, md_table = extract_div_to_md_table(start_idx, end_idx, table_and_div_tags, file_txt)
div_to_md_tables += [(div_txt, md_table)]
return div_to_md_tables
def clean_md_file_tables(md_fp):
div_to_md_tables = extract_div_to_md_tables(md_fp)
with open(md_fp, 'r') as f:
md_file_text = f.read()
for div_txt, md_txt in div_to_md_tables:
md_file_text = md_file_text.replace(div_txt, md_txt)
with open(md_fp, 'w') as f:
f.write(md_file_text)
return
md_fps = [f'{docs_dir}/{f}' for f in os.listdir(docs_dir) if f[-3:]=='.md' if f!='00-utilities.md']
for md_fp in md_fps:
div_to_md_tables = clean_md_file_tables(md_fp)
And finally change the filepaths for any images in the notebooks
#exports
def clean_md_file_img_fps(md_fp):
with open(md_fp, 'r') as f:
md_file_text = f.read()
md_file_text = md_file_text.replace('../docs/img/nbs', 'img/nbs')
with open(md_fp, 'w') as f:
f.write(md_file_text)
return
for md_fp in md_fps:
clean_md_file_img_fps(md_fp)
Plotting¶
AxTransformer
enables conversion from data coordinates to tick locations, set_date_ticks
allows custom date ranges to be applied to plots (including a seaborn heatmap)
#exports
class AxTransformer:
def __init__(self, datetime_vals=False):
self.datetime_vals = datetime_vals
self.lr = linear_model.LinearRegression()
return
def process_tick_vals(self, tick_vals):
if not isinstance(tick_vals, Iterable) or isinstance(tick_vals, str):
tick_vals = [tick_vals]
if self.datetime_vals == True:
tick_vals = pd.to_datetime(tick_vals).astype(int).values
tick_vals = np.array(tick_vals)
return tick_vals
def fit(self, ax, axis='x'):
axis = getattr(ax, f'get_{axis}axis')()
tick_locs = axis.get_ticklocs()
tick_vals = self.process_tick_vals([label._text for label in axis.get_ticklabels()])
self.lr.fit(tick_vals.reshape(-1, 1), tick_locs)
return
def transform(self, tick_vals):
tick_vals = self.process_tick_vals(tick_vals)
tick_locs = self.lr.predict(np.array(tick_vals).reshape(-1, 1))
return tick_locs
def set_date_ticks(ax, start_date, end_date, axis='y', date_format='%Y-%m-%d', **date_range_kwargs):
dt_rng = pd.date_range(start_date, end_date, **date_range_kwargs)
ax_transformer = AxTransformer(datetime_vals=True)
ax_transformer.fit(ax, axis=axis)
getattr(ax, f'set_{axis}ticks')(ax_transformer.transform(dt_rng))
getattr(ax, f'set_{axis}ticklabels')(dt_rng.strftime(date_format))
ax.tick_params(axis=axis, which='both', bottom=True, top=False, labelbottom=True)
return ax
Finally we'll export the relevant code to our batopt
module