Source code for mhealthdata.utils

#!/usr/bin/env python
# -*- coding: utf8 -*-

import numpy as np
import pandas as pd
import pylab as plt
import itertools
import datetime
import calendar
import pytz
import re


[docs] def sleep_stage_dict(mode="decode"): """ Gets dictionary to encode/decode sleep stage. Parameters ---------- mode : {"decode", "encode"}, default "decode" If "decode", return dict num -> str If "encode", return dict str -> num Returns ------- dict Dictionary for decoding/encoding sleep stages """ assert mode in ["decode", "encode"] d = { "no_sleep": 0, "wake": 1, "awake": 1, "unknown": 2, "no_stage": 2, "rem": 3, "restless": 4, "light": 5, "asleep": 6, "deep": 7, } if mode == "decode": d = {val: key for key, val in d.items()} return d
[docs] def find_columns_by_key(df, keys): """ Finds all DataFrame column names containing any of the keys. Parameters ---------- df : DataFrame DataFrame keys : array_like List of keys of `str` type Returns ------- list List of column names """ keys = keys if keys is not None else [] keys = keys if isinstance(keys, list) else [keys] col = [c for c in df.columns if any([k.lower() in c.lower() for k in keys])] return col
[docs] def find_intervals(x, tol=0, nmin=None, sort=False): """ Finds continuous positive intervals in 1d-array. Parameters ---------- x : ndarray 1D array of non-negative numeric values tol : int, default 0 Gap duration tolerance nmin : int or None, default None Minimal length of intervals sort : bool, default False If False - keep interval order by index in the array If True - sort descending by interval duration Returns ------- ndarray 2D array - N intervals x 2 indices (start, end) """ assert x.ndim == 1 and not any(x < 0) x_ = fill_gaps(x, tol) x_ = (x_ > 0).astype(int) pad = np.zeros((1)) x_ = np.concatenate([pad, x_, pad]) x_ = np.diff(x_) idx = np.arange(len(x_)) i0 = idx[x_ == 1] i1 = idx[x_ == -1] idx = np.stack([i0,i1]).T if nmin is not None: duration = np.diff(idx, axis=-1).flatten() idx = idx[duration >= nmin] if sort: duration = np.diff(idx, axis=-1).flatten() idx = idx[np.argsort(duration)[::-1]] return idx
[docs] def fill_gaps(x, gap=None, fill=1): """ Fills zero gaps in 1d-array. Parameters ---------- x : ndarray 1D array of non-negative numeric values gap : int or None, default None Max gap duration (if None - fill all gaps) fill : float, default 1 Value to fill zeros Returns ------- ndarray 1D array - arrray with all gaps <= gap filled """ assert x.ndim == 1 and not any(x < 0) x_ = np.copy(x) if gap is None: x_[~(x>0)] = fill elif gap > 0: x__ = (np.nan_to_num(x) == 0).astype(int) idx = find_intervals(x__) duration = np.diff(idx, axis=-1).flatten() idx = idx[duration <= gap] for i0, i1 in idx: x_[i0:i1] = fill return x_
[docs] def unique_sorted(x, return_dict=False): """ Sorts unique values in descending order. Parameters ---------- x : ndarray 1D array of numeric values return_dict : bool, default False If False - return value, count arrays If True - return dict value -> count Returns ------- value : ndarray, optional Unique values array (if return_dict is False) count : ndarray, optional Unique values counts (if return_dict is False) dict : dict, optional Dictionary unique value -> count (if return_dict is True) """ x_ = x if isinstance(x, list): if isinstance(x[0], list): x_ = list(itertools.chain(*x)) x_ = np.array(x_) else: x_ = np.array(x) x_ = x_[np.isfinite(x_)] if np.issubdtype(x_.dtype, float) else x_ value, count = np.unique(x_, return_counts=True) idx = np.argsort(count)[::-1] value = value[idx] count = count[idx] if return_dict: return dict(zip(value, count)) return value, count
[docs] def columns_to_datetime(df, tstart, tend=None, tz=None): """ Converts DataFrame date/time columns to pandas.Timestamp. Parameters ---------- df : DataFrame DataFrame columns : array_like List of columns to convert into datetime tz_col : str, optional Time zone column Returns ------- DataFrame DataFrame with selected columns converted to pandas.Timestamp """ def localize(row): fmt = "%Y-%m-%d %H:%M:%S" t = row[col].tz_localize("UTC").tz_convert(row[tz_col]) t = pd.to_datetime(t.strftime(fmt), format=fmt) return t columns = find_columns_by_key(df, tstart) + find_columns_by_key(df, tend) tz_col = find_columns_by_key(df, tz) tz_col = tz_col[0] if len(tz_col) > 0 else None # if tz_col is not None: # print("TZ", df[tz_col]) for col in columns: if np.issubdtype(df[col].dtype, np.datetime64): continue if np.issubdtype(df[col].dtype, np.integer): df[col] = pd.to_datetime(df[col], unit="ms").dt.tz_localize(None) else: sample = df[col].values[0] fmt = "%Y-%m-%d %H:%M:%S.%f" if "-" in sample else "%m/%d/%y %H:%M:%S" if len(sample) > 5 and sample[-2:] in ["AM", "PM"]: fmt = f"{fmt[:8]} %I:%M:%S %p" try: df[col] = pd.to_datetime(df[col], format=fmt).dt.tz_localize(None) except: df[col] = pd.to_datetime(df[col]).dt.tz_localize(None) if tz_col is not None: df[col] = df.apply(localize, axis=1) return df
[docs] def from_ordinal(date, fmt="%Y-%m-%d"): """ Converts ordinal day(s) to date(s), where day 1 = Jan 1st, 1 AD. Parameters ---------- date : array_like or int Day(s) of type `int` (ordinal) fmt : str, default "%Y-%m-%d" Output date format Returns ------- ndarray or str Date(s) """ if isinstance(date, (list, np.ndarray)): date = np.array([from_ordinal(d) for d in date]) elif not isinstance(date, str): date = datetime.datetime.fromordinal(date).strftime(fmt) return date
[docs] def to_ordinal(date): """ Converts date(s) to ordinal day(s), where day 1 = Jan 1st, 1 AD. Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- ndarray or int Ordinal day(s) """ if isinstance(date, (list, np.ndarray)): date = np.array(date) if np.issubdtype(date.dtype, str): try: date = pd.to_datetime(pd.Series(date)) date = date.apply(pd.Timestamp.toordinal).values except: date = np.array([to_ordinal(d) for d in date]) elif isinstance(date, str): date = datetime.datetime.strptime(date[:10], "%Y-%m-%d").toordinal() return date
[docs] def to_ordinal_day(date): """ Converts date(s) to ordinal day(s), where day 1 = Jan 1st, 1 AD. Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- ndarray or int Ordinal day(s) """ return to_ordinal(date)
[docs] def to_ordinal_week(date): """ Converts date(s) to ordinal week(s), where week 1 = Jan 1st-7th, 1 AD. Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- ndarray or int Ordinal week(s) """ return (to_ordinal(date) + 6) // 7
[docs] def to_ordinal_month(date): """ Converts date(s) to ordinal month(s), where month 1 = Jan 1st-31st, 1 AD. Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- ndarray or int Ordinal month(s) """ if isinstance(date, (list, np.ndarray)): month = np.array([ordinal_month(d) for d in date]) else: if isinstance(date, str): date = datetime.datetime.strptime(date[:10], "%Y-%m-%d") else: date = datetime.datetime.fromordinal(date) month = date.month + 12 * (date.year - 1) return month
[docs] def to_ordinal_year(date): """ Converts date(s) to ordinal year(s). Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- ndarray or int Ordinal year(s) """ if isinstance(date, (list, np.ndarray)): year = np.array([ordinal_year(d) for d in date]) elif isinstance(date, str): year = datetime.datetime.strptime(date[:10], "%Y-%m-%d").year else: year = datetime.datetime.fromordinal(date).year return year
[docs] def to_year_month_day(date): """ Converts date(s) to (arrays of) year, month, day. Parameters ---------- date : array_like or str Date(s) of type `str` Returns ------- year : ndarray or int Year(s) month : ndarray or int Month(s) of the year day : ndarray or int Day(s) of the month """ if isinstance(date, (list, np.ndarray)): date = np.array([to_year_month_day(d) for d in date]).T else: if not isinstance(date, str): date = datetime.datetime.fromordinal(date) date = date.strftime("%Y-%m-%d") date = np.array(date.split("-")).astype(int) year, month, day = list(date) return year, month, day
[docs] def to_range(date, pad_to_full_week=True): """ Encloses date(s) into a continuos range of ordinal dates. Parameters ---------- date : array_like List of dates of type `int` (ordinal) or `str` pad_to_full_week : bool, default True If True - pad range to full weeks so that its length % 7 == 0 Returns ------- ndarray Continuous range of ordinal days """ try: idate = to_ordinal(date) i0 = idate.min() i1 = idate.max() if pad_to_full_week: i0 = i0 - to_weekdayiso(i0) + 1 i1 = i1 - to_weekdayiso(i1) + 7 idate = np.arange(i0, i1 + 1) except ValueError: idate = np.array([]) return idate
[docs] def to_weekdayiso(date): """ Converts date(s) to int day of the week (1 - Monday, 7 - Sunday). Parameters ---------- date : array_like or str or int Date(s) Returns ------- ndarray or int Day(s) of the week """ idate = to_ordinal(date) weekday = (idate + 6) % 7 + 1 return weekday
[docs] def to_weekdayiso_name(date): """ Converts date(s) to name(s) of day of the week (Monday - Sunday). Parameters ---------- date : array_like or str or int Date(s) Returns ------- ndarray or str Day(s) of the week """ d = list(calendar.day_name) d = dict(zip(np.arange(len(d)), np.array(d))) i = (to_ordinal(date) - 1) % 7 w = np.vectorize(d.get)(i) if isinstance(i, np.ndarray) else d[i] return w
[docs] def to_weekdayiso_abbr(date): """ Converts date(s) to name(s) of day of the week (Mon - Sun). Parameters ---------- date : array_like or str or int Date(s) Returns ------- ndarray or str Day(s) of the week """ d = list(calendar.day_abbr) d = dict(zip(np.arange(len(d)), np.array(d))) i = (to_ordinal(date) - 1) % 7 w = np.vectorize(d.get)(i) if isinstance(i, np.ndarray) else d[i] return w
[docs] def to_month_name(date): """ Converts date(s) to name(s) of month (January - December). Parameters ---------- date : array_like or str or int Date(s) Returns ------- ndarray or str Month(s) """ d = list(calendar.month_name) d = dict(zip(np.arange(len(d)), np.array(d))) i = (to_ordinal_month(date) - 1) % 12 + 1 m = np.vectorize(d.get)(i) if isinstance(i, np.ndarray) else d[i] return m
[docs] def to_month_abbr(date): """ Converts date(s) to name(s) of month (Jan - Dec). Parameters ---------- date : array_like or str or int Date(s) Returns ------- ndarray or str Month(s) """ d = list(calendar.month_abbr) d = dict(zip(np.arange(len(d)), np.array(d))) i = (to_ordinal_month(date) - 1) % 12 + 1 m = np.vectorize(d.get)(i) if isinstance(i, np.ndarray) else d[i] return m
[docs] def timezone_txt_to_minutes(tz): """ Converts timezone name to minutes relative to UTC Parameters ---------- tz : str Timezone, e.g. "Europe/Madrid" or "UTC+0100" Returns ------- int Minutes """ if isinstance(tz, (list, np.ndarray)): dt = np.array([timezone_txt_to_minutes(t) for t in tz]).T else: fmt = "%Y-%m-%d %H:%M:%S" t0 = datetime.datetime.strptime("2000-01-01 00:00:00+0000", fmt + "%z") try: try: t1 = t0.astimezone(pytz.timezone(tz)).strftime(fmt) except: t1 = t0.astimezone(pytz.timezone("UTC")).strftime(fmt) t0 = "2000-01-01 00:00:00" + re.sub("[a-zA-ZÀ-ž]", "", tz) t0 = datetime.datetime.strptime(t0, fmt + "%z") t0 = t0.astimezone(pytz.timezone("UTC")).strftime(fmt) t1 = datetime.datetime.strptime(t1, fmt) t0 = datetime.datetime.strptime(t0, fmt) dt = (t1 - t0).total_seconds() // 60 except: dt = np.nan return np.round(dt)
[docs] def xticks_hours(dt=1, mode="24H", ax=None, **kwargs): """ Matplotlib xticks as hours, assuming xlim is (0,1440) [min/day]. Parameters ---------- dt : int, default 1 stride, hours mode : {"24H", "12H"}, default "24H" Time format ax : matplotlib.pyplot.Axes object, default None Axes for plotting **kwargs Keyword arguments Returns ------- ax : matplotlib.pyplot.Axes object Axes for plotting """ if ax is None: ax = plt.gca() t = np.arange(25)[::dt] h = t.astype(str) if mode.upper() == "12H": h = [datetime.datetime.strptime(f"{t_}", "%H").strftime('%I %p').upper() for t_ in t % 24] ax.set_xticks(t * 60, h, **kwargs) return ax
[docs] def xticks_days(x, ax=None, **kwargs): """ Matplotlib xticks as days, assuming xlim is (0, 1440 x N days) [min]. Parameters ---------- x : array_like Data, to infer number of days ax : matplotlib.pyplot.Axes object, default None Axes for plotting **kwargs Keyword arguments Returns ------- ax : matplotlib.pyplot.Axes object Axes for plotting """ if ax is None: ax = plt.gca() n = len(x) // 1440 t = np.arange(n + 1) ax.set_xticks(t * 1440, t, **kwargs) return ax
[docs] def xticks_dates(idate, mode="day", ax=None, **kwargs): """ Matplotlib xticks as date(s). Parameters ---------- idate : array_like Array or list of dates of type `int` (day 1 = Jan 1st, 1 AD) mode : {"day", "week", "fortnight", "month"}, default "day" Date spacing ax : matplotlib.pyplot.Axes object, default None Axes for plotting **kwargs Keyword arguments Returns ------- ax : matplotlib.pyplot.Axes object Axes for plotting """ assert mode in ["day", "week", "fortnight", "month", "year", "día", "semana", "quincena", "mes", "año"] if ax is None: ax = plt.gca() t = np.copy(idate) if mode in ["año", "year"]: _, month, day = to_year_month_day(t) t = t[(day == 1) & (month == 1)] if mode in ["mes", "month"]: day = to_year_month_day(t)[-1] t = t[day == 1] if mode in ["quincena", "fortnight"]: day = to_year_month_day(t)[-1] t = t[(day == 1) | (day == 15)] if mode in ["semana", "week"]: day = to_weekdayiso(t) t = t[day == 1] date = from_ordinal(t) if "rotation" not in kwargs: kwargs["rotation"] = 45 if "ha" not in kwargs: kwargs["ha"] = "right" ax.set_xticks(t, date, **kwargs) return ax
[docs] def smoother(x, window, pad=np.nan, epsilon=1e-10, roll=False): """ Smooth data using running Hann window. Parameters ---------- x : ndarray Array of equispaced time series data window : int, Window size pad : float, default np.nan Value to pad x if roll is False epsilon : float, default 1e-10 Cutoff to account values as non-zeros roll : bool, default False If True, pad with rolled x, else pad with zeros Returns ------- ndarray Smoothed time series data """ assert window < x.size hann = np.hanning(window) x_ = x.flatten() n = int(window) // 2 pad0 = x_[-n:] if roll else np.ones((n)) * pad pad1 = x_[:n] if roll else np.ones((n)) * pad x_ = np.concatenate([pad0, x_, pad1]) if np.all(np.isfinite(x)): w = np.correlate(x_, hann, mode='same') / np.sum(hann) w = w[n:-n].reshape(x.shape) else: w = np.lib.stride_tricks.sliding_window_view(x_, window) s = np.nansum((np.isfinite(w) * hann), 1) w = np.nansum(w * hann, 1) / s w = w[:x.size].reshape(x.shape) w[np.abs(w) < epsilon] = 0 return w
[docs] def window_avg_std(t, x, window=14, smooth=0): """ Calculates running window average and std. Parameters ---------- t : ndarray 1D array of time indices of values x : ndarray 1D array of values window : int, default 14 Window size smooth : int, default 0 Window size to smooth average and std Returns ------- t_avg : ndarray 1D array of time indices of avg / std values x_avg : ndarray 1D array of running window average values x_std : ndarray 1D array of running window std values """ n = t[-1] - t[0] + 1 + 2 * window pad = np.zeros((window)) * np.nan x_ = np.concatenate([pad, x, pad]) t_avg = np.arange(n) + t[0] - window x_avg = np.zeros((n)) * np.nan x_std = np.zeros((n)) * np.nan for i in range(n): x_avg[i] = np.nanmean(x_[i-window//2:i+window//2+1]) x_std[i] = np.nanstd(x_[i-window//2:i+window//2+1]) if smooth: x_avg = smoother(x_avg, window=smooth)[window//2:][:n-2*window] x_std = smoother(x_std, window=smooth)[window//2:][:n-2*window] t_avg = t_avg[window:][:n-2*window] x_avg = x_avg[window:][:n-2*window] x_std = x_std[window:][:n-2*window] return t_avg, x_avg, x_std
def _remove_nonlocal_peaks(x, idx, window): """ Removes redundant and non-local (edge) peaks. Parameters ---------- x : ndarray Array of equispaced time series data idx : ndarray Indices of peaks window : int, Window size Returns ------- ndarray Indices of true local peaks """ def is_max(val, arr): return val == np.nanmax(arr) and val > np.nanmin(arr) idxs = [] x_ = x.flatten() n = len(x_) for i in np.unique(idx): i0 = max(0, i - window // 2) i1 = min(n, i + window // 2) if len(idxs) > 0 and idxs[-1] > i - window // 2: continue if (i > 0) & (i < n) & is_max(x_[i], x_[i0:i+1]) & is_max(x_[i], x_[i:i1]): idxs.append(i) idxs = np.array(idxs).astype(int) return idxs
[docs] def series_peaks(x, window, smooth=False): """ Finds local peaks in array of time series. Notes ----- - NaN, Inf values NOT allowed Parameters ---------- x : ndarray Array of equispaced time series data window : int Window size smooth : bool, default False If True - apply Hann window averaging smooth Returns ------- idx : ndarray 1D array of peak coordinate(s) score : ndarray 1D array of peak height(s) """ s = smoother(x, window) if smooth else x w = np.lib.stride_tricks.sliding_window_view(s, window)[::window//2] m, n = w.shape idx = np.argmax(w, axis=1) idx = idx + np.arange(len(w)) * (window // 2) idx = idx[(idx >= 0) & (idx < len(x))] idx = _remove_nonlocal_peaks(s, idx, window) score = s[idx] return idx, score
[docs] def histogram_peaks(x, bins=100, smooth=False): """ Finds local peaks in histogram of values. Parameters ---------- x : ndarray Array of data values bins : int or ndarray, default 100 Histogram bins or number of bins smooth : bool, default False If True - apply Hann window averaging smooth Returns ------- idx : ndarray 1D array of peak coordinate(s) score : ndarray 1D array of peak height(s) """ if isinstance(bins, int): x1 = np.nanmin(x) x2 = np.nanmax(x) bins = np.linspace(x1, x2, bins) dx = np.diff(bins)[0] x_std = np.nanstd(x) window = int(x_std / dx) s = np.histogram(x, bins)[0] idx, score = series_peaks(s, window, smooth) idx = bins[idx] return idx, score
[docs] def window_sigmoid(n, m=None): """ Generates sigmoid window (-1 to 1) of length n and width m. Parameters ---------- n : int Window length m : int or None, default None Window width, if None, m = int(max(1, 0.2 * n)) Returns ------- ndarray 1D array of sigmoid window """ n = int(n) m = int(max(1, 0.2*n)) if m is None else int(m) t = np.arange(n) - (n-1) / 2 w = 1 - 2 / (1 + np.exp(t / m)) return np.round(w,2)
[docs] def window_boxcar(n, m=None): """ Generates boxcar window (-1 to 1) of length n and width m. Parameters ---------- n : int Window length m : int or None, default None Window width, if None, m = int(max(1, 0.2 * n)) Returns ------- ndarray 1D array of boxcar window """ n = int(n) m = int(max(1, 0.2*n)) if m is None else int(m) l = (n - m + (n + m) % 2) // 2 w = -1 * np.ones((n)) w[l:-l] = 1 return w
[docs] def calc_covariance(x, window): """ Calculates covariance with running window. Parameters ---------- x : ndarray 1D array of data points window : ndarray 1D window array Returns ------- ndarray 1D array of covariance """ n = len(window) pad = np.zeros((n//2)) * np.nan x_ = np.concatenate([pad, x, pad]) x_ = np.lib.stride_tricks.sliding_window_view(x_, n).T x_ = (x_ - np.nanmean(x_, 0)) cov = np.nanmean(x_[window > 0], 0) cov = cov - np.nanmean(x_[window < 0], 0) n0 = np.sum(np.isfinite(x_[window < 0]),0) n1 = np.sum(np.isfinite(x_[window > 0]),0) mask = (n0 >= np.sum(window < 0) * 2 / 3) & (n1 >= np.sum(window > 0) * 2 / 3) cov[~mask] = np.nan cov = cov[:x.size] return cov
[docs] def calc_interpolation(x): """ Linearly interpolates data array. Parameters ---------- x : ndarray 1D array of data points Returns ------- ndarray 1D array of interpolated data """ x_ = np.zeros_like(x) mask = np.isfinite(x) if np.any(mask): x0 = x[mask][0] x1 = x[mask][-1] t = np.arange(len(x)) x_ = np.interp(t, t[mask], x[mask], x0, x1) return x_
[docs] def anomaly_detection(x, wlen=10, wtype="step", cutoff=None): """ Detects anomalies using running step or boxcar window. Parameters ---------- x : ndarray 1D array of data points wlen : int, default 10 Window length wtype : {"step", "box"}, default "box" Window type cutoff : float or None, default None Cuoff scale for std Returns ------- ndarray 1D array of anomaly indices """ if x.size < wlen: idx = np.array([]) else: wlen = int(wlen) wfunc = {"step": window_sigmoid, "box": window_boxcar} window = wfunc[wtype](wlen) cov = calc_covariance(x, window) cov = calc_interpolation(cov) idx = series_peaks(cov, int(wlen * 2 / 3)) std = np.nanstd(cov) if cutoff is not None: idx = idx[cov[idx] >= std * cutoff] return idx
[docs] def calc_cadence(steps): """ Calculates walking and running cadence (steps/min). Parameters ---------- steps : ndarray Array of equispaced time series data Returns ------- walk : float Walking cadence run : float Running cadence """ # histogram of steps bins = np.linspace(0,255,256) hist = np.histogram(steps, bins)[0] hist[0] = 0 hist = np.log(hist+1) # log-linear fit in the scale-invariant range 20-60 steps/min x, y = bins[:-1], hist mask = (x >= 20) & (x <= 60) p = np.polyfit(x[mask], y[mask], 1) pred = np.polyval(p, x) pred[pred < 0] = 0 # subtract scale-invariant trend to contrast peaks diff = hist - pred diff[diff < 0] = 0 # find local maxima idx = series_peaks(diff, window=40) walk, run = np.nan, np.nan if len(idx): walk = idx[np.argmin(np.abs(idx - 110))] run = idx[np.argmin(np.abs(idx - 160))] run = run if run > walk else np.nan walk = walk if walk > 60 else np.nan return walk, run
[docs] def impute_bpm(bpm, tol=15): """ Impputes short bpm gaps (defaukt 15 min) by linear interpolation. Some devices output bpm every 5 or 10 min. This function imputes short gaps to make them compatible with bpm output every 1 min. Parameters ---------- bpm : ndarray Array of equispaced time series data (N days x 1440 min) tol : int, default 15 Max length of imputted intervals [minutes] Returns ------- ndarray Imputed bpm of the same shape as the input bpm """ # flatten to 1D and make placeholder for imputted bpm x = bpm.flatten() ibpm = np.copy(x) # Collect intervals id that should not be interpolated (longer then tol) idx = find_intervals(x <= 0, tol=0, nmin=tol) # Mask non-zero bpm as reference points for interpolation mask = x > 0 # make interpolation, then turn too long gaps back to zero (using idx) if np.any(mask): t = np.arange(len(x)) ibpm = np.interp(t, t[mask], x[mask]) for i0, i1, in idx: ibpm[i0:i1] = 0 # reshape flattened 1D array back to the shape of the input array ibpm = ibpm.reshape(bpm.shape) return ibpm
[docs] def defragment_sleep(sleep, tol=60): """ Defragments sleep intervals. Some samples have fragmented sleep records. This function imputes short gaps (defalut 60 min) to merge a series of fragmented sleep bouts into a countinous sleep interval. Parameters ---------- sleep : ndarray Array of equispaced time series data (N days x 1440 min) tol : int, default 60 Max length of imputted intervals [minutes] Returns ------- ndarray Defragmented sleep of the same shape as the input sleep """ # flatten to 1D and make placeholder for imputted sleep x = sleep.flatten() dsleep = np.zeros_like(x) # Collect continuous sleep intervals indices and impute with ones idxs = find_intervals(x > 0, tol=tol) for i0, i1 in idxs: dsleep[i0:i1] = 1 # reshape flattened 1D array back to the shape of the input array dsleep = dsleep.reshape(sleep.shape) return dsleep
import types __all__ = [name for name, thing in globals().items() if not (name.startswith('_') or isinstance(thing, types.ModuleType))] del types