Source code for mhealthdata.df2numpy

#!/usr/bin/env python
# -*- coding: utf8 -*-

import numpy as np
import pandas as pd
from mhealthdata.utils import *


def _get_values(df, column):
    """
    Get record value arrays from loaded datframes.
    
    Parameters
    ----------
    df : DataFrame
        DataFrame of loaded health data for "steps", "sleep", "bpm", etc.
    column : str
        Values column name

    Returns
    -------
    ndarray
        1D array of record values, e.g. sleep stage, or stepcounts, etc.

    """
    values = df[column].values
    try:
        values = values.astype(float)
    except:
        s = sleep_stage_dict(mode="encode")
        s["None"] = 0
        values = values.astype(str)
        values = np.vectorize(s.get)(values)
        values = values.astype(float)
    return values


def _get_time(df, keys):
    """
    Get record timestamps from loaded datframes.
    
    Parameters
    ----------
    df : DataFrame
        DataFrames of loaded health data for "steps", "bpm", etc.
    keys : list
        List of keywords to seek for a column in a DataFrame

    Returns
    -------
    ndarray
        1D array of type datetime64

    """
    col = find_columns_by_key(df, keys)
    t = df[col[0]] if len(col) > 0 else None
    return t


def _get_idate_imin(t):
    """
    Get record ordinal days and minutes from loaded datframes.
    
    Parameters
    ----------
    t : ndarray
        1D array of datetime64

    Returns
    -------
    idate : ndarray
        1D array of ordinal days (January 1 of year 1 is day 1)
    imin : ndarray
        1D array of minutes since midnight

    """
    idate = t.apply(pd.Timestamp.toordinal).values.astype(int)
    imin = (60 * t.dt.hour + t.dt.minute).values.astype(int)
    return idate, imin


def _calc_duration(t0, t1):
    """
    Private method to calc record durations from start and end timestamps.
    
    Parameters
    ----------
    t0 : ndarray
        1D array of start datetime64
    t1 : ndarray
        1D array of end datetime64; if None, set t1 = t0 + 1 [min]

    Returns
    -------
    ndarray
        1D array record durations [minutes]

    """
    dt = np.ones((len(t0))).astype(int)
    if t1 is not None:
        dt = (t1 - t0).astype("timedelta64[s]").values
        dt = (dt / 60).astype(int)
        dt[dt < 1] = 1
    return dt


[docs] def to_1darray(df, column, tstart, tend=None, tz=None, idate=None, x=None, uint8=False): """ Get value-per-day health data ("weight", "rhr", or "hrv") as 1D array. Parameters ---------- df : DataFrame DataFrames of health data records - "steps", "bpm", etc. column : str Name of values column tstart : list List of columns to seek for start date/time tend : list or None, default None List of columns to seek for end date/time tz : list or None, default None List of columns to seek for date/time time zone idate : ndarray or None, default None 1D array of continuous range of ordinal days x : ndarray or None, default None Initialized 1D array; if None, will be initialized with np.zeros() uint8 : bool, default False Flag to cast all health data to np.uint8 to save disk space Returns ------- x : ndarray 1D array of values of size (N days). idate : ndarray 1D array of record ordinal days. """ df = columns_to_datetime(df, tstart, tend, tz) val = _get_values(df, column) t0 = _get_time(df, tstart) iday, imin = _get_idate_imin(t0) idate = idate if idate is not None else to_range(iday) n = len(idate) x = np.zeros((n)) if x is None else x idx = iday - idate[0] mask = (idx >= 0) & (idx < n) idx = idx[mask] val = val[mask] for k in range(len(val)): i = idx[k] x[i] = val[k] if uint8: x = np.clip(x,0,255).astype(np.uint8) return x, idate
[docs] def to_2darray(df, column, tstart, tend=None, tz=None, dt=None, idate=None, x=None, uint8=False, mode="rate"): """ Get value-per-minute health data ("steps", "sleep", or "bpm") as 2D array. Parameters ---------- df : DataFrame DataFrames of health data records - "steps", "bpm", etc. column : str Name of values column tstart : list List of columns to seek for start date/time tend : list or None, default None List of columns to seek for end date/time tz : list or None, default None List of columns to seek for date/time time zone dt : str, ndarray, or None, default None Column name or 1D array of record durations [seconds]. idate : ndarray or None, default None 1D array of continuous range of ordinal days x : ndarray or None, default None Initialized 1D array; if None, will be initialized with np.zeros() uint8 : bool, default False Flag to cast all health data to np.uint8 to save disk space mode : {"rate", "count"}, default "rate" Way to treat values of records longer than 1 minute: if "rate" - duplicate values, if "count" - split evenly between minutes Returns ------- x : ndarray 2D array of values of size (N days x 1440 minutes). idate : ndarray 1D array of record ordinal days. """ assert mode in ["rate", "count"] df = columns_to_datetime(df, tstart, tend, tz) val = _get_values(df, column) t0, t1 = [_get_time(df, k) for k in [tstart, tend]] iday, imin = _get_idate_imin(t0) dt = df[dt].values if isinstance(dt, str) else dt # seconds dt = dt / 60 if dt is not None else _calc_duration(t0, t1) # minutes dt = np.ceil(dt).astype(int) idate = idate if idate is not None else to_range(iday) n = 1440 * len(idate) x = np.zeros((n)) if x is None else x.flatten() idx = 1440 * (iday - idate[0]) + imin mask = (idx >= 0) & (idx < n) idx = idx[mask] val = val[mask] for k in range(len(val)): i = idx[k] j = i + dt[k] if mode == "count": x[i:j] = val[k] / dt[k] else: x[i:j] = val[k] x = x.reshape(-1,1440) if uint8: x = np.clip(x,0,255).astype(np.uint8) return x, idate
[docs] def combine_arrays(*args, labels=None, mode="valid"): """ Convert arrays of e.g. steps, bpm, sleep into the same length and combine in a dictionary Parameters ---------- *args Tuples of (data, date) e.g. as output by to_2darray() labels : list or None, default None List of keyword labels for data mode : {"valid", "full"}, default "valid" If "valid" all arrys shrinked to min overlapping range, else expanded Returns ------- dict Dictionary with numpy arrays of the same length """ assert labels is None or len(labels) == len(args) assert mode in ["full", "valid"] data = {} if mode == "valid": t0 = max(a[1][0] for a in args) t1 = min(a[1][-1] for a in args) + 1 data["idate"] = np.arange(t0,t1) for i in range(len(args)): label = f"x{i}" if labels is None else labels[i] mask = (args[i][1] >= t0) & (args[i][1] < t1) data[label] = args[i][0][mask] if mode == "full": t0 = min(a[1][0] for a in args) t1 = max(a[1][-1] for a in args) + 1 data["idate"] = np.arange(t0,t1) for i in range(len(args)): x, t = args[i] label = f"x{i}" if labels is None else labels[i] val = np.zeros((t1-t0,1440), x.dtype) i0 = t[0] - t0 i1 = i0 + len(t) val[i0:i1] = x data[label] = val return data