Source code for papi.gui.gui_api

# -*- coding: utf-8 -*-

Copyright (C) 2014 Technische Universität Berlin,
Fakultät IV - Elektrotechnik und Informatik,
Fachgebiet Regelungssysteme,
Einsteinufer 17, D-10587 Berlin, Germany

This file is part of PaPI.

PaPI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

PaPI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with PaPI.  If not, see <>.

Stefan Ruppin

import datetime
import time
import traceback
import os
import json

import papi.event as Event

from import DOptionalData
from papi.ConsoleLog import ConsoleLog

from PyQt5 import QtCore

from import DSignal
from import DBlock

import papi.error_codes as ERROR

import xml.etree.cElementTree as ET

[docs]class Gui_api(QtCore.QObject): error_occured = QtCore.pyqtSignal(str, str, str) def __init__(self, gui_data, core_queue, gui_id, get_gui_config_function = None, set_gui_config_function = None, TabManager = None, plugin_manager = None): super(Gui_api, self).__init__() self.gui_id = gui_id self.gui_data = gui_data self.core_queue = core_queue self.log = ConsoleLog(GUI_PROCESS_CONSOLE_LOG_LEVEL, GUI_PROCESS_CONSOLE_IDENTIFIER) self.get_gui_config_function = get_gui_config_function self.set_gui_config_function = set_gui_config_function self.tabManager = TabManager self.pluginManager = plugin_manager
[docs] def do_create_plugin(self, plugin_identifier, uname, config={}, autostart=True): """ Something like a callback function for gui triggered events e.a. when a user wants to create a new plugin. :param plugin_identifier: plugin to create :type plugin_identifier: basestring :param uname: uniqe name to set for new plugin :type uname: basestring :param config: additional configuration for creation :type config: :return: """ # create new optional Data for event opt = DOptionalData() # set important information # plugin to create opt.plugin_identifier = plugin_identifier # uname to create plugin with opt.plugin_uname = uname # additional config opt.plugin_config = config opt.autostart = autostart # check if plugin with uname already exists allPlugins = self.gui_data.get_all_plugins() for pluginID in allPlugins: plugin = allPlugins[pluginID] if plugin.uname == uname: return False # create event object and sent it to core event = Event.instruction.CreatePlugin(self.gui_id, 0, opt) self.core_queue.put(event)
[docs] def do_delete_plugin(self, id): """ Delete plugin with given id. :param id: Plugin id to delete :type id: int :return: """ event = Event.instruction.StopPlugin(self.gui_id, id, None) self.core_queue.put(event)
[docs] def do_delete_plugin_uname(self, uname): """ Delete plugin with given uname. :param uname: Plugin uname to delete :type uname: basestring :return: """ event =Event.instruction.StopPluginByUname(self.gui_id, uname) self.core_queue.put(event)
[docs] def do_edit_plugin(self, pl_id, eObject, changeRequest): """ Edit plugin with given plugin id. Specify attribute of plugin by eObject which should be edited e.g. DBlock. Specify action by changeRequest e.g. {'edit' : DSignal}. Currently only possible to change a DSignal for a given dplugin and dblock. :param pl_id: Plugin id to delete :type pl_id: int :return: """ event =, pl_id, eObject, changeRequest) self.core_queue.put(event)
[docs] def do_edit_plugin_uname(self, uname, eObject, changeRequest): """ Edit plugin with given plugin uname. Specify attribute of plugin by eObject which should be edited e.g. DBlock. Specify action by changeRequest e.g. {'edit' : DSignal}. Currently only possible to change a DSignal for a given dplugin and dblock. :param uname: :param eObject: :param changeRequest: :return: """ event =, uname, eObject, changeRequest) self.core_queue.put(event)
[docs] def do_stopReset_pluign(self, id): """ Stop and reset plugin with given id without deleting it. :param id: Plugin id to stopReset :type id: int :return: """ event = Event.instruction.StopPlugin(self.gui_id, id, None, delete=False) self.core_queue.put(event)
[docs] def do_stopReset_plugin_uname(self, uname): """ Stop and reset plugin with given uname without deleting it. :param uname: Plugin uname to stop :type uname: basestring :return: """ pl_id = self.do_get_plugin_id_from_uname(uname) if pl_id is not None: self.do_stopReset_pluign(pl_id) else: self.log.printText(1, " Do stopReset plugin with uname " + uname + ' failed') return ERROR.NOT_EXISTING
[docs] def do_start_plugin(self, id): """ Start plugin with given id. :param id: Plugin id to start :type id: int :return: """ event = Event.instruction.StartPlugin(self.gui_id, id, None) self.core_queue.put(event)
[docs] def do_start_plugin_uname(self, uname): """ Start plugin with given uname. :param uname: Plugin uname to start :type uname: basestring :return: """ pl_id = self.do_get_plugin_id_from_uname(uname) if pl_id is not None: self.do_start_plugin(pl_id) else: self.log.printText(1, " Do start_plugin with uname " + uname + ' failed') return ERROR.NOT_EXISTING
[docs] def do_subscribe(self, subscriber_id, source_id, block_name, signals=None, sub_alias=None): """ Something like a callback function for gui triggered events. In this case, user wants one plugin to subscribe another :param subscriber_id: Plugin id of plugin which should get the data :type subscriber_id: int :param source_id: plugin uname of plugin that should send the data :type source_id: int :param block_name: name of block to subscribe :type block_name: basestring :return: """ # build optional data object and add id and block name to it opt = DOptionalData() opt.source_ID = source_id opt.block_name = block_name opt.signals = signals opt.subscription_alias = sub_alias # send event with subscriber id as the origin to CORE event = Event.instruction.Subscribe(subscriber_id, 0, opt) self.core_queue.put(event)
[docs] def do_subscribe_uname(self, subscriber_uname, source_uname, block_name, signals=None, sub_alias=None): """ Something like a callback function for gui triggered events. In this case, user wants one plugin to subscribe another :param subscriber_uname: Plugin uname of plugin which should get the data :type subscriber_uname: basestring :param source_uname: plugin uname of plugin that should send the data :type source_uname: basestring :param block_name: name of block to subscribe :type block_name: basestring :return: """ event = Event.instruction.SubscribeByUname(self.gui_id, 0, subscriber_uname, source_uname, block_name, signals=signals, sub_alias= sub_alias) self.core_queue.put(event)
[docs] def do_unsubscribe(self, subscriber_id, source_id, block_name, signal_index=None): """ Something like a callback function for gui triggered events. User wants one plugin to do not get any more data from another plugin :param subscriber_id: plugin id which wants to lose a data source :type subscriber_id: int :param source_id: plugin id of data source :type source_id: int :param block_name: name of block to unsubscribe :type block_name: basestring :return: """ # create optional data with source id and block_name opt = DOptionalData() opt.source_ID = source_id opt.block_name = block_name opt.signals = signal_index # sent event to Core with origin subscriber_id event = Event.instruction.Unsubscribe(subscriber_id, 0, opt) self.core_queue.put(event)
[docs] def do_unsubscribe_uname(self, subscriber_uname, source_uname, block_name, signal_index=None): """ Something like a callback function for gui triggered events. User wants one plugin to do not get any more data from another plugin :param subscriber_uname: plugin uname which wants to lose a data source :type subscriber_uname: basestring :param source_uname: plugin uname of data source :type source_uname: basestring :param block_name: name of block to unsubscribe :type block_name: basestring :return: """ subscriber_id = self.do_get_plugin_id_from_uname(subscriber_uname) if subscriber_id is None: # plugin with uname does not exist self.log.printText(1, 'do_unsubscribe, sub uname worng') return -1 source_id = self.do_get_plugin_id_from_uname(source_uname) if source_id is None: # plugin with uname does not exist self.log.printText(1, 'do_unsubscribe, target uname wrong') return -1 # call do_subscribe with ids to subscribe self.do_unsubscribe(subscriber_id, source_id, block_name, signal_index)
[docs] def do_set_parameter(self, plugin_id, parameter_name, value, only_db_update = False): """ Something like a callback function for gui triggered events. User wants to change a parameter of a plugin :param plugin_id: id of plugin which owns the parameter :type plugin_id: int :param parameter_name: name of parameter to change :type parameter_name: basestring :param value: new parameter value to set :type value: :param only_db_update: do_set_parameter of the target plugin will not be called. Updates only the internal database. :type boolean: """ # get plugin from DGUI dplug = self.gui_data.get_dplugin_by_id(plugin_id) # check for existance if dplug is not None: # it exists # get its parameter list parameters = dplug.get_parameters() # check if there are any parameter if parameters is not None: # there is a parameter list # get the parameter with parameter_name if parameter_name in parameters: p = parameters[parameter_name] # check if this specific parameter exists if p is not None: # parameter with name parameter_name exists # build an event to send this information to Core opt = DOptionalData() = value opt.is_parameter = True opt.parameter_alias = parameter_name opt.block_name = None if only_db_update: e = Event.instruction.UpdateParameter(self.gui_id,, opt) else: e = Event.instruction.SetParameter(self.gui_id,, opt) self.core_queue.put(e)
[docs] def do_set_parameter_uname(self, plugin_uname, parameter_name, value): """ Something like a callback function for gui triggered events. User wants to change a parameter of a plugin :param plugin_uname: name of plugin which owns the parameter :type plugin_uname: basestring :param parameter_name: name of parameter to change :type parameter_name: basestring :param value: new parameter value to set :type value: """ # id = self.do_get_plugin_id_from_uname(plugin_uname) # if id is not None: # self.do_set_parameter(id, parameter_name, value) # print(parameter_name, value) opt = DOptionalData() = value opt.is_parameter = True opt.parameter_alias = parameter_name opt.block_name = None e = Event.instruction.SetParameterByUname(self.gui_id,plugin_uname, opt) self.core_queue.put(e)
[docs] def do_pause_plugin_by_id(self, plugin_id): """ Something like a callback function for gui triggered events. User wants to pause a plugin, so this method will send an event to core. :param plugin_id: id of plugin to pause :type plugin_id: int """ if self.gui_data.get_dplugin_by_id(plugin_id) is not None: opt = DOptionalData() event = Event.instruction.PausePlugin(self.gui_id, plugin_id, opt) self.core_queue.put(event) return 1 else: return -1
[docs] def do_pause_plugin_by_uname(self, plugin_uname): """ Something like a callback function for gui triggered events. User wants to pause a plugin, so this method will send an event to core. :param plugin_uname: uname of plugin to pause :type plugin_uname: basestring """ plugin_id = self.do_get_plugin_id_from_uname(plugin_uname) if plugin_id is not None: return self.do_pause_plugin_by_id(plugin_id) else: # plugin with uname does not exist self.log.printText(1, 'do_pause, plugin uname worng') return -1
[docs] def do_resume_plugin_by_id(self, plugin_id): """ Something like a callback function for gui triggered events. User wants to pause a plugin, so this method will send an event to core. :param plugin_id: id of plugin to pause :type plugin_id: int """ if self.gui_data.get_dplugin_by_id(plugin_id) is not None: opt = DOptionalData() event = Event.instruction.ResumePlugin(self.gui_id, plugin_id, opt) self.core_queue.put(event) return 1 else: return -1
[docs] def do_resume_plugin_by_uname(self, plugin_uname): """ Something like a callback function for gui triggered events. User wants to resume a plugin, so this method will send an event to core. :param plugin_uname: uname of plugin to resume :type plugin_uname: basestring """ plugin_id = self.do_get_plugin_id_from_uname(plugin_uname) if plugin_id is not None: return self.do_resume_plugin_by_id(plugin_id) else: # plugin with uname does not exist self.log.printText(1, 'do_resume, plugin uname worng') return -1
[docs] def do_get_plugin_id_from_uname(self, uname): """ Returns the plugin id of the plugin with unique name uname :param uname: uname of plugin :type uname: basestring :return: None: plugin with uname does not exist, id: id of plugin """ dplugin = self.gui_data.get_dplugin_by_uname(uname) # check for existance if dplugin is not None: # it does exist, so get its id return else: return None
[docs] def do_close_program(self): """ Tell core to close papi. Core will respond and will close all open plugins. GUI will close all VIP Plugins due to calling their quit function """ plugins = self.gui_data.get_all_plugins() for dplugin_id in plugins: dplugin = plugins[dplugin_id] if dplugin.type == PLUGIN_VIP_IDENTIFIER: try: dplugin.plugin.cb_quit() except Exception as E: tb = traceback.format_exc() self.plugin_died.emit(dplugin, E, tb) opt = DOptionalData() opt.reason = 'User clicked close Button' event = Event.instruction.CloseProgram(self.gui_id, 0, opt) self.core_queue.put(event)
[docs] def do_set_tab_active_by_name(self, tabName): self.tabManager.set_tab_active_by_name(tabName)
[docs] def do_open_new_tabs_with_names_in_order(self, tabNames = None): for name in tabNames: self.tabManager.add_tab(name)
[docs] def do_load_xml(self, path): if path is None or not os.path.isfile(path): return False tree = ET.parse(path) root = tree.getroot() if root.tag == 'PaPI': self.do_load_xml_reloaded(root) else: self.do_load_xml_v1(root)
[docs] def do_load_xml_reloaded(self,root): """ Function to load a xml config to papi and apply the configuration. :param path: path to xml file to load. :type path: basestring :return: """ gui_config = {} plugins_to_start = [] parameters_to_change = [] signals_to_change = [] subs_to_make = [] try: for root_element in root: ########################## # Read gui configuration # ########################## if root_element.tag == 'Configuration': for property in root_element: gui_config[property.tag] = {} for attr in property: if len(attr) == 0: gui_config[property.tag][attr.tag] = attr.text else: gui_config[property.tag][attr.tag] = {} for val in attr: gui_config[property.tag][attr.tag][val.tag] = val.text if root_element.tag == 'Plugins': ############################# # Read plugin configuration # ############################# for plugin_xml in root_element: plObj = {} plObj['uname'] = self.change_uname_to_uniqe(plugin_xml.attrib['uname']) plObj['identifier'] = plugin_xml.find('Identifier').text config_xml = plugin_xml.find('StartConfig') config_hash = {} for parameter_xml in config_xml.findall('Parameter'): para_name = parameter_xml.attrib['Name'] config_hash[para_name] = {} for detail_xml in parameter_xml: detail_name = detail_xml.tag config_hash[para_name][detail_name] = detail_xml.text plObj['cfg'] = config_hash plugins_to_start.append(plObj) # -------------------------------- # Load PreviousParameters # -------------------------------- prev_parameters_xml = plugin_xml.find('PreviousParameters') if prev_parameters_xml is not None: for prev_parameter_xml in prev_parameters_xml.findall('Parameter'): para_name = prev_parameter_xml.attrib['Name'] para_value = prev_parameter_xml.text # pl_uname_new = self.change_uname_to_uniqe(pl_uname) # TODO validate NO FLOAT in parameter parameters_to_change.append([plObj['uname'], para_name, para_value]) # -------------------------------- # Load DBlocks due to signals name # -------------------------------- dblocks_xml = plugin_xml.find('DBlocks') if dblocks_xml is not None: for dblock_xml in dblocks_xml: dblock_name = dblock_xml.attrib['Name'] dsignals_xml = dblock_xml.findall('DSignal') for dsignal_xml in dsignals_xml: dsignal_uname = dsignal_xml.attrib['uname'] dsignal_dname = dsignal_xml.find('dname').text signals_to_change.append([plObj['uname'], dblock_name, dsignal_uname, dsignal_dname]) if root_element.tag == 'Subscriptions': for sub_xml in root_element: #TODO: Ask stefan: Why this line? #dest = self.change_uname_to_uniqe(sub_xml.find('Destination').text) dest = sub_xml.find('Destination').text for source in sub_xml: if source.tag == 'Source': sourceName = source.attrib['uname'] #self.change_uname_to_uniqe(source.attrib['uname']) for block_xml in source: blockName = block_xml.attrib['name'] alias = block_xml.find('Alias').text signals_xml = block_xml.find('Signals') signals = [] for sig_xml in signals_xml: signals.append(sig_xml.text) subs_to_make.append({'dest':dest, 'source':sourceName, 'block':blockName, 'alias':alias, 'signals':signals}) self.set_gui_config_function(gui_config) except Exception as E: tb = traceback.format_exc() self.error_occured.emit("Error: Config Loader", "Not loadable", tb) # ----------------------------------------------- # Check: Are there unloadable plugins? # ----------------------------------------------- unloadable_plugins = [] for pl in plugins_to_start: plugin_info = self.pluginManager.getPluginByName(pl['identifier']) if plugin_info is None: if pl['identifier'] not in unloadable_plugins: unloadable_plugins.append(pl['identifier']) if not len(unloadable_plugins): for pl in plugins_to_start: self.do_create_plugin(pl['identifier'], pl['uname'], pl['cfg']) self.config_loader_subs_reloaded(plugins_to_start, subs_to_make, parameters_to_change, signals_to_change) else: self.error_occured.emit("Error: Loading Plugins", "Can't use: " + str(unloadable_plugins) + "\nConfiguration will not be used.", None)
[docs] def config_loader_subs_reloaded(self, pl_to_start, subs_to_make, parameters_to_change, signals_to_change): """ Function for callback when timer finished to apply subscriptions and parameter changed of config. :param pl_to_start: list of plugins to start :type pl_to_start: list :param subs_to_make: list of subscriptions to make :type subs_to_make: list :param parameters_to_change: parameter changes to apply :type parameters_to_change: list :param signals_to_change: signal name changes to apply :type signals_to_change: list :return: """ for sub in subs_to_make: self.do_subscribe_uname(sub['dest'], sub['source'], sub['block'], sub['signals'], sub['alias']) for para in parameters_to_change: self.do_set_parameter_uname(para[0], para[1], para[2]) for sig in signals_to_change: plugin_uname = sig[0] dblock_name = sig[1] dsignal_uname = sig[2] dsignal_dname = sig[3] self.do_edit_plugin_uname(plugin_uname, DBlock(dblock_name),{'edit': DSignal(dsignal_uname, dsignal_dname)})
[docs] def do_load_xml_v1(self, root): """ Function to load a xml config to papi and apply the configuration. :param path: path to xml file to load. :type path: basestring :return: """ plugins_to_start = [] subs_to_make = [] parameters_to_change = [] signals_to_change = [] try: for plugin_xml in root: ########################## # Read gui configuration # ########################## if plugin_xml.tag == 'guiConfig': cfg = {} for property in plugin_xml: cfg[property.tag] = {} for attr in property: cfg[property.tag][attr.tag] = {} for value in attr: cfg[property.tag][attr.tag][value.tag] = value.text self.set_gui_config_function(cfg) ############################# # Read plugin configuration # ############################# if plugin_xml.tag == 'Plugin': pl_uname = plugin_xml.attrib['uname'] identifier = plugin_xml.find('Identifier').text config_xml = plugin_xml.find('StartConfig') config_hash = {} for parameter_xml in config_xml.findall('Parameter'): para_name = parameter_xml.attrib['Name'] config_hash[para_name] = {} for detail_xml in parameter_xml: detail_name = detail_xml.tag config_hash[para_name][detail_name] = detail_xml.text pl_uname_new = self.change_uname_to_uniqe(pl_uname) plugins_to_start.append([identifier, pl_uname_new, config_hash]) # -------------------------------- # Load Subscriptions # -------------------------------- subs_xml = plugin_xml.find('Subscriptions') if subs_xml is not None: for sub_xml in subs_xml.findall('Subscription'): data_source = sub_xml.find('data_source').text for block_xml in sub_xml.findall('block'): block_name = block_xml.attrib['Name'] signals = [] for sig_xml in block_xml.findall('Signal'): signals.append(str(sig_xml.text)) alias_xml = block_xml.find('alias') alias = alias_xml.text pl_uname_new = self.change_uname_to_uniqe(pl_uname) data_source_new = data_source #self.change_uname_to_uniqe(data_source) subs_to_make.append([pl_uname_new, data_source_new, block_name, signals, alias]) # -------------------------------- # Load PreviousParameters # -------------------------------- prev_parameters_xml = plugin_xml.find('PreviousParameters') if prev_parameters_xml is not None: for prev_parameter_xml in prev_parameters_xml.findall('Parameter'): para_name = prev_parameter_xml.attrib['Name'] para_value = prev_parameter_xml.text pl_uname_new = self.change_uname_to_uniqe(pl_uname) # TODO validate NO FLOAT in parameter parameters_to_change.append([pl_uname_new, para_name, para_value]) # -------------------------------- # Load DBlocks due to signals name # -------------------------------- dblocks_xml = plugin_xml.find('DBlocks') if dblocks_xml is not None: for dblock_xml in dblocks_xml: dblock_name = dblock_xml.attrib['Name'] dsignals_xml = dblock_xml.findall('DSignal') for dsignal_xml in dsignals_xml: dsignal_uname = dsignal_xml.attrib['uname'] dsignal_dname = dsignal_xml.find('dname').text signals_to_change.append([pl_uname, dblock_name, dsignal_uname, dsignal_dname]) except Exception as E: tb = traceback.format_exc() self.error_occured.emit("Error: Config Loader", "Not loadable", tb) # ----------------------------------------------- # Check: Are there unloadable plugins? # ----------------------------------------------- unloadable_plugins = [] for pl in plugins_to_start: plugin_info = self.pluginManager.getPluginByName(pl[0]) if plugin_info is None: if pl[0] not in unloadable_plugins: unloadable_plugins.append(pl[0]) if not len(unloadable_plugins): for pl in plugins_to_start: # 0: ident, 1: uname, 2: config self.do_create_plugin(pl[0], pl[1], pl[2]) # QtCore.QTimer.singleShot(CONFIG_LOADER_SUBSCRIBE_DELAY, \ # lambda: self.config_loader_subs(plugins_to_start, subs_to_make, \ # parameters_to_change, signals_to_change)) self.config_loader_subs(plugins_to_start, subs_to_make, parameters_to_change, signals_to_change) else: self.error_occured.emit("Error: Loading Plugins", "Can't use: " + str(unloadable_plugins) + "\nConfiguration will not be used.", None)
[docs] def change_uname_to_uniqe(self, uname): """ Function will search for unames and add an indentifier to it to make it unique in case of existence :param uname: uname to make unique :type uname: basestring :return: uname """ i = 1 while self.gui_data.get_dplugin_by_uname(uname) is not None: i = i + 1 if i == 2: uname = uname + 'X' + str(i) else: uname = uname[:-1] + str(i) return uname
[docs] def config_loader_subs(self, pl_to_start, subs_to_make, parameters_to_change, signals_to_change): """ Function for callback when timer finished to apply subscriptions and parameter changed of config. :param pl_to_start: list of plugins to start :type pl_to_start: list :param subs_to_make: list of subscriptions to make :type subs_to_make: list :param parameters_to_change: parameter changes to apply :type parameters_to_change: list :param signals_to_change: signal name changes to apply :type signals_to_change: list :return: """ for sub in subs_to_make: self.do_subscribe_uname(sub[0], sub[1], sub[2], sub[3], sub[4]) for para in parameters_to_change: self.do_set_parameter_uname(para[0], para[1], para[2]) for sig in signals_to_change: plugin_uname = sig[0] dblock_name = sig[1] dsignal_uname = sig[2] dsignal_dname = sig[3] self.do_edit_plugin_uname(plugin_uname, DBlock(dblock_name), {'edit': DSignal(dsignal_uname, dsignal_dname)})
[docs] def do_save_xml_config_reloaded(self,path, plToSave=[], sToSave=[], saveUserSettings=False): """ :param path: :param plToSave: :param sToSave: :return: """ subscriptionsToSave = {} # check for xml extension in path, add .xml if missing if path[-4:] != '.xml': path += '.xml' try: root = ET.Element(CONFIG_ROOT_ELEMENT_NAME_RELOADED) root.set('Date', datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')) root.set('PaPI_version', CORE_PAPI_VERSION) ########################## # Save gui configuration # ########################## gui_cfg = self.get_gui_config_function(save_user_settings=saveUserSettings) gui_cfg_xml = ET.SubElement(root, 'Configuration') for cfg_item in gui_cfg: item_xml = ET.SubElement(gui_cfg_xml,cfg_item) item = gui_cfg[cfg_item] for attr_name in item: attr_xml = ET.SubElement(item_xml, attr_name) values = item[attr_name] # check if there is another dict level to explore # if true: exlplore the dict # if false: save the value of values in the parent xml node if isinstance(values,dict): for val in values: value_xml = ET.SubElement(attr_xml, val) value_xml.text = values[val] else: attr_xml.text = values # --------------------------------------- # save information of plugins # for the next start # --------------------------------------- plugins_xml = ET.SubElement(root,'Plugins') plugins = self.gui_data.get_all_plugins() for dplugin_id in plugins: dplugin = plugins[dplugin_id] # check if this plugin should be saved to XML if dplugin.uname in plToSave: if dplugin.type == PLUGIN_VIP_IDENTIFIER: dplugin.startup_config = dplugin.plugin.pl_get_current_config() pl_xml = ET.SubElement(plugins_xml, 'Plugin') pl_xml.set('uname', dplugin.uname) identifier_xml = ET.SubElement(pl_xml, 'Identifier') identifier_xml.text = dplugin.plugin_identifier # --------------------------------------- # Save all current config as startup config # for the next start # --------------------------------------- cfg_xml = ET.SubElement(pl_xml, 'StartConfig') for parameter in dplugin.startup_config: para_xml = ET.SubElement(cfg_xml, 'Parameter') para_xml.set('Name', parameter) for detail in dplugin.startup_config[parameter]: if detail not in CONFIG_SAVE_CFG_BLACKLIST: detail_xml = ET.SubElement(para_xml, detail) detail_xml.text = dplugin.startup_config[parameter][detail] # --------------------------------------- # Save all current values for all # parameter # --------------------------------------- last_paras_xml = ET.SubElement(pl_xml, 'PreviousParameters') allparas = dplugin.get_parameters() for para_key in allparas: para = allparas[para_key] last_para_xml = ET.SubElement(last_paras_xml, 'Parameter') last_para_xml.set('Name', para_key) last_para_xml.text = str(para.value) # --------------------------------------- # Save all current values for all # signals of all dblocks # --------------------------------------- dblocks_xml = ET.SubElement(pl_xml, 'DBlocks') alldblock_names = dplugin.get_dblocks() for dblock_name in alldblock_names: dblock = alldblock_names[dblock_name] dblock_xml = ET.SubElement(dblocks_xml, 'DBlock') dblock_xml.set('Name', alldsignals = dblock.get_signals() for dsignal in alldsignals: if dsignal.uname != CORE_TIME_SIGNAL: dsignal_xml = ET.SubElement(dblock_xml, 'DSignal') dsignal_xml.set('uname', dsignal.uname) dname_xml = ET.SubElement(dsignal_xml, 'dname') dname_xml.text = dsignal.dname # --------------------------------------- # Build temporary subscription objects # to remember the subs of all plugins # including plugins that are not saved # excluding plugins we do not want the subs saved of # --------------------------------------- if dplugin.uname in sToSave: subsOfPl = {} subs = dplugin.get_subscribtions() for sub in subs: sourcePL = self.gui_data.get_dplugin_by_id(sub).uname subsOfPl[sourcePL] = {} for block in subs[sub]: subsOfPl[sourcePL][block] = {} dsubscription = subs[sub][block] subsOfPl[sourcePL][block]['alias'] = dsubscription.alias signals = [] for s in dsubscription.get_signals(): signals.append( str(s)) subsOfPl[sourcePL][block]['signals'] = signals if len(subsOfPl) != 0: subscriptionsToSave[dplugin.uname] = subsOfPl # --------------------------------------- # save subs to xml # # --------------------------------------- subs_xml = ET.SubElement(root,'Subscriptions') for dest in subscriptionsToSave: sub_xml = ET.SubElement(subs_xml,'Subscription') # Destination of data dest_xml = ET.SubElement(sub_xml,'Destination') dest_xml.text = dest for source in subscriptionsToSave[dest]: # Source of Data source_xml = ET.SubElement(sub_xml,'Source') source_xml.set('uname',source) for block in subscriptionsToSave[dest][source]: block_xml = ET.SubElement(source_xml,'Block') block_xml.set('name',block) alias_xml = ET.SubElement(block_xml,'Alias') alias_xml.text = subscriptionsToSave[dest][source][block]['alias'] signal_xml = ET.SubElement(block_xml,'Signals') for sig in subscriptionsToSave[dest][source][block]['signals']: if sig != CORE_TIME_SIGNAL: sig_xml = ET.SubElement(signal_xml,'Signal') sig_xml.text = sig # do transformation for readability and save xml tree to file self.indent(root) tree = ET.ElementTree(root) tree.write(path) except Exception as E: tb = traceback.format_exc() self.error_occured.emit("Error: Config Loader", "Not saveable: " + path, tb)
[docs] def do_save_xml_config(self, path): """ This function will save papis current state to a xml file provided by path. :param path: path to save xml to. :type path: basestring :return: """ raise Exception('do_save_xml_config_reloaded must be used')
[docs] def do_save_json_config_reloaded(self, path, plToSave=[], sToSave=[]): if path[-5:] != '.json': path += '.json' json_config = {} to_create = {} to_control = {} to_sub = {} plugins = self.gui_data.get_all_plugins() for dplugin_id in plugins: dplugin = plugins[dplugin_id] # check if this plugin should be saved to XML if dplugin.uname in plToSave: if dplugin.type == PLUGIN_VIP_IDENTIFIER: dplugin.startup_config = dplugin.plugin.pl_get_current_config() to_create[dplugin.uname] = {} to_create[dplugin.uname]["identifier"] = {'value' : dplugin.plugin_identifier} plugin_config = {} for config in dplugin.startup_config: for value in dplugin.startup_config[config]: if value == 'value': plugin_config[config] = {'value' : dplugin.startup_config[config][value]} to_create[dplugin.uname]["config"] = plugin_config if dplugin.uname in sToSave: subsOfPl = {} subs = dplugin.get_subscribtions() for sub in subs: sourcePL = self.gui_data.get_dplugin_by_id(sub).uname subsOfPl[sourcePL] = {} for block in subs[sub]: subsOfPl[sourcePL][block] = {} dsubscription = subs[sub][block] subsOfPl[sourcePL]['alias'] = dsubscription.alias print(sourcePL) print(block) if dsubscription.alias is not None: if sourcePL not in to_control: to_control[sourcePL] = {} to_control[sourcePL][block] = {'parameter' : dsubscription.alias} else: signals = [] for s in dsubscription.get_signals(): signals.append( str(s)) to_sub[dplugin.uname] = {} to_sub[dplugin.uname]['signals'] = signals to_sub[dplugin.uname]['block'] = block to_sub[dplugin.uname]['plugin'] = sourcePL if len(to_create): json_config["ToCreate"] = to_create; if len(to_control): json_config["ToControl"] = to_control if len(to_sub): json_config["ToSub"] = to_sub papi_config = {"PaPIConfig" : json_config} try: with open(path, 'w') as outfile: json.dump(papi_config, outfile, indent=' ') except: pass
[docs] def indent(self, elem, level=0): """ Function which will apply a nice looking indentiation to xml structure before save. Better readability. copied from 06.10.2014 15:53 :param elem: :param level: :return: """ i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for elem in elem: self.indent(elem, level + 1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i
[docs] def do_reset_papi(self): """ APi call to reset PaPI. Reset in this case means to delete all plugins cleanly and keep PaPI running. Will free all unames. Is using the do_delete_plugin api call and the delete plugin mechanism :return: ERROR CODE """ all_plugins = self.gui_data.get_all_plugins() if all_plugins is not None: for plugin_key in all_plugins: plugin = all_plugins[plugin_key] self.do_delete_plugin(
[docs] def do_test_name_to_be_unique(self, name): """ Will check if a given name would be a valid, unique name for a plugin. :param name: name to check :type name: basestring :return: True or False """ reg = QtCore.QRegExp('\S[^_][^\W_]+') if reg.exactMatch(name): if self.gui_data.get_dplugin_by_uname(name) is None: return True else: return False else: return False
[docs] def do_change_string_to_be_uname(self, name): """ This method will take a string and convert him according to some rules to be an uname :param name: name to convert to unmae :type name: basestring :return: name converted to uname """ uname = name # TODO: get more inteligence here! forbidden = ['_', ',', '.', '`', ' '] for c in forbidden: uname = uname.replace(c, 'X') return uname
