Skip to content
Snippets Groups Projects
Unverified Commit a6badc69 authored by Alexander Dunkel's avatar Alexander Dunkel
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
## AD Base modules python/Jupyterlab
-----
The `tools.py` and other `*.py` files in this repository
contain base python methods used across Jupyter notebooks.
\ No newline at end of file
tools.py 0 → 100644
"""Workshop tools"""
import os
import io
import csv
import numpy as np
import warnings
import requests
import pandas as pd
import zipfile
import shutil
import pkg_resources
import platform
import fnmatch
from itertools import islice
from dotenv import load_dotenv
from pathlib import Path
from collections import namedtuple
from IPython.display import clear_output
from typing import List, Optional, Dict
from IPython.core.display import display, HTML
class DbConn(object):
def __init__(self, db_conn):
self.db_conn = db_conn
def query(self, sql_query: str) -> pd.DataFrame:
"""Execute Calculation SQL Query with pandas"""
with warnings.catch_warnings():
# ignore warning for non-SQLAlchemy Connecton
# see github.com/pandas-dev/pandas/issues/45660
warnings.simplefilter('ignore', UserWarning)
# create pandas DataFrame from database query
df = pd.read_sql_query(sql_query, self.db_conn)
return df
def close(self):
self.db_conn.close()
def print_link(url: str, hashtag: str):
"""Format HTML link with hashtag"""
return f"""
<div class="alert alert-warning" role="alert" style="color: black;">
<strong>Open the following link in a new browser tab and have a look at the content:</strong>
<br>
<a href="{url}">Instagram: {hashtag} feed (json)</a>
</div>
"""
def get_sample_url(use_base: bool = None):
"""Retrieve sample json url from .env"""
if use_base is None:
use_base = True
load_dotenv(Path.cwd().parents[0] / '.env', override=True)
SAMPLE_URL = os.getenv("SAMPLE_URL")
if SAMPLE_URL is None:
raise ValueError(
f"Environment file "
f"{Path.cwd().parents[0] / '.env'} not found")
if use_base:
SAMPLE_URL = f'{BASE_URL}{SAMPLE_URL}'
return SAMPLE_URL
def return_total(headers: Dict[str, str]):
"""Return total length from requests header"""
if not headers:
return
total_length = headers.get('content-length')
if not total_length:
return
try:
total_length = int(total_length)
except:
total_length = None
return total_length
def stream_progress(total_length: int, loaded: int):
"""Stream progress report"""
clear_output(wait=True)
perc_str = ""
if total_length:
total = total_length/1000000
perc = loaded/(total/100)
perc_str = f"of {total:.2f} ({perc:.0f}%)"
print(
f"Loaded {loaded:.2f} MB "
f"{perc_str}..")
def stream_progress_basic(total: int, loaded: int):
"""Stream progress report"""
clear_output(wait=True)
perc_str = ""
if total:
perc = loaded/(total/100)
perc_str = f"of {total:.0f} ({perc:.0f}%)"
print(
f"Processed {loaded:.0f} "
f"{perc_str}..")
def get_stream_file(url: str, path: Path):
"""Download file from url and save to path"""
chunk_size = 8192
with requests.get(url, stream=True) as r:
r.raise_for_status()
total_length = return_total(r.headers)
with open(path, 'wb') as f:
for ix, chunk in enumerate(r.iter_content(chunk_size=chunk_size)):
f.write(chunk)
loaded = (ix*chunk_size)/1000000
if (ix % 100 == 0):
stream_progress(
total_length, loaded)
stream_progress(
total_length, loaded)
def get_stream_bytes(url: str):
"""Stream file from url to bytes object (in-memory)"""
chunk_size = 8192
content = bytes()
with requests.get(url, stream=True) as r:
r.raise_for_status()
total_length = return_total(r.headers)
for ix, chunk in enumerate(r.iter_content(
chunk_size=chunk_size)):
content += bytes(chunk)
loaded = (ix*chunk_size)/1000000
if (ix % 100 == 0):
stream_progress(
total_length, loaded)
stream_progress(
total_length, loaded)
return content
def highlight_row(s, color):
return f'background-color: {color}'
def display_header_stats(
df: pd.DataFrame, metric_cols: List[str], base_cols: List[str]):
"""Display header stats for CSV files"""
pd.options.mode.chained_assignment = None
# bg color metric cols
for col in metric_cols:
df.loc[df.index, col] = df[col].str[:25]
styler = df.style
styler.applymap(
lambda x: highlight_row(x, color='#FFF8DC'),
subset=pd.IndexSlice[:, metric_cols])
# bg color base cols
styler.applymap(
lambda x: highlight_row(x, color='#8FBC8F'),
subset=pd.IndexSlice[:, base_cols])
# bg color index cols (multi-index)
css = []
for ix, __ in enumerate(df.index.names):
idx = df.index.get_level_values(ix)
css.extend([{
'selector': f'.row{i}.level{ix}',
'props': [('background-color', '#8FBC8F')]}
for i,v in enumerate(idx)])
styler.set_table_styles(css)
display(styler)
def get_folder_size(folder: Path):
"""Return size of all files in folder in MegaBytes"""
if not folder.exists():
raise Warning(
f"Folder {folder} does not exist")
return
size_mb = 0
for file in folder.glob('*'):
size_mb += file.stat().st_size / (1024*1024)
return size_mb
def get_zip_extract(
uri: str, filename: str, output_path: Path,
create_path: bool = True, skip_exists: bool = True,
report: bool = True,
write_intermediate: bool = None):
"""Get Zip file and extract to output_path.
Create Path if not exists."""
if write_intermediate is None:
write_intermediate = False
if create_path:
output_path.mkdir(
exist_ok=True)
if skip_exists and Path(
output_path / filename.replace(".zip", ".csv")).exists():
if report:
print("File already exists.. skipping download..")
return
if write_intermediate:
out_file = output_path / filename
get_stream_file(f'{uri}{filename}', out_file)
z = zipfile.ZipFile(out_file)
else:
content = get_stream_bytes(
f'{uri}{filename}')
z = zipfile.ZipFile(io.BytesIO(content))
print("Extracting zip..")
z.extractall(output_path)
if write_intermediate:
if out_file.is_file():
out_file.unlink()
if report:
raw_size_mb = get_folder_size(output_path)
print(
f"Retrieved {filename}, "
f"extracted size: {raw_size_mb:.2f} MB")
def zip_dir(path: Path, zip_file_path: Path):
"""Zip all contents of path to zip_file"""
files_to_zip = [
file for file in path.glob('*') if file.is_file()]
with zipfile.ZipFile(
zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zip_f:
for file in files_to_zip:
zip_f.write(file, file.name)
def clean_folder(path: Path):
"""Remove folder, warn if recursive"""
if not path.is_dir():
print(f"{path} is not a directory")
return
raw_size_mb = get_folder_size(path)
contents = [content for content in path.glob('*')]
answer = input(
f"Removing {path.stem} with "
f"{len(contents)} files / {raw_size_mb:.0f} MB ? "
f"Type 'y' or 'yes'")
if answer not in ["y", "yes"]:
print("Cancel.")
return
for content in contents:
if content.is_file():
content.unlink()
continue
try:
content.rmdir()
except:
raise Warning(
f"{content.name} contains subdirs. "
f"Cancelling operation..")
return
path.rmdir()
def clean_folders(paths: List[Path]):
"""Clean list of folders (depth: 2)"""
for path in paths:
clean_folder(path)
print(
"Done. Thank you. "
"Do not forget to shut down your notebook server "
"(File > Shut Down), once you are finished with "
"the last notebook.")
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def package_report(root_packages: List[str], python_version = True):
"""Report package versions for root_packages entries"""
root_packages.sort(reverse=True)
root_packages_list = []
if python_version:
pyv = platform.python_version()
root_packages_list.append(["python", pyv])
for m in pkg_resources.working_set:
if m.project_name.lower() in root_packages:
root_packages_list.append([m.project_name, m.version])
html_tables = ''
for chunk in chunks(root_packages_list, 10):
# get table HTML
html_tables += pd.DataFrame(
chunk,
columns=["package", "version"]
).set_index("package").transpose().to_html()
display(HTML(
f'''
<details><summary style="cursor: pointer;">List of package versions used in this notebook</summary>
{html_tables}
</details>
'''
))
def tree(dir_path: Path, level: int=-1, limit_to_directories: bool=False,
length_limit: int=1000, ignore_files_folders=None, ignore_match=None):
"""Given a directory Path object print a visual tree structure
Source: https://stackoverflow.com/a/59109706/4556479
"""
dir_path = Path(dir_path) # accept string coerceable to Path
files = 0
directories = 0
space = ' '
branch = ''
# pointers:
tee = '├── '
last = '└── '
print_list = []
if ignore_files_folders is None:
ignore_files_folders = [".git", ".ipynb_checkpoints", "__pycache__", "__init__.py"]
if ignore_match is None:
ignore_match = ["_*", "*.pyc"]
def inner(dir_path: Path, prefix: str='', level=-1):
nonlocal files, directories
if not level:
return # 0, stop iterating
if limit_to_directories:
contents = [d for d in dir_path.iterdir() if
d.name not in ignore_files_folders]
else:
contents = [d for d in dir_path.iterdir()
if d.name not in ignore_files_folders and
not any(fnmatch.fnmatch(d, pat) for pat in ignore_match)]
# print(f'{contents[1].name})
pointers = [tee] * (len(contents) - 1) + [last]
for pointer, path in zip(pointers, contents):
if path.is_dir():
yield prefix + pointer + path.name
directories += 1
extension = branch if pointer == tee else space
yield from inner(path, prefix=prefix+extension, level=level-1)
elif not limit_to_directories:
yield prefix + pointer + path.name
files += 1
print_list.append(".")
iterator = inner(dir_path, level=level)
for line in islice(iterator, length_limit):
print_list.append(line)
if next(iterator, None):
print_list.append(f'... length_limit, {length_limit}, reached, counted:')
print_list.append(f'\n{directories} directories' + (f', {files} files' if files else ''))
br = "<br>"
return HTML(f"""
<div>
<details><summary style='cursor: pointer;'>Directory file tree</summary>
<pre><code>{"<br>".join(print_list)}
</pre></code>
</details>
</div>
""")
def record_preview_hll(file: Path, num: int = 0):
"""Get record preview for hll data"""
with open(file, 'r', encoding="utf-8") as file_handle:
post_reader = csv.DictReader(
file_handle,
delimiter=',',
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
for ix, hll_record in enumerate(post_reader):
hll_record = get_hll_record(hll_record, strip=True)
# convert to df for display
display(pd.DataFrame(data=[hll_record]).rename_axis(
f"Record {ix}", axis=1).transpose().style.background_gradient(cmap='viridis'))
# stop iteration after x records
if ix >= num:
break
HllRecord = namedtuple('Hll_record', 'latitude, longitude, user_hll, post_hll, date_hll')
def strip_item(item, strip: bool):
if item is None:
return
if not strip:
return item
if len(item) > 120:
item = item[:120] + '..'
return item
def get_hll_record(record, strip: bool = None):
"""Concatenate topic info from post columns"""
latitude = record.get('latitude')
longitude = record.get('longitude')
cols = ['user_hll', 'post_hll', 'date_hll']
col_vals = []
for col in cols:
col_val = record.get('user_hll')
col_vals.append(strip_item(record.get(col), strip))
return HllRecord(latitude, longitude, *col_vals)
def hll_series_cardinality(
hll_series: pd.Series, db_conn: DbConn,) -> pd.Series:
"""HLL cardinality estimation from a series of hll sets
Args:
hll_series: Indexed series of hll sets.
"""
# create list of hll values for pSQL
hll_values_list = ",".join(
[f"({ix}::int,'{hll_item}'::hll)"
for ix, hll_item
in enumerate(hll_series.values.tolist())])
# Compilation of SQL query
return_col = hll_series.name
db_query = f"""
SELECT s.ix,
hll_cardinality(s.hll_set)::int AS {return_col}
FROM (
VALUES {hll_values_list}
) s(ix, hll_set)
ORDER BY ix ASC
"""
df = db_conn.query(db_query)
# to merge values back to grouped dataframe,
# first reset index to ascending integers
# matching those of the returned df;
# this will turn series_grouped into a DataFrame;
# the previous index will still exist in column 'index'
df_series = hll_series.reset_index()
# drop hll sets not needed anymore
df_series.drop(columns=[hll_series.name], inplace=True)
# append hll_cardinality counts
# using matching ascending integer indexes
df_series.loc[df.index, return_col] = df[return_col].values
# set index back to original multiindex
df_series.set_index(hll_series.index.names, inplace=True)
# return as series
return df_series[return_col].astype(np.int64)
def union_hll_series(
hll_series: pd.Series, db_conn: DbConn, cardinality: bool = True) -> pd.Series:
"""HLL Union and (optional) cardinality estimation from series of hll sets
based on group by composite index.
Args:
hll_series: Indexed series (bins) of hll sets.
cardinality: If True, returns cardinality (counts). Otherwise,
the unioned hll set will be returned.
The method will combine all groups of hll sets first,
in a single SQL command. Union of hll hll-sets belonging
to the same group (bin) and (optionally) returning the cardinality
(the estimated count) per group will be done in postgres.
By utilizing Postgres´ GROUP BY (instead of, e.g. doing
the group with numpy), it is possible to reduce the number
of SQL calls to a single run, which saves overhead
(establishing the db connection, initializing the SQL query
etc.). Also note that ascending integers are used for groups,
instead of their full original bin-ids, which also reduces
transfer time.
cardinality = True should be used when calculating counts in
a single pass.
cardinality = False should be used when incrementally union
of hll sets is required, e.g. due to size of input data.
In the last run, set to cardinality = True.
"""
# group all hll-sets per index (bin-id)
series_grouped = hll_series.groupby(
hll_series.index).apply(list)
# From grouped hll-sets,
# construct a single SQL Value list;
# if the following nested list comprehension
# doesn't make sense to you, have a look at
# spapas.github.io/2016/04/27/python-nested-list-comprehensions/
# with a decription on how to 'unnest'
# nested list comprehensions to regular for-loops
hll_values_list = ",".join(
[f"({ix}::int,'{hll_item}'::hll)"
for ix, hll_items
in enumerate(series_grouped.values.tolist())
for hll_item in hll_items])
# Compilation of SQL query,
# depending on whether to return the cardinality
# of unioned hll or the unioned hll
return_col = "hll_union"
hll_calc_pre = ""
hll_calc_tail = "AS hll_union"
if cardinality:
# add sql syntax for cardinality
# estimation
# (get count distinct from hll)
return_col = "hll_cardinality"
hll_calc_pre = "hll_cardinality("
hll_calc_tail = ")::int"
db_query = f"""
SELECT sq.{return_col} FROM (
SELECT s.group_ix,
{hll_calc_pre}
hll_union_agg(s.hll_set)
{hll_calc_tail}
FROM (
VALUES {hll_values_list}
) s(group_ix, hll_set)
GROUP BY group_ix
ORDER BY group_ix ASC) sq
"""
df = db_conn.query(db_query)
# to merge values back to grouped dataframe,
# first reset index to ascending integers
# matching those of the returned df;
# this will turn series_grouped into a DataFrame;
# the previous index will still exist in column 'index'
df_grouped = series_grouped.reset_index()
# drop hll sets not needed anymore
df_grouped.drop(columns=[hll_series.name], inplace=True)
# append hll_cardinality counts
# using matching ascending integer indexes
df_grouped.loc[df.index, return_col] = df[return_col].values
# set index back to original bin-ids
df_grouped.set_index(hll_series.index.names, inplace=True)
series = df_grouped[return_col]
if cardinality:
return series.astype(np.int64)
return series
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment