Source code for papi.data.DPlugin
#!/usr/bin/python3
# -*- coding: latin-1 -*-
"""
Copyright (C) 2014 Technische Universität Berlin,
Fakultät IV - Elektrotechnik und Informatik,
Fachgebiet Regelungssysteme,
Einsteinufer 17, D-10587 Berlin, Germany
This file is part of PaPI.
PaPI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PaPI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PaPI. If not, see <http://www.gnu.org/licenses/>.
Contributors
Sven Knuth
"""
from papi.data import DParameter
from papi.constants import CORE_TIME_SIGNAL
from papi.data.DObject import DObject
from papi.data.DSignal import DSignal
import copy
[docs]class DBlock(DObject):
"""
DBlock is used for the internal description of a block.
Contains a bunch of signals of a the same data source.
"""
def __init__(self, name):
"""
A block is described by name, which has to be unique within the context of the plugin owning this block.
:param name: name of the block
:return:
"""
super(DObject, self).__init__()
self.subscribers = {}
self.dplugin_id = None
self.name = name
self.signals = []
self.add_signal(DSignal(CORE_TIME_SIGNAL))
[docs] def add_subscribers(self, dplugin):
"""
Add dplugin as subscriber for this dblock.
:param dplugin:
:return:
:rtype boolean:
"""
if dplugin.id not in self.subscribers:
self.subscribers[dplugin.id] = dplugin.id
return True
else:
return False
[docs] def rm_subscriber(self, dplugin):
"""
Remove dplugin as subscriber of this dblock.
:param dplugin:
:return:
:rtype boolean:
"""
if dplugin.id in self.subscribers:
del self.subscribers[dplugin.id]
return True
else:
return False
[docs] def add_signal(self, signal):
"""
Add Signal for this DBlock
:param signal:
:return:
"""
if signal not in self.signals:
self.signals.append(signal)
return True
return False
[docs] def rm_signal(self, signal):
"""
Remove Signal for this DBlock
:param signal:
:return:
"""
if signal in self.signals:
signal.uname = signal.uname + "_deleted"
signal.dname = signal.dname + "_deleted"
signal.remove()
self.signals.remove(signal)
return True
return False
[docs] def get_subscribers(self):
"""
Returns all subscribers of this plugin. Returns a copy that means
changes have no effect on the PaPI data structure.
:return:
:rtype []:
"""
return self.subscribers
[docs] def get_signal_by_uname(self, uname):
"""
Returns a signal object by a signal's uname.
:param uname:
:return DSignal:
"""
for signal in self.signals:
if signal.uname == uname:
return signal
return None
[docs] def get_signals(self):
"""
Returns a copy of the internal signal names
:return:
"""
return copy.deepcopy(self.signals)
[docs] def get_name(self):
"""
Returns the name of this block.
:return block_name:
"""
return self.name
[docs]class DEvent(DBlock):
def __init__(self, name):
super(DEvent, self).__init__(name)
[docs]class DPlugin(DObject):
"""
DPlugin is used for the internal description of a plugin.
"""
def __init__(self):
"""
Used to create the plugin data object.
:return:
"""
super(DPlugin, self).__init__()
self.process = None
self.pid = None
self.queue = None
self.plugin = None
self.plugin_identifier = None
self.startup_config = None
self.__subscriptions = {}
self.state = None
self.alive_state = None
self.paused = False
self.own_process = None
self.uname = None
#self.display_name = None
self.__parameters = {}
self.__blocks = {}
self.type = None
self.alive_count = 0
self.path = None
[docs] def subscribe_signals(self, dblock, signals):
"""
This function is used to subscribe a bunch of signals.
:param dblock:
:param signals:
:return:
:rtype DSubscription:
"""
if dblock.dplugin_id in self.__subscriptions:
if dblock.name in self.__subscriptions[dblock.dplugin_id]:
subscription = self.__subscriptions[dblock.dplugin_id][dblock.name]
for signal in signals:
subscription.add_signal(signal)
return subscription
else:
return None
else:
return None
[docs] def unsubscribe_signals(self, dblock, signals):
"""
This function is used to unsubscribe a bunch of signals.
:param dblock:
:param signals:
:return:
:rtype DSubscription:
"""
if dblock.dplugin_id in self.__subscriptions:
if dblock.name in self.__subscriptions[dblock.dplugin_id]:
subscription = self.__subscriptions[dblock.dplugin_id][dblock.name]
for signal in signals:
subscription.rm_signal(signal)
if 0 == len(subscription.get_signals()):
subscription.remove()
del self.__subscriptions[dblock.dplugin_id][dblock.name]
return subscription
else:
return None
else:
return None
[docs] def subscribe(self, dblock):
"""
This plugin subscribes a 'dblock' by remembering the dblock id.
:param dblock: DBlock which should be subscribed
:return:
:rtype boolean:
"""
if dblock.dplugin_id not in self.__subscriptions:
self.__subscriptions[dblock.dplugin_id] = {}
self.__subscriptions[dblock.dplugin_id][dblock.name] = DSubscription(dblock)
return self.__subscriptions[dblock.dplugin_id][dblock.name]
else:
if dblock.name not in self.__subscriptions[dblock.dplugin_id]:
self.__subscriptions[dblock.dplugin_id][dblock.name] = DSubscription(dblock)
return self.__subscriptions[dblock.dplugin_id][dblock.name]
else:
return None
return None
[docs] def unsubscribe(self, dblock):
"""
This plugin unsubscribes a 'dblock' by forgetting the dblog id.
:param dblock: DBlock which should be unsubscribed
:return:
:rtype boolean:
"""
if dblock.dplugin_id not in self.__subscriptions:
return False
else:
if dblock.name in self.__subscriptions[dblock.dplugin_id]:
self.__subscriptions[dblock.dplugin_id][dblock.name].remove()
del self.__subscriptions[dblock.dplugin_id][dblock.name]
if len(self.__subscriptions[dblock.dplugin_id]) is 0:
del self.__subscriptions[dblock.dplugin_id]
return True
else:
return False
return False
[docs] def get_subscribtions(self):
"""
Returns a reference to a dictionary of all subscribtions.
:return {}{} of DPlugin ids to DBlock names :
:rtype: {}{}
"""
return self.__subscriptions
[docs] def add_parameter(self, parameter):
"""
Used to add a parameter for this plugin.
Returns true if parameter doesn't existed and was added,
returns false if parameter already exists.
:param parameter:
:return:
:rtype boolean:
"""
if parameter.name not in self.__parameters:
self.__parameters[parameter.name] = parameter
return True
else:
return False
[docs] def rm_parameter(self, parameter):
"""
Used to remove a parameter for this plugin.
Returns true if parameter existed and was deleted,
returns false if parameter doesn't exist.
:param parameter: Parameter which should be removed.
:return:
:rtype boolean:
"""
if parameter.name in self.__parameters:
self.__parameters[parameter.name].remove()
del self.__parameters[parameter.name]
return True
else:
return False
[docs] def get_parameters(self):
"""
Returns a list of all parameters.
:return:
:rtype {}:
"""
return self.__parameters
[docs] def add_dblock(self, dblock):
"""
Used to add a block for this plugin.
Returns true if parameter doesn't existed and was deleted,
returns false if parameter already exists.
:param dblock: Block which should be added.
:return:
:rtype boolean:
"""
dblock.dplugin_id = self.id
if dblock.name not in self.__blocks:
self.__blocks[dblock.name] = dblock
return True
else:
return False
[docs] def rm_dblock(self, dblock):
"""
Used to remove a block for this plugin.
Returns true if block existed and was deleted,
returns false if block doesn't exist.
:param dblock:
:return:
:rtype boolean:
"""
if dblock.name in self.__blocks:
del self.__blocks[dblock.name]
dblock.name += "_deleted"
dblock.remove()
return True
else:
return False
[docs] def get_dblocks(self):
"""
Returns a list of all blocks.
:return:
:rtype {}:
"""
return self.__blocks
[docs] def get_devent(self):
devent_dict = {}
for dblock_name in self.__blocks:
block = self.__blocks[dblock_name]
if isinstance(block, DEvent):
devent_dict[dblock_name] = block
return devent_dict
[docs] def get_dblock_by_name(self, dblock_name):
"""
Returns a single block by its unique name of all parameters.
:param dblock_name: Uniqueder identifier for this block.
:return:
"""
if dblock_name in self.__blocks:
return self.__blocks[dblock_name]
else:
return None
[docs]class DSubscription(DObject):
"""
DSubscription is used for the internal description of a subscription.
"""
def __init__(self, dblock):
self.dblock = dblock
self.dblock_name = dblock.name
self.dplugin_id = dblock.dplugin_id
self.alias = None
self.signals = []
self.add_signal(CORE_TIME_SIGNAL)
[docs] def add_signal(self, signal):
"""
Add Signal for this Subscription
:param signal:
:return:
"""
if signal not in self.signals:
self.signals.append(signal)
return True
return False
[docs] def rm_signal(self, signal):
"""
Remove Signal for this Subscription
:param signal:
:return:
"""
if signal in self.signals:
self.signals.remove(signal)
return True
return False
[docs] def get_dblock(self):
"""
Returns the block whose signals are subscribed.
:return:
:rtype: DBlock
"""
return self.dblock
[docs] def get_signals(self):
"""
Returns a copy of all signals which are subscribed.
:return:
:rtype: []
"""
return self.signals
[docs] def attach_signal(self, signal):
raise NotImplementedError("Stop Using this function.")
[docs] def remove_signal(self, signal):
raise NotImplementedError("Stop Using this function.")