Source code for pm4py.algo.simulation.tree_generator.variants.ptandloggenerator

```'''

PM4Py is free software: you can redistribute it and/or modify
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from pm4py.objects.process_tree import obj
from pm4py.objects.process_tree import obj as pt_operator
from scipy.stats import triang as triangular
import string
import math
import itertools
from enum import Enum
from itertools import accumulate as _accumulate, repeat as _repeat
from bisect import bisect as _bisect
import random
from typing import Optional, Dict, Any, Union, Tuple
from string import ascii_lowercase
import itertools

[docs]def choices(population, weights=None, *, cum_weights=None, k=1):
"""Return a k sized list of population elements chosen with replacement.
If the relative weights or cumulative weights are not specified,
the selections are made with equal probability.
"""
n = len(population)
if cum_weights is None:
if weights is None:
_int = int
n += 0.0    # convert to float for a small speed improvement
return [population[_int(random.random() * n)] for i in _repeat(None, k)]
cum_weights = list(_accumulate(weights))
elif weights is not None:
raise TypeError('Cannot specify both weights and cumulative weights')
if len(cum_weights) != n:
raise ValueError('The number of weights does not match the population')
bisect = _bisect
total = cum_weights[-1] + 0.0   # convert to float
hi = n - 1
return [population[bisect(cum_weights, random.random() * total, 0, hi)]
for i in _repeat(None, k)]

[docs]class Parameters(Enum):
SEQUENCE = "sequence"
CHOICE = "choice"
PARALLEL = "parallel"
LOOP = "loop"
OR = "or"
MODE = "mode"
MIN = "min"
MAX = "max"
SILENT = "silent"
DUPLICATE = "duplicate"
NO_MODELS = "no_models"

[docs]def apply(parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> obj.ProcessTree:
"""
Generate a process tree using the PTAndLogGenerator approach
(see the paper PTandLogGenerator: A Generator for Artificial Event Data)

Parameters
--------------
parameters
Parameters of the algorithm, according to the paper:
- Parameters.MODE: most frequent number of visible activities
- Parameters.MIN: minimum number of visible activities
- Parameters.MAX: maximum number of visible activities
- Parameters.SEQUENCE: probability to add a sequence operator to tree
- Parameters.CHOICE: probability to add a choice operator to tree
- Parameters.PARALLEL: probability to add a parallel operator to tree
- Parameters.LOOP: probability to add a loop operator to tree
- Parameters.OR: probability to add an or operator to tree
- Parameters.SILENT: probability to add silent activity to a choice or loop operator
- Parameters.DUPLICATE: probability to duplicate an activity label
- Parameters.NO_MODELS: number of trees to generate from model population
"""
if parameters is None:
parameters = {}

if not "mode" in parameters:
parameters["mode"] = 20
if not "min" in parameters:
parameters["min"] = 10
if not "max" in parameters:
parameters["max"] = 30
if not "sequence" in parameters:
parameters["sequence"] = 0.25
if not "choice" in parameters:
parameters["choice"] = 0.25
if not "parallel" in parameters:
parameters["parallel"] = 0.25
if not "loop" in parameters:
parameters["loop"] = 0.25
if not "or" in parameters:
parameters["or"] = 0.0
if not "silent" in parameters:
parameters["silent"] = 0.2
if not "duplicate" in parameters:
parameters["duplicate"] = 0
if not "no_models" in parameters:
parameters["no_models"] = 1

no_models = parameters["no_models"]

if no_models == 1:
return GeneratedTree(parameters).generate()
else:
# if the generation of an higher number of models is required,
# proceed to the generation of these and return a list of
# process trees.
ret = []
for i in range(no_models):
ret.append(GeneratedTree(parameters).generate())
return ret

[docs]def assign_operator(operator):
if operator == "choice":
return pt_operator.Operator.XOR
elif operator == "sequence":
return pt_operator.Operator.SEQUENCE
elif operator == "parallel":
return pt_operator.Operator.PARALLEL
elif operator == "or":
return pt_operator.Operator.OR
elif operator == "loop":
return pt_operator.Operator.LOOP
else:
return None

[docs]class GeneratedTree(object):
# are later used as labels
alphabet = string.ascii_lowercase

[docs]    def calculate_activity_distribution(self, mode, min, max):
"""
Here, the triangular function is used, since the parameters for this function are given in the paramterfile.
However, this approach can be applied on other distribution functions as well.
:param mode: Mode of the distribution
:param min: Smallest number
:param max: Highest number
:return: Distribution object
"""
# manage the case where min = mode = max
c = (mode - min + 10**-12) / (max - min + 10**-6)
ret = triangular(c, loc=min, scale=max - min)
return ret

[docs]    def draw_random_number_from_distribution(self):
return self.activity_distribution.rvs(1)[0]

[docs]    def select_operator(self):
# add root operator, if probabilities are high enough
# ordering of operator computation is sequence, choice, parallel, loop, or
operator = choices(["sequence", "choice", "parallel", "loop", "or"],
[self.parameters["sequence"], self.parameters["choice"], self.parameters["parallel"],
self.parameters["loop"], self.parameters["or"]])
return operator[0]

[docs]    def get_next_activity(self):
result = self.iter.__next__()
return result

"""
Replaces some leaves  to add duplicated labels. Depends on parameter.
:return:
"""
duplication_allowed = False
leaves = self.tree._get_leaves()
for leaf in leaves:
if leaf._parent != self.tree._get_root():
duplication_allowed = True
break
# if there is at least a depth of two
if duplication_allowed:
# list that contains the leaves with a label unequal to tau
leaves_with_label = []
for leaf in leaves:
if leaf.label is not None:
leaves_with_label.append(leaf)
# generate random list of duplicates
duplicates = []
for leaf in leaves:
if random.random() < self.parameters["duplicate"]:
duplicates.append(leaf)
if len(duplicates) > 0:
# select potential leaves to replace them by duplicates
possible_replacements = []
for leaf in leaves:
if leaf not in duplicates:
possible_replacements.append(leaf)
for leaf in duplicates:
i = 0
siblings = []
# determine sibling nodes (same parent)
p = leaf._parent
for child in p._children:
if child != leaf:
siblings.append(child)
# TODO Skaling? Original: 30times, my idea : percentage of duplicates * len(leaves)
while i < self.parameters["duplicate"] * len(leaves):
replacement = random.choice(possible_replacements)
if replacement not in siblings:
replacement._label = leaf._label
break

leaves = self.tree._get_leaves()
if self.tree.operator is None:
leaves.append(self.tree)
visible_leaves = [x for x in leaves if x.label is not None]
operator = self.select_operator()
op_map = {"parallel": obj.Operator.PARALLEL, "sequence": obj.Operator.SEQUENCE, "choice": obj.Operator.XOR, "loop": obj.Operator.LOOP, "or": obj.Operator.OR}
mapped_operator = op_map[operator]
order = random.randrange(0, 2)
chosen_leaf = random.choice(leaves)
label1 = chosen_leaf.label
if chosen_leaf.label is None:
label1 = self.get_next_activity()
label2 = None
if chosen_leaf.parent is None:
self.tree = obj.ProcessTree(operator=mapped_operator)
chosen_leaf = self.tree
else:
parent = chosen_leaf.parent
del parent.children[parent.children.index(chosen_leaf)]
chosen_leaf = obj.ProcessTree(operator=mapped_operator, parent=parent)
parent.children.append(chosen_leaf)
r = random.random()
if self.total_activities - len(visible_leaves) > added_count and not r < self.parameters["silent"]:
label2 = self.get_next_activity()
node1 = obj.ProcessTree(label=label1, parent=chosen_leaf)
node2 = obj.ProcessTree(label=label2, parent=chosen_leaf)
if order == 0:
chosen_leaf.children.append(node1)
chosen_leaf.children.append(node2)
elif order == 1:
chosen_leaf.children.append(node2)
chosen_leaf.children.append(node1)

[docs]    def iter_all_strings(self):
for size in itertools.count(1):
for s in itertools.product(ascii_lowercase, repeat=size):
yield "".join(s)

[docs]    def create_process_tree(self):
self.iter = self.iter_all_strings()
self.tree = obj.ProcessTree()
visible_leaves = [x for x in self.tree._get_leaves() if x.label is not None]
while len(visible_leaves) < self.total_activities:
visible_leaves = [x for x in self.tree._get_leaves() if x.label is not None]

def __init__(self, parameters):
self.parameters = {}
for param in parameters:
p = param if type(param) is str else param.value
self.parameters[p] = parameters[param]
# rescale probabilities of operators if the sum is not equal to one
if self.parameters["sequence"] + self.parameters["choice"] + self.parameters["parallel"] + self.parameters[
"loop"] + self.parameters["or"] != 1:
sum_of_operators = self.parameters["sequence"] + self.parameters["choice"] + self.parameters["parallel"] + \
self.parameters["loop"] + self.parameters["or"]
self.parameters["sequence"] = self.parameters["sequence"] / sum_of_operators
self.parameters["choice"] = self.parameters["choice"] / sum_of_operators
self.parameters["parallel"] = self.parameters["parallel"] / sum_of_operators
self.parameters["or"] = self.parameters["or"] / sum_of_operators
self.parameters["loop"] = self.parameters["loop"] / sum_of_operators
# First step: Compute acivity distribution
# Since mode, min and max are given, the triangle distribution is chosen
self.activity_distribution = self.calculate_activity_distribution(self.parameters["mode"],
self.parameters["min"],
self.parameters["max"])
# Number of total activities represented in the tree. Also, tau is counted as an activity.
self.total_activities = int(round(self.draw_random_number_from_distribution()))

[docs]    def generate(self):
# Create a process tree based on the given probabilities
self.create_process_tree()