#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Copyright (C) 2014 Technische Universität Berlin,
Fakultät IV - Elektrotechnik und Informatik,
Fachgebiet Regelungssysteme,
Einsteinufer 17, D-10587 Berlin, Germany
This file is part of PaPI.
PaPI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PaPI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PaPI. If not, see <http://www.gnu.org/licenses/>.
Contributors
Stefan Ruppin
"""
import datetime
import time
import traceback
import os
import json
import papi.event as Event
from papi.data.DOptionalData import DOptionalData
from papi.ConsoleLog import ConsoleLog
from papi.constants import GUI_PROCESS_CONSOLE_IDENTIFIER, GUI_PROCESS_CONSOLE_LOG_LEVEL, CONFIG_LOADER_SUBSCRIBE_DELAY, \
CONFIG_ROOT_ELEMENT_NAME, CORE_PAPI_VERSION, PLUGIN_VIP_IDENTIFIER, CONFIG_ROOT_ELEMENT_NAME_RELOADED, \
CONFIG_SAVE_CFG_BLACKLIST, CORE_TIME_SIGNAL
from PyQt5 import QtCore
from papi.data.DSignal import DSignal
from papi.data.DPlugin import DBlock
import papi.error_codes as ERROR
import xml.etree.cElementTree as ET
[docs]class Gui_api(QtCore.QObject):
error_occured = QtCore.pyqtSignal(str, str, str)
def __init__(self, gui_data, core_queue, gui_id, get_gui_config_function = None, set_gui_config_function = None, TabManager = None, plugin_manager = None):
super(Gui_api, self).__init__()
self.gui_id = gui_id
self.gui_data = gui_data
self.core_queue = core_queue
self.log = ConsoleLog(GUI_PROCESS_CONSOLE_LOG_LEVEL, GUI_PROCESS_CONSOLE_IDENTIFIER)
self.get_gui_config_function = get_gui_config_function
self.set_gui_config_function = set_gui_config_function
self.tabManager = TabManager
self.pluginManager = plugin_manager
[docs] def do_create_plugin(self, plugin_identifier, uname, config={}, autostart=True):
"""
Something like a callback function for gui triggered events e.a. when a user wants to create a new plugin.
:param plugin_identifier: plugin to create
:type plugin_identifier: basestring
:param uname: uniqe name to set for new plugin
:type uname: basestring
:param config: additional configuration for creation
:type config:
:return:
"""
# create new optional Data for event
opt = DOptionalData()
# set important information
# plugin to create
opt.plugin_identifier = plugin_identifier
# uname to create plugin with
opt.plugin_uname = uname
# additional config
opt.plugin_config = config
opt.autostart = autostart
# check if plugin with uname already exists
allPlugins = self.gui_data.get_all_plugins()
for pluginID in allPlugins:
plugin = allPlugins[pluginID]
if plugin.uname == uname:
return False
# create event object and sent it to core
event = Event.instruction.CreatePlugin(self.gui_id, 0, opt)
self.core_queue.put(event)
[docs] def do_delete_plugin(self, id):
"""
Delete plugin with given id.
:param id: Plugin id to delete
:type id: int
:return:
"""
event = Event.instruction.StopPlugin(self.gui_id, id, None)
self.core_queue.put(event)
[docs] def do_delete_plugin_uname(self, uname):
"""
Delete plugin with given uname.
:param uname: Plugin uname to delete
:type uname: basestring
:return:
"""
event =Event.instruction.StopPluginByUname(self.gui_id, uname)
self.core_queue.put(event)
[docs] def do_edit_plugin(self, pl_id, eObject, changeRequest):
"""
Edit plugin with given plugin id. Specify attribute of plugin by eObject which should
be edited e.g. DBlock.
Specify action by changeRequest e.g. {'edit' : DSignal}.
Currently only possible to change a DSignal for a given dplugin and dblock.
:param pl_id: Plugin id to delete
:type pl_id: int
:return:
"""
event = Event.data.EditDPlugin(self.gui_id, pl_id, eObject, changeRequest)
self.core_queue.put(event)
[docs] def do_edit_plugin_uname(self, uname, eObject, changeRequest):
"""
Edit plugin with given plugin uname. Specify attribute of plugin by eObject which should
be edited e.g. DBlock.
Specify action by changeRequest e.g. {'edit' : DSignal}.
Currently only possible to change a DSignal for a given dplugin and dblock.
:param uname:
:param eObject:
:param changeRequest:
:return:
"""
event = Event.data.EditDPluginByUname(self.gui_id, uname, eObject, changeRequest)
self.core_queue.put(event)
[docs] def do_stopReset_pluign(self, id):
"""
Stop and reset plugin with given id without deleting it.
:param id: Plugin id to stopReset
:type id: int
:return:
"""
event = Event.instruction.StopPlugin(self.gui_id, id, None, delete=False)
self.core_queue.put(event)
[docs] def do_stopReset_plugin_uname(self, uname):
"""
Stop and reset plugin with given uname without deleting it.
:param uname: Plugin uname to stop
:type uname: basestring
:return:
"""
pl_id = self.do_get_plugin_id_from_uname(uname)
if pl_id is not None:
self.do_stopReset_pluign(pl_id)
else:
self.log.printText(1, " Do stopReset plugin with uname " + uname + ' failed')
return ERROR.NOT_EXISTING
[docs] def do_start_plugin(self, id):
"""
Start plugin with given id.
:param id: Plugin id to start
:type id: int
:return:
"""
event = Event.instruction.StartPlugin(self.gui_id, id, None)
self.core_queue.put(event)
[docs] def do_start_plugin_uname(self, uname):
"""
Start plugin with given uname.
:param uname: Plugin uname to start
:type uname: basestring
:return:
"""
pl_id = self.do_get_plugin_id_from_uname(uname)
if pl_id is not None:
self.do_start_plugin(pl_id)
else:
self.log.printText(1, " Do start_plugin with uname " + uname + ' failed')
return ERROR.NOT_EXISTING
[docs] def do_subscribe(self, subscriber_id, source_id, block_name, signals=None, sub_alias=None):
"""
Something like a callback function for gui triggered events.
In this case, user wants one plugin to subscribe another
:param subscriber_id: Plugin id of plugin which should get the data
:type subscriber_id: int
:param source_id: plugin uname of plugin that should send the data
:type source_id: int
:param block_name: name of block to subscribe
:type block_name: basestring
:return:
"""
# build optional data object and add id and block name to it
opt = DOptionalData()
opt.source_ID = source_id
opt.block_name = block_name
opt.signals = signals
opt.subscription_alias = sub_alias
# send event with subscriber id as the origin to CORE
event = Event.instruction.Subscribe(subscriber_id, 0, opt)
self.core_queue.put(event)
[docs] def do_subscribe_uname(self, subscriber_uname, source_uname, block_name, signals=None, sub_alias=None):
"""
Something like a callback function for gui triggered events.
In this case, user wants one plugin to subscribe another
:param subscriber_uname: Plugin uname of plugin which should get the data
:type subscriber_uname: basestring
:param source_uname: plugin uname of plugin that should send the data
:type source_uname: basestring
:param block_name: name of block to subscribe
:type block_name: basestring
:return:
"""
event = Event.instruction.SubscribeByUname(self.gui_id, 0, subscriber_uname, source_uname, block_name,
signals=signals, sub_alias= sub_alias)
self.core_queue.put(event)
[docs] def do_unsubscribe(self, subscriber_id, source_id, block_name, signal_index=None):
"""
Something like a callback function for gui triggered events.
User wants one plugin to do not get any more data from another plugin
:param subscriber_id: plugin id which wants to lose a data source
:type subscriber_id: int
:param source_id: plugin id of data source
:type source_id: int
:param block_name: name of block to unsubscribe
:type block_name: basestring
:return:
"""
# create optional data with source id and block_name
opt = DOptionalData()
opt.source_ID = source_id
opt.block_name = block_name
opt.signals = signal_index
# sent event to Core with origin subscriber_id
event = Event.instruction.Unsubscribe(subscriber_id, 0, opt)
self.core_queue.put(event)
[docs] def do_unsubscribe_uname(self, subscriber_uname, source_uname, block_name, signal_index=None):
"""
Something like a callback function for gui triggered events.
User wants one plugin to do not get any more data from another plugin
:param subscriber_uname: plugin uname which wants to lose a data source
:type subscriber_uname: basestring
:param source_uname: plugin uname of data source
:type source_uname: basestring
:param block_name: name of block to unsubscribe
:type block_name: basestring
:return:
"""
subscriber_id = self.do_get_plugin_id_from_uname(subscriber_uname)
if subscriber_id is None:
# plugin with uname does not exist
self.log.printText(1, 'do_unsubscribe, sub uname worng')
return -1
source_id = self.do_get_plugin_id_from_uname(source_uname)
if source_id is None:
# plugin with uname does not exist
self.log.printText(1, 'do_unsubscribe, target uname wrong')
return -1
# call do_subscribe with ids to subscribe
self.do_unsubscribe(subscriber_id, source_id, block_name, signal_index)
[docs] def do_set_parameter(self, plugin_id, parameter_name, value, only_db_update = False):
"""
Something like a callback function for gui triggered events.
User wants to change a parameter of a plugin
:param plugin_id: id of plugin which owns the parameter
:type plugin_id: int
:param parameter_name: name of parameter to change
:type parameter_name: basestring
:param value: new parameter value to set
:type value:
:param only_db_update: do_set_parameter of the target plugin will not be called. Updates only the internal database.
:type boolean:
"""
# get plugin from DGUI
dplug = self.gui_data.get_dplugin_by_id(plugin_id)
# check for existance
if dplug is not None:
# it exists
# get its parameter list
parameters = dplug.get_parameters()
# check if there are any parameter
if parameters is not None:
# there is a parameter list
# get the parameter with parameter_name
if parameter_name in parameters:
p = parameters[parameter_name]
# check if this specific parameter exists
if p is not None:
# parameter with name parameter_name exists
# build an event to send this information to Core
opt = DOptionalData()
opt.data = value
opt.is_parameter = True
opt.parameter_alias = parameter_name
opt.block_name = None
if only_db_update:
e = Event.instruction.UpdateParameter(self.gui_id, dplug.id, opt)
else:
e = Event.instruction.SetParameter(self.gui_id, dplug.id, opt)
self.core_queue.put(e)
[docs] def do_set_parameter_uname(self, plugin_uname, parameter_name, value):
"""
Something like a callback function for gui triggered events.
User wants to change a parameter of a plugin
:param plugin_uname: name of plugin which owns the parameter
:type plugin_uname: basestring
:param parameter_name: name of parameter to change
:type parameter_name: basestring
:param value: new parameter value to set
:type value:
"""
# id = self.do_get_plugin_id_from_uname(plugin_uname)
# if id is not None:
# self.do_set_parameter(id, parameter_name, value)
# print(parameter_name, value)
opt = DOptionalData()
opt.data = value
opt.is_parameter = True
opt.parameter_alias = parameter_name
opt.block_name = None
e = Event.instruction.SetParameterByUname(self.gui_id,plugin_uname, opt)
self.core_queue.put(e)
[docs] def do_pause_plugin_by_id(self, plugin_id):
"""
Something like a callback function for gui triggered events.
User wants to pause a plugin, so this method will send an event to core.
:param plugin_id: id of plugin to pause
:type plugin_id: int
"""
if self.gui_data.get_dplugin_by_id(plugin_id) is not None:
opt = DOptionalData()
event = Event.instruction.PausePlugin(self.gui_id, plugin_id, opt)
self.core_queue.put(event)
return 1
else:
return -1
[docs] def do_pause_plugin_by_uname(self, plugin_uname):
"""
Something like a callback function for gui triggered events.
User wants to pause a plugin, so this method will send an event to core.
:param plugin_uname: uname of plugin to pause
:type plugin_uname: basestring
"""
plugin_id = self.do_get_plugin_id_from_uname(plugin_uname)
if plugin_id is not None:
return self.do_pause_plugin_by_id(plugin_id)
else:
# plugin with uname does not exist
self.log.printText(1, 'do_pause, plugin uname worng')
return -1
[docs] def do_resume_plugin_by_id(self, plugin_id):
"""
Something like a callback function for gui triggered events.
User wants to pause a plugin, so this method will send an event to core.
:param plugin_id: id of plugin to pause
:type plugin_id: int
"""
if self.gui_data.get_dplugin_by_id(plugin_id) is not None:
opt = DOptionalData()
event = Event.instruction.ResumePlugin(self.gui_id, plugin_id, opt)
self.core_queue.put(event)
return 1
else:
return -1
[docs] def do_resume_plugin_by_uname(self, plugin_uname):
"""
Something like a callback function for gui triggered events.
User wants to resume a plugin, so this method will send an event to core.
:param plugin_uname: uname of plugin to resume
:type plugin_uname: basestring
"""
plugin_id = self.do_get_plugin_id_from_uname(plugin_uname)
if plugin_id is not None:
return self.do_resume_plugin_by_id(plugin_id)
else:
# plugin with uname does not exist
self.log.printText(1, 'do_resume, plugin uname worng')
return -1
[docs] def do_get_plugin_id_from_uname(self, uname):
"""
Returns the plugin id of the plugin with unique name uname
:param uname: uname of plugin
:type uname: basestring
:return: None: plugin with uname does not exist, id: id of plugin
"""
dplugin = self.gui_data.get_dplugin_by_uname(uname)
# check for existance
if dplugin is not None:
# it does exist, so get its id
return dplugin.id
else:
return None
[docs] def do_close_program(self):
"""
Tell core to close papi. Core will respond and will close all open plugins.
GUI will close all VIP Plugins due to calling their quit function
"""
plugins = self.gui_data.get_all_plugins()
for dplugin_id in plugins:
dplugin = plugins[dplugin_id]
if dplugin.type == PLUGIN_VIP_IDENTIFIER:
try:
dplugin.plugin.cb_quit()
except Exception as E:
tb = traceback.format_exc()
self.plugin_died.emit(dplugin, E, tb)
opt = DOptionalData()
opt.reason = 'User clicked close Button'
event = Event.instruction.CloseProgram(self.gui_id, 0, opt)
self.core_queue.put(event)
[docs] def do_set_tab_active_by_name(self, tabName):
self.tabManager.set_tab_active_by_name(tabName)
[docs] def do_open_new_tabs_with_names_in_order(self, tabNames = None):
for name in tabNames:
self.tabManager.add_tab(name)
[docs] def do_load_xml(self, path):
if path is None or not os.path.isfile(path):
return False
tree = ET.parse(path)
root = tree.getroot()
if root.tag == 'PaPI':
self.do_load_xml_reloaded(root)
else:
self.do_load_xml_v1(root)
[docs] def do_load_xml_reloaded(self,root):
"""
Function to load a xml config to papi and apply the configuration.
:param path: path to xml file to load.
:type path: basestring
:return:
"""
gui_config = {}
plugins_to_start = []
parameters_to_change = []
signals_to_change = []
subs_to_make = []
try:
for root_element in root:
##########################
# Read gui configuration #
##########################
if root_element.tag == 'Configuration':
for property in root_element:
gui_config[property.tag] = {}
for attr in property:
if len(attr) == 0:
gui_config[property.tag][attr.tag] = attr.text
else:
gui_config[property.tag][attr.tag] = {}
for val in attr:
gui_config[property.tag][attr.tag][val.tag] = val.text
if root_element.tag == 'Plugins':
#############################
# Read plugin configuration #
#############################
for plugin_xml in root_element:
plObj = {}
plObj['uname'] = self.change_uname_to_uniqe(plugin_xml.attrib['uname'])
plObj['identifier'] = plugin_xml.find('Identifier').text
config_xml = plugin_xml.find('StartConfig')
config_hash = {}
for parameter_xml in config_xml.findall('Parameter'):
para_name = parameter_xml.attrib['Name']
config_hash[para_name] = {}
for detail_xml in parameter_xml:
detail_name = detail_xml.tag
config_hash[para_name][detail_name] = detail_xml.text
plObj['cfg'] = config_hash
plugins_to_start.append(plObj)
# --------------------------------
# Load PreviousParameters
# --------------------------------
prev_parameters_xml = plugin_xml.find('PreviousParameters')
if prev_parameters_xml is not None:
for prev_parameter_xml in prev_parameters_xml.findall('Parameter'):
para_name = prev_parameter_xml.attrib['Name']
para_value = prev_parameter_xml.text
# pl_uname_new = self.change_uname_to_uniqe(pl_uname)
# TODO validate NO FLOAT in parameter
parameters_to_change.append([plObj['uname'], para_name, para_value])
# --------------------------------
# Load DBlocks due to signals name
# --------------------------------
dblocks_xml = plugin_xml.find('DBlocks')
if dblocks_xml is not None:
for dblock_xml in dblocks_xml:
dblock_name = dblock_xml.attrib['Name']
dsignals_xml = dblock_xml.findall('DSignal')
for dsignal_xml in dsignals_xml:
dsignal_uname = dsignal_xml.attrib['uname']
dsignal_dname = dsignal_xml.find('dname').text
signals_to_change.append([plObj['uname'], dblock_name, dsignal_uname, dsignal_dname])
if root_element.tag == 'Subscriptions':
for sub_xml in root_element:
#TODO: Ask stefan: Why this line?
#dest = self.change_uname_to_uniqe(sub_xml.find('Destination').text)
dest = sub_xml.find('Destination').text
for source in sub_xml:
if source.tag == 'Source':
sourceName = source.attrib['uname'] #self.change_uname_to_uniqe(source.attrib['uname'])
for block_xml in source:
blockName = block_xml.attrib['name']
alias = block_xml.find('Alias').text
signals_xml = block_xml.find('Signals')
signals = []
for sig_xml in signals_xml:
signals.append(sig_xml.text)
subs_to_make.append({'dest':dest, 'source':sourceName, 'block':blockName, 'alias':alias, 'signals':signals})
self.set_gui_config_function(gui_config)
except Exception as E:
tb = traceback.format_exc()
self.error_occured.emit("Error: Config Loader", "Not loadable", tb)
# -----------------------------------------------
# Check: Are there unloadable plugins?
# -----------------------------------------------
unloadable_plugins = []
for pl in plugins_to_start:
plugin_info = self.pluginManager.getPluginByName(pl['identifier'])
if plugin_info is None:
if pl['identifier'] not in unloadable_plugins:
unloadable_plugins.append(pl['identifier'])
if not len(unloadable_plugins):
for pl in plugins_to_start:
self.do_create_plugin(pl['identifier'], pl['uname'], pl['cfg'])
self.config_loader_subs_reloaded(plugins_to_start, subs_to_make, parameters_to_change, signals_to_change)
else:
self.error_occured.emit("Error: Loading Plugins", "Can't use: " + str(unloadable_plugins) +
"\nConfiguration will not be used.", None)
[docs] def config_loader_subs_reloaded(self, pl_to_start, subs_to_make, parameters_to_change, signals_to_change):
"""
Function for callback when timer finished to apply
subscriptions and parameter changed of config.
:param pl_to_start: list of plugins to start
:type pl_to_start: list
:param subs_to_make: list of subscriptions to make
:type subs_to_make: list
:param parameters_to_change: parameter changes to apply
:type parameters_to_change: list
:param signals_to_change: signal name changes to apply
:type signals_to_change: list
:return:
"""
for sub in subs_to_make:
self.do_subscribe_uname(sub['dest'], sub['source'], sub['block'], sub['signals'], sub['alias'])
for para in parameters_to_change:
self.do_set_parameter_uname(para[0], para[1], para[2])
for sig in signals_to_change:
plugin_uname = sig[0]
dblock_name = sig[1]
dsignal_uname = sig[2]
dsignal_dname = sig[3]
self.do_edit_plugin_uname(plugin_uname, DBlock(dblock_name),{'edit': DSignal(dsignal_uname, dsignal_dname)})
[docs] def do_load_xml_v1(self, root):
"""
Function to load a xml config to papi and apply the configuration.
:param path: path to xml file to load.
:type path: basestring
:return:
"""
plugins_to_start = []
subs_to_make = []
parameters_to_change = []
signals_to_change = []
try:
for plugin_xml in root:
##########################
# Read gui configuration #
##########################
if plugin_xml.tag == 'guiConfig':
cfg = {}
for property in plugin_xml:
cfg[property.tag] = {}
for attr in property:
cfg[property.tag][attr.tag] = {}
for value in attr:
cfg[property.tag][attr.tag][value.tag] = value.text
self.set_gui_config_function(cfg)
#############################
# Read plugin configuration #
#############################
if plugin_xml.tag == 'Plugin':
pl_uname = plugin_xml.attrib['uname']
identifier = plugin_xml.find('Identifier').text
config_xml = plugin_xml.find('StartConfig')
config_hash = {}
for parameter_xml in config_xml.findall('Parameter'):
para_name = parameter_xml.attrib['Name']
config_hash[para_name] = {}
for detail_xml in parameter_xml:
detail_name = detail_xml.tag
config_hash[para_name][detail_name] = detail_xml.text
pl_uname_new = self.change_uname_to_uniqe(pl_uname)
plugins_to_start.append([identifier, pl_uname_new, config_hash])
# --------------------------------
# Load Subscriptions
# --------------------------------
subs_xml = plugin_xml.find('Subscriptions')
if subs_xml is not None:
for sub_xml in subs_xml.findall('Subscription'):
data_source = sub_xml.find('data_source').text
for block_xml in sub_xml.findall('block'):
block_name = block_xml.attrib['Name']
signals = []
for sig_xml in block_xml.findall('Signal'):
signals.append(str(sig_xml.text))
alias_xml = block_xml.find('alias')
alias = alias_xml.text
pl_uname_new = self.change_uname_to_uniqe(pl_uname)
data_source_new = data_source #self.change_uname_to_uniqe(data_source)
subs_to_make.append([pl_uname_new, data_source_new, block_name, signals, alias])
# --------------------------------
# Load PreviousParameters
# --------------------------------
prev_parameters_xml = plugin_xml.find('PreviousParameters')
if prev_parameters_xml is not None:
for prev_parameter_xml in prev_parameters_xml.findall('Parameter'):
para_name = prev_parameter_xml.attrib['Name']
para_value = prev_parameter_xml.text
pl_uname_new = self.change_uname_to_uniqe(pl_uname)
# TODO validate NO FLOAT in parameter
parameters_to_change.append([pl_uname_new, para_name, para_value])
# --------------------------------
# Load DBlocks due to signals name
# --------------------------------
dblocks_xml = plugin_xml.find('DBlocks')
if dblocks_xml is not None:
for dblock_xml in dblocks_xml:
dblock_name = dblock_xml.attrib['Name']
dsignals_xml = dblock_xml.findall('DSignal')
for dsignal_xml in dsignals_xml:
dsignal_uname = dsignal_xml.attrib['uname']
dsignal_dname = dsignal_xml.find('dname').text
signals_to_change.append([pl_uname, dblock_name, dsignal_uname, dsignal_dname])
except Exception as E:
tb = traceback.format_exc()
self.error_occured.emit("Error: Config Loader", "Not loadable", tb)
# -----------------------------------------------
# Check: Are there unloadable plugins?
# -----------------------------------------------
unloadable_plugins = []
for pl in plugins_to_start:
plugin_info = self.pluginManager.getPluginByName(pl[0])
if plugin_info is None:
if pl[0] not in unloadable_plugins:
unloadable_plugins.append(pl[0])
if not len(unloadable_plugins):
for pl in plugins_to_start:
# 0: ident, 1: uname, 2: config
self.do_create_plugin(pl[0], pl[1], pl[2])
# QtCore.QTimer.singleShot(CONFIG_LOADER_SUBSCRIBE_DELAY, \
# lambda: self.config_loader_subs(plugins_to_start, subs_to_make, \
# parameters_to_change, signals_to_change))
self.config_loader_subs(plugins_to_start, subs_to_make, parameters_to_change, signals_to_change)
else:
self.error_occured.emit("Error: Loading Plugins", "Can't use: " + str(unloadable_plugins) +
"\nConfiguration will not be used.", None)
[docs] def change_uname_to_uniqe(self, uname):
"""
Function will search for unames and add an indentifier to it to make it unique in case of existence
:param uname: uname to make unique
:type uname: basestring
:return: uname
"""
i = 1
while self.gui_data.get_dplugin_by_uname(uname) is not None:
i = i + 1
if i == 2:
uname = uname + 'X' + str(i)
else:
uname = uname[:-1] + str(i)
return uname
[docs] def config_loader_subs(self, pl_to_start, subs_to_make, parameters_to_change, signals_to_change):
"""
Function for callback when timer finished to apply
subscriptions and parameter changed of config.
:param pl_to_start: list of plugins to start
:type pl_to_start: list
:param subs_to_make: list of subscriptions to make
:type subs_to_make: list
:param parameters_to_change: parameter changes to apply
:type parameters_to_change: list
:param signals_to_change: signal name changes to apply
:type signals_to_change: list
:return:
"""
for sub in subs_to_make:
self.do_subscribe_uname(sub[0], sub[1], sub[2], sub[3], sub[4])
for para in parameters_to_change:
self.do_set_parameter_uname(para[0], para[1], para[2])
for sig in signals_to_change:
plugin_uname = sig[0]
dblock_name = sig[1]
dsignal_uname = sig[2]
dsignal_dname = sig[3]
self.do_edit_plugin_uname(plugin_uname, DBlock(dblock_name),
{'edit': DSignal(dsignal_uname, dsignal_dname)})
[docs] def do_save_xml_config_reloaded(self,path, plToSave=[], sToSave=[], saveUserSettings=False):
"""
:param path:
:param plToSave:
:param sToSave:
:return:
"""
subscriptionsToSave = {}
# check for xml extension in path, add .xml if missing
if path[-4:] != '.xml':
path += '.xml'
try:
root = ET.Element(CONFIG_ROOT_ELEMENT_NAME_RELOADED)
root.set('Date', datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
root.set('PaPI_version', CORE_PAPI_VERSION)
##########################
# Save gui configuration #
##########################
gui_cfg = self.get_gui_config_function(save_user_settings=saveUserSettings)
gui_cfg_xml = ET.SubElement(root, 'Configuration')
for cfg_item in gui_cfg:
item_xml = ET.SubElement(gui_cfg_xml,cfg_item)
item = gui_cfg[cfg_item]
for attr_name in item:
attr_xml = ET.SubElement(item_xml, attr_name)
values = item[attr_name]
# check if there is another dict level to explore
# if true: exlplore the dict
# if false: save the value of values in the parent xml node
if isinstance(values,dict):
for val in values:
value_xml = ET.SubElement(attr_xml, val)
value_xml.text = values[val]
else:
attr_xml.text = values
# ---------------------------------------
# save information of plugins
# for the next start
# ---------------------------------------
plugins_xml = ET.SubElement(root,'Plugins')
plugins = self.gui_data.get_all_plugins()
for dplugin_id in plugins:
dplugin = plugins[dplugin_id]
# check if this plugin should be saved to XML
if dplugin.uname in plToSave:
if dplugin.type == PLUGIN_VIP_IDENTIFIER:
dplugin.startup_config = dplugin.plugin.pl_get_current_config()
pl_xml = ET.SubElement(plugins_xml, 'Plugin')
pl_xml.set('uname', dplugin.uname)
identifier_xml = ET.SubElement(pl_xml, 'Identifier')
identifier_xml.text = dplugin.plugin_identifier
# ---------------------------------------
# Save all current config as startup config
# for the next start
# ---------------------------------------
cfg_xml = ET.SubElement(pl_xml, 'StartConfig')
for parameter in dplugin.startup_config:
para_xml = ET.SubElement(cfg_xml, 'Parameter')
para_xml.set('Name', parameter)
for detail in dplugin.startup_config[parameter]:
if detail not in CONFIG_SAVE_CFG_BLACKLIST:
detail_xml = ET.SubElement(para_xml, detail)
detail_xml.text = dplugin.startup_config[parameter][detail]
# ---------------------------------------
# Save all current values for all
# parameter
# ---------------------------------------
last_paras_xml = ET.SubElement(pl_xml, 'PreviousParameters')
allparas = dplugin.get_parameters()
for para_key in allparas:
para = allparas[para_key]
last_para_xml = ET.SubElement(last_paras_xml, 'Parameter')
last_para_xml.set('Name', para_key)
last_para_xml.text = str(para.value)
# ---------------------------------------
# Save all current values for all
# signals of all dblocks
# ---------------------------------------
dblocks_xml = ET.SubElement(pl_xml, 'DBlocks')
alldblock_names = dplugin.get_dblocks()
for dblock_name in alldblock_names:
dblock = alldblock_names[dblock_name]
dblock_xml = ET.SubElement(dblocks_xml, 'DBlock')
dblock_xml.set('Name', dblock.name)
alldsignals = dblock.get_signals()
for dsignal in alldsignals:
if dsignal.uname != CORE_TIME_SIGNAL:
dsignal_xml = ET.SubElement(dblock_xml, 'DSignal')
dsignal_xml.set('uname', dsignal.uname)
dname_xml = ET.SubElement(dsignal_xml, 'dname')
dname_xml.text = dsignal.dname
# ---------------------------------------
# Build temporary subscription objects
# to remember the subs of all plugins
# including plugins that are not saved
# excluding plugins we do not want the subs saved of
# ---------------------------------------
if dplugin.uname in sToSave:
subsOfPl = {}
subs = dplugin.get_subscribtions()
for sub in subs:
sourcePL = self.gui_data.get_dplugin_by_id(sub).uname
subsOfPl[sourcePL] = {}
for block in subs[sub]:
subsOfPl[sourcePL][block] = {}
dsubscription = subs[sub][block]
subsOfPl[sourcePL][block]['alias'] = dsubscription.alias
signals = []
for s in dsubscription.get_signals():
signals.append( str(s))
subsOfPl[sourcePL][block]['signals'] = signals
if len(subsOfPl) != 0:
subscriptionsToSave[dplugin.uname] = subsOfPl
# ---------------------------------------
# save subs to xml
#
# ---------------------------------------
subs_xml = ET.SubElement(root,'Subscriptions')
for dest in subscriptionsToSave:
sub_xml = ET.SubElement(subs_xml,'Subscription')
# Destination of data
dest_xml = ET.SubElement(sub_xml,'Destination')
dest_xml.text = dest
for source in subscriptionsToSave[dest]:
# Source of Data
source_xml = ET.SubElement(sub_xml,'Source')
source_xml.set('uname',source)
for block in subscriptionsToSave[dest][source]:
block_xml = ET.SubElement(source_xml,'Block')
block_xml.set('name',block)
alias_xml = ET.SubElement(block_xml,'Alias')
alias_xml.text = subscriptionsToSave[dest][source][block]['alias']
signal_xml = ET.SubElement(block_xml,'Signals')
for sig in subscriptionsToSave[dest][source][block]['signals']:
if sig != CORE_TIME_SIGNAL:
sig_xml = ET.SubElement(signal_xml,'Signal')
sig_xml.text = sig
# do transformation for readability and save xml tree to file
self.indent(root)
tree = ET.ElementTree(root)
tree.write(path)
except Exception as E:
tb = traceback.format_exc()
self.error_occured.emit("Error: Config Loader", "Not saveable: " + path, tb)
[docs] def do_save_xml_config(self, path):
"""
This function will save papis current state to a xml file provided by path.
:param path: path to save xml to.
:type path: basestring
:return:
"""
raise Exception('do_save_xml_config_reloaded must be used')
[docs] def do_save_json_config_reloaded(self, path, plToSave=[], sToSave=[]):
if path[-5:] != '.json':
path += '.json'
json_config = {}
to_create = {}
to_control = {}
to_sub = {}
plugins = self.gui_data.get_all_plugins()
for dplugin_id in plugins:
dplugin = plugins[dplugin_id]
# check if this plugin should be saved to XML
if dplugin.uname in plToSave:
if dplugin.type == PLUGIN_VIP_IDENTIFIER:
dplugin.startup_config = dplugin.plugin.pl_get_current_config()
to_create[dplugin.uname] = {}
to_create[dplugin.uname]["identifier"] = {'value' : dplugin.plugin_identifier}
plugin_config = {}
for config in dplugin.startup_config:
for value in dplugin.startup_config[config]:
if value == 'value':
plugin_config[config] = {'value' : dplugin.startup_config[config][value]}
to_create[dplugin.uname]["config"] = plugin_config
if dplugin.uname in sToSave:
subsOfPl = {}
subs = dplugin.get_subscribtions()
for sub in subs:
sourcePL = self.gui_data.get_dplugin_by_id(sub).uname
subsOfPl[sourcePL] = {}
for block in subs[sub]:
subsOfPl[sourcePL][block] = {}
dsubscription = subs[sub][block]
subsOfPl[sourcePL]['alias'] = dsubscription.alias
print(sourcePL)
print(block)
if dsubscription.alias is not None:
if sourcePL not in to_control:
to_control[sourcePL] = {}
to_control[sourcePL][block] = {'parameter' : dsubscription.alias}
else:
signals = []
for s in dsubscription.get_signals():
signals.append( str(s))
to_sub[dplugin.uname] = {}
to_sub[dplugin.uname]['signals'] = signals
to_sub[dplugin.uname]['block'] = block
to_sub[dplugin.uname]['plugin'] = sourcePL
if len(to_create):
json_config["ToCreate"] = to_create;
if len(to_control):
json_config["ToControl"] = to_control
if len(to_sub):
json_config["ToSub"] = to_sub
papi_config = {"PaPIConfig" : json_config}
try:
with open(path, 'w') as outfile:
json.dump(papi_config, outfile, indent=' ')
except:
pass
[docs] def indent(self, elem, level=0):
"""
Function which will apply a nice looking indentiation to xml structure before save. Better readability.
copied from http://effbot.org/zone/element-lib.htm#prettyprint 06.10.2014 15:53
:param elem:
:param level:
:return:
"""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
self.indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
[docs] def do_reset_papi(self):
"""
APi call to reset PaPI.
Reset in this case means to delete all plugins cleanly and keep PaPI running.
Will free all unames.
Is using the do_delete_plugin api call and the delete plugin mechanism
:return: ERROR CODE
"""
all_plugins = self.gui_data.get_all_plugins()
if all_plugins is not None:
for plugin_key in all_plugins:
plugin = all_plugins[plugin_key]
self.do_delete_plugin(plugin.id)
[docs] def do_test_name_to_be_unique(self, name):
"""
Will check if a given name would be a valid, unique name for a plugin.
:param name: name to check
:type name: basestring
:return: True or False
"""
reg = QtCore.QRegExp('\S[^_][^\W_]+')
if reg.exactMatch(name):
if self.gui_data.get_dplugin_by_uname(name) is None:
return True
else:
return False
else:
return False
[docs] def do_change_string_to_be_uname(self, name):
"""
This method will take a string and convert him according to some rules to be an uname
:param name: name to convert to unmae
:type name: basestring
:return: name converted to uname
"""
uname = name
# TODO: get more inteligence here!
forbidden = ['_', ',', '.', '`', ' ']
for c in forbidden:
uname = uname.replace(c, 'X')
return uname