import logging
from typing import List, Dict, Any, Optional, Callable, Union, Iterable
import numpy as np
from shyft.time_series import TsVector, Calendar
from enum import Enum

from bokeh.models import ColumnDataSource, DataTable, TableColumn, Div
from bokeh.layouts import column, row
from shyft.dashboard.time_series.dt_selector import tdiff_to_str

from shyft.dashboard.time_series.axes_handler import BaseViewTimeAxis

from shyft.dashboard.base.ports import States, StatePorts, Sender, connect_state_ports
from import TableTool
from shyft.dashboard.time_series.view import TableView
from shyft.dashboard.time_series.data_utility import find_nearest, merge_convert_ts_vectors_to_numpy
from shyft.dashboard.time_series.view_container.view_container_base import BaseViewContainer
from shyft.dashboard.time_series.state import Quantity
from shyft.dashboard.time_series.formatter import basic_time_formatter

[docs] class TableError(RuntimeError): pass
[docs] class Table(BaseViewContainer): """ Table class is the view container for a table Examples -------- | # create the viewer app | viewer = TsViewer(bokeh_document=doc, title='Test Ts Viewer') | | # create view container | table1 = Table(viewer=viewer, tools=[]) | | # create a data source | data_source = DataSource(ts_adapter=A_time_series_adapter(unit_to_decorate='MW'), unit='MW', | request_time_axis_type=DsViewTimeAxisType.padded_view_time_axis, | time_range=UtcPeriod(start_time, end_time)) | | # create a view in where we put the view container | table_view = TableView(view_container_uid=table1.uid, columns={0: 'column 1', 1: 'column 2'}, | label='Generic Label') | | # create a handle for the data source and list of views connected to the data source | ds_view_handle = DsViewHandle(data_source=data_source, views=[table_view, line_view, fill_in_between_view]) | | # add views and data source to the viewer | viewer.add_ds_view_handles(ds_view_handles=[ds_view_handle]) """
[docs] def __init__(self, *, viewer: 'shyft.dashboard.time_series.ts_viewer.TsViewer', width: int = 600, height: int = 600, title: str = '', max_column_width: Optional[int] = None, min_column_width: int = 120, visible: bool = True, time_formatter: Callable[[Iterable, str], List[str]] = basic_time_formatter, tools: Union[List['TableTool'], 'TableTool'] = None, logger: Optional['logging.Logger'] = None, alternative_view_time_axis: BaseViewTimeAxis = None) -> None: """ Parameters ---------- viewer: which TsViewer it is connected to width: pixel width of the table height: pixel height of the table title: title of the table max_column_width: sets an upper limit of the size of the columns, if None upper = infinite min_column_width: sets a lower limit of the size of the columns visible: switch for visibility time_formatter: the time format of the time column tools: optional table tools see table tools/ """ super().__init__(viewer=viewer) self.logger = logger or logging.getLogger(f"Table {title}") = [] self.time_formatter = time_formatter self.bokeh_data_source = ColumnDataSource({k: [] for k in ["Time"]}) self.bokeh_data_table = DataTable(source=self.bokeh_data_source, columns=[TableColumn(field=f"{self.uid}", title="Time", width=85)], editable=False, sortable=False, index_position=None, fit_columns=False, width=width, height=height, scroll_to_selection=True) = {} self.table_columns = {} self.unit_row = {} self.title = title self.bokeh_title_div = Div(text=f'<b>{title}</b>', height=20, width=width) self.active_views = [] self.views = [] self.view_range_indices = [] self.aligned_time = None self.y_axis_label = '' self.time_column_width = 145 self.max_column_width = max_column_width self.min_column_width = min_column_width self.ts_dict = {} if alternative_view_time_axis and isinstance(alternative_view_time_axis, BaseViewTimeAxis): self.view_time_axis = alternative_view_time_axis self.view_time_axis.on_change_view_range(obj=self, callback=self.view_range_callback) self._visible = visible self._visible_state = visible # remember visibility when set Deactive and Active again self.visible_callback_enabled = True self._layout = column(row(self.bokeh_title_div, height=20, width=width, sizing_mode='fixed'), row(self.bokeh_data_table)) if tools: if not isinstance(tools, list): tools = [tools] for tool in tools: self.add_tool(tool=tool)
@property def layout(self) -> Any: """ This property returns the preferred layout of the view_container """ return self._layout @property def layout_components(self) -> Dict[str, List[Any]]: """ This property returns all layout components of the view_container """ return {"widgets": [self.bokeh_title_div], "figures": [self.bokeh_data_table]} @property def visible(self) -> bool: """ This property returns the visibility of the table """ return self._visible @visible.setter def visible(self, visible: bool) -> None: """ This functions will turn off the visibility of the table, i.e. the table in the browser will not be updated, Setter of visibility. """ if visible == self._visible or not isinstance(visible, bool): return self._visible = visible if not visible: self.reset_columns() if visible: self.update_stored_view_data()
[docs] def add_view(self, *, view: TableView) -> None: """ This function adds a new view to the view_container """ if view in self.views: self.logger.debug(f"Table {self.uid}: not adding view {view} since it is already registered") return # save view self.views.append(view) view.on_change(obj=self, attr='visible', callback=self.visible_callback)
[docs] def update_stored_view_data(self): """ This function only updates the time series who are stored within the view container """ if self.visible: self.prepare_data_and_update_data_source() else: self.reset_data_source()
[docs] def update_view_data(self, *, view_data: Dict[TableView, Quantity[TsVector]]) -> None: """ This function updates the table with new data as sent in by view_data """ if sum(v not in self.views for v in view_data): raise TableError(f'TableView {view_data.keys()} not in registered views') visible_view_changed = False for view, ts in view_data.items(): view_data[view] = if view.visible: visible_view_changed = True self.ts_dict.update(view_data) if self.visible: self.prepare_data_and_update_data_source(needs_data_source_update=visible_view_changed) else: self.reset_data_source()
[docs] def prepare_data_and_update_data_source(self, needs_data_source_update: bool = True): """ This function prepares all the view data and creates the bokeh data and table columns that should be updated and updates the data source """ if not self.ts_dict: return views_visible = {view: tsv for view, tsv in self.ts_dict.items() if view.visible} self.aligned_time, aligned_data = merge_convert_ts_vectors_to_numpy(ts_vectors=list(views_visible.values())) self.table_columns = {"Time": TableColumn(field='Time', title='Time', width=self.time_column_width)} = {"Time": []} self.unit_row = {"Time": ['Unit']} if len(aligned_data) != 0:["Time"] = self.time_formatter(self.aligned_time[:-1], self.parent.time_zone or None) for ts_number, view in enumerate(views_visible.keys()): for column_index, column_name in view.columns.items(): if view.label in column_name: title = f"{column_name}" else: title = f"{view.label} - {column_name}" if column_name.strip() else f"{view.label}" field_name = f"{view.uid}.{column_index}" column_width = max(int(len(title)*7), self.min_column_width) if self.max_column_width is not None: column_width = min(column_width, self.max_column_width) self.table_columns[field_name] = TableColumn(field=field_name, title=title, width=column_width) if not aligned_data: self.unit_row[field_name] = [][field_name] = [] elif not aligned_data[ts_number]: self.unit_row[field_name] = [str(view.unit)][field_name] = np.ones(len(self.aligned_time)-1)*np.nan else: self.unit_row[field_name] = [str(view.unit)][field_name] = aligned_data[ts_number][column_index] if needs_data_source_update: self.update_data_source()
[docs] def update_data_source(self): """ This function updates the data that is to be shown """ self.estimate_view_range_indices() if not self.view_range_indices: self.reset_data_source() return data = {} for (k, d), u in zip(, self.unit_row.values()): if len(d) == 0: formatter = "{}" elif isinstance(d[0], float) or isinstance(d[0], int): formatter = "{:4.2f}" elif isinstance(d[0], str): formatter = "{:s}" else: raise TableError(f"{self}: data type {d[0]} in table column {k} is not string or int/float") # Add one to the end index since slicing needs one extra value data[k] = list(u) + list(map(formatter.format, d[self.view_range_indices[0]: self.view_range_indices[1] + 1])) self.bokeh_data_table.columns = list(self.table_columns.values()) = data
[docs] def reset_columns(self): """ This function uses the stored time series and sets all columns but the time column to nothing """ if len(self.ts_dict): aligned_time, aligned_data = merge_convert_ts_vectors_to_numpy(ts_vectors=list(self.ts_dict.values())) self.table_columns = {"Time": TableColumn(field='Time', title='Time', width=self.time_column_width)} = {"Time": [self.time_formatter(aligned_time[:-1], self.parent.time_zone or None)]} self.view_range_callback()
[docs] def reset_data_source(self): """ This function clears the data that bokeh should handle """ = {k: [] for k in [f"Time"]}
[docs] def view_range_callback(self) -> None: """ This callback is triggered whenever the view range changes view_range = self.view_time_axis.view_range """ self.estimate_view_range_indices() #sih: if len( > 1: self.update_data_source()
[docs] def estimate_view_range_indices(self) -> None: """ Indices of aligned time, such that i0 <= view range <= i1. If the overlap of view range and aligned time is zero, no indices are set for `view_range_indices` (empty list). Examples:: 1) aligned_time: |t_a0 |t_a1 |t_a2 |t_a3 view range: |---------------| # self.view_range_indices = [0, 2] 2) aligned_time: |t_a0 |t_a1 |t_a2 |t_a3 view range: |---------------| # self.view_range_indices = [2, 3] 3) aligned_time: |t_a0 |t_a1 |t_a2 |t_a3 view range: |---------| # self.view_range_indices = [] """ if self.aligned_time is None: return elif len(self.aligned_time) == 0: self.view_range_indices = [] else: start = self.view_time_axis.view_range.start end = self.view_time_axis.view_range.end if end <= self.aligned_time[0] or start >= self.aligned_time[-1]: # view out of data range self.view_range_indices = [] else: start_index = find_nearest(self.aligned_time, start, smaller_equal=True) end_index = find_nearest(self.aligned_time, end, smaller_equal=False) self.view_range_indices = [start_index, end_index]
[docs] def clear(self) -> None: """ This function removes all views from the view_container and resets the meta data """ self.clear_views() self.aligned_time = {} = {} self.view_range_indices = []
[docs] def clear_views(self, *, specific_views: Optional[List[TableView]] = None) -> None: """ This function removes all or specific views from the view container """ if specific_views: for v in specific_views: v.remove_all_callbacks(obj=self) if v in self.ts_dict: self.ts_dict.pop(v) self.views = [v for v in self.views if v not in specific_views] else: for v in self.views: v.remove_all_callbacks(obj=self) self.ts_dict = {} self.views = [] if len(self.ts_dict): self.update_stored_view_data() else: self.reset_columns()
[docs] def update_title(self, title: str) -> None: """ This function sets the title in the correct <div> format """ if self.title: self.bokeh_title_div.text = ': '.join([self.title, title]) else: self.bokeh_title_div.text = title
def _receive_state(self, state: States) -> None: """ This function checks the state of self """ if state == self._state: return self._state = state if state == States.LOADING: if self._state == state: return self.update_title('Loading table ...') elif state == States.DEACTIVE: if self.visible: self.reset_data_source() self._visible_state = self.visible self.visible = False self.state_port.send_state(state) elif state in [States.ACTIVE, States.READY]: self.visible = self._visible_state else: self.logger.error(f"ERROR: {self} - not handel for received state {state} implemented") self.state_port.send_state(state)
[docs] def visible_callback(self, obj, attr, old_value, new_value): """ This function is the callback for when the visibility for table view changes """ if self._state == States.DEACTIVE or not self.visible_callback_enabled: return if obj not in self.views: obj.remove_all_callbacks(self) return self.update_stored_view_data()
[docs] def add_tool(self, tool: TableTool) -> None: """ This function adds a FigureTool to the figure """ if not isinstance(tool, TableTool): raise TableError(f'Table {self.title}: tool {tool} not of type TableTool') if tool not in tool.bind(parent=self) connect_state_ports(self.state_port, tool.state_port)
[docs] class StatisticsTable(Table):
[docs] def __init__(self, *, viewer: 'shyft.dashboard.time_series.ts_viewer.TsViewer', width: int = 600, height: int = 600, title: str = '', name_column_width: int = 300, max_column_width: Optional[int] = None, min_column_width: int = 120, visible: bool = True, time_formatter: Callable[[np.ndarray, str], List[str]] = basic_time_formatter, tools: Union[List['TableTool'], 'TableTool'] = None, logger: Optional['logging.Logger'] = None, alternative_view_time_axis: BaseViewTimeAxis = None) -> None: super().__init__(viewer=viewer, width=width, height=height, title=title, max_column_width=max_column_width, min_column_width=min_column_width, visible=visible, time_formatter=time_formatter, tools=tools, logger=logger, alternative_view_time_axis=alternative_view_time_axis) def calc_column_width(title): column_width = max(int(len(title)*7), self.min_column_width) if self.max_column_width is not None: column_width = min(column_width, self.max_column_width) return column_width self.name_column_width = name_column_width self.table_columns = {"Name": TableColumn(field='Name', title='Name', width=self.name_column_width), "Unit": TableColumn(field='Unit', title='Unit', width=calc_column_width('Unit')), 'Mean': TableColumn(field='Mean', title='Mean', width=calc_column_width('Mean')), 'Min': TableColumn(field='Min', title='Min', width=calc_column_width('Min')), 'Max': TableColumn(field='Max', title='Max', width=calc_column_width('Max')), 'Std': TableColumn(field='Std', title='Std', width=calc_column_width('Std')), 'Net Change': TableColumn(field='Net Change', title='Net Change', width=calc_column_width('Net Change')) }
[docs] def prepare_data_and_update_data_source(self, needs_data_source_update: bool = True): """ This function prepares all the view data and creates the bokeh data and table columns that should be updated and updates the data source """ if not self.ts_dict: return views_visible = {view: tsv for view, tsv in self.ts_dict.items() if view.visible} self.aligned_time, aligned_data = merge_convert_ts_vectors_to_numpy(ts_vectors=list(views_visible.values())) = {} self.unit_row = {} if len(aligned_data) != 0: for ts_number, view in enumerate(views_visible.keys()): for column_index, column_name in view.columns.items(): if view.label in column_name: title = f"{column_name}" else: title = f"{view.label} - {column_name}" if column_name.strip() else f"{view.label}" if not aligned_data: self.unit_row[title] = [][title] = [] elif not aligned_data[ts_number]: self.unit_row[title] = [str(view.unit)][title] = np.full(len(self.aligned_time)-1, np.nan) else: self.unit_row[title] = [str(view.unit)][title] = aligned_data[ts_number][column_index] if needs_data_source_update: self.update_data_source()
[docs] def update_data_source(self): """ This function updates the data that is to be shown """ self.estimate_view_range_indices() if not self.view_range_indices: self.reset_data_source() return data = {k: [] for k in self.table_columns.keys()} t = self.aligned_time[self.view_range_indices[0]: self.view_range_indices[1] + 1] data["Name"].append('Time Range') data["Unit"].append('-') data['Min'].append(basic_time_formatter([t[0]], self.parent.time_zone or None)[0]) data['Max'].append(basic_time_formatter([t[-1]], self.parent.time_zone or None)[0]) data['Mean'].append('-') data['Std'].append('-') data['Net Change'].append(tdiff_to_str(Calendar(self.parent.time_zone), t[0], t[-1])) for (k, d), u in zip(, self.unit_row.values()): d = d[self.view_range_indices[0]: self.view_range_indices[1] + 1] if len(d) == 0 or np.isnan(d).all(): min_v = '-' max_v = '-' mean_v = '-' std_v = '-' change = '-' elif isinstance(d[0], float) or isinstance(d[0], int): min_v = f"{np.nanmin(d):4.2f}" max_v = f"{np.nanmax(d):4.2f}" mean_v = f"{np.nanmean(d):4.2f}" std_v = f"{np.nanstd(d):4.2f}" try: nans = np.where(~np.isnan(d)) if np.shape(nans)[1] > 1: s = np.polyfit(x=t[nans], y=d[nans], deg=1) s = s[0] change = f"{s*(t[-1] - t[0]):4.2f}" else: change = '-' except Exception as e: self.logger.error(f"error: {e}") change = '-' elif isinstance(d[0], str): min_v = '-' max_v = '-' mean_v = '-' std_v = '-' change = '-' else: raise TableError(f"{self}: data type {d[0]} in table column {k} is not string or int/float") data["Name"].append(k) data["Unit"].append(u) data['Min'].append(min_v) data['Max'].append(max_v) data['Mean'].append(mean_v) data['Std'].append(std_v) data['Net Change'].append(change) self.bokeh_data_table.columns = list(self.table_columns.values()) = data
[docs] def reset_columns(self): """ This function uses the stored time series and sets all columns but the time column to nothing """ if len(self.ts_dict): = {} self.view_range_callback()
[docs] def view_range_callback(self) -> None: """ This callback is triggered whenever the view range changes view_range = self.view_time_axis.view_range """ self.estimate_view_range_indices() if len( > 0: self.update_data_source()
