Source code for shyft.dashboard.apps.dtss_viewer.dtsc_helper_functions

from typing import List, Tuple
import shyft.time_series as sa
from shyft.dashboard.time_series.state import Quantity, State, Unit
from shyft.dashboard.time_series.sources.ts_adapter import TsAdapter


[docs] def check_dtss_url(host_port: str) -> bool: """ This function evaluates if the url belogns to a running dts server Parameters ---------- host_port: url of dtss like localhost:20000 Returns ------- True if dtss reachable with given url, False if not """ dtsc = sa.DtsClient(host_port) try: dtsc.reopen() except RuntimeError as e: return False finally: dtsc.close() return True
[docs] def try_dtss_connection(host_port: str) -> bool: """ This function evaluates if the url belogs to a running dts server Parameters ---------- host_port: url of dtss like localhost:20000 Returns ------- True if dtss reachable with given url Raises ------ RuntimeError if no dtss can be found under given url """ dtsc = sa.DtsClient(host_port) try: dtsc.reopen() finally: dtsc.close() return True
[docs] def detect_unit_of(url: str) -> str: """ Just a helper to illustrate multiple axis based on unit """ if 'charge' in url: return 'm**3/s' if 'emperature' in url: return 'degC' if 'recip' in url: return 'mm/h' if 'speed' in url: return 'm/s' if 'swe' in url: return 'mm' return ''
[docs] def find_all_ts_names_and_url(*, host_port: str, container: str, pattern: str) -> List[Tuple[str, str]]: """ This function returns a list of time series names and urls for each Time Series in the container of the dtss at the ts_url. Parameters ---------- host_port: host_port to dtss container: dtss data container to search in pattern: what time-series to match, regular expression Returns ------- List of Tuple with (ts_url, name) for each ts in the container Raises ------ RuntimeError if no dtss can be found under given url """ dtsc = sa.DtsClient(host_port) try: ts_infos = dtsc.find(sa.shyft_url(container, pattern)) finally: dtsc.close() return [(sa.shyft_url(container, ti.name), f'{ti.name}:{ti.data_period}/{ti.delta_t}') for ti in ts_infos]
[docs] class DtssTsAdapter(TsAdapter): """ A very primitive synchronous dtss adapter to keep it simple """
[docs] def __init__(self, dtss_url: str, ts_url: str, unit: str = "") -> None: super().__init__() self.unit = unit if unit else detect_unit_of(dtss_url) self.dtsc = sa.DtsClient(dtss_url) self.tsv_request = sa.TsVector([sa.TimeSeries(ts_url)])
[docs] def __call__(self, *, time_axis: sa.TimeAxis, unit: Unit) -> Quantity[sa.TsVector]: try: tsv = self.dtsc.evaluate(self.tsv_request.average(time_axis), time_axis.total_period()) except RuntimeError as e: return sa.TsVector() finally: self.dtsc.close() # TODO compute from src unit to unit using pint. return State.Quantity(tsv, self.unit)