Source code for shyft.dashboard.time_series.data_utility

from typing import List, Tuple, Iterable, Union, Optional
import numpy as np

from shyft.time_series import TsVector, TimeSeries, Calendar, time_series_to_bokeh_plot_data
from shyft.dashboard.time_series.state import Quantity


[docs] class DataUtilError(RuntimeError): pass
[docs] def calculate_dead_band_indices(ts_input: np.ndarray, eps: Optional[float] = 0.0005): """ This function find all indices to reduce the data of a numpy array. The algorithm is a basic compression / dead band filter with eps being the dead band. It is tested for arrays with integer values. e.g. values: 1 1 1 2 3 3 3 3 4 3 3 3 3 3 indices: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 would return: - np.array([[0, 2], [4, 7], [9, 13]]) - [3, 8] Parameters ---------- ts_input: np.ndarray which should be analysed for compression eps: deadband for integer set to a low value Returns ------- - a numpy array with all indices which can compress one value e.g [[2,4]] meaning all values from index 2 to 4 can be compressed under the given eps - a list of single value indices e.g [5,6,7] meaning index 5 ,6, 7 cannot be compressed and are unique under the given eps """ tc = np.hstack([ts_input[0], ts_input, ts_input[-1], ts_input[-1]]) left_indices = set(np.where(tc[1:-1] - tc[0:-2] != 0)[0].tolist()) right_indices = set(np.where(tc[1:-1] - tc[2::] != 0)[0].tolist()) scatter_indices = set.intersection(left_indices, right_indices) if abs(ts_input[0] - ts_input[1]) > eps: scatter_indices = scatter_indices.union({0}) if abs(ts_input[-1] - ts_input[-2]) > eps: scatter_indices = scatter_indices.union({len(ts_input) - 1}) multi_line_indices = set.union(left_indices, right_indices, {0, len(ts_input) - 1}).difference(scatter_indices) multi_line_indices = sorted(multi_line_indices) return np.array(multi_line_indices).reshape(len(multi_line_indices)//2, 2), sorted(scatter_indices)
[docs] def data_to_patch_values(data1: np.ndarray, data2: np.ndarray, non_nan_slices: Optional[Iterable[slice]] = None) -> List[np.ndarray]: """ :param data1: :param data2: :param non_nan_slices: :return: """ if non_nan_slices is None: non_nan_slices = np.ma.clump_unmasked(np.ma.masked_invalid(data1)) if len(non_nan_slices) == 1: d1 = data1[non_nan_slices[0]] n = len(d1) res = np.empty(2*n, dtype=d1.dtype) res[:n] = d1 res[n:] = data2[non_nan_slices[0]][::-1] return [res] else: resulting_patches = [] for slicex in non_nan_slices: d1 = data1[slicex] n = len(d1) res = np.empty(2*n, dtype=d1.dtype) res[:n] = d1 res[n:] = data2[slicex][::-1] resulting_patches.append(res) return resulting_patches
[docs] def convert_ts_to_plot_vectors(*, ts: TimeSeries, cal: Calendar, crop_nan: Optional[bool] = False, interpret_point_interpretation: Optional[bool] = False, time_scale: Optional[float] = 1000.0) -> Tuple[np.ndarray, np.ndarray]: """ This routine is about 30..100x faster than corresponding python code. :param ts: time-series to extract plot data from :param cal: containing time-zone offsets to apply to the time-points :param crop_nan: crop trailing nans from the time-series :param interpret_point_interpretation: interpret ts.point_interpretation, and if POINT_AVERAGE_VALUE make stair-step curve (2x+1 in size) :param time_scale: bokeh uses time as numbers in ms scale, so default multiply by 1000.0 :return: tuple with times,values, where times is tz-offset with cal,and then multiplied by time-scale, values """ tv = time_series_to_bokeh_plot_data(ts=ts, calendar=cal, time_scale=time_scale, force_linear=not interpret_point_interpretation, crop_trailing_nans=crop_nan) return tv[0].to_numpy(), tv[1].to_numpy()
[docs] def merge_convert_ts_vectors_to_numpy(*, ts_vectors: List[Quantity[TsVector]], time_scale: Optional[float] = 1.0, cal: Optional[Calendar] = None) -> Tuple[np.ndarray, List[np.ndarray]]: """ Merges a list of ts_vectors and converts them to numpy arrays Parameters ---------- ts_vectors: list of ts_vectors to merge and convert time_scale: default 1.0, return time-vectors in seconds (utc) cal: Calendar, default None, specify if time-stamps should be tz-adjusted Returns ------- aligned_time: Array of time points for the entire span of ts_vectors data_list: List of data for each time series in all ts_vectors. Example: a = data_list[i][j] is the numpy array with data of the time series j and vector i. The numpy array is filles with nan where a(t) does not have data. """ if len(ts_vectors) == 0: return np.array([]), [] tsv = TsVector() ix_map = [] # start index of `ts_vectors[i]` in tsv cal = cal or Calendar() t_max = -np.infty for qtsv in ts_vectors: ix_map.append(len(tsv)) tsv.extend(qtsv.magnitude) # as extract label doesn't give end period t_max_all = [int(ts.time_axis.total_period().end) for ts in qtsv.m if len(ts) > 0] if len(t_max_all) > 0: t_max_tsv = max(t_max_all) t_max = max(t_max_tsv, t_max) # at least an approximation, not 100ยค sure e.g. for calander_dt ix_map.append(len(tsv)) # add last elem upper boundary. r = tsv.extract_as_table(cal=cal, time_scale=time_scale) if len(r) == 0 or len(r[0]) == 0: return np.array([]), [] values = [] for i in range(len(ts_vectors)): g = [r[j + 1].to_numpy() for j in range(ix_map[i], ix_map[i + 1])] values.append(g) times = np.empty(len(r[0])+1, dtype="float64") times[:-1] = r[0].to_numpy() times[-1] = t_max * time_scale return times, values
[docs] def find_nearest(array: np.ndarray, input_value: Union[float, int], smaller_equal: Optional[bool] = True): """ Returns the index of the array value closest to input_value in the given array. The closest value will be chosen either the first one smaller_equal to input value if smaller_equal is True or larger equal if smaller_equal is False Find index of nearest value in array to input_value smaller_equal: bool to decide if search value <= input_value or value >= input_value """ if len(array) == 0: raise DataUtilError('Empty array sent to find_nearest()') if smaller_equal: idx = np.searchsorted(array, input_value, side='right') else: idx = np.searchsorted(array, input_value, side='left') if idx == len(array) or (idx != 0 and smaller_equal): idx = idx - 1 return idx
[docs] def convert_ts_to_numpy(ts: TimeSeries, crop_nan: Optional[bool] = False, interpret_point_interpretation: Optional[bool] = False) -> Tuple[np.ndarray, np.ndarray]: """ Backward compatibility ONLY: Convert shyft time series into nummpy array :param ts: shyft TimeSeries containing values and time points :param crop_nan: if True, strip away trailing nans and shorten result accordingly :param interpret_point_interpretation: interpret ts.point_interpretation, and if POINT_AVERAGE_VALUE make stair-step curve (2x+1 in size) :return: tuple of [ndarray, np.array] containing [time, values] """ return convert_ts_to_plot_vectors(ts=ts, cal=Calendar(), crop_nan=crop_nan, interpret_point_interpretation=interpret_point_interpretation, time_scale=1.0)