Source code for pm4py.simulation.tree_generator.variants.ptandloggenerator

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.objects.process_tree import obj
from pm4py.objects.process_tree import obj as pt_operator
from scipy.stats import triang as triangular
import string
import math
import itertools
from enum import Enum
from itertools import accumulate as _accumulate, repeat as _repeat
from bisect import bisect as _bisect
import random


[docs]def choices(population, weights=None, *, cum_weights=None, k=1): """Return a k sized list of population elements chosen with replacement. If the relative weights or cumulative weights are not specified, the selections are made with equal probability. """ n = len(population) if cum_weights is None: if weights is None: _int = int n += 0.0 # convert to float for a small speed improvement return [population[_int(random.random() * n)] for i in _repeat(None, k)] cum_weights = list(_accumulate(weights)) elif weights is not None: raise TypeError('Cannot specify both weights and cumulative weights') if len(cum_weights) != n: raise ValueError('The number of weights does not match the population') bisect = _bisect total = cum_weights[-1] + 0.0 # convert to float hi = n - 1 return [population[bisect(cum_weights, random.random() * total, 0, hi)] for i in _repeat(None, k)]
[docs]class Parameters(Enum): SEQUENCE = "sequence" CHOICE = "choice" PARALLEL = "parallel" LOOP = "loop" OR = "or" MODE = "mode" MIN = "min" MAX = "max" SILENT = "silent" DUPLICATE = "duplicate" LT_DEPENDENCY = "lt_dependency" INFREQUENT = "infrequent" NO_MODELS = "no_models" UNFOLD = "unfold" MAX_REPEAT = "max_repeat"
[docs]def apply(parameters=None): """ Generate a process tree using the PTAndLogGenerator approach (see the paper PTandLogGenerator: A Generator for Artificial Event Data) Parameters -------------- parameters Parameters of the algorithm, according to the paper: - Parameters.MODE: most frequent number of visible activities - Parameters.MIN: minimum number of visible activities - Parameters.MAX: maximum number of visible activities - Parameters.SEQUENCE: probability to add a sequence operator to tree - Parameters.CHOICE: probability to add a choice operator to tree - Parameters.PARALLEL: probability to add a parallel operator to tree - Parameters.LOOP: probability to add a loop operator to tree - Parameters.OR: probability to add an or operator to tree - Parameters.SILENT: probability to add silent activity to a choice or loop operator - Parameters.DUPLICATE: probability to duplicate an activity label - Parameters.LT_DEPENDENCY: probability to add a random dependency to the tree - Parameters.INFREQUENT: probability to make a choice have infrequent paths - Parameters.NO_MODELS: number of trees to generate from model population - Parameters.UNFOLD: whether or not to unfold loops in order to include choices underneath in dependencies: 0=False, 1=True if lt_dependency <= 0: this should always be 0 (False) if lt_dependency > 0: this can be 1 or 0 (True or False) - Parameters.MAX_REPEAT: maximum number of repetitions of a loop (only used when unfolding is True) """ if parameters is None: parameters = {} if not "mode" in parameters: parameters["mode"] = 20 if not "min" in parameters: parameters["min"] = 10 if not "max" in parameters: parameters["max"] = 30 if not "sequence" in parameters: parameters["sequence"] = 0.25 if not "choice" in parameters: parameters["choice"] = 0.25 if not "parallel" in parameters: parameters["parallel"] = 0.25 if not "loop" in parameters: parameters["loop"] = 0.25 if not "or" in parameters: parameters["or"] = 0.0 if not "silent" in parameters: parameters["silent"] = 0.2 if not "duplicate" in parameters: parameters["duplicate"] = 0 if not "lt_depencency" in parameters: parameters["lt_depencency"] = 0.0 if not "infrequent" in parameters: parameters["infrequent"] = 0.5 if not "no_models" in parameters: parameters["no_models"] = 10 if not "unfold" in parameters: parameters["unfold"] = 10 if not "max_repeat" in parameters: parameters["max_repeat"] = 10 return GeneratedTree(parameters).generate()
[docs]def assign_operator(operator): if operator == "choice": return pt_operator.Operator.XOR elif operator == "sequence": return pt_operator.Operator.SEQUENCE elif operator == "parallel": return pt_operator.Operator.PARALLEL elif operator == "or": return pt_operator.Operator.OR elif operator == "loop": return pt_operator.Operator.LOOP else: return None
[docs]class GeneratedTree(object): # are later used as labels alphabet = string.ascii_lowercase
[docs] def calculate_activity_distribution(self, mode, min, max): """ Here, the triangular function is used, since the parameters for this function are given in the paramterfile. However, this approach can be applied on other distribution functions as well. :param mode: Mode of the distribution :param min: Smallest number :param max: Highest number :return: Distribution object """ c = (mode - min) / (max - min) return triangular(c, loc=min, scale=max - min)
[docs] def draw_random_number_from_distribution(self): return self.activity_distribution.rvs(1)[0]
[docs] def select_operator(self): # add root operator, if probabilities are high enough # ordering of operator computation is sequence, choice, parallel, loop, or operator = choices(["sequence", "choice", "parallel", "loop", "or"], [self.parameters["sequence"], self.parameters["choice"], self.parameters["parallel"], self.parameters["loop"], self.parameters["or"]]) return operator[0]
[docs] def get_next_activity(self, activity): result = self.set_activity_labels[self.set_activity_labels.index(activity) + 1] return result
[docs] def assign_root_opeartor(self): activity = "a" # is a silent activity chosen silent_activity = False if random.random() < self.parameters["silent"]: silent_activity = True root = self.tree._get_root() operator = self.select_operator() root.operator = assign_operator(operator) # if operator is loop, we use a special structure, otherwise 2 if operator == "loop": root.operator = pt_operator.Operator.SEQUENCE root_loop = obj.ProcessTree(operator=pt_operator.Operator.LOOP) root_loop.parent = root root._children.append(root_loop) new_node = obj.ProcessTree(label=activity) new_node.parent = root_loop root_loop._children.append(new_node) activity = self.get_next_activity(activity) if silent_activity: new_node = obj.ProcessTree(label=None) new_node.parent = root_loop root_loop._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = root_loop root_loop._children.append(new_node) activity = self.get_next_activity(activity) new_node = obj.ProcessTree(label=activity) new_node.parent = root root._children.append(new_node) self.total_activities -= 1 else: if silent_activity and operator == "choice": number = random.choice([0, 1]) if number == 0: new_node = obj.ProcessTree(label=None) new_node.parent = root root._children.append(new_node) new_node = obj.ProcessTree(label=activity) new_node.parent = root root._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = root root._children.append(new_node) new_node = obj.ProcessTree(label=None) new_node.parent = root root._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = root root._children.append(new_node) activity = self.get_next_activity(activity) new_node = obj.ProcessTree(label=activity) new_node.parent = root root._children.append(new_node) # always two children are added self.total_activities -= 2 return self.get_next_activity(activity)
[docs] def add_node(self, next_activity): """ Add nodes to current tree. The general procedure is as follows: Select a random leaf (leaves have label). Next step, and opertor is chosen. The chosen operator then replaces the leaf, whereby the old label is then add as a leaf to the manipulated node. Then, next activity is added as a second leaf to the new operator node or a silent acticity (tau) is added. :return: Next activity """ # Need to select random node that is not a silent activity leaf_silent = True while (leaf_silent): leaf = random.choice(self.tree._get_leaves()) if leaf.label is not None: leaf_silent = False operator_nok = True while (operator_nok): operator = self.select_operator() if self.total_activities > 1: operator_nok = False else: if operator != "loop": operator_nok = False activity = leaf._get_label() leaf._set_label(None) leaf._set_operator(assign_operator(operator)) # Will be an tau added? silent_activity = False if random.random() < self.parameters["silent"]: silent_activity = True # add two children if operator == "loop": leaf._set_operator(pt_operator.Operator.SEQUENCE) root_loop = obj.ProcessTree(pt_operator.Operator.LOOP) root_loop.parent = leaf leaf._children.append(root_loop) new_node = obj.ProcessTree(label=activity) new_node.parent = root_loop root_loop._children.append(new_node) activity = next_activity if silent_activity: new_node = obj.ProcessTree(label=None) new_node.parent = root_loop root_loop._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = root_loop root_loop._children.append(new_node) activity = self.get_next_activity(activity) new_node = obj.ProcessTree(label=activity) new_node.parent = leaf leaf._children.append(new_node) self.total_activities -= 1 else: if silent_activity and operator == "choice": number = random.choice([0, 1]) if number == 0: new_node = obj.ProcessTree(label=None) new_node.parent = leaf leaf._children.append(new_node) new_node = obj.ProcessTree(label=activity) new_node.parent = leaf leaf._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = leaf leaf._children.append(new_node) new_node = obj.ProcessTree(label=None) new_node.parent = leaf leaf._children.append(new_node) else: new_node = obj.ProcessTree(label=activity) new_node.parent = leaf leaf._children.append(new_node) activity = next_activity new_node = obj.ProcessTree(label=activity) new_node.parent = leaf leaf._children.append(new_node) self.total_activities -= 2 if silent_activity and operator == "choice": return next_activity else: return self.get_next_activity(activity)
[docs] def add_duplicates(self): """ Replaces some leaves to add duplicated labels. Depends on parameter. :return: """ duplication_allowed = False leaves = self.tree._get_leaves() for leaf in leaves: if leaf._parent != self.tree._get_root(): duplication_allowed = True break # if there is at least a depth of two if duplication_allowed: # list that contains the leaves with a label unequal to tau leaves_with_label = [] for leaf in leaves: if leaf.label is not None: leaves_with_label.append(leaf) # generate random list of duplicates duplicates = [] for leaf in leaves: if random.random() < self.parameters["duplicate"]: duplicates.append(leaf) if len(duplicates) > 0: # select potential leaves to replace them by duplicates possible_replacements = [] for leaf in leaves: if leaf not in duplicates: possible_replacements.append(leaf) for leaf in duplicates: i = 0 siblings = [] # determine sibling nodes (same parent) p = leaf._parent for child in p._children: if child != leaf: siblings.append(child) # TODO Skaling? Original: 30times, my idea : percentage of duplicates * len(leaves) while i < self.parameters["duplicate"] * len(leaves): replacement = random.choice(possible_replacements) if replacement not in siblings: replacement._label = leaf._label break
[docs] def create_process_tree(self): self.tree = obj.ProcessTree() self.set_activity_labels = [] p = 1 # create labels while (self.total_activities > len(self.set_activity_labels)): # pairwise product l = itertools.product(self.alphabet, repeat=p) for item in l: label = "" for element in item: label += str(element) self.set_activity_labels.append(label) p += 1 step = 1 activity = self.assign_root_opeartor() step += 1 while (self.total_activities > 0): activity = self.add_node(activity) step += 1
def __init__(self, parameters): self.parameters = {} for param in parameters: p = param if type(param) is str else param.value self.parameters[p] = parameters[param] # rescale probabilities of operators if the sum is not equal to one if self.parameters["sequence"] + self.parameters["choice"] + self.parameters["parallel"] + self.parameters[ "loop"] + self.parameters["or"] != 1: sum_of_operators = self.parameters["sequence"] + self.parameters["choice"] + self.parameters["parallel"] + \ self.parameters["loop"] + self.parameters["or"] self.parameters["sequence"] = self.parameters["sequence"] / sum_of_operators self.parameters["choice"] = self.parameters["choice"] / sum_of_operators self.parameters["parallel"] = self.parameters["parallel"] / sum_of_operators self.parameters["or"] = self.parameters["or"] / sum_of_operators self.parameters["loop"] = self.parameters["loop"] / sum_of_operators # First step: Compute acivity distribution # Since mode, min and max are given, the triangle distribution is chosen self.activity_distribution = self.calculate_activity_distribution(self.parameters["mode"], self.parameters["min"], self.parameters["max"]) # Number of total activities represented in the tree. Also, tau is counted as an activity. self.total_activities = int(math.ceil(self.draw_random_number_from_distribution()))
[docs] def generate(self): # Create a process tree based on the given probabilities self.create_process_tree() # add duplicates self.add_duplicates() return self.tree