Source code for mhealthdata.timezone

#!/usr/bin/env python
# -*- coding: utf8 -*-

import numpy as np
from mhealthdata.utils import *


def _get_tzlist(dt):
    """
    Get array of possible timezone offsets

    Parameters
    ----------
    dt : int, default 60
        Stride to find best match timezone offset, [minutes]

    Returns
    -------
    ndarray
        1D array of possible timezone offsets

    """
    n = 1440 // dt
    tzlist = (np.arange(n) - n // 2) * dt
    return tzlist
    

def _get_slice(x, y, i, nday=3, backwards=True):
    """
    Get window slice

    Parameters
    ----------
    x : ndarray
        2D array of size N days x 1440 minutes (padded sleep > 0)
    y : ndarray
        2D array of size N days x 1440 minutes (padded steps + bpm, 0 replaced by np.nan)
    i : int
        Day index
    nday : int, default 3
        Window to find best match timezone offset, [days]
    backwards : bool, default True
        If True include previous day, otherwise include following days

    Returns
    -------
    ndarrays
        1D arrays of window slices

    """
    i0 = i - nday * backwards
    i1 = i0 + nday + 1
    return x[i0:i1].flatten(), y[i0:i1].flatten()


def _conv_slice(x, y, t):
    """
    Calculate convolution of window slices

    Parameters
    ----------
    x : ndarray
        2D array of size N days x 1440 minutes (padded sleep > 0)
    y : ndarray
        2D array of size N days x 1440 minutes (padded steps + bpm, 0 replaced by np.nan)
    t : int
        Offset [minutes]

    Returns
    -------
    float
        Convolution (the smaller - the better sleep-to-steps match)

    """
    mask = np.roll(x, -t) > 0
    return np.nanmean(y[mask]) * np.sum(mask)


def _calc_timezone(x, y, i, nday=3, dt=60):
    """
    Day-wise score for each timezone offset

    Parameters
    ----------
    x : ndarray
        2D array of size N days x 1440 minutes (padded sleep > 0)
    y : ndarray
        2D array of size N days x 1440 minutes (padded steps + bpm, 0 replaced by np.nan)
    i : int
        Day index
    nday : int, default 3
        Window to find best match timezone offset, [days]
    dt : int, default 60
        Stride to find best match timezone offset, [minutes]

    Returns
    -------
    ndarray
        1D array of score for each timezone offset

    """
    tzlist = _get_tzlist(dt)
    x_, y_ = _get_slice(x, y, i + nday, nday, backwards=False)
    score = np.array([_conv_slice(x_, y_, t) for t in tzlist])
    x_, y_ = _get_slice(x, y, i + nday, nday, backwards=True)
    score_ = np.array([_conv_slice(x_, y_, t) for t in tzlist])
    score = np.nanmin(np.stack([score, score_]), axis=0)
    score = score - np.nanmin(score)
    score[score > 0.1 * np.nanmax(score)] = np.nan
    mask = score == np.nanmin(score)
    t = tzlist[mask][0] if any(mask) else np.nan
    return score


def _baseline_timezone(tz, nday, dt):
    """
    Find baseline (i.e. long-term) timezone offsets

    Parameters
    ----------
    tz : ndarray
        1D array of length N days with timezone offset [minutes]
    nday : int, default 3
        Window to find best match timezone offset, [days]
    dt : int, default 60
        Stride to find best match timezone offset, [minutes]

    Returns
    -------
    ndarray
        1D array of length N days with baseline offset [minutes]

    """
    bz = np.zeros_like(tz) * np.nan
    tzlist = np.unique(tz[np.isfinite(tz)])
    for t in tzlist:
        idx = find_intervals(tz == t, tol = 4 * nday)
        for i0, i1 in idx:
            if i1 - i0 >= 4 * nday:
                bz[i0:i1] = t
    mask = np.isfinite(bz)
    if any(mask):
        idx = np.arange(len(bz))[mask]
        bz[:idx.min()] = bz[idx.min()]
        bz[idx.max():] = bz[idx.max()]
        idx = find_intervals(np.isnan(bz))
        if any(np.isfinite(bz)):
            for i0, i1 in idx:
                tx, _ = unique_sorted(tz[i0:i1])
                bx = np.zeros((2)) * np.nan
                bx[0] = bz[i0 - 1] if i0 > 0 else bz[i1]
                bx[1] = bz[i1] if i1 < len(bz) - 1 else bz[i0 - 1]
                dx = np.abs(bx - tx[0])
                if any(np.isfinite(dx)):
                    bz[i0:i1] = bx[np.nanargmin(dx)]
                else:
                    bz[i0:i1] = bx[np.isfinite(bx)][0]
    return bz


def _filter_timezone(tz, nday, dt):
    """
    Filter out too short or ambigouos timezone offsets

    Parameters
    ----------
    tz : ndarray
        1D array of length N days with timezone offset [minutes]
    nday : int, default 3
        Window to find best match timezone offset, [days]
    dt : int, default 60
        Stride to find best match timezone offset, [minutes]

    Returns
    -------
    ndarray
        1D array of length N days with timezone offset [minutes]

    """
    tz = tz.astype(float)
    if any(np.isfinite(tz)):
        bz = _baseline_timezone(tz, nday, dt)
        dz = tz - bz
        idx = find_intervals(dz != 0, tol=1)
        for i0, i1 in idx:
            n = max(np.sum(dz[i0:i1] >= 0), np.sum(dz[i0:i1] <= 0))
            if i1 - i0 <= nday or i1 - i0 > n:
                tz[i0:i1] = bz[i0:i1]
            else:
                tz[i0:i1] = unique_sorted(tz[i0:i1])[0][0]
    return tz


def _localize_activity(x, idx, tz):
    """
    Roll activity (steps or bpm) to local time zone

    Parameters
    ----------
    x : ndarray
        2D array of size N days x 1440 minutes
    idx : ndarray
        2D array of size N intervals x 2 indices (start, end)
    tz : ndarray
        1D array of length N intervals with timezone offset, [minutes]

    Returns
    -------
    ndarray
        2D array of size N days x 1440 minutes

    """
    x_ = np.zeros((x.size + 2 * 1440))
    for k, i in enumerate(idx):
        dt = tz[k]
        j = 1440 + i + dt
        x_[j[0]:j[1]] = x[i[0]:i[1]]
    x_ = x_[1440:-1440].reshape(-1,1440)
    return x_


[docs] def find_timezone_mismatch(data, nday=3, dt=60): """ For Fitbit only: Identify timezone by mismatch of sleep, steps, and bpm - Assume "sleep" is in local time - Automatically detect day-wise "steps" & "bpm" offsets to match "sleep" Parameters ---------- data : dict Dictionary of data, should have keys "sleep", "steps", "bpm", each containing array of size N days x 1440 minutes nday : int, default 3 Window to find best match timezone offset, [days] dt : int, default 60 Stride to find best match timezone offset, [minutes] Returns ------- ndarray 1D array of length N days with timezone offset [minutes] """ pad = np.zeros((nday,1440)) x = np.vstack([pad, data["sleep"], pad]) y = np.vstack([pad, data["steps"], pad]) + np.vstack([pad, data["bpm"], pad]) x = (x > 0).astype(float) y[y == 0] = np.nan score = [] for i in range(len(data["steps"])): score.append(_calc_timezone(x, y, i, nday, dt)) score = np.stack(score) tzlist = _get_tzlist(dt) tz = np.zeros(len(score),) * np.nan for k, s in enumerate(score): if any(np.isfinite(s)): tz[k] = tzlist[np.nanargmin(s)] tz[0] = tz[1] tz[-1] = tz[-2] tz = _filter_timezone(tz, nday, dt) return tz
[docs] def fix_timezone_mismatch(data, tz=None): """ Fix timezone offset for data imported from Fitbit - Assume "sleep" is in local time - Automatically detect day-wise "steps" and "bpm" offsets to match "sleep" Parameters ---------- data : dict Dictionary of data, should have keys "sleep", "steps", "bpm" Each key should point to an array of size N days x 1440 minutes tz : ndarray or None, default None 1D array of length N days with timezone offset [minutes] If None, timezone will be detected automatically Returns ------- dict Dictionary of data with "steps" and "bpm" rolled to match "sleep" """ tz = tz if tz is not None else find_timezone_mismatch(data) tz = np.repeat(tz, 1440) mask = np.diff(tz, prepend=tz[0]+1, append=tz[-1]+1) != 0 idxs = find_intervals(data["sleep"].flatten()) for k, (i0,i1) in enumerate(idxs): dt = unique_sorted(tz[i0:i1])[0] dt = int(dt[0]) if len(dt) > 0 else 0 j0 = max(0, i0 - dt * (dt < 0)) j1 = min(len(tz), i1 - dt * (dt > 0)) dts = np.unique(tz[j0:j1]) tz[j0:j1] = dt mask = np.diff(tz, prepend=tz[0]+1, append=tz[-1]+1) != 0 idx = np.arange(len(tz) + 1)[mask] idx = np.stack([idx[:-1], idx[1:]]).T tz = np.array([np.unique(tz[i0:i1])[0] for i0, i1 in idx]).astype(int) data["steps"] = _localize_activity(data["steps"].flatten(), idx, tz) data["bpm"] = _localize_activity(data["bpm"].flatten(), idx, tz) return data