Source code for papi.data.DCore

#!/usr/bin/python3
# -*- coding: latin-1 -*-

"""
Copyright (C) 2014 Technische Universität Berlin,
Fakultät IV - Elektrotechnik und Informatik,
Fachgebiet Regelungssysteme,
Einsteinufer 17, D-10587 Berlin, Germany

This file is part of PaPI.

PaPI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

PaPI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with PaPI.  If not, see <http://www.gnu.org/licenses/>.

Contributors
Sven Knuth
"""
from papi.data.DPlugin import DPlugin
from papi.data.DObject import DObject
from papi.ConsoleLog import ConsoleLog
import copy
import papi.error_codes as ERROR
from papi.constants import DCORE_CONSOLE_LOG_LEVEL



[docs]class DCore(): """ DCore contains and manages the internal data structure """ def __init__(self): """ Used to initialize this object. An empty dictionary of DPlugin is created :return: """ self.__DPlugins = {} self.__newid = 0 self.log = ConsoleLog(DCORE_CONSOLE_LOG_LEVEL, "DCore: ")
[docs] def create_id(self): """ Creates and returns unique IDs :returns: unique ID :rtype: int """ #self.__newid += 1 self.__newid = DObject.create_unique_id() return self.__newid # return uuid.uuid4().int >> 64
[docs] def add_plugin(self, process, pid, own_process, queue, plugin, id): """ Add plugin with necessary information. :param process: Plugin is running in this process :param pid: Process ID of the process in which the plugin is running :param queue: Event queue needed for events which should be received by this plugin :param plugin: Plugin object :param plugin_id: ID of this plugin :param id: ID for the new DPlugin :return: Returns the data object DPlugin :rtype: DPlugin """ d_pl = DPlugin() d_pl.process = process d_pl.pid = pid d_pl.queue = queue d_pl.plugin = plugin d_pl.id = id d_pl.own_process = own_process self.__DPlugins[id] = d_pl return d_pl
[docs] def rm_dplugin(self, dplugin_id): """ Removes DPlugin with dplugin_id :param dplugin_id: :return: :rtype: bool """ if dplugin_id in self.__DPlugins: self.__DPlugins[dplugin_id].state = 'deleted' self.rm_all_subscribers(dplugin_id) self.unsubscribe_all(dplugin_id) del self.__DPlugins[dplugin_id] return ERROR.NO_ERROR else: return ERROR.UNKNOWN_ERROR
[docs] def get_dplugins_count(self): """ Returns count of known plugins in this data structure :return: :rtype: int """ return len(self.__DPlugins.keys())
[docs] def get_dplugin_by_id(self, plugin_id): """ Returns DPlugin object by ID :param plugin_id: ID of an DPlugin object :return DPlugin: :rtype: DPlugin """ if plugin_id in self.__DPlugins: return self.__DPlugins[plugin_id] else: return None
[docs] def get_dplugin_by_uname(self, plugin_uname): """ Returns DPlugin object by uname :param plugin_name: uname of an DPlugin object :return DPlugin: :rtype: DPlugin """ for plugin_id in self.__DPlugins: d_pl = self.__DPlugins[plugin_id] if d_pl.uname == plugin_uname: return d_pl return None
[docs] def get_all_plugins(self): """ Returns a dictionary of all known dplugins :return: :rtype: {} """ return self.__DPlugins
[docs] def subscribe(self, subscriber_id, target_id, dblock_name): """ Used to create a subscription. :param subscriber_id: DPlugin which likes to subscribes dblock :param target_id: DPlugin which contains the dblock for subscribtion :param dblock_name: DBlock identified by its unique name for subscribtion :return: """ #Get Subscriber DPlugin subscriber = self.get_dplugin_by_id(subscriber_id) if subscriber is None: self.log.printText(1, "Found no Subscriber with ID " + subscriber_id) return None #Get Target DPlugin target = self.get_dplugin_by_id(target_id) if target is None: self.log.printText(1, "Found no Target with ID " + str(target_id)) return None dblock = target.get_dblock_by_name(dblock_name) if dblock is None: self.log.printText(3, "Target " + target.uname + " has no DBlock " + dblock_name) return None #Create relation between DPlugin and DBlock dsubscription = subscriber.subscribe(dblock) if dsubscription is None: self.log.printText(1, "Subscriber " + str(subscriber_id) + " has already subscribed " + dblock_name) return None if dblock.add_subscribers(subscriber) is False: self.log.printText(1, "DBlock " + dblock_name + " was already subscribed by Subscriber" + subscriber_id) return None return dsubscription
[docs] def unsubscribe(self, subscriber_id, target_id, dblock_name): """ Used to remove a subscription. :param subscriber_id: DPlugin which likes to unsubscribes dblock :param target_id: DPlugin which contains the dblock for subscribtion :param dblock_name: DBlock identified by its unique name for unsubscribtion :return: :rtype boolean: """ #Get Susbcriber DPlugin subscriber = self.get_dplugin_by_id(subscriber_id) if subscriber is None: return False #Get Target DPlugin target = self.get_dplugin_by_id(target_id) if target is None: return False dblock = target.get_dblock_by_name(dblock_name) if dblock is None: return False #Destroy relation between DPlugin and DBlock if subscriber.unsubscribe(dblock) is False: self.log.printText(1, "Subscriber " + str(subscriber_id) + " has already unsubscribed " + dblock_name) return False if dblock.rm_subscriber(subscriber) is False: self.log.printText(1, "DBlock " + dblock_name + " was already unsubscribed by Subscriber" + subscriber_id) return False return True
[docs] def unsubscribe_all(self, dplugin_id): """ This function is used to cancel all subscription of the DPlugin with the dplugin_id. :param dplugin_id: dplugin identifed by dplugin_id whose subscription should be canceled. :return: """ dplugin = self.get_dplugin_by_id(dplugin_id) # copy subscription for iteration and deletion subscribtion_ids = copy.deepcopy( dplugin.get_subscribtions() ) #Iterate over all DPlugins, which own a subscribed DBlock for sub_id in subscribtion_ids: sub = self.get_dplugin_by_id(sub_id) dblock_names = subscribtion_ids[sub_id] for dblock_name in dblock_names: dblock = sub.get_dblock_by_name(dblock_name) dblock.rm_subscriber(dplugin) dplugin.unsubscribe(dblock) if 0 == len(dplugin.get_subscribtions()): return True else: return False
[docs] def rm_all_subscribers(self, dplugin_id): """ This function is used to remove all subscribers of all DBlocks, which are hold by the DPlugin with the dplugin_id. :param dplugin_id: dplugin identifed by dplugin_id whose subscribers should be removed. :return: """ dplugin = self.get_dplugin_by_id(dplugin_id) dblock_names = dplugin.get_dblocks() for dblock_name in dblock_names: dblock = dplugin.get_dblock_by_name(dblock_name) copy_dplugin_ids = copy.deepcopy(dblock.get_subscribers()) for dplugin_id in copy_dplugin_ids: subscriber = self.get_dplugin_by_id(dplugin_id) subscriber.unsubscribe(dblock) dblock.rm_subscriber(subscriber) # if len(dplugin.get_dblocks()) == 0: # return True # else: # return False
[docs] def rm_all_subscribers_of_a_dblock(self, dplugin_id, dblock_name): dplugin = self.get_dplugin_by_id(dplugin_id) if dplugin is not None: dblock = dplugin.get_dblock_by_name(dblock_name) if dblock is not None: dplugin_ids = copy.deepcopy(dblock.get_subscribers()) for dplugin_id in dplugin_ids: subscriber = self.get_dplugin_by_id(dplugin_id) subscriber.unsubscribe(dblock) dblock.rm_subscriber(subscriber)
[docs] def subscribe_signals(self, subscriber_id, target_id, dblock_name, signals): """ This function is used to subscribe a bunch of signals. :param subscriber_id: DPlugin which likes to subscribes signals of the chosen dblock :param target_id: DPlugin which contains the dblock for subscribtion :param dblock_name: DBlock identified by its unique name for subscribtion :param signals: List of signals which are needed to be added :return: """ #Get Susbcriber DPlugin subscriber = self.get_dplugin_by_id(subscriber_id) if subscriber is None: self.log.printText(1, "Found no Subscriber with ID " + subscriber_id) return None #Get Target DPlugin target = self.get_dplugin_by_id(target_id) if target is None: self.log.printText(1, "Found no Target with ID " + str(target_id)) return None dblock = target.get_dblock_by_name(dblock_name) if dblock is None: self.log.printText(1, "Target " + target.uname + " has no DBlock " + dblock_name) return None return subscriber.subscribe_signals(dblock, signals)
[docs] def unsubscribe_signals(self, subscriber_id, target_id, dblock_name, signals): """ This function is used to unubscribe a bunch of signals. :param subscriber_id: DPlugin which likes to unsubscribes signals of the chosen dblock :param target_id: DPlugin which contains the dblock for subscribtion :param dblock_name: DBlock identified by its unique name for subscribtion :param signals: List of signals which are needed to be added :return: """ #Get Susbcriber DPlugin subscriber = self.get_dplugin_by_id(subscriber_id) if subscriber is None: self.log.printText(1, "Found no Subscriber with ID " + subscriber_id) return False #Get Target DPlugin target = self.get_dplugin_by_id(target_id) if target is None: self.log.printText(1, "Found no Target with ID " + str(target_id)) return False dblock = target.get_dblock_by_name(dblock_name) if dblock is None: self.log.printText(1, " Target " + target.uname + " has no DBlock " + dblock_name) return False subscription = subscriber.unsubscribe_signals(dblock, signals) if subscription is None: self.log.printText(1, " Subscription for target " + target.uname + " and DBlock " + dblock_name + " is None") return False if len(subscription.get_signals()) == 1: return self.unsubscribe(subscriber_id, target_id, dblock_name) return True

Links