Source code for skfuzzy.control.controlsystem

"""
controlsystem.py : Framework for the new fuzzy logic control system API.
"""
from __future__ import print_function, division

import numpy as np
import networkx as nx

from skfuzzy import interp_membership, interp_universe, defuzz
from .fuzzyvariable import FuzzyVariable
from .antecedent_consequent import Antecedent, Consequent
from .term import Term, WeightedTerm, TermAggregate
from .rule import Rule
from .visualization import ControlSystemVisualizer

try:
    from collections import OrderedDict
except ImportError:
    from .ordereddict import OrderedDict


[docs]class ControlSystem(object): """ Base class to contain a Fuzzy Control System. Parameters ---------- rules : Rule or iterable of Rules, optional If provided, the system is initialized and populated with a set of fuzzy Rules (see ``skfuzzy.control.Rule``). This is optional. If omitted the ControlSystem can be built interactively. """
[docs] def __init__(self, rules=None): """ Initialization method for the fuzzy ControlSystem object. """ + '\n'.join(ControlSystem.__doc__.split('\n')[1:]) self.graph = nx.DiGraph() self._rule_generator = RuleOrderGenerator(self) # Construct a system from provided rules, if given if rules is not None: if hasattr(rules, '__iter__'): for rule in rules: self.addrule(rule) else: try: self.addrule(rules) except: raise ValueError("Optional argument `rules` must be a " "FuzzyRule or iterable of FuzzyRules.")
@property def rules(self): """ Generator which yields Rules in the system in calculation order. """ # We have to expose the rules in the order from antecedents to # consequences. For example if we have: # Antecedent -> rule1 -> Intermediary -> rule2 -> Consequence # if we expose rule1 before rule2, we won't calculate correctly return self._rule_generator @property def antecedents(self): """Generator which yields Antecedents in the system.""" for node in self.graph.nodes(): if isinstance(node, Antecedent): yield node @property def consequents(self): """Generator which yields Consequents in the system.""" for node in self.graph.nodes(): if isinstance(node, Consequent): yield node @property def fuzzy_variables(self): """ Generator which yields fuzzy variables in the system. This includes Antecedents, Consequents, and Intermediaries. """ for node in self.graph.nodes(): if isinstance(node, FuzzyVariable): yield node
[docs] def addrule(self, rule): """ Add a new rule to the system. """ if not isinstance(rule, Rule): raise ValueError("Input rule must be a Rule object!") # Ensure no label duplication labels = [] for r in self.rules: if r.label in labels: raise ValueError("Input rule cannot have same label, '{0}', " "as any other rule.".format(r.label)) labels.append(r.label) # Combine the two graphs, which may not be disjoint self.graph = nx.compose(self.graph, rule.graph)
[docs] def view(self): """ View a representation of the system NetworkX graph. """ fig = ControlSystemVisualizer(self).view() fig.show()
class _InputAcceptor(object): """ Set a single input value to an Antecedent in this ControlSystemSimulation. """ def __init__(self, simulation): assert isinstance(simulation, ControlSystemSimulation) self.sim = simulation def __setitem__(self, key, value): # Find the antecedent we should set the input for matches = [n for n in self.sim.ctrl.graph.nodes() if isinstance(n, Antecedent) and n.label == key] if len(matches) == 0: raise ValueError("Unexpected input: " + key) assert len(matches) == 1 var = matches[0] if value > var.universe.max(): if self.sim.clip_to_bounds: value = var.universe.max() else: raise ValueError("Input value out of bounds. Max is %s" % max(var.universe)) if value < var.universe.min(): if self.sim.clip_to_bounds: value = var.universe.min() else: raise ValueError("Input value is out of bounds. Min is %s" % min(var.universe)) var.input['current'] = value self.sim._update_unique_id() self._update_to_current() def __repr__(self): """ Print a convenient string representation of all current input data. """ current_inputs = self._get_inputs() out = "" for key, val in current_inputs.items(): out += "{0} : {1}\n".format(key, val) return out def _update_to_current(self): # Private method, used to store the current state of the system in a # cache, 'current', accessible before and after the unique_id changes. if self.sim.unique_id == 'current': return # Find all antecedents matches = [n for n in self.sim.ctrl.graph.nodes() if isinstance(n, Antecedent)] for antecedent in matches: antecedent.input[self.sim] = antecedent.input['current'] def _get_inputs(self): """ Find and return all antecedent inputs available. """ antecedents = [n for n in self.sim.ctrl.graph.nodes() if isinstance(n, Antecedent)] inputs = OrderedDict() for antecedent in antecedents: try: inputs[antecedent.label] = antecedent.input['current'] except AttributeError: # No system ID yet, because no assigned values inputs[antecedent.label] = None return inputs
[docs]class ControlSystemSimulation(object): """ Calculate results from a ControlSystem. Parameters ---------- control_system : ControlSystem A fuzzy ControlSystem object. clip_to_bounds : bool, optional Controls if input values should be clipped to the consequent universe range. Default is True. cache : bool, optional Controls if results should be stored for reference in fuzzy variable objects, allowing fast lookup for repeated runs of `.compute()`. Unless you are heavily memory constrained leave this `True` (default). flush_after_run : int, optional Clears cached results after this many repeated, unique simulations. The default of 1000 is appropriate for most hardware, but for small embedded systems this can be lowered as appropriate. Higher memory systems may see better performance with a higher limit. """
[docs] def __init__(self, control_system, clip_to_bounds=True, cache=True, flush_after_run=1000): """ Initialize a new ControlSystemSimulation. """ + '\n'.join(ControlSystemSimulation.__doc__.split('\n')[1:]) assert isinstance(control_system, ControlSystem) self.ctrl = control_system self.input = _InputAcceptor(self) self.output = OrderedDict() self.cache = cache self.unique_id = self._update_unique_id() self.clip_to_bounds = clip_to_bounds self._calculated = [] self._run = 0 self._flush_after_run = flush_after_run
def _update_unique_id(self): """ Unique hash of this control system including a specific set of inputs. Generated at runtime from the system state. Used as key to access data from `StatePerSimulation` objects, enabling multiple runs. """ # The string to be hashed is the concatenation of: # * the control system ID, which is independent of inputs # * hash of the current input OrderedDict # Simple hashes and Python ids are fast and serve our purposes. self.unique_id = (str(id(self.ctrl)) + str(hash(self._get_inputs().__repr__()))) def _get_inputs(self): return self.input._get_inputs()
[docs] def inputs(self, input_dict): """ Convenience method to accept multiple inputs to antecedents. Parameters ---------- input_dict : dict Contains key:value pairs where the key is the label for a connected Antecedent and the value is the input. """ # Have to set this twice - first time to get the unique ID, # second time to actually pass them into the correct ID. for label, value in input_dict.items(): self.input[label] = value
[docs] def compute(self): """ Compute the fuzzy system. """ self.input._update_to_current() # Shortcut with lookup if this calculation was done before if self.cache is not False and self.unique_id in self._calculated: for consequent in self.ctrl.consequents: self.output[consequent.label] = consequent.output[self] return # If we get here, cache is disabled OR the inputs are novel. Compute! # Check if any fuzzy variables lack input values and fuzzify inputs for antecedent in self.ctrl.antecedents: if antecedent.input[self] is None: raise ValueError("All antecedents must have input values!") CrispValueCalculator(antecedent, self).fuzz(antecedent.input[self]) # Calculate rules, taking inputs and accumulating outputs first = True for rule in self.ctrl.rules: # Clear results of prior runs from Terms if needed. if first: for c in rule.consequent: c.term.membership_value[self] = None c.activation[self] = None first = False self.compute_rule(rule) # Collect the results and present them as a dict for consequent in self.ctrl.consequents: consequent.output[self] = \ CrispValueCalculator(consequent, self).defuzz() self.output[consequent.label] = consequent.output[self] # Make note of this run so we can easily find it again if self.cache is not False: self._calculated.append(self.unique_id) else: # Reset StatePerSimulations self._reset_simulation() # Increment run number self._run += 1 if self._run % self._flush_after_run == 0: self._reset_simulation()
[docs] def compute_rule(self, rule): """ Implement rule according to Mamdani inference. The three step method consists of:: * Aggregation * Activation * Accumulation """ # Step 1: Aggregation. This finds the net accomplishment of the # antecedent by AND-ing or OR-ing together all the membership values # of the terms that make up the accomplishment condition. # The process of actually aggregating everything is delegated to the # TermAggregation class, but we can tell that class # what aggregation style this rule mandates if isinstance(rule.antecedent, TermAggregate): rule.antecedent.agg_method = rule.aggregation_method rule.aggregate_firing[self] = rule.antecedent.membership_value[self] # Step 2: Activation. The degree of membership of the consequence # is determined by the degree of accomplishment of the antecedent, # which is what we determined in step 1. The only difference would # be if the consequent has a weight, which we would apply now. for c in rule.consequent: assert isinstance(c, WeightedTerm) c.activation[self] = rule.aggregate_firing[self] * c.weight # Step 3: Accumulation. Apply the activation to each consequent, # accumulating multiple rule firings into a single membership value. # The process of actual accumulation is delegated to the # Term which uses its parent's accumulation method for c in rule.consequent: assert isinstance(c, WeightedTerm) term = c.term value = c.activation[self] # Find new membership value if term.membership_value[self] is None: term.membership_value[self] = value else: # Use the accumulation method of variable to determine # how to to handle multiple cuts accu = term.parent.accumulation_method term.membership_value[self] = accu(value, term.membership_value[self]) term.cuts[self][rule.label] = term.membership_value[self]
def _reset_simulation(self): """ Clear temporary data from simulation objects. Called internally if cache=False (after every run) or after a certain number of runs if cache=True according to the `flush_after_run` kwarg. """ def _clear_terms(fuzzy_var): for term in fuzzy_var.terms.values(): term.membership_value.clear() term.cuts.clear() for rule in self.ctrl.rules: rule.aggregate_firing.clear() for c in rule.consequent: c.activation.clear() for consequent in self.ctrl.consequents: consequent.output.clear() _clear_terms(consequent) for antecedent in self.ctrl.antecedents: antecedent.input.clear() _clear_terms(antecedent) self._calculated = [] self._run = 0
[docs] def print_state(self): """ Print info about the inner workings of a ControlSystemSimulation. """ if self.ctrl.consequents.next().output[self] is None: raise ValueError("Call compute method first.") print("=============") print(" Antecedents ") print("=============") for v in self.ctrl.antecedents: print("{0:<35} = {1}".format(v, v.input[self])) for term in v.terms.values(): print(" - {0:<32}: {1}".format(term.label, term.membership_value[self])) print("") print("=======") print(" Rules ") print("=======") rule_number = {} for rn, r in enumerate(self.ctrl.rules): assert isinstance(r, Rule) rule_number[r] = "RULE #%d" % rn print("RULE #%d:\n %s\n" % (rn, r)) print(" Aggregation (IF-clause):") for term in r.antecedent_terms: assert isinstance(term, Term) print(" - {0:<55}: {1}".format(term.full_label, term.membership_value[self])) print(" {0:>54} = {1}".format(r.antecedent, r.aggregate_firing[self])) print(" Activation (THEN-clause):") for c in r.consequent: assert isinstance(c, WeightedTerm) print(" {0:>54} : {1}".format(c, c.activation[self])) print("") print("") print("==============================") print(" Intermediaries and Conquests ") print("==============================") for c in self.ctrl.consequents: print("{0:<36} = {1}".format( c, CrispValueCalculator(c, self).defuzz())) for term in c.terms.values(): print(" %s:" % term.label) for cut_rule, cut_value in term.cuts[self].items(): if cut_rule not in rule_number.keys(): continue print(" {0:>32} : {1}".format(rule_number[cut_rule], cut_value)) accu = "Accumulate using %s" % c.accumulation_method.func_name print(" {0:>32} : {1}".format(accu, term.membership_value[self])) print("")
class CrispValueCalculator(object): """ Convert a calculated FuzzyVariable back into a crisp real number. Parameters ---------- fuzzy_var : FuzzyVariable The fuzzy variable to be defuzzified. sim : ControlSystemSimulation The simulation which holds all necessary data for this calculation. """ def __init__(self, fuzzy_var, sim): """ Initialization method for CrispValueCalculator. """ + '\n'.join(CrispValueCalculator.__doc__.split('\n')[1:]) assert isinstance(fuzzy_var, FuzzyVariable) assert isinstance(sim, ControlSystemSimulation) self.var = fuzzy_var self.sim = sim def defuzz(self): """Derive crisp value based on membership of adjective(s).""" ups_universe, output_mf, cut_mfs = self.find_memberships() if len(cut_mfs) == 0: raise ValueError("No terms have memberships. Make sure you " "have at least one rule connected to this " "variable and have run the rules calculation.") try: return defuzz(ups_universe, output_mf, self.var.defuzzify_method) except AssertionError: raise ValueError("Crisp output cannot be calculated, likely " "because the system is too sparse. Check to " "make sure this set of input values will " "activate at least one connected Term in each " "Antecedent via the current set of Rules.") def fuzz(self, value): """ Propagate crisp value down to adjectives by calculating membership. """ if len(self.var.terms) == 0: raise ValueError("Set Term membership function(s) first") for label, term in self.var.terms.items(): term.membership_value[self.sim] = \ interp_membership(self.var.universe, term.mf, value) def find_memberships(self): # Check we have some adjectives if len(self.var.terms.keys()) == 0: raise ValueError("Set term membership function(s) first") ''' First we have to upsample the universe of self.var in order to add the key points of the membership function based on the activation level for this consequent, using the interp_universe function, which interpolates the `xx` values in the universe such that its membership function value is the activation level. ''' add_universe = set() for label, term in self.var.terms.items(): cut = term.membership_value[self.sim] if cut is None: continue # No membership defined for this adjective add_xx = interp_universe(self.var.universe, term.mf, cut) add_universe.update(add_xx) # We are only interested in points not in self.var.universe add_universe = add_universe - set(self.var.universe) # We want to sort the universe values and keep related their indices # to access to their mf values upsampled_universe = ( list(zip(self.var.universe.tolist() + list(add_universe), list(range(self.var.universe.size)) + [None] * len(add_universe)))) upsampled_universe.sort(key=lambda element: element[0]) upsampled_mf_indices = [element[1] for element in upsampled_universe] upsampled_universe = np.array([ element[0] for element in upsampled_universe]) # Initilize membership output_mf = np.zeros_like(upsampled_universe, dtype=np.float64) # Build output membership function term_mfs = {} for label, term in self.var.terms.items(): cut = term.membership_value[self.sim] if cut is None: continue # No membership defined for this adjective upsampled_mf = [] for i in range(len(upsampled_mf_indices)): if upsampled_mf_indices[i] is not None: upsampled_mf.append(term.mf[upsampled_mf_indices[i]]) else: upsampled_mf.append( interp_membership(self.var.universe, term.mf, upsampled_universe[i])) term_mfs[label] = np.minimum(cut, upsampled_mf) np.maximum(output_mf, term_mfs[label], output_mf) return upsampled_universe, output_mf, term_mfs class RuleOrderGenerator(object): """ Generator to yield rules in the correct order for calculation. Parameters ---------- control_system : ControlSystem Fuzzy control system object, instance of `skfuzzy.ControlSystem`. Returns ------- out : Rule Fuzzy rules in computation order. """ def __init__(self, control_system): """ Generator to yield rules in the correct order for calculation. """ + '\n'.join(RuleOrderGenerator.__doc__.split('\n')[1:6]) assert isinstance(control_system, ControlSystem) self.control_system = control_system self._cache = [] self._cached_graph = None def __iter__(self): """ Method to yield the fuzzy rules in order for computation. """ # Determine if we can return the cached version or must calc new if self._cached_graph is not self.control_system.graph: # The controller is still using a different version of the graph # than we created the rule order for. Thus, make new cache self._init_state() self._cache = list(self._process_rules(self.all_rules[:])) self._cached_graph = self.control_system.graph for n, r in enumerate(self._cache): yield r else: n = 0 if n == 0: pass else: assert n == len(self.all_rules) - 1, "Not all rules exposed" def _init_state(self): # This graph will represent what's been calculated so far. We # initialize it to just the antecedents as they, by definition, already # have fuzzy values self.calced_graph = nx.DiGraph() for a in self.control_system.antecedents: self.calced_graph.add_star([a, ] + list(a.terms.values())) self.all_graph = self.control_system.graph self.all_rules = [] for node in self.all_graph.nodes(): if isinstance(node, Rule): self.all_rules.append(node) def _process_rules(self, rules): # Recursive funcion to process rules in the correct firing order len_rules = len(rules) skipped_rules = [] while len(rules) > 0: rule = rules.pop(0) if self._can_calc_rule(rule): yield rule # Add rule to calced graph self.calced_graph = nx.compose(self.calced_graph, rule.graph) else: # We have not calculated the predecsors for this rule yet. # Skip it for now skipped_rules.append(rule) if len(skipped_rules) == 0: # All done! raise StopIteration() else: if len(skipped_rules) == len_rules: # Avoid being caught in an infinite loop raise RuntimeError("Unable to resolve rule execution order. " "The most likely reason is two or more " "rules that depend on each other.\n" "Please check the rule graph for loops.") else: # Recurse across the skipped rules for r in self._process_rules(skipped_rules): yield r def _can_calc_rule(self, rule): # Check that we've exposed all inputs to this rule by ensuring # the predecessor-degree of each predecessor node is the same # in both the calculation graph and overall graph for p in self.all_graph.predecessors_iter(rule): assert isinstance(p, Term) if p not in self.calced_graph: return False all_degree = len(self.all_graph.predecessors(p)) calced_degree = len(self.calced_graph.predecessors(p)) if all_degree != calced_degree: return False return True