Source code for shyft.dashboard.time_series.sources.source

import logging
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import List, Optional, Dict, Union, Tuple

from bokeh.document import Document
from tornado import gen

from shyft.dashboard.base.hashable import Hashable
from shyft.dashboard.base.ports import States
from shyft.dashboard.time_series.axes_handler import DsViewTimeAxisType
from shyft.dashboard.time_series.bindable import Bindable
from shyft.dashboard.time_series.sources.ts_adapter import TsAdapter
from shyft.dashboard.time_series.state import Unit, Quantity, UnitRegistry
from shyft.dashboard.time_series.view import BaseView
from shyft.dashboard.time_series.view_time_axes import ViewTimeAxisProperties, create_view_time_axis, period_union
from shyft.time_series import UtcPeriod, min_utctime, max_utctime, TimeAxis, TsVector, utctime_now, time


[docs] class DataSource(Hashable, Bindable): """ This (not entirely enforced) immutable hashable object contains all data defining a source for requesting data. It is typically used related to the view, ref to the class Source below, as means of keeping parameters that help getting ts-data in an optimal manner for renderers. Attributes ---------- ts_adapter: TsAdapter is the most important feature, its role is to provide the time-series data from the underlying system. It's a callable object of type: >>> def fx( time_axis: TimeAxis, unit: Unit) -> Quantity[TsVector]: and as indicated, given a time-axis and a unit, provide a Quantity[TsVector]. The TsVector having 1 or more members, depending on the wanted ts-renderer (percentile/fill-between or line/scatter-plots) It is invoked by the view-controller each time it needs (new) data for rendering. min_dt: time >=0 the smallest time-step to propagate through requests to the ts_adapter the purpose is to let the user instrument this data-source so that it will never ask for time-axis of less resolution than this. Typically, it's reasonable to set 3600 for ts-expressions where you know that the underlying data is sampled at hourly resolution. Note that is only for optimizing memory/time usage. time_range: UtcPeriod clipping period, default min..max, that is, -> no clip/crop the purpose is to let you clip the request, and maybe also the returned resulting ts-vector from the ts_adapter. request_time_axis_type: DsViewTimeAxisType (padded/not padded) the purpose is to let user control which of the time-axis provided by the view-controller should be used for fetching data through .ts_adapter(....) call. The padded time-axis contains surplus ranges at each end of the time-axis to allow for smooth pan-operations. tag :str the short-name of the data-source the purpose..? have named data-sources. """
[docs] def __init__(self, *, ts_adapter: TsAdapter, unit: str, min_dt: Optional['shyft.time_series.time'] = 0, time_range: Optional[UtcPeriod] = None, request_time_axis_type: Optional[DsViewTimeAxisType] = None, tag: Optional[str] = None) -> None: """ Parameters ---------- ts_adapter: the TsAdapter which has the data unit: the unit of the data min_dt: the smallest time step to use when constructing time-axis used for the .ts_adapter(time_axis..) request. time_range: the maximum time range for the data, used to clip request, in or outgoing to the ts_adapter(.... ) request. request_time_axis_type: time axis type (padded/not padded), controls which of the time-axis to use for the .ts_adapter(...) request. tag: tag(short-name) of data source """ Hashable.__init__(self) Bindable.__init__(self) self.tag: str = tag or f'ds {self.uid}' self.unit: str = unit self.min_dt: time = min_dt self.time_range: UtcPeriod = time_range or UtcPeriod(min_utctime, max_utctime) self.request_time_axis_type: DsViewTimeAxisType = request_time_axis_type or DsViewTimeAxisType.padded_view_time_axis if not isinstance(ts_adapter, TsAdapter): raise (ValueError(f'Error Source {tag} ts_adapter {ts_adapter} not of type TsAdapter')) self.ts_adapter: TsAdapter = ts_adapter
[docs] class TsAdapterRequestParameter(Hashable): """ This immutable hashable object contains all information to request data from TimeAxisHandle and we use it as message-transport between the front thread and the background worker-thread that performs the real work in an async context(strongly recommended) Ref. to DataSource for the semantics of the data-members """
[docs] def __init__(self, *, request_time_axis_type: DsViewTimeAxisType, unit: Unit, view_time_axis: TimeAxis, padded_view_time_axis: TimeAxis) -> None: """ This object bundles all parameter needed to get data from ts_adapter Parameters ---------- request_time_axis_type: defines which time axis to use view_time_axis or padded_view_time_axis unit: unit of the requested data view_time_axis: view time axis provided from time_axis_handle padded_view_time_axis: padded view time axis provided from time_axis_handle """ super().__init__() self.request_time_axis_type: DsViewTimeAxisType = request_time_axis_type self.unit: Unit = unit self.view_time_axis: TimeAxis = view_time_axis self.padded_view_time_axis: TimeAxis = padded_view_time_axis
[docs] @classmethod def create_empty(cls) -> 'TsAdapterRequestParameter': return cls(request_time_axis_type=None, unit=None, view_time_axis=None, padded_view_time_axis=None)
@property def is_empty(self): return (self.view_time_axis is None or self.unit is None or self.request_time_axis_type is None or self.padded_view_time_axis is None)
[docs] def is_equiv(self, other): if not isinstance(other, TsAdapterRequestParameter): return False if not self.request_time_axis_type == other.request_time_axis_type: return False try: rqta = self.request_time_axis except RuntimeError: return False try: other_rqta = other.request_time_axis except RuntimeError: return False if not rqta == other_rqta: return False if not self.unit == other.unit: return False return True
@property def request_time_axis(self) -> TimeAxis: if self.request_time_axis_type == DsViewTimeAxisType.view_time_axis: ta = self.view_time_axis elif self.request_time_axis_type == DsViewTimeAxisType.padded_view_time_axis: ta = self.padded_view_time_axis else: raise RuntimeError(f"TsAdapterRequestParameter {self.uid} unknown request_time_axis_type={self.request_time_axis_type}") return ta @property def request_parameter(self) -> Dict[str, Union[Unit, TimeAxis]]: return {'time_axis': self.request_time_axis, 'unit': self.unit}
[docs] class SourceError(RuntimeError): pass
[docs] class Source(Bindable): """ This object plays the role of binding a DataSource (ref DataSource) to a set of Views, based on controls from the .parent (bindable) that need to provide TsViewer capabilities. It utilizes a *thread-pool* to ensure that time-consuming data-fetching/computations can run in the *background thread*, and that when ready, these are *properly dispatched* into the bokeh foreground async io-loop. Notice that this class plays together (closely) with the TsViewer class that have several Sources that is renders. The TsViewer is reached through the .parent (from Bindable) member. Control flow for an update goes like this: >>> #From the TsViewer This class The background worker(thread-pool) TsAdapter >>> #.update_data(vw_parms) -> | | | >>> # compute >>> # time-axis suitable for the current view, >>> # given source.min_dt,source.time_range(alias clip) >>> # | | >>> # post the TsReq --> | | >>> # | request_data_from_ts_adapter_sync () fx(time_axis,unit)->TsVector >>> # | | >>> # update_view_data(tsv..) <--- TsVector >>> #.trigger_view_update(views:tsv) Attributes ---------- logger:Logger provides logging functionality data_source: DataSource provide means of getting time-series data from the data-layer, with time-axis/delta-t limitations, does not have any logic. views: List[BaseView] keeps the list of view that presents this source unit_reqistry: UnitRegistry keeps the measurement-units and methods for conversions so that we can do simple conversions at the visual/presentation layer visible: bool True if this datasource should be visible _state: States represent the port/visual state of this object bokeh_document: Document the document we renter into (we use async, so we need to keep track of it) async_on: bool True if thread-pool executer is supplied (can be turned off) queue: List[TsAdapterRequestParameter] Keeps the list of pending request (async) fetching data current_request_parameter: TsAdapterRequestParameter Keeps the ongoing request (so that other similar request can be skipped) loading_data_async: bool True while the async-thread worker is executing in background async_observer_ts_viewer: TsViewer the TsViewer that registers itself as the observer of this Source. This is needed if trigger_view_update() should wait for results from all async data updates in its sources. """
[docs] def __init__(self, bokeh_document: Document, data_source: DataSource, views: List[BaseView], unit_registry: UnitRegistry, thread_pool_executor: Optional[ThreadPoolExecutor] = None, logger: Optional['logging.Logger'] = None): """ Parameters ---------- bokeh_document: bokeh document data_source: data source views: views to updated after data was loaded unit_registry: unit registry use to check units thread_pool_executor: thread pool executor needed for async data loading, if provided source will use it """ super().__init__() self.logger = logger or logging.getLogger() self.do_log: bool = self.logger.isEnabledFor(logging.DEBUG) self.data_source: DataSource = data_source self.views: List[BaseView] = views self.unit_registry: UnitRegistry = unit_registry self.visible: bool = True self._state: States = States.ACTIVE self.bokeh_document: Document = bokeh_document self.async_on: bool = thread_pool_executor is not None self.thread_pool_executor: ThreadPoolExecutor = thread_pool_executor self.queue: List[TsAdapterRequestParameter] = [] self.current_request_parameter: TsAdapterRequestParameter = TsAdapterRequestParameter.create_empty() self.loading_data_async: bool = False self.async_observer_ts_viewer = None
def _make_request_parameter(self, view_axis: ViewTimeAxisProperties)->TsAdapterRequestParameter: """ based on the view_axis properties, and self.min_dt and .time_range, compute reasonable time-axis, using the create_view_time_axis function. :return a ts-adapter-request-parameter that is filled in ready for execution """ cp = self.data_source.time_range vp, vp_padded = self._effective_request_periods(view_axis, cp) dt = max([self.data_source.min_dt, view_axis.dt]) view_ta = TimeAxis() if vp is None else create_view_time_axis(cal=view_axis.cal, view_period=vp, clip_period=cp, dt=dt) padded_ta = TimeAxis() if vp_padded is None else create_view_time_axis(cal=view_axis.cal, view_period=vp_padded, clip_period=cp, dt=dt) return TsAdapterRequestParameter(unit=self.data_source.unit, view_time_axis=view_ta, padded_view_time_axis=padded_ta, request_time_axis_type=self.data_source.request_time_axis_type) @staticmethod def _effective_request_periods(view_axis: ViewTimeAxisProperties, cp: UtcPeriod ) -> Tuple[Optional[UtcPeriod], Optional[UtcPeriod]]: if view_axis.extend_mode: # funny that this have to be set on the TsViewer, it could.. be on the view-list.. vp_list = [] for vp in [view_axis.view_period, view_axis.padded_view_period]: try: cp_inner = UtcPeriod(cp.start, cp.end) if cp.start == min_utctime: cp_inner = UtcPeriod(vp.start, cp.end) if cp.end == max_utctime: cp_inner = UtcPeriod(cp_inner.start, vp.end) vp = period_union(vp, cp_inner) vp_list.append(vp) except (ValueError, NotImplementedError): vp_list.append(None) vp, vp_padded = vp_list else: vp = view_axis.view_period vp_padded = view_axis.padded_view_period return vp, vp_padded
[docs] def update_data(self, view_axis:ViewTimeAxisProperties) -> None: """ This function triggers the updating of the data using the ts adapter provided in data_source container. It is called by the controller(TsViewer): It goes through these steps: * Create suitable time-axis/parameters to forward to the ts-adapter for getting data (try to figure out minimum amount of work to be done) * In thread-pool thread, execute the ts-adapter to get the data * when done, invoke the update_view_data(..) to update the TsViewer with the results. :parameter view_axis Contains the properties of the visual-time-axis, so that this class can adapt and optimize it's request to the TsAdapter class. """ if self._state == States.ACTIVE and self.visible: try: request_param = self._make_request_parameter(view_axis) # check if time axes out of view if not request_param.request_time_axis: self._empty_time_axis() return if self.current_request_parameter.is_equiv(request_param): return else: self._update(request_param=request_param) except RuntimeError as e: self._empty_time_axis() self.logger.error(f"Error {self.__class__.__name__} {self.data_source.tag}: {e}", exc_info=True)
def _update(self,*, request_param:TsAdapterRequestParameter)->None: self.queue = [request_param] if self.async_on and not self.loading_data_async: if self.current_request_parameter.is_empty: self.current_request_parameter = request_param try: if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_starting_async_data_update(self) self.bokeh_document.add_next_tick_callback(self._request_data_from_ts_adapter_async) self.loading_data_async = True except Exception as e: self.logger.error(f"{self.__class__.__name__} {self.data_source.tag}, Exception {e}") self._empty_time_axis() elif not self.async_on: self.current_request_parameter = request_param self._request_data_from_ts_adapter_sync(request_param=request_param) def _empty_time_axis(self) -> None: """ This function is used when the requested time axis is empty or out of range ... AND it fires an update to the observer (parent) """ self.current_request_parameter = TsAdapterRequestParameter.create_empty() self.queue = [] self.update_view_data(ts_vector=self.unit_registry.Quantity(TsVector(), self.data_source.unit)) def _request_data_from_ts_adapter_sync(self, *, request_param: TsAdapterRequestParameter): """ This function requests the data sync """ if self.do_log: self.logger.debug(f"{self.__class__.__name__} {self.data_source.tag} requesting data for {request_param.request_time_axis}") timestamp = utctime_now() ts_vector = self.data_source.ts_adapter(**request_param.request_parameter) for ts in ts_vector: if ts.needs_bind(): raise SourceError(f"TimeSeries {ts} is not bound") if self.do_log: self.logger.debug(f"{self.__class__.__name__} {self.data_source.tag} received data after {utctime_now() - timestamp}") self.update_view_data(ts_vector=ts_vector)
[docs] def request_data_from_ts_adapter_sync(self, *, request_param: TsAdapterRequestParameter): if self._state == States.ACTIVE and self.visible: self._update(request_param=request_param)
[docs] def update_view_data(self, ts_vector: Quantity[TsVector]) -> None: """ This function triggers the view data update for the ts_vector, for each of the views (as a dict), same ts_vector. Before return, async_observer_ts_viewer must be notified about completed_async_data_update """ if not self.bound: if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_completed_async_data_update(self) return # check if unit annotated ts vector reset_ts_vector = False if not isinstance(ts_vector, self.unit_registry.Quantity): msg = f"Error {self.__class__.__name__} {self.data_source.tag} ts_vector not annotated with unit" \ f" or in viewer unit registry" self.logger.error(msg) reset_ts_vector = True elif not isinstance(ts_vector.magnitude, TsVector): self.logger.error(f"Error {self.__class__.__name__} {self.data_source.tag} received magnitude not of type TsVector") reset_ts_vector = True if reset_ts_vector: ts_vector = self.unit_registry.Quantity(TsVector(), self.data_source.unit) view_data = {view: ts_vector for view in self.views} self.parent.trigger_view_update(view_data) if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_completed_async_data_update(self)
# -- ASYNC # @without_document_lock # SIH: This one is absolutely unclear, in 2.4 it does not work, in 2.3 it works without # so we need to figure out if we need this. Since this is NOT a doc, there is no requirement # to try to lock it, however, methods called from this(after retrieve data) is # could be docs, but then we must assume they either have, or will take a lock prior to doc change? @gen.coroutine def _request_data_from_ts_adapter_async(self): """ This function request data from ts_adapter in async way. async_observer_ts_viewer must be notified about starting_ and completed_async_data_updates. """ if self.do_log: # avoid overhead if no debug output self.logger.debug(f"{self.__class__.__name__} {self.data_source.tag} requesting data", extra=dict(async_on=True)) ts = utctime_now() if not self.queue: self.loading_data_async = False if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_completed_async_data_update(self) return self.loading_data_async = True request_param = self.queue.pop(0) self.current_request_parameter = request_param future_name = self.data_source.uid future = self.thread_pool_executor.submit(self.data_source.ts_adapter, **request_param.request_parameter) try: # use yield fr tornado to not add a blocking wait as #self.future.result(timeout=25) would do results = yield future # we resume execution here, in the bokeh-main-thread, when thread-pool executor is done. except (RuntimeError, TypeError, ValueError, AttributeError, ArithmeticError) as e: self.logger.error(f"{self.__class__.__name__} {self.data_source.tag} ts_adapter: Exception {e}", exc_info=True, extra=dict(async_on=True)) # self.bokeh_document.add_next_tick_callback(self.empty_time_axis) # not wanted, but possible results = None if self.do_log: self.logger.debug(f"{self.__class__.__name__} {self.data_source.tag} received data after {utctime_now() - ts}", extra=dict(async_on=True)) if self.queue: next_request_param = self.queue[0] self.current_request_parameter = next_request_param if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_starting_async_data_update(self) self.bokeh_document.add_next_tick_callback(self._request_data_from_ts_adapter_async) else: self.loading_data_async = False if results and self._state == States.ACTIVE and future_name == self.data_source.uid: try: self.bokeh_document.add_next_tick_callback(partial(self.update_view_data_async, ts_vector=results)) except Exception as e: self.logger.error(f"{self.__class__.__name__} {self.data_source.tag}, {self.bokeh_document}, Exception {e}", exc_info=True, extra=dict(async_on=True)) if self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_completed_async_data_update(self) elif self.async_observer_ts_viewer: self.async_observer_ts_viewer.source_completed_async_data_update(self)
[docs] @gen.coroutine def update_view_data_async(self, ts_vector: TsVector) -> None: """ Trigger the view data update, from async function """ self.update_view_data(ts_vector=ts_vector)