#!/usr/bin/env python
# -*- coding: utf8 -*-
import pathlib
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
import lxml.etree
import lxml
import tqdm
import json
import glob
from mhealthdata.utils import *
from mhealthdata.df2numpy import _get_time, _get_idate_imin
from mhealthdata.df2numpy import to_1darray, to_2darray, combine_arrays
from mhealthdata.timezone import find_timezone_mismatch, fix_timezone_mismatch
import warnings
warnings.filterwarnings('ignore')
[docs]
class DataLoader():
"""
This is the class from which all loaders inherit.
DataLoader subclasses make numpy arrays from basic health sensor data,
all at local time zone loaded from different mobile health apps.
Basic health data are:
- per minute values of "steps", "sleep", and "bpm"
- per day values of "weight", "rhr", and "hrv"
- per user values of "dob", "sex", and "height"
Data can be accessed:
- as pandas DataFrame in self.df dict attribute
- as numpy arrays using get_device_data() or save_device_npz() methods
Methods are device-centric and can be used to get data for any specific device
connected to your health app aggregater, e.g. iPhone, or Apple Watch, or Fitbit wristband
Notes
-----
Other data can be found in data export (e.g. VO2Max), but not processed by DataLoader subclasses.
In case those data needed, the self.category dict attribute should be modified.
Parameters
----------
path : str
Path to unzipped local folder containing health app data
Attributes
----------
df : dict
Dictionary of pandas DataFrames of loaded health data for "steps", "bpm", etc.
categories : dict
Dictionary of health data categories. Keys used to find files. Attributes:
"name" - to rename, "column" - to seek value column in corresponding DataFrame.
userdata : dict
Dictionary of "Date-of-birth", "Biological sex", and "Height".
Other data like country of residence, or phone number, etc. are ignored.
start_keys : list
List of keywords to seek for start timestamp column in a DataFrame.
end_keys : list
List of keywords to seek for end timestamp column in a DataFrame.
tz_keys : list
List of keywords to seek for timezone in a DataFrame (NOT to be applied timestamps).
tz_offset : list
List of keywords to seek for timezone in a DataFrame (to be applied timestamps).
dev_col : list
List of keywords to seek for device id column in a DataFrame.
path : list
Path to unzipped local folder containing health app data.
"""
def __init__(self, path):
self.df = {}
self.categories = {}
self.userdata = {}
self.start_keys = ["start_time", "startTime", "startDate",
"dateTime", "day_time"]
self.end_keys = ["end_time", "endTime", "endDate"]
self.tz_keys = ["time_offset", "HKTimeZone"]
self.tz_offset = ["time_offset"]
self.dev_col = []
self.path = path
@property
def devices_dict(self):
"""
Get dictionary of loaded devices identifiers.
Returns
-------
dict
Dictionary of loaded devices identifiers.
"""
return {"all": ["all"]}
@property
def devices(self):
"""
Get list of loaded devices.
Returns
-------
list
List of loaded devices.
"""
dev = self.devices_dict
return list(dev.keys())
@property
def dataframes(self):
"""
Get list of loaded DataFrames.
Returns
-------
list
List of loaded DataFrames.
"""
return list(self.df.keys())
@property
def all_categories(self):
"""
Get list of all data categories found (not all loaded).
Returns
-------
list
List of all data categories found in provided path.
"""
categories = []
return categories
def _parse_userdata(self):
"""
Private method to retrieve "Date-of-birth", "Biological sex", and "Height".
Other data like country of residence, or phone number, etc. are ignored.
Returns
-------
list
List of loaded userdata keys.
"""
return list(self.userdata.keys())
@staticmethod
def _special_cases(df, category):
"""
Private method to process special cases during loading of data.
Parameters
----------
df : DataFrame
DataFrames of loaded health data for "steps", "bpm", etc.
category : str
Key used to find health data files.
Returns
-------
DataFrame
DataFrame with applied health app-specific fixes.
"""
return df
@staticmethod
def _get_duration(df):
"""
Get record durations from loaded datframes.
Notes
-----
"binning_period" is treated as [minutes]
"stage_duration" and "seconds" are treated as [seconds]
Parameters
----------
df : DataFrame
DataFrames of loaded health data for "steps", "bpm", etc.
Returns
-------
ndarray
1D array record durations [seconds]
"""
dt = None
if "binning_period" in df.columns:
dt = 60 * df["binning_period"].values.astype(int)
elif "seconds" in df.columns:
dt = (df["seconds"].values + 1)
dt = np.round(dt).astype(int)
elif "stage_duration" in df.columns:
dt = (df["stage_duration"].values + 1)
dt = np.round(dt).astype(int)
return dt
@staticmethod
def _get_device_slice(df, uuids, dev_col):
"""
Private method to get DataFrame slice for specified device.
Parameters
----------
df : DataFrame
DataFrames of loaded health data for "steps", "bpm", etc.
uuids : list
List of device identifiers.
dev_col : list
List of keywords to seek for device id column in a DataFrame.
Returns
-------
DataFrame
DataFrame slice matching mask of specified device identifiers.
"""
mask = np.zeros((df.shape[0])).astype(bool)
deviceuuid = find_columns_by_key(df, dev_col)
if len(deviceuuid) > 0:
deviceuuid = df[deviceuuid[0]].values.astype(str)
for uuid in uuids:
mask[deviceuuid == uuid] = True
return df[mask]
def _get_timezone(self, tstart, idate=None):
"""
Search dataframes for timezone for provided ordinal dates.
Parameters
----------
tstart : list
List of columns to seek for start date/time
idate : ndarray or None, default None
1D array of continuous range of ordinal days
Returns
-------
ndarray
1D array of timezone offset from GMT [minutes].
"""
tz_dict = {}
for category in self.categories:
df = self.df[category]
tz_col = find_columns_by_key(df, self.tz_keys)
tz_col = tz_col[0] if len(tz_col) > 0 else None
if tz_col is not None:
t0 = _get_time(df, tstart)
iday, imin = _get_idate_imin(t0)
idate = idate if idate is not None else to_range(iday)
tz = df[tz_col].values.astype(str)
tz = timezone_txt_to_minutes(tz)
for i in idate:
tz_list = tz_dict[i] if i in tz_dict else []
tz_list = tz_list + list(tz[iday == i])
tz_dict[i] = tz_list
tz = np.zeros(len(idate),) * np.nan
for k, i in enumerate(idate):
tz_list = tz_dict[i] if i in tz_dict else []
if len(tz_list) > 0:
t = unique_sorted(tz_list)[0]
tz[k] = t[0] if len(t) > 0 else np.nan
return tz.astype(np.float16)
[docs]
def get_device_data(self, device="all", idate=None, uint8=True):
"""
Get dictionary of per day and per minute ndarrays.
The method is device-centric and can output data for specified device.
Parameters
----------
device : str, default "all"
Device name (sould match any one of self.devices).
date_range : ndarray or None, default None
1D array of continuous range of ordinal days.
If None, automatically get from on min and max dates of "steps".
uint8 : bool, default True
Flag to cast all health data to np.uint8 to save disk space.
Returns
-------
dict
Dictionary of ountput ndarrays.
"""
data = {}
if device not in self.devices:
raise KeyError(f"Wrong device '{device}'. Use 'devices' property to get valid devices.")
for category in self.categories:
df = self.df[category]
if category not in ["weight"] and device not in ["all"]:
uuids = self.devices_dict[device]
df = self._get_device_slice(df, uuids, self.dev_col)
if df.shape[0] > 0:
dt = self._get_duration(df)
name = self.categories[category]["name"]
column = self.categories[category]["column"]
x = data[name] if name in data else None
if name in ["weight", "rhr"]:
x, idate = to_1darray(df, column, self.start_keys, self.end_keys, self.tz_offset, idate, x, uint8)
else:
mode = "count" if name == "steps" else "rate"
x, idate = to_2darray(df, column, self.start_keys, self.end_keys, self.tz_offset, dt, idate, x, uint8, mode)
data[name] = x
data["idate"] = idate
data["tz"] = self._get_timezone(self.start_keys, idate)
return data
[docs]
def save_device_npz(self, output_file, device="all", idate=None, uint8=True):
"""
Save dictionary of per day and per minute ndarrays to npz.
The method is device-centric and can output data for specified device.
Parameters
----------
output_file : str
Path to output .npz file.
device : str, default "all"
Device name (sould match any one of self.devices).
date_range : ndarray or None, default None
1D array of continuous range of ordinal days.
If None, automatically get from on min and max dates of "steps".
uint8 : bool, default True
Flag to cast all health data to np.uint8 to save disk space.
Returns
-------
bool
True if data ndarrays length not zero, else False.
"""
data = self.get_device_data(device, idate, uint8)
if data:
np.savez_compressed(output_file, **data)
return bool(len(data["idate"]))
[docs]
class FitbitLoader(DataLoader):
"""
Notes
-----
One may note that Fitbit exported
- ``sleep`` in local time
- ``steps`` and ``bpm`` timestamps in UTC
FitbitLoader
- Attempts to infer time zone offset from data mismatch
- Converts all timestamps to local time, see ``self._fix_timezone()``
Example
-------
Assume we have data export ``MyFitbitData.zip`` downloaded to folder \
``/Users/username/Downloads/wearable_data/`` and unzipped into a subfolder \
``/Users/username/Downloads/wearable_data/User/``.
>>> import mhealthdata
>>> path = '/Users/username/Downloads/wearable_data/User/'
>>> wdata = mhealthdata.FitbitLoader(path)
"""
def __init__(self, path):
super().__init__(path)
self.categories = {
"steps": {
"name": "steps", "column": "value"},
"sleep": {
"name": "sleep", "column": "level"},
"heart_rate": {
"name": "bpm", "column": "value.bpm"},
"resting_heart_rate": {
"name": "rhr", "column": "value.value"},
"weight": {
"name": "weight", "column": "weight"},
}
self.load_data()
@property
def all_categories(self):
fnames = glob.glob(self.path + "/*/*.csv")
fnames += glob.glob(self.path + "/*/*.json")
categories = []
for fname in fnames:
category = pathlib.Path(fname).stem.split("-")[0]
if category not in categories:
categories.append(category)
return categories
@staticmethod
def _special_cases(df, category):
if category == "weight":
df["dateTime"] = df["date"] + " " + df["time"]
df["weight"] = 0.4536 * df["weight"] # pounds to kg
return df
def _parse_userdata(self):
try:
fname = glob.glob(self.path + "/*/Profile.csv")[0]
except IndexError as e:
return None
df= pd.read_csv(fname)
df = self.df["Profile"] = df
self.userdata["Date of birth"] = df["date_of_birth"].values.astype(str)[0]
self.userdata["Biological sex"] = df["gender"].values.astype(str)[0]
self.userdata["Height"] = df["height"].values.astype(str)[0]
return list(self.userdata.keys())
[docs]
def get_device_data(self, device="all", idate=None, trunc=True):
data = super().get_device_data(device, idate, trunc)
if "sleep" in data and len(data["sleep"]) > 0:
data["tz"] = find_timezone_mismatch(data)
data = fix_timezone_mismatch(data, tz=data["tz"])
return data
[docs]
def load_sleep(self):
"""
Load sleep data from .json files.
Path to seek files is taken from self.path attribute.
Returns
-------
DataFrame
DataFrame of raw data loaded from .csv.
"""
fnamelist = glob.glob(self.path + "/*/" + "sleep" + "-*")
if len(fnamelist) == 0:
return None
df = []
for fname in tqdm.tqdm(fnamelist):
for json_data in json.load(open(fname)):
df.append(pd.json_normalize(json_data["levels"]["data"]))
if "shortData" in json_data["levels"]:
df.append(pd.json_normalize(json_data["levels"]["shortData"]))
df = pd.concat(df, ignore_index=True)
return df
[docs]
def load_nonsleep(self, category):
"""
Load non-sleep data from .json files.
Path to seek files is taken from self.path attribute.
Parameters
----------
category : str
Key used to find health data files.
Returns
-------
DataFrame
DataFrame of raw data loaded from .csv.
"""
fnamelist = glob.glob(self.path + "/*/" + category + "-*")
if len(fnamelist) == 0:
return None
df = [pd.json_normalize(json.load(open(fname))) for fname in tqdm.tqdm(fnamelist)]
df = pd.concat(df, ignore_index=True) if len(df) > 0 else pd.DataFrame()
return df
[docs]
def load_data(self):
"""
Load data from .csv and .json files.
Cycling over category from self.categories attribute.
Path to seek files is taken from self.path attribute.
Returns
-------
list
List of loiaded DataFrames.
"""
self.df = {}
for category in list(self.categories.keys()):
df = self.load_sleep() if category == "sleep" else self.load_nonsleep(category)
if df is not None:
df = self._special_cases(df, category)
df = columns_to_datetime(df, self.start_keys, self.end_keys, self.tz_offset)
self.df[category] = df
elif "step" in category or "pedometer" in category:
raise FileNotFoundError(f"Wrong 'path'. Cannot find files for '{category}'.")
self._parse_userdata()
return self.dataframes
[docs]
class ShealthLoader(DataLoader):
"""
Notes
-----
Samsung Health exports:
- ``.json`` (``step`` binning data) timestamps in local time
- ``.csv`` (``sleep``, ``bpm``, ``weight``) in UTC with \
additional time zone column ``time_offset``
ShealthLoader
- Converts all timestamps to local time, see ``utils.columnscolumns_to_datetime()``
Example
-------
Assume we have data export downloaded to folder \
``/Users/username/Downloads/wearable_data/Samsung Health/`` \
which contains a subfolder ``samsunghealth_<username>_<date-time>``.
>>> import mhealthdata
>>> path = '/Users/username/Downloads/wearable_data/Samsung Health/'
>>> wdata = mhealthdata.ShealthLoader(path)
"""
def __init__(self, path):
super().__init__(path)
self.start_keys = ["start_time"]
self.dev_col = ["deviceuuid"]
self.categories = {
"pedometer_day_summary": {
"name": "steps", "column": "mStepCount"},
"step_daily_trend": {
"name": "steps", "column": "count"},
"heart_rate": {
"name": "bpm", "column": "com.samsung.health.heart_rate.heart_rate"},
"sleep": {
"name": "sleep", "column": "stage"},
"sleep_stage": {
"name": "sleep", "column": "stage"},
"weight": {
"name": "weight", "column": "weight"},
}
self.load_data()
@property
def devices_dict(self):
def clean_username(s):
s = " ".join(w for w in s.split() if "'" not in w)
s = " ".join(w for w in s.split() if "(" not in w)
return s
dev = {}
if "device_profile" in self.df:
col = ["deviceuuid", "device_group", "name", "model", "fixed_name"]
df = self.df["device_profile"][col].astype(str)
dev_group = df["device_group"].values.astype(int).clip(360001)
dev_uuid = df["deviceuuid"].values.astype(str)
dev_name = df["fixed_name"].values.astype(str)
dev_name = np.where(dev_name == "nan", df["model"], dev_name)
dev_name = np.where(dev_group == 360003, df["name"], dev_name)
dev_name = [clean_username(name) for name in dev_name]
dev_name = np.array(dev_name, dtype=str)
dev["all"] = dev_uuid
if 360001 in dev_group:
dev["mobile"] = dev_uuid[dev_group == 360001]
if 360003 in dev_group:
dev["wearable"] = dev_uuid[dev_group == 360003]
for name in np.unique(dev_name):
dev[name] = dev_uuid[dev_name == name]
return dev
@property
def all_categories(self):
fnames = glob.glob(self.path + "/*/*.csv")
categories = []
for fname in fnames:
category = fname.split(".")
if len(category) > 3:
categories.append(category)
return categories
@staticmethod
def _special_cases(df, category):
if "sleep" in category:
if "stage" in df.columns:
s = {40001: "awake", 40002: "light", 40003: "deep", 40004: "rem"}
df["stage"] = np.vectorize(s.get)(df["stage"].values)
else:
df["stage"] = np.array(["no_stage"] * df.shape[0])
return df
def _parse_userdata(self):
if "user_profile" in self.df:
d = self.df["user_profile"]
d["text_value"].fillna(d["float_value"], inplace=True)
d.set_index("key", inplace=True)
d = d[["text_value"]].T.to_dict("list")
self.userdata["Date of birth"] = d["birth_date"][0]
self.userdata["Biological sex"] = d["gender"][0]
self.userdata["Height"] = d["height"][0]
return list(self.userdata.keys())
def _binning_dict(self, category, idx):
"""
Private method to get device-id and date dictionaries for daily .json.
Parameters
----------
category : str
Key used to find health data files.
idx : str
Column name containing binning data file names.
Returns
-------
dev : dict
Dictionary of device id for binning data file names.
dat : dict
Dictionary of dates for binning data file names.
"""
df = self.load_csv(category)
df.set_index(idx, inplace=True)
dat = df["day_time"].to_dict()
dev = df["deviceuuid"].to_dict()
return dev, dat
[docs]
def load_csv(self, category):
"""
Load data from .csv file.
Path to seek files is taken from self.path attribute.
Parameters
----------
category : str
Key used to find health data files.
Returns
-------
DataFrame
DataFrame of raw data loaded from .csv.
"""
try:
fname = glob.glob(self.path + "/*/*." + category + ".*.csv")[0]
except IndexError as e:
return None
df = pd.read_csv(fname, skiprows=1, index_col=False)
return df
[docs]
def load_jsons(self, category, idx="binning_data"):
"""
Load data from .json files.
Path to seek files is taken from self.path attribute.
Parameters
----------
category : str
Key used to find health data files.
idx : str, default "binning_data"
Column name containing binning data file names.
Returns
-------
DataFrame
DataFrame of raw data loaded from .csv.
"""
dev, dat = self._binning_dict(category, idx)
fnamelist = glob.glob(self.path + '/*/jsons/*' + category + '*/*/*.' + idx + '.json')
if len(fnamelist) == 0:
return None
df = []
for fname in tqdm.tqdm(fnamelist):
df_ = pd.json_normalize(json.load(open(fname)))
nrec, ncol = df_.shape
if ncol > 0:
f = fname.split("/")[-1]
df_["deviceuuid"] = np.array([dev[f]] * nrec)
df_["start_time"] = dat[f] + 600000 * df_.index
df_["binning_period"] = 10 * np.ones((nrec)).astype(int)
df.append(df_)
df = pd.concat(df, ignore_index=True)
return df
[docs]
def load_data(self):
"""
Load data from .csv and .json files.
Cycling over category from self.categories attribute.
Path to seek files is taken from self.path attribute.
Returns
-------
list
List of loiaded DataFrames.
"""
self.df = {}
for category in list(self.categories.keys()) + ["device_profile", "user_profile"]:
df = self.load_csv(category)
if "binning_data" in df.columns:
df = self.load_jsons(category)
if df is not None:
df = self._special_cases(df, category)
df = columns_to_datetime(df, self.start_keys, self.end_keys, self.tz_offset)
self.df[category] = df
elif "step" in category or "pedometer" in category:
raise FileNotFoundError(f"Wrong 'path'. Cannot find files for '{category}'.")
self._parse_userdata()
return self.dataframes
[docs]
class HealthkitLoader(DataLoader):
"""
Example
-------
Assume path contains unzipped data ``export.xml`` or ``exportación.xml``.
>>> import mhealthdata
>>> path = '/Users/username/Downloads/wearable_data/apple_health_export/'
>>> wdata = mhealthdata.HealthkitLoader(path)
"""
def __init__(self, path):
super().__init__(path)
self.dev_col = ["sourceName"]
self.categories = {
"HKQuantityTypeIdentifierStepCount": {
"name": "steps", "column": "value"},
"HKCategoryTypeIdentifierSleepAnalysis": {
"name": "sleep", "column": "value"},
"HKQuantityTypeIdentifierHeartRate": {
"name": "bpm", "column": "value"},
"HKQuantityTypeIdentifierRestingHeartRate": {
"name": "rhr", "column": "value"},
"HKQuantityTypeIdentifierHeartRateVariabilitySDNN": {
"name": "hrv", "column": "value"},
"HKQuantityTypeIdentifierBodyMass": {
"name": "weight", "column": "value"},
}
self.load_data()
@property
def devices_dict(self):
dev = {"all": ["all"]}
for category in self.categories:
if category in self.df:
df = self.df[category]
if "sourceName" in df.columns:
for d in np.unique(df["sourceName"].values.astype(str)):
dev[d] = [d]
return dev
@property
def all_categories(self):
categories = []
for tag in ["Record", "Workout"]:
df = self.df[tag]
col = find_columns_by_key(df, ["type"])
if len(col) > 0:
categories.append(df[col[0]].values.astype(str))
categories = np.concatenate(categories)
categories = np.unique(categories).tolist()
return categories
@staticmethod
def _special_cases(df, category):
def clean_username(s):
if "iphone" in s.lower():
s = "iPhone"
elif "apple" in s.lower() and "watch" in s.lower():
s = "Apple Watch"
return s
if "sourceName" in df.columns:
df["sourceName"] = df["sourceName"].apply(clean_username)
if category == "HKCategoryTypeIdentifierSleepAnalysis":
s = {"HKCategoryValueSleepAnalysisAwake": "awake",
"HKCategoryValueSleepAnalysisInBed": "no_stage",
"HKCategoryValueSleepAnalysisAsleep": "asleep"}
df["value"] = np.vectorize(s.get)(df["value"].values)
return df
def _parse_userdata(self, data):
if "Me" in data:
dob = data["Me"]["HKCharacteristicTypeIdentifierDateOfBirth"]
sex = data["Me"]["HKCharacteristicTypeIdentifierBiologicalSex"]
self.userdata["Date of birth"] = dob.values.astype(str)[0]
self.userdata["Biological sex"] = sex.values.astype(str)[0]
if "Record" in data:
mask = data["Record"]["type"] == "HKQuantityTypeIdentifierHeight"
height = data["Record"][mask]
height = height["value"] + " " + height["unit"]
self.userdata["Height"] = unique_sorted(height.values.astype(str))[0][0]
return list(self.userdata.keys())
def _parse_xml(self, root):
"""
Private method to parse records from loaded .xml
Path to seek files is taken from self.path attribute.
Parameters
----------
root
Root element attribute for .xml tree
Returns
-------
dict
Dictionary of DataFrames.
"""
data = {}
for tag in ["Record", "Workout", "Me"]:
records = []
for child in tqdm.tqdm(root):
if child.tag == tag:
for node in list(child):
if node.tag == "MetadataEntry":
if node.attrib["key"] == "HKTimeZone":
child.attrib["HKTimeZone"] = node.attrib["value"]
records.append(dict(child.attrib))
df = pd.DataFrame(records)
df = columns_to_datetime(df, self.start_keys, self.end_keys, self.tz_offset)
data[tag] = df
return data
[docs]
def load_data(self):
"""
Load data from .xml file.
Cycling over category from self.categories attribute.
Path to seek files is taken from self.path attribute.
Returns
-------
list
List of loiaded DataFrames.
"""
try:
fname = (glob.glob(self.path + "/[eE][xX][pP][oO][rR][tT].[xX][mM][lL]") + \
glob.glob(self.path + "/[eE][xX][pP][oO][rR][tT][aA][cC][iI]*[nN].[xX][mM][lL]"))[0]
except IndexError as e:
raise FileNotFoundError(f"Wrong 'path'. Cannot find file 'export.xml'.")
parser = lxml.etree.XMLParser(recover=True)
tree = ET.parse(fname, parser=parser)
root = tree.getroot()
data = self._parse_xml(root)
for tag in data:
self.df[tag] = data[tag]
self._parse_userdata(data)
for category in self.categories:
mask = data["Record"]["type"] == category
df = data["Record"][mask].copy()
df = self._special_cases(df, category)
self.df[category] = df
return self.dataframes
import types
__all__ = [name for name, thing in globals().items()
if not (name.startswith('_') or isinstance(thing, types.ModuleType))]
del types