Source code for shyft.dashboard.time_series.ds_view_handle

import itertools
import logging
from abc import ABC, abstractmethod
from typing import List, Optional, Union, Any, Tuple
from pint import UnitRegistry, UndefinedUnitError, DimensionalityError

from shyft.dashboard.time_series.sources.ts_adapter import BasicTsAdapter

from shyft.time_series import statistics_property, TimeSeries, TsVector, UtcPeriod, min_utctime, max_utctime

from shyft.dashboard.time_series.axes import YAxis

from shyft.dashboard.time_series.state import State, Unit, Quantity

from shyft.dashboard.time_series.view_container.table import Table

from shyft.dashboard.time_series.view_container.legend import Legend

from shyft.dashboard.time_series.view_container.figure import Figure

from shyft.dashboard.base.ports import Sender, Receiver

from shyft.dashboard.time_series.sources.source import DataSource
from shyft.dashboard.time_series.view import BaseView, FillInBetween, Line, TableView, LegendItem
from shyft.dashboard.base.hashable import Hashable

from bokeh.palettes import Category20 as DefaultPalette


[docs] class DsViewHandleError(RuntimeError): pass
[docs] class DsViewHandle(Hashable): """ This Object combines the data_source with views. It is used to show data in ts_viewer """
[docs] def __init__(self, *, data_source: DataSource, views: List[BaseView], tag: Optional[str]=None, unit_registry: Optional[UnitRegistry]=None): """ Initializes an immutable, hashable ds_view_handle. Parameters ---------- data_source: unbound data source to combine with views views: list of unbound views to view the data tag: optional uid to identify the ds view handle later unit_registry: optional unit_registry, is used to verify if units in data_source and views are compatible, should be used provided if custom defined units are used """ super().__init__() # check data_source is unbound if data_source.bound: raise DsViewHandleError(f'{tag}: data_source {data_source.tag} already bound!') data_source.bind(parent=self) # check views for view in views: if view.bound: raise DsViewHandleError(f'{tag}: data_source {view.label} already bound!') view.bind(parent=self) # check units try: temp_ureg = unit_registry or UnitRegistry() ds_unit_dimensionality = temp_ureg.Unit(data_source.unit).dimensionality for v in views: if not hasattr(v, 'unit'): continue v_unit_dimensionality = temp_ureg.Unit(v.unit).dimensionality if not ds_unit_dimensionality == v_unit_dimensionality: raise DsViewHandleError(f"{tag}: Incompatible units {data_source.unit}!!{v.unit} of {data_source} and {v}!") except UndefinedUnitError as u: raise DsViewHandleError(f"{tag}: Incompatible unit registry! Please provide one!: {u}") self.__data_source = data_source self.__views = views self.tag = tag or str(self.uid)
@property def data_source(self): """ This property returns the data source """ return self.__data_source @property def views(self): """ This property returns the list with all defiend views """ return self.__views
[docs] class DsViewHandleCreator(ABC): """ This abstract class should be implemented in applications that need automated creation of DsViewHandles. This class should receive a data structure and create DsViewHandles to all TimeSeries contained in it. These DsViewHamndles are then sent forward, i.e. to a TsViewer or DsViewHandleRegistry. """
[docs] def __init__(self, unit_registry: Optional[UnitRegistry] = None, figure_container: Optional[Union[List[Figure], Figure]] = None, legend_container: Optional[Union[List[Legend], Legend]] = None, table_container: Optional[Union[List[Table], Table]] = None, logger: Optional[logging.Logger] = None) -> None: self.logger = logger or logging.getLogger() self.unit_registry = unit_registry or State.unit_registry self.figure_container = (figure_container if isinstance(figure_container, list) else [figure_container] ) if figure_container else [] self.legend_container = (legend_container if isinstance(legend_container, list) else [legend_container] ) if legend_container else [] self.table_container = (table_container if isinstance(table_container, list) else [table_container] ) if table_container else [] self.y_axes = [x.axis_view for lst in [figure.y_axes.values() for figure in self.figure_container] for x in lst] self.iter_color = itertools.cycle(DefaultPalette[20]) self.default_line_style = "solid" self.default_line_width = 2.0 self.tooltips = [("label", "@label"), ("f", "$y"), ("t", "$x{%d-%m-%Y %H:%M}")] # Every DsViewHandleCreator should have a Receiver with the signal type of the data structure it supports. # This attribute is implementation dependent since the signal type must match the associated Sender self.receive_data: Receiver self.send_ds_view_handles = Sender(parent=self, name='send ds view handles', signal_type=List[DsViewHandle])
def _receive_data(self, data: Any): """ This function receives a data structure containing TimeSeries, creates DsViewHandles to each one of them and sends the resulting List[DsViewHandles] forward, i.e. to a TsViewer or DsViewHandleRegistry """ ds_view_handles = self.create_ds_view_handles(data) self.send_ds_view_handles(ds_view_handles)
[docs] @abstractmethod def create_ds_view_handles(self, data: Any) -> List[DsViewHandle]: pass
[docs] def get_views(self, unit: str, label: str, visible=True, y_axis_label: Optional[str] = None, line_width: Optional[float] = None, line_style: Optional[str] = None, percentiles: Optional[List[Union[float, statistics_property]]] = None, table_label: Optional[str] = None, color: Optional[str] = None) -> List[BaseView]: """ Auxiliary function for the creation of BaseViews for the DsViewHandles """ line_width = line_width or self.default_line_width line_style = line_style or self.default_line_style percentiles = percentiles or [] table_label = table_label or label color = color or next(self.iter_color) views = [] if self.figure_container: if y_axis_label: y_axis = self.get_y_axis_by_label(label=y_axis_label) elif self.y_axes: y_axis = self.get_y_axis_by_unit(unit=unit) else: y_axis = None for figure in self.figure_container: if y_axis in figure.y_axes: line_view = Line(color=color, unit=unit, label=label, view_container=figure, index=len(percentiles)//2, visible=visible, y_axis=y_axis, line_style=line_style, line_width=line_width, tooltips=self.tooltips) views.append(line_view) if len(percentiles) > 1: fill_in_betweens = self.get_percentiles_views(percentiles=percentiles, view_container=figure, visible=visible, color=color, label=label, unit=unit, y_axis=y_axis) views.extend(fill_in_betweens) if self.table_container: for table in self.table_container: table_view = TableView(view_container=table, unit=unit, columns={0: ' '}, label=table_label, visible=visible) views.append(table_view) if self.legend_container: for legend in self.legend_container: legend_item = LegendItem(view_container=legend, label=label, views=[v for v in views]) views.append(legend_item) return views
[docs] def get_y_axis_by_label(self, label: str): """ Auxiliary function for the management of YAxis by label. """ for y_axis in self.y_axes: if y_axis.label == label: return y_axis else: raise RuntimeError(f"No y-axis found for {label}")
[docs] def get_y_axis_by_unit(self, unit: Union[str, Unit]) -> Optional[YAxis]: """ Auxiliary function for the management of YAxis by unit. """ for y_axis in self.y_axes: if self.unit_registry(str(unit)) == self.unit_registry(str(y_axis.unit)): return y_axis else: for y_axis in self.y_axes: try: self.unit_registry(str(unit)).to(self.unit_registry(str(y_axis.unit))) return y_axis except DimensionalityError: continue else: raise RuntimeError(f"No y-axis found for {unit}")
[docs] @staticmethod def get_percentiles_views(percentiles: List[Union[float, statistics_property]], view_container: Figure, color: str, label: str, unit: Union[str, Unit], visible: bool = True, y_axis: Optional[YAxis] = None) -> List[FillInBetween]: """ Auxiliary function for the creation of FillInBetweens """ fill_in_betweens = [] for i in range(len(percentiles) // 2): fill_label = f'{label} p{percentiles[i]}-{percentiles[-(i + 1)]}' fill_in_betweens.append(FillInBetween(view_container=view_container, color=color, label=fill_label, visible=visible, unit=unit, indices=(i, -(i + 1)), y_axis=y_axis)) return fill_in_betweens
[docs] @staticmethod def get_time_range(tso: Union[Quantity, TsVector, TimeSeries]): def get_ts_time_range(ts): if len(ts) > 0: return int(ts.time_axis.time_points.min()), int(ts.time_axis.time_points.max()) else: return min_utctime, max_utctime if isinstance(tso, TimeSeries): t_min, t_max = get_ts_time_range(ts=tso) elif isinstance(tso, TsVector) and len(tso) > 0: ranges = [get_ts_time_range(ts=ts) for ts in tso] t_min, t_max = min(v[0] for v in ranges), max(v[1] for v in ranges) else: t_min, t_max = min_utctime, max_utctime return UtcPeriod(t_min, t_max)
[docs] class BasicDsViewHandleCreator(DsViewHandleCreator): """ A basic implementation of a DsViewHandleCreator which uses BasicTsAdapters and supports labels and units. """
[docs] def __init__(self, unit_registry: Optional[UnitRegistry] = None, figure_container: Optional[Union[List[Figure], Figure]] = None, legend_container: Optional[Union[List[Legend], Legend]] = None, table_container: Optional[Union[List[Table], Table]] = None, logger: Optional[logging.Logger] = None) -> None: super().__init__(unit_registry=unit_registry, figure_container=figure_container, legend_container=legend_container, table_container=table_container, logger=logger) self.receive_data = Receiver(parent=self, name="Receive data", func=self._receive_data, signal_type=List[Union[Quantity, TsVector, TimeSeries, Tuple[Union[Quantity, TsVector, TimeSeries], str]]])
[docs] def create_ds_view_handles(self, data_container: List[Union[Quantity, TsVector, TimeSeries, Tuple[Union[Quantity, TsVector, TimeSeries], str]]] ) -> List[DsViewHandle]: """ :param data_container: A list of TimeSeries, TsVector, or tuples containing data and label :return: A list of DsViewHandles for the data """ dsvhs = [] for element in data_container: data, label = element if isinstance(element, Tuple) else (element, "") tso, unit = (data.magnitude, data.units) if isinstance(data, self.unit_registry.Quantity) else (data, "") tsa = BasicTsAdapter(data=tso, unit_registry=self.unit_registry, unit=unit) time_range = self.get_time_range(tso=tso) data_source = DataSource(ts_adapter=tsa, unit=unit, tag=label, time_range=time_range) views = self.get_views(unit=unit, label=label) dsvh = DsViewHandle(data_source=data_source, views=views, tag=label, unit_registry=self.unit_registry) dsvhs.append(dsvh) return dsvhs