#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Copyright (C) 2014 Technische Universität Berlin,
Fakultät IV - Elektrotechnik und Informatik,
Fachgebiet Regelungssysteme,
Einsteinufer 17, D-10587 Berlin, Germany
This file is part of PaPI.
PaPI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PaPI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PaPI. If not, see <http://www.gnu.org/licenses/>.
Contributors
Stefan Ruppin
"""
import os
from multiprocessing import Process, Queue
from threading import Timer
import copy
from papi.yapsy.PluginManager import PluginManager
from papi.data.DCore import DCore
from papi.data.DPlugin import DPlugin, DBlock
from papi.data.DSignal import DSignal
from papi.ConsoleLog import ConsoleLog
from papi.data.DOptionalData import DOptionalData
import papi.constants as pc
from papi.constants import CORE_PROCESS_CONSOLE_IDENTIFIER, CORE_CONSOLE_LOG_LEVEL, CORE_PAPI_CONSOLE_START_MESSAGE, \
CORE_CORE_CONSOLE_START_MESSAGE, CORE_ALIVE_CHECK_ENABLED, \
CORE_STOP_CONSOLE_MESSAGE, CORE_ALIVE_CHECK_INTERVAL, CORE_ALIVE_MAX_COUNT, CORE_TIME_SIGNAL
from papi.constants import PLUGIN_ROOT_FOLDER_LIST, PLUGIN_VIP_IDENTIFIER, \
PLUGIN_DPP_IDENTIFIER, PLUGIN_STATE_PAUSE, PLUGIN_STATE_RESUMED, \
PLUGIN_STATE_START_SUCCESFUL, PLUGIN_STATE_START_FAILED, PLUGIN_STATE_ALIVE, PLUGIN_STATE_STOPPED, \
PLUGIN_STATE_DEAD
import papi.error_codes as ERROR
from papi.event.event_base import PapiEventBase
import papi.event as Event
import signal
[docs]def run_core_in_own_process(gui_queue, core_queue, gui_id):
core = Core(None,False,False)
core.gui_id = gui_id
core.core_event_queue = core_queue
core.gui_event_queue = gui_queue
core.run()
[docs]class Process_dummy(object):
def __init__(self):
self.pid= 0
[docs]class Core:
def __init__(self, gui_start_function = None, use_gui=True, is_parent = True, gui_process_pid = None, args=None):
"""
Init function of the PaPI Core.
Creates all data needed to use core and core.run() functions
.. document private functions
.. automethod:: __*
"""
# switch case structure for processing incoming events
self.__process_event_by_type__ = {'status_event': self.__process_status_event__,
'data_event': self.__process_data_event__,
'instr_event': self.__process_instr_event__,
}
self.__process_status_event_l__ = {'start_successfull': self.__process_start_successfull__,
'start_failed': self.__process_start_failed__,
'alive': self.__process_alive__,
'join_request': self.__process_join_request__,
'plugin_stopped': self.__process_plugin_stopped__
}
self.__process_data_event_l__ = {'new_data': self.__process_new_data__,
'new_block': self.__process_new_block__,
'new_parameter': self.__process_new_parameter__,
'edit_dplugin': self.__process_edit_dplugin,
'edit_dplugin_by_uname': self.__process_edit_dplugin_by_uname__,
'delete_block': self.__process_delete_block__,
'delete_parameter': self.__process_delete_parameter__
}
self.__process_instr_event_l__ = {'create_plugin': self.__process_create_plugin__,
'stop_plugin': self.__process_stop_plugin__,
'stop_plugin_by_uname': self.__process_stop_plugin_by_uname__,
'close_program': self.__process_close_programm__,
'subscribe': self.__process_subscribe__,
'subscribe_by_uname': self.__process_subscribe_by_uname__,
'unsubscribe': self.__process_unsubsribe__,
'set_parameter': self.__process_set_parameter__,
'set_parameter_by_uname': self.__process_set_parameter__,
'pause_plugin': self.__process_pause_plugin__,
'resume_plugin': self.__process_resume_plugin__,
'start_plugin': self.__process_start_plugin__
}
# Startup parameter to configure core
self.args = args
self.is_parent = is_parent
# creating the main core data object DCore and core queue
self.core_data = DCore()
self.core_goOn = 1
self.core_id = 0
# setting up the yapsy plugin manager for directory structure
self.plugin_manager = PluginManager()
self.plugin_manager.setPluginPlaces(PLUGIN_ROOT_FOLDER_LIST)
# define gui information. ID and alive status as well as gui queue for events
self.gui_id = self.core_data.create_id()
self.gui_alive = False
self.use_gui = use_gui
# set information for console logging part (debug information)
self.log = ConsoleLog(CORE_CONSOLE_LOG_LEVEL, CORE_PROCESS_CONSOLE_IDENTIFIER)
self.log.lvl = pc.CORE_CONSOLE_LOG_LEVEL
try:
if args:
if args.debug_level:
self.log.lvl = int(args.debug_level)
except:
pass
# define variables for check alive system, e.a. timer and counts
self.alive_interval = CORE_ALIVE_CHECK_INTERVAL
self.alive_count_max = CORE_ALIVE_MAX_COUNT
self.alive_timer = Timer(self.alive_interval, self.check_alive_callback)
self.alive_count = 0
self.gui_alive_count = 0
# Setup queues and processes for inter-process communication between core and gui
self.core_event_queue = None
self.gui_event_queue = None
self.gui_process = Process_dummy()
self.gui_process.pid = gui_process_pid
self.gui_alive = True
# Check whether core is set to be parent and then check parameter to be valid
if is_parent:
if gui_start_function is None:
raise Exception('Core started with wrong arguments')
if use_gui is True and is_parent is False:
raise Exception('Core started with wrong arguments')
if is_parent is False and gui_process_pid is None:
raise Exception('Core started with wrong arguments')
# core is set to be parent, start gui process from core
self.gui_start_function = gui_start_function
self.core_event_queue = Queue()
self.gui_event_queue = Queue()
# list/queue for events that should be delayed before execution
self.core_delayed_operation_queue = []
# connect CTRL+C signaling from cmd to quit PaPI
signal.signal(signal.SIGINT, lambda sig,frame,core=self: self.signal_handler(sig,frame,core))
[docs] def run(self):
"""
Main operation function of core.
Event loop is in here.
:return:
"""
# logs: some startup information
self.log.printText(1, CORE_PAPI_CONSOLE_START_MESSAGE)
self.log.printText(1, CORE_CORE_CONSOLE_START_MESSAGE + ' .. Process id: ' + str(os.getpid()))
# start the GUI process to show GUI, set GUI alive status to TRUE
# don't start GUI in case that CORE is not parent and that the gui should be used
if self.use_gui and self.is_parent:
self.gui_process = Process(target=self.gui_start_function,
args=(self.core_event_queue, self.gui_event_queue, self.gui_id, self.args))
self.gui_process.start()
# start the check alive timer if heartbeat is activated
if CORE_ALIVE_CHECK_ENABLED is True:
self.log.printText(1, 'Alive check of processes is enabled')
self.alive_timer.start()
# core main operation loop
# Receiving and reaction to events
while self.core_goOn:
# get event from queue, blocks until there is a new event
# blocking prevents unnecessary iterations since core is only reacting to events
event = self.core_event_queue.get()
# LOG: Debug output
self.log.printText(2, 'Event->' + event.get_eventtype() + ' ' + event.get_event_operation())
# process the next event of queue
# catch error if event is unknown, i.e. no callback function exits for it
try:
self.__process_event__(event)
except Exception as Exc:
print(Exc)
# check if there are still plugins alive or gui is still alive before ending core main loop
self.core_goOn = self.core_data.get_dplugins_count() != 0 or self.gui_alive
# core main loop ended, so cancel alive timer
self.alive_timer.cancel()
# LOG: core finished operation and did clean shutdown
self.log.printText(1, CORE_STOP_CONSOLE_MESSAGE)
[docs] def signal_handler(self,signal, frame,core):
"""
Callback-handler for signal from cmd line like e.g. SIGINT
:return:
"""
pass
[docs] def send_alive_check_events(self):
"""
Function for check_alive_status timer to send check_alive events to all running processes
This will trigger all correctly running processes to send an answer to core.
Alive response is handled in method '__process_alive__'
:return:
"""
# get list of all plugins [hash: id->dplug], type DPlugin
dplugs = self.core_data.get_all_plugins()
for id in dplugs:
# get dplugin object
plug = dplugs[id]
# check if this plugin runs in a separate process
if plug.own_process is True:
# send check_alive_status event to plugin process (via queue)
event = Event.status.CheckAliveStatus(self.core_id, plug.id, None)
plug.queue.put(event)
# send check_alive_status event to gui process (via queue)
event = Event.status.CheckAliveStatus(self.core_id, self.gui_id, None)
self.gui_event_queue.put(event)
[docs] def handle_alive_situation(self):
"""
Function which handles the check for the alive situation of all plugins in data base
Will distribute to error_handling methods to handle dead processes
This is no callback functions, more a rather periodically called method to check the alive situation.
Is called by the handler which handles the alive timer callbacks
It checks the counter of the plugins and compares them against the core counter. A not equal (normally smaller)
number in the plugin object indicates a dead process. A error handler is called.
:return:
"""
# get all plugins from core data [hash: id->DPlugin]
dplugs = self.core_data.get_all_plugins()
if dplugs is not None:
for id in dplugs:
# get DPlugin object
plug = dplugs[id]
# check if plugins runs in a separate process
if plug.own_process is True:
# check if counts are equal: equal indicates that plugin is alive
if plug.alive_count is self.alive_count:
self.log.printText(2, 'Plugin ' + plug.uname + ' is still alive')
# change plugin state in DCore
plug.alive_state = PLUGIN_STATE_ALIVE
else:
# Plugin is not alive anymore, so do error handling
self.plugin_process_is_dead_error_handler(plug)
# check for gui status and do error handling
if self.gui_alive_count is self.alive_count:
self.log.printText(2, 'GUI is still ALIVE')
else:
self.gui_is_dead_error_handler()
[docs] def gui_is_dead_error_handler(self):
"""
Error handler for the case that the gui is dead.
The handler gets called by the method 'handle_alive_situation' in case the gui counter does not match
:return:
"""
self.log.printText(1, 'GUI is DEAD')
self.log.printText(1,
'core count: ' + str(self.alive_count) + ' vs. ' + str(self.gui_alive_count) + ' :gui count')
[docs] def plugin_process_is_dead_error_handler(self, dplug):
"""
Error handler for the case that the a plugin is dead.
The handler gets called by the method 'handle_alive_situation' in case the plugin counter does not match
:param dplug: Plugin which is dead
:type dplug: DPlugin
:return:
"""
# LOG: print situation to the console
self.log.printText(1, 'Plugin ' + dplug.uname + ' is DEAD')
self.log.printText(1,
'core count: ' + str(self.alive_count) + ' vs. ' + str(dplug.alive_count) + ' :plugin count')
# set the plugin state to DEAD and update meta data to let the GUI know
dplug.alive_state = PLUGIN_STATE_DEAD
self.update_meta_data_to_gui(dplug.id)
[docs] def check_alive_callback(self):
"""
Callback function for check_alive status timer, get called every time the timer triggers, does sth and starts a
new timer.
Handles sending events to processes and checking their answer.
:return:
"""
# LOG: alive timer triggered
self.log.printText(2, 'check alive')
# check for answer status of all plugins
self.handle_alive_situation()
# increase the global check_alive counter
self.alive_count += 1
self.alive_count = self.alive_count % self.alive_count_max
# send new check alive events to plugins
self.send_alive_check_events()
# start a new one shot timer with this callback function as the next alive interval
if CORE_ALIVE_CHECK_ENABLED is True:
self.alive_timer = Timer(self.alive_interval, self.check_alive_callback)
self.alive_timer.start()
[docs] def handle_parameter_change(self, plugin, parameter_name, value):
"""
This function should be called whenever there is a new_data event for changing a parameter value
This function changes the value in DCore and updates this information to the GUI via a meta update
:param plugin: Plugin which owns the parameter
:type plugin: DPlugin
:param parameter_name: Name of the parameter which value should be changed
:type parameter_name: basestring
:param value: new value for the parameter
:type value: all possible, depends on plugin
:return: -1 for error(parameter with parameter_name does not exist in plugin), 1 for o.k., done
"""
# gets the parameter list of the plugin for witch a parameter was changed
allparas = plugin.get_parameters()
if parameter_name in allparas:
# parameter exists in the plugin, so change its value.
para = allparas[parameter_name]
para.value = value
# Then, update the change to the gui by performing a meta update
self.update_meta_data_to_gui(plugin.id)
return 1
else:
return -1
[docs] def new_subscription(self, subscriber_id, source_id, block_name, signals, sub_alias, orginal_event=None):
"""
Gets information of a new and desired subscription.
This method will try to create the wanted subscription.
:param subscriber_id: Id of the plugin that want to get data
:param source_id: Id of the plugin that will be the data source
:param block_name: name of source block of source plugin
:param signals: signals to subscribe
:param sub_alias: optional alias for parameter
:return:
"""
already_sub = False
# check if already subscribed
source_pl = self.core_data.get_dplugin_by_id(source_id)
if source_pl is not None:
# only check plugins which are started successfully, otherwise delay this operation.
if source_pl.state != PLUGIN_STATE_START_SUCCESFUL:
# source for subscription does not exists yet, so delay subscription event again
self.core_delayed_operation_queue.append(orginal_event)
self.log.printText(2, 'subscribe, event was placed in delayed queue because plugin with id: '
+ str(source_id) + ' is still starting.')
return 0
# plugin is running correctly, so get its blocks to check if the subscriber already exists
blocks = source_pl.get_dblocks()
if block_name in blocks:
b = blocks[block_name]
subs = b.get_subscribers()
if subscriber_id in subs:
# subscriber is already listed, so remember that
already_sub = True
# Check if the subscription is necessary
if already_sub is False:
dsubscription = self.core_data.subscribe(subscriber_id, source_id, block_name)
if dsubscription is None:
# subscription failed
# maybe the block does not exists yet, so delay subscription event again
self.core_delayed_operation_queue.append(orginal_event)
self.log.printText(1, 'subscribe, something failed in subscription process with subscriber id: ' + str(
subscriber_id) + '..target id:' + str(source_id) + '..and block ' + str(block_name))
return -1
else:
# subscription correct
# set alias for parameter control (can be none if no parameter)
dsubscription.alias = sub_alias
# send event to plugin which will control this parameter with information of parameter
if sub_alias is not None:
sub_pl = self.core_data.get_dplugin_by_id(subscriber_id)
if sub_pl is not None:
parameters = sub_pl.get_parameters()
if sub_alias in parameters:
parameter = parameters[sub_alias]
para_event = Event.status.ParameterInfo(self.core_id, source_id, copy.deepcopy(parameter) )
source_pl.queue.put(para_event)
if signals != []:
if self.core_data.subscribe_signals(subscriber_id, source_id, block_name, signals) is None:
# subscription failed
self.log.printText(1, 'subscribe, something failed in subsription process with subscriber id: ' + str(
subscriber_id) + '..target id:' + str(source_id) + '..and block ' + str(block_name)
+ '. A Problem with signals')
return -1
else:
pass
# LOG: successful
self.log.printText(1, 'subscribe, subscribtion correct: ' + str(subscriber_id) + '->(' + str(source_id) + ',' + str(
block_name) + ')')
# update meta data of subscriber and source plugin, since the relation between those two changed due to this method.
self.update_meta_data_to_gui(subscriber_id)
self.update_meta_data_to_gui(source_id)
return ERROR.NO_ERROR
[docs] def check_events_in_delayed_queue(self):
"""
Process all events which are in the delayed operation queue by poping them out and inserting them to the main
core event queue 'core_event_queue'
:return:
"""
while len(self.core_delayed_operation_queue) != 0:
self.core_event_queue.put(self.core_delayed_operation_queue.pop(0))
# ------- Event processing initial stage ---------
[docs] def __process_event__(self, event):
"""
Initial stage of event processing, dividing by event type
Extracts event type and give it to a switch case equivalent
:param event: event to process
:type event: PapiEventBase
"""
t = event.get_eventtype()
self.__process_event_by_type__[t](event)
# ------- Event processing first stage ---------
[docs] def __process_status_event__(self, event):
"""
First stage of event processing, deciding which status_event this is. Therefore, the method extracts the event
operation and processes it.
:param event: event to process
:type event: PapiEventBase
"""
op = event.get_event_operation()
return self.__process_status_event_l__[op](event)
[docs] def __process_data_event__(self, event):
"""
First stage of event processing, deciding which data_event this is
:param event: event to process
:type event: PapiEventBase
"""
op = event.get_event_operation()
return self.__process_data_event_l__[op](event)
[docs] def __process_instr_event__(self, event):
"""
First stage of event processing, deciding which instr_event this is. Therefore, the method extracts the event
operation and processes it.
:param event: event to process
:type event: PapiEventBase
"""
op = event.get_event_operation()
return self.__process_instr_event_l__[op](event)
# ------- Event processing second stage: status events ---------
[docs] def __process_start_successfull__(self, event):
"""
Process start_successful event.
A start_successful event is sent by plugins after their initialize function returned 'true' indicating a
successful start of the plugin before continuing with its main execution routine.
:param event: event to process
:type event: PapiEventBase
"""
# get plugin from DCore with id of event origin and check if it exists
dplug = self.core_data.get_dplugin_by_id(event.get_originID())
if (dplug != None):
# plugin exists and sent successfull event, so change it state
dplug.state = PLUGIN_STATE_START_SUCCESFUL
self.update_meta_data_to_gui(dplug.id)
# process delayed_operation_queue
self.check_events_in_delayed_queue()
return 1
else:
# plugin does not exist
self.log.printText(1, 'start_successfull_event, Event with id ' + str(
event.get_originID()) + ' but plugin does not exist')
return -1
[docs] def __process_start_failed__(self, event):
"""
Process start failed event and do error handling
This event occurs when a plugin returns false in its initialize routine. Therefore, it means that the start of
failed but the plugin noticed it itself and was able to return false.
This method will update the core database with the new state of the plugin. At the moment, no other actions are
done.
:param event: event to process
:type event: PapiEventBase
"""
# get plugin from DCore with id of event origin and check if it exists
dplug = self.core_data.get_dplugin_by_id(event.get_originID())
if (dplug != None):
# plugin exists but start failed, so change its state
dplug.state = PLUGIN_STATE_START_FAILED
return 1
else:
# plugin does not exist in DCore
self.log.printText(1, 'start_failed_event, Event with id ' + str(
event.get_originID()) + ' but plugin does not exist')
return -1
[docs] def __process_alive__(self, event):
"""
Processes alive response from processes/plugins and GUI, organising the counter,
i.e. increment GUI counter if alive event's origin is GUI, otherwise increment counter stored in dplugin object
Alive events that were sent by plugins are the response on alive request which were sent by the core alive timer.
An alive event indicates that the plugin is still running and responding to requests.
:param event: event to process
:type event: PapiEventBase
"""
# get the event origin
oID = event.get_originID()
# check if its the gui
if oID is self.gui_id:
# increment GUI counter
self.gui_alive_count += 1
self.gui_alive_count = self.gui_alive_count % self.alive_count_max
else:
# its not the Gui, so search for plugin in DCore
dplug = self.core_data.get_dplugin_by_id(oID)
# check if plugin exists. If plugin exists, increment its counter
if dplug is not None:
dplug.alive_count += 1
dplug.alive_count = dplug.alive_count % self.alive_count_max
return True
[docs] def __process_join_request__(self, event):
"""
Process join requests of processes
A join request is send from a plugin which will close itself. The requests wants the core to join the process of
the plugin for clean-up reasons. Therefore, the join request is just send by plugins running in an own process
(and not in the GUI).
The method is searching for the dplugin object in the core database to get the corresponding process object.
Then, the method joins the process. Note, that a join request is the last action of a plugin to ensure that the
core is not blocked by the process join procedure.
Afterwards, the plugin will be removed from the core's database. A event is send to the GUI process to inform the
GUI about the closed and removed plugin.
:param event: event to process
:type event: PapiEventBase
:type dplugin: DPlugin
"""
# get event origin id and its corresponding plugin object
pl_id = event.get_originID()
dplugin = self.core_data.get_dplugin_by_id(pl_id)
# check for existence
if dplugin is not None:
# get process of plugin and join it (is blocking but plugin call order ensures that no block can occur)
dplugin.process.join()
# remove plugin from DCore, because process was joined and thus, the plugin is identified as 'closed'
if self.core_data.rm_dplugin(dplugin.id) is False:
# remove failed
self.log.printText(1, 'join request, remove plugin with id ' + str(dplugin.id) + 'failed')
return -1
else:
# remove from DCore in core successful
# Check whether the GUI is still alive to ensure that no new events are put into a queue to a non-existing GUI process
if self.gui_alive is True:
# GUI is still alive, so tell GUI, that this plugin was closed
opt = DOptionalData()
opt.plugin_id = dplugin.id
event = Event.status.PluginClosed(self.core_id, self.gui_id, opt)
self.gui_event_queue.put(event)
self.log.printText(1, 'join_request, plugin with id ' + str(
dplugin.id) + ' was joined (uname: ' + dplugin.uname + ' )')
return ERROR.NO_ERROR
else:
# Plug does not exist
self.log.printText(1, 'join_request, Event with id ' + str(
event.get_originID()) + ' but plugin does not exist')
return -1
# ------- Event processing second stage: data events ---------
[docs] def __process_new_data__(self, event):
"""
Process new_data event from plugins.
Will do the routing: Subscriber/Subscription
This method is doing all the magic in PaPI.
It routes all data to its target and decides whether subscription are for data or parameters.
First, it is checked if the GUI is alive, since no data shall be processed if the GUI is already closed.
Then, the dplugin object of the origin of the data event is searched. Then, the block of the new data is searched.
With the block object of the origin plugin, the subscriber list can be extracted. Then, for every subscriber of the
block (which has new data available), the plugin object is searched. If a subscriber plugin is running (not dead/stopped),
the data event is routed to it. For subscriber running in the GUI, just one event is sent with multiple receipts.
Data events which represent parameter subscriptions are handles separately.
:param event: event to process
:type event: PapiEventBase
:type tar_plug: DPlugin
"""
# just proceed with new_data events if GUI is still alive (indicates that program will close)
if self.gui_alive:
# get event origin and optional parameter
oID = event.get_originID()
opt = event.get_optional_parameter()
# get origin plugin from DCore
dplug = self.core_data.get_dplugin_by_id(oID)
# check for existence
if dplug is not None:
# get data block of DPlugin with block_name from event
block = dplug.get_dblock_by_name(opt.block_name)
# check for existence of block with block_name
if block is not None:
# get subscriber list of block
subscriber = block.get_subscribers()
# id list dummy for event destination id list
id_list = []
# for all subscriber in subscriber list
for sub_id in subscriber:
# get plugin with sub_id and check for existence
pl = self.core_data.get_dplugin_by_id(sub_id)
if pl is not None:
if pl.state != PLUGIN_STATE_PAUSE or pl.state != PLUGIN_STATE_STOPPED:
# plugin exists, check whether it is a ViP
if pl.type == PLUGIN_VIP_IDENTIFIER:
# Because its a ViP, we need a list of destination ID for new_data
id_list.append(pl.id)
else:
# Plugin is not running in GUI, so just 1:1 relation for event and destinations
opt.parameter_alias = pl.get_subscribtions()[oID][opt.block_name].alias
new_event = Event.data.NewData(oID, [pl.id], opt, source_plugin_uname= dplug.uname)
pl.queue.put(new_event)
else:
# plugin is paused
pass
else:
# pluign with sub_id does not exist in DCore of core
self.log.printText(1, 'new_data, subscriber plugin with id ' + str(
sub_id) + ' does not exists')
return -1
# check if our list with id is longer than 0, which will indicate that there is at least one ViP
# which will need to get this new data event
if len(id_list) > 0:
# send new_data event to GUI with id_list of destinations
opt = event.get_optional_parameter()
opt.parameter_alias = pl.get_subscribtions()[oID][opt.block_name].alias
new_event = Event.data.NewData(oID, id_list, opt, source_plugin_uname= dplug.uname)
self.gui_event_queue.put(new_event)
# this event will be a new parameter value for a plugin, so update dcore data
if opt.is_parameter is True:
try:
self.handle_parameter_change(pl, opt.parameter_alias, opt.data)
except:
self.log.printText(2, 'pl not assigned (Use of pcp parameter which isn\'t used)')
# process new_data seemed correct
return 1
else:
# block is None
self.log.printText(1, 'new_data, block with name ' + opt.block_name + ' does not exists')
return -1
else:
# Plugin of event origin does not exist in DCore of core
self.log.printText(1, 'new_data, Plugin with id ' + str(oID) + ' does not exist in DCore')
return -1
def __process_edit_dplugin(self, event):
"""
Process edit_dplugin event from gui.
TODO: SVEN
:param event: event to process
:type event: PapiEventBaseBase
:type tar_plug: DPlugin
"""
eObject = event.editedObject
cRequest = event.changeRequest
dID = event.get_destinatioID()
pl = self.core_data.get_dplugin_by_id(dID)
if pl is not None:
if pl.state != PLUGIN_STATE_START_SUCCESFUL:
self.core_delayed_operation_queue.append(event)
self.log.printText(2, 'edit_dplugin, event was placed in delayed queue because plugin with name: ' +
pl.uname + ' is still starting')
return 0
# DBlock of DPlugin should be edited
if isinstance(eObject, DBlock):
dblock = pl.get_dblock_by_name(eObject.name)
if dblock is not None:
if "edit" in cRequest:
cObject = cRequest["edit"]
# Signal should be modified in DBlock
if isinstance(cObject, DSignal):
dsignal = dblock.get_signal_by_uname(cObject.uname)
dsignal.dname = cObject.dname
self.log.printText(3,
'edit_dplugin, Edited Dblock ' + dblock.name + ' of DPlugin ' + pl.uname +
" : DSignal " + dsignal.uname + " to dname -> " + dsignal.dname)
self.update_meta_data_to_gui(pl.id, True)
[docs] def __process_edit_dplugin_by_uname__(self, event):
"""
This function processes edit plugin events and acts like a wrapper for the '__process_edit_dplugin' function which
requires plugin ids instead of unames.
Documentation of what the method does can be found in the method: __process_edit_dplugin
:param event:
:return:
"""
# Get dplugin object by uname out of core queue.
dplugin = self.core_data.get_dplugin_by_uname(event.plugin_uname)
# Check if plugin object exists
if dplugin is not None:
pl_id = dplugin.id
# create a new event with the corresponding id instead of the uname
idEvent = Event.data.EditDPlugin(self.gui_id, pl_id, event.editedObject, event.changeRequest)
# pass the event to the function which is wrapped here
self.__process_edit_dplugin(idEvent)
else:
self.log.printText(1, " Do edit plugin with uname " + event.plugin_uname + ' failed')
# ------- Event processing second stage: instr events ---------
[docs] def __process_create_plugin__(self, event):
"""
Processes create_plugin event.
So it will create a plugin, start a process if needed, send events to GUI to create a plugin and do the pre configuration
This method reacts to the create plugin event. Such an event, is mostly sent by the GUI.
The method extracts all the necessary plugin properties from the event.
Then, the yapsy plugin loader tries to find the plugin via its identifier. Afterwards, the PaPI dplugin object
is created and filled with all information. It is decided if the plugin runs in the GUI or in an own process.
Based on this, a new process is created or the plugin is redirected to the GUI process.
The startup configurations are merged and passed to the plugin.
:param event: event to process
:param optData: optional Data Object of event
:type event: PapiEventBase
:type optData: DOptionalData
:return: -1: Error
"""
# get optData to get information about which plugin to start
optData = event.get_optional_parameter()
self.log.printText(2,
'create_plugin, Try to create plugin with Name ' + optData.plugin_identifier
+ " and UName " + optData.plugin_uname)
# search yapsy plugin object with plugin_idientifier and check if it exists
plugin = self.plugin_manager.getPluginByName(optData.plugin_identifier)
if plugin is None:
# Plugin does not exist in yapsy manger, maybe it is new
# collect plugin information from file system again and recheck
self.plugin_manager.collectPlugins()
plugin = self.plugin_manager.getPluginByName(optData.plugin_identifier)
if plugin is None:
self.log.printText(1,
'create_plugin, Plugin with Name ' + optData.plugin_identifier
+ ' does not exist in file system')
return -1
# creates a new plugin id because plugin exsits
plugin_id = self.core_data.create_id()
# checks if plugin is of not of type ViP, because these two will run in GUI process
if plugin.plugin_object._get_type() != PLUGIN_VIP_IDENTIFIER:
# So plugin will not run in GUI
# it will need an own process and queue to function
# create Queue for plugin process
plugin_queue = Queue()
# decide if plugin will need to get Data from another plugin
if plugin.plugin_object._get_type() == PLUGIN_DPP_IDENTIFIER:
# plugin will get data from another, so make its execution triggered by events
eventTriggered = True
else:
# stand alone plugin, so it will do its permanent execute loop
eventTriggered = False
plugin_config = optData.plugin_config
if plugin_config is None or plugin_config == {}:
plugin_config = plugin.plugin_object._get_startup_configuration()
else:
plugin_config = dict(list(plugin.plugin_object._get_startup_configuration().items())+list(plugin_config.items()))
# create Process object for new plugin
# set parameter for work function of plugin, such as queues, id and eventTriggered
PluginProcess = Process(target=plugin.plugin_object._work_process, \
args=(self.core_event_queue, plugin_queue, plugin_id, eventTriggered, \
plugin_config, optData.autostart))
PluginProcess.start()
#Add new Plugin process to DCore
dplug = self.core_data.add_plugin(PluginProcess, PluginProcess.pid, True, plugin_queue, plugin, plugin_id)
dplug.plugin_identifier = plugin.name
dplug.uname = optData.plugin_uname
dplug.type = plugin.plugin_object._get_type()
dplug.alive_count = self.alive_count
dplug.startup_config = plugin_config
dplug.path = plugin.path
# change some attributes of optional data before sending it back to GUI for local creation
optData.plugin_identifier = plugin.name
optData.plugin_id = plugin_id
optData.plugin_type = dplug.type
# send create_plugin event to GUI for local creation, now with new information like id and type
event = Event.instruction.CreatePlugin(0, self.gui_id, optData)
self.gui_event_queue.put(event)
return 1
else:
# Plugin will run in GUI, thats why core does not need to create a new process
# Adding plugin information to DCore of core for organisation
dplug = self.core_data.add_plugin(self.gui_process, self.gui_process.pid, False, self.gui_event_queue,
plugin, plugin_id)
dplug.uname = optData.plugin_uname
dplug.type = plugin.plugin_object._get_type()
dplug.plugin_identifier = plugin.name
dplug.startup_config = optData.plugin_config
dplug.path = plugin.path
# change some attributes of optional data before sending it back to GUI for local creation
optData.plugin_identifier = plugin.name
optData.plugin_id = plugin_id
optData.plugin_type = dplug.type
# send create_plugin event to GUI for local creation, now with new information like id and type
event = Event.instruction.CreatePlugin(0, self.gui_id, optData)
self.gui_event_queue.put(event)
self.log.printText(2, 'core sent create event to gui for plugin: ' + str(optData.plugin_uname))
return 1
[docs] def __process_stop_plugin__(self, event):
"""
Process stop_plugin event.
Will send an event to destination plugin to close itself. Will lead to a join request of this plugin.
A stop plugin event is sent by the GUI to the core to request the core to stop a plugin.
'Stop' means that: either the plugin is just stopped and not deleted or the plugin execution is stopped and the
is deleted from the database (i.e. it is cleaned). It is determines by a flag which of them is used.
The methods searches for the dplugin object in the core database.
If the plugin runs in an own process, the stop event is redirected to the corresponding plugin
(to notify it to close itself). The core will cut all subscriptions&subscribers loose and update the gui data with
the new plugin state.
If the plugin is not running in an own process (runs in GUI), the event is checked for the delete flag.
If the delete flag is set, the method will send a close request to the plugin.
If not, the plugin will receive a customized stop event. The state will be modified and updated to the GUI.
:param event: event to process
:type event: PapiEventBase
:type dplugin: DPlugin
"""
# get destination id
id = event.get_destinatioID()
# get DPlugin object and check if plugin exists
dplugin = self.core_data.get_dplugin_by_id(id)
if dplugin is not None:
# dplugin exist, get queue and route event to plugin
if dplugin.own_process is True:
# dplugin is not running in gui
# tell plugin to stop and update state and GUI
dplugin.queue.put(event)
dplugin.state = PLUGIN_STATE_STOPPED
self.core_data.unsubscribe_all(dplugin.id)
self.core_data.rm_all_subscribers(dplugin.id)
self.update_meta_data_to_gui_for_all()
else:
# plugin is not running in own process, so process the event completely
if event.delete is True:
# tell gui to close plugin
opt = DOptionalData()
opt.plugin_id = id
dplugin.queue.put(Event.status.PluginClosed(self.core_id, self.gui_id, opt))
# remove plugin from DCore
if self.core_data.rm_dplugin(id) != ERROR.NO_ERROR:
self.log.printText(1, 'stop plugin, unable to remove plugin von core_data')
return ERROR.UNKNOWN_ERROR
else:
# Do not delete the plugin from the database, thus just send a stop event to plugin without delete flag.
dplugin.queue.put(Event.instruction.StopPlugin(self.core_id, id, None, delete=False))
# update plugin state and gui meta database
dplugin.state = PLUGIN_STATE_STOPPED
self.core_data.unsubscribe_all(dplugin.id)
self.core_data.rm_all_subscribers(dplugin.id)
self.update_meta_data_to_gui_for_all()
return ERROR.NO_ERROR
else:
# DPlugin does not exist
self.log.printText(1, 'stop_plugin, plugin with id ' + str(id) + ' not found')
return ERROR.UNKNOWN_ERROR
[docs] def __process_stop_plugin_by_uname__(self, event):
"""
Just a wrapper function for '__process_stop_plugin__' to use the uname instead of a plugin id.
:param event:
:return:
"""
dplugin = self.core_data.get_dplugin_by_uname(event.plugin_uname)
# check if plugin exists in core database
if dplugin is not None:
# get plugin id, rebuild event and call original function
id = dplugin.id
idEvent = Event.instruction.StopPlugin(self.gui_id, id, None)
self.__process_stop_plugin__(idEvent)
[docs] def __process_start_plugin__(self, event):
"""
Processes start event for a plugin.
The start plugin event is a request for the core to send a start event to a plugin.
The plugin will be started by receiving the event, i.e. it will call its initialize function and start its
execution afterwards.
The methods searches the plugin in the database and checks wheter it has its own process or not.
Then, it will send an event to the corresponding queue to tell the plugin it should start.
:param event: Incomming event
:type event: PapiEventBase
:return:
"""
# get destination id
id = event.get_destinatioID()
# get DPlugin object and check if plugin exists
dplugin = self.core_data.get_dplugin_by_id(id)
if dplugin is not None:
# dplugin exist, get queue and route event to plugin
if dplugin.own_process is True:
# dplugin is not running in gui
# tell plugin to quit
dplugin.queue.put(event)
else:
dplugin.state = PLUGIN_STATE_START_SUCCESFUL
self.gui_event_queue.put(event)
self.update_meta_data_to_gui(id)
[docs] def __process_plugin_stopped__(self, event):
"""
Process plugin_stopped event.
Will change plugins state and delete its parameters and blocks.
Will update meta to gui
The event is the result of a stop request of a plugin. A plugin which is request to stop its operation (no deletion)
sends the plugin stopped event to confirm that it has stopped its operation.
The method searches the plugin in the database and updates its state. Additionally, all subscribers, blocks and
parameter are removed/delete.
:param event: Incomming event
:type event: PapiEventBase
:return:
"""
pl_id = event.get_originID()
plugin = self.core_data.get_dplugin_by_id(pl_id)
# check if plugin exists
if plugin is not None:
# update state
plugin.state = PLUGIN_STATE_STOPPED
# remove all subscribers before removing the blocks
self.core_data.rm_all_subscribers(pl_id)
# delete all blocks of plugin
blocks = plugin.get_dblocks()
block_to_delete = []
for block_key in blocks:
block = blocks[block_key]
block_to_delete.append(block)
for b in block_to_delete:
plugin.rm_dblock(b)
# delete all parameters of plugin
paras = plugin.get_parameters()
paras_to_delete = []
for key in paras:
para = paras[key]
paras_to_delete.append(para)
for p in paras_to_delete:
plugin.rm_parameter(p)
# update to gui
self.update_meta_data_to_gui(pl_id)
[docs] def __process_close_programm__(self, event):
"""
This functions processes a close_program event from GUI and
sends events to all processes to close themselves
The GUI can request the close to quit PaPI.
The method will request all other processes to stop their execution and to close.
:param event: event to process
:type event: PapiEventBase
"""
# GUI wants to close, so join process
if self.gui_alive is True and self.is_parent is True:
self.gui_process.join()
# Set gui_alive to false for core loop to know that is it closed
self.gui_alive = False
# get a list of all running plugIns
all_plugins = self.core_data.get_all_plugins()
# iterate through all plugins to send an event to tell them to quit and
# response with a join_request
toDelete = []
for dplugin_key in all_plugins:
dplugin = all_plugins[dplugin_key]
# just send event to plugins running in own process
if dplugin.own_process:
event = Event.instruction.StopPlugin(0, dplugin.id, None)
dplugin.queue.put(event)
else:
toDelete.append(dplugin.id)
for dplugin_ID in toDelete:
self.core_data.rm_dplugin(dplugin_ID)
[docs] def __process_subscribe__(self, event):
"""
Process subscribe_event.
The event is sent to create new data connections between plugins.
The method will call the functional method 'new_subscription' will the necessary arguments,
like SourceID, Block, Signals, Alias
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
# get event origin id and optional parameters
opt = event.get_optional_parameter()
self.new_subscription(event.get_originID(), opt.source_ID, opt.block_name, opt.signals, opt.subscription_alias,
orginal_event=event)
[docs] def __process_subscribe_by_uname__(self, event):
"""
Process subscribe_event. Uses uname instead of IDs
The event is sent to create new data connections between plugins.
The method will call the functional method 'new_subscription' will the necessary arguments,
like SourceID, Block, Signals, Alias
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
pl = self.core_data.get_dplugin_by_uname(event.subscriber_uname)
if pl is not None:
subscriber_id = pl.id
else:
# plugin with uname does not exist
self.log.printText(1, 'do_subscribe, sub uname worng')
return -1
pl = self.core_data.get_dplugin_by_uname(event.source_uname)
if pl is not None:
source_id = pl.id
else:
# plugin with uname does not exist
self.log.printText(1, 'do_subscribe, target uname wrong')
return -1
self.new_subscription(subscriber_id, source_id, event.block_name, event.signals, event.sub_alias,
orginal_event=event)
[docs] def __process_unsubsribe__(self, event):
"""
Process unsubscribe_event. Will try to remove a subscription from DCore
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
# get event origin id and optional parameters
opt = event.get_optional_parameter()
oID = event.get_originID()
if opt.signals == []:
# try to unsubscribe
if self.core_data.unsubscribe(oID, opt.source_ID, opt.block_name) is False:
# unsubscribe failed
self.log.printText(1,
'unsubscribe, something failed in unsubsription process with subscriber id: ' + str(
oID) + '..target id:' + str(opt.source_ID) + '..and block ' + str(
opt.block_name))
return -1
else:
if self.core_data.unsubscribe_signals(oID, opt.source_ID, opt.block_name, opt.signals) is False:
return -1
# unsubscribe correct
self.log.printText(1,
'unsubscribe, unsubscribtion correct: ' + str(oID) + '->(' + str(opt.source_ID) + ',' + str(
opt.block_name) + ')')
self.update_meta_data_to_gui(oID)
self.update_meta_data_to_gui(opt.source_ID)
[docs] def __process_set_parameter__(self, event):
"""
Process set_parameter event. Core will just route this event from GUI to destination plugin and update DCore
The set parameter event is sent from plugins to some plugins to trigger parameter changes.
First, the method checks whether the event is an event by uname or by id. When the event is by uname, it is modified
to be by id. If the plugin which the event refers to is not started yet, the event is put in the
delayed operation queue. Then, the correct dplugin object and an event by id is available. The dplugin object is
searched for the parameter which is changed. If the parameter exists, its value in the database is changed and
then, the change event is redirected to the plugin itself.
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
# check if event is by uname or by id
if isinstance(event, Event.instruction.SetParameterByUname):
dplugin = self.core_data.get_dplugin_by_uname(event.plugin_uname)
if dplugin is not None:
if dplugin.state != PLUGIN_STATE_START_SUCCESFUL:
# plugin not here yet, put event in delayed queue
self.core_delayed_operation_queue.append(event)
self.log.printText(2, 'setParameterByUname, event was placed in delayed queue because plugin: '
+ event.plugin_uname + ' does not exist yet.')
return 0
# build forward event for gui with ids instead of uname (original event was with uname)
forward_event = Event.instruction.SetParameter(event.get_originID(), dplugin.id, event.get_optional_parameter())
else:
# Event is by id, so get destination id
pl_id = event.get_destinatioID()
# get DPlugin object of destination id from DCore and check for existence
dplugin = self.core_data.get_dplugin_by_id(pl_id)
# set forward event to event, since there is no change required (event already by id)
forward_event = event
if dplugin is None:
# plugin with id does not exist, so id is wrong.
self.log.printText(1, 'set_paramenter, plugin with id ' + str(pl_id) + ' not found')
return -1
# Now, the forward_event has the correct event with ids to process it.
# recheck for existing dplugin
if dplugin is not None:
# Plugin exists
# get parameter list of plugin [hash]
parameters = dplugin.get_parameters()
opt = event.get_optional_parameter()
# check for existence of the changed parameter
if opt.parameter_alias in parameters:
para = parameters[opt.parameter_alias]
para.value = opt.data
# route the event to the destination plugin queue
dplugin.queue.put(forward_event)
# change event type for plugin
#update GUI
self.update_meta_data_to_gui(dplugin.id)
return 1
[docs] def __process_new_block__(self, event):
"""
Processes new_block event.
Will try to add a new data block to a DPlugin object
This event is a request from a plugin to create a new block.
The method receives all block information via the event. It searches the corresponding pluign and add the blocks
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
# get event origin id and optional parameter
opt = event.get_optional_parameter()
pl_id = event.get_originID()
# get DPlugin object with origin id of event to add new parameter to THIS DPlugin
dplugin = self.core_data.get_dplugin_by_id(pl_id)
# check for existence
if dplugin is not None:
# dplugin exists, so add blocks to DPlugin
for b in opt.block_list:
dplugin.add_dblock(b)
# update meta information of GUI after new blocks were added
self.update_meta_data_to_gui(pl_id)
self.check_events_in_delayed_queue()
return 1
else:
# plugin does not exist
self.log.printText(1, 'new_block, plugin with id ' + str(pl_id) + ' not found')
return -1
[docs] def __process_delete_block__(self, event):
"""
Processes delete_block event.
The event is sent by a plugin if the plugin wants to delete one of its own blocks (plugin updates itself).
The methods searches in the dplugin object for the block and deletes it. Before deletion, all block
subscriptions are cut loose to ensure that no dead links are created.
:param event: event to process
:type event: PapiEventBase
"""
pl_id = event.get_originID()
# cut loose all subscription for the block
self.core_data.rm_all_subscribers_of_a_dblock(pl_id, event.blockname)
# get plugin object
plugin = self.core_data.get_dplugin_by_id(pl_id)
if plugin is not None:
# get block and remove it
dblock = plugin.get_dblock_by_name(event.blockname)
plugin.rm_dblock(dblock)
# send update to gui to propagate the change
self.update_meta_data_to_gui_for_all()
[docs] def __process_delete_parameter__(self, event):
"""
Processes delete parameter event.
The event is sent by a plugin if the plugin wants to delete one of its own parameter (plugin updates itself).
The methods searches in the dplugin object for the parameter and deletes it. Before deletion, all parameter
subscriptions are cut loose to ensure that no dead links are created.
:param event: event to process
:type event: PapiEventBase
:return:
"""
# Get id of plugin which parameter should be changed
pl_id = event.get_originID()
# Search in database for plugin object
dplugin = self.core_data.get_dplugin_by_id(pl_id)
if dplugin is not None:
# get connections of this dplugin (copy because changes in iteration are possible)
subscriptions = copy.deepcopy(dplugin.get_subscribtions())
# iterate over all source plugins by id in subscription dict
for source_id in subscriptions:
# iterate over all blocks
for blockName in subscriptions[source_id]:
dSubObject = subscriptions[source_id][blockName]
# search for parameter to delete in subscription
if dSubObject.alias == event.parameterName:
self.core_data.unsubscribe(pl_id, source_id, blockName)
# delete parameter in dplugin
paraObject = dplugin.get_parameters()[event.parameterName]
dplugin.rm_parameter(paraObject)
# send update to gui to propagate the change
self.update_meta_data_to_gui_for_all()
[docs] def __process_new_parameter__(self, event):
"""
Processes new parameter event. Adding a new parameter to DPluign in DCore and updating GUI information
:param event: event to process
:type event: PapiEventBase
:type dplugin_sub: DPlugin
:type dplugin_source: DPlugin
"""
# get event origin and optional parameter
opt = event.get_optional_parameter()
pl_id = event.get_originID()
# get DPlugin object of event origin id which is the plugin that wants to add parameter
dplugin = self.core_data.get_dplugin_by_id(pl_id)
if dplugin is not None:
# Plugin exists so loop through parameter list to add all parameter
for p in opt.parameter_list:
# add parameter to DPlugin
dplugin.add_parameter(p)
# update meta of GUI to introduce new parameter to user
self.update_meta_data_to_gui(pl_id)
return 1
else:
# plugin does not exist
self.log.printText(1, 'new_parameter, plugin with id ' + str(pl_id) + ' not found')
return -1
[docs] def __process_pause_plugin__(self, event):
"""
Processes pause_plugin event. Will add information that a plugin is paused and send event to plugin to pause it.
Pause events are request from the GUI to pause a plugin. Paused plugins will not execute its execution function and
thus, lower the computational load.
:param event: event to process
:type event: PapiEventBase
:type dplugin: DPlugin
"""
pl_id = event.get_destinatioID()
dplugin = self.core_data.get_dplugin_by_id(pl_id)
if dplugin is not None:
if dplugin.state != PLUGIN_STATE_PAUSE:
# set pause info
dplugin.state = PLUGIN_STATE_PAUSE
# send event to plugin
event = Event.instruction.PausePlugin(self.core_id, pl_id, None)
dplugin.queue.put(event)
# update meta for Gui
self.update_meta_data_to_gui(pl_id)
[docs] def __process_resume_plugin__(self, event):
"""
Processes resume_plugin event. Will add information that a plugin is resumed and send event to plugin to resume it.
Resumes a paused plugin to make it work again. So, the plugin will execute its main functions again.
:param event: event to process
:type event: PapiEventBase
:type dplugin: DPlugin
"""
pl_id = event.get_destinatioID()
dplugin = self.core_data.get_dplugin_by_id(pl_id)
if dplugin is not None:
if dplugin.state == PLUGIN_STATE_PAUSE:
# set resume info
dplugin.state = PLUGIN_STATE_RESUMED
# send event to plugin
event = Event.instruction.ResumePlugin(self.core_id, pl_id, None)
dplugin.queue.put(event)
# update meta for Gui
self.update_meta_data_to_gui(pl_id)