Source code for pm4py.objects.conversion.wf_net.variants.to_process_tree

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import datetime
import itertools
import uuid
from copy import deepcopy
from enum import Enum

from pm4py.objects.petri_net.utils import petri_utils as pn_util
from pm4py.objects.petri_net.obj import PetriNet
from pm4py.objects.process_tree import obj as pt_operator
from pm4py.objects.process_tree.utils import generic as pt_util
from pm4py.objects.process_tree.utils.generic import tree_sort
from pm4py.util import exec_utils

TRANSITION_PREFIX = str(uuid.uuid4())


[docs]class Parameters(Enum): DEBUG = "debug" FOLD = "fold"
[docs]def generate_label_for_transition(t): return 'tau' if t.label is None else '\'' + t.label + '\'' if not t.name.startswith( TRANSITION_PREFIX) else t.label
[docs]def generate_new_binary_transition(t1, t2, operator, net): t = PetriNet.Transition(TRANSITION_PREFIX + str(datetime.datetime.now())) t.label = str(operator) + '(' + generate_label_for_transition( t1) + ', ' + generate_label_for_transition(t2) + ')' return t
[docs]def loop_requirement(t1, t2): if t1 == t2: return False for p in pn_util.pre_set(t2): if len(pn_util.pre_set(p)) != 1: # check that the preset of the t2 preset has one entry return False if t1 not in pn_util.pre_set(p): # check that t1 is the unique way to mark the preset of t2 return False for p in pn_util.post_set(t2): if len(pn_util.post_set(p)) != 1: return False if t1 not in pn_util.post_set(p): return False for p in pn_util.pre_set(t1): if len(pn_util.post_set(p)) != 1: return False if t1 not in pn_util.post_set(p): return False if t2 not in pn_util.pre_set(p): # t2 has to enable t1! return False for p in pn_util.post_set(t1): if len(pn_util.pre_set(p)) != 1: # check that the preset of the t2 preset has one entry return False if t1 not in pn_util.pre_set(p): # check that t1 is the unique way to mark the preset of t2 return False if t2 not in pn_util.post_set(p): return False return True
[docs]def binary_loop_detection(net): c1 = None c2 = None for t1, t2 in itertools.product(net.transitions, net.transitions): if loop_requirement(t1, t2): c1 = t1 c2 = t2 break if c1 is not None and c2 is not None: t = generate_new_binary_transition(c1, c2, pt_operator.Operator.LOOP, net) net.transitions.add(t) # reduce for a in c1.in_arcs: pn_util.add_arc_from_to(a.source, t, net) for a in c1.out_arcs: pn_util.add_arc_from_to(t, a.target, net) pn_util.remove_transition(net, c1) pn_util.remove_transition(net, c2) return net return None
[docs]def concurrent_requirement(t1, t2): if t1 == t2: # check if transitions different return False if len(pn_util.pre_set(t1)) == 0 or len(pn_util.post_set(t1)) == 0 or len(pn_util.pre_set(t2)) == 0 or len( pn_util.post_set(t2)) == 0: # not possible in WF-net, just checking... return False pre_pre = set() post_post = set() for p in pn_util.pre_set(t1): # check if t1 is unique post of its preset pre_pre = set.union(pre_pre, pn_util.pre_set(p)) if len(pn_util.post_set(p)) > 1 or t1 not in pn_util.post_set(p): return False for p in pn_util.post_set(t1): # check if t1 is unique pre of its postset post_post = set.union(post_post, pn_util.post_set(p)) if len(pn_util.pre_set(p)) > 1 or t1 not in pn_util.pre_set(p): return False for p in pn_util.pre_set(t2): # check if t2 is unique post of its preset pre_pre = set.union(pre_pre, pn_util.pre_set(p)) if len(pn_util.post_set(p)) > 1 or t2 not in pn_util.post_set(p): return False for p in pn_util.post_set(t2): # check if t2 is unique pre of its postset post_post = set.union(post_post, pn_util.post_set(p)) if len(pn_util.pre_set(p)) > 1 or t2 not in pn_util.pre_set(p): return False for p in set.union(pn_util.pre_set(t1), pn_util.pre_set(t2)): # check if presets synchronize for t in pre_pre: if t not in pn_util.pre_set(p): return False for p in set.union(pn_util.post_set(t1), pn_util.post_set(t2)): # check if postsets synchronize for t in post_post: if t not in pn_util.post_set(p): return False return True
[docs]def binary_concurrency_detection(net): c1 = None c2 = None for t1, t2 in itertools.product(net.transitions, net.transitions): if concurrent_requirement(t1, t2): c1 = t1 c2 = t2 break if c1 is not None and c2 is not None: t = generate_new_binary_transition(c1, c2, pt_operator.Operator.PARALLEL, net) net.transitions.add(t) # reduce for a in c1.in_arcs: pn_util.add_arc_from_to(a.source, t, net) for a in c1.out_arcs: pn_util.add_arc_from_to(t, a.target, net) for a in c2.in_arcs: pn_util.add_arc_from_to(a.source, t, net) for a in c2.out_arcs: pn_util.add_arc_from_to(t, a.target, net) pn_util.remove_transition(net, c1) pn_util.remove_transition(net, c2) return net return None
[docs]def choice_requirement(t1, t2): return t1 != t2 and pn_util.pre_set(t1) == pn_util.pre_set(t2) and pn_util.post_set(t1) == pn_util.post_set( t2) and len(pn_util.pre_set(t1)) > 0 and len( pn_util.post_set(t1)) > 0
[docs]def binary_choice_detection(net): c1 = None c2 = None for t1, t2 in itertools.product(net.transitions, net.transitions): if choice_requirement(t1, t2): c1 = t1 c2 = t2 break if c1 is not None and c2 is not None: t = generate_new_binary_transition(c1, c2, pt_operator.Operator.XOR, net) net.transitions.add(t) for a in c1.in_arcs: pn_util.add_arc_from_to(a.source, t, net) for a in c2.out_arcs: pn_util.add_arc_from_to(t, a.target, net) pn_util.remove_transition(net, c1) pn_util.remove_transition(net, c2) return net return None
[docs]def sequence_requirement(t1, t2): if t1 == t2: return False if len(pn_util.pre_set(t2)) == 0: return False for p in pn_util.post_set(t1): if len(pn_util.pre_set(p)) != 1 or len(pn_util.post_set(p)) != 1: return False if t1 not in pn_util.pre_set(p): return False if t2 not in pn_util.post_set(p): return False for p in pn_util.pre_set(t2): if len(pn_util.pre_set(p)) != 1 or len(pn_util.post_set(p)) != 1: return False if t1 not in pn_util.pre_set(p): return False if t2 not in pn_util.post_set(p): # redundant check, just to be sure... return False return True
[docs]def binary_sequence_detection(net): c1 = None c2 = None for t1, t2 in itertools.product(net.transitions, net.transitions): if sequence_requirement(t1, t2): c1 = t1 c2 = t2 break if c1 is not None and c2 is not None: t = generate_new_binary_transition(c1, c2, pt_operator.Operator.SEQUENCE, net) net.transitions.add(t) for a in c1.in_arcs: pn_util.add_arc_from_to(a.source, t, net) for a in c2.out_arcs: pn_util.add_arc_from_to(t, a.target, net) for p in pn_util.post_set(c1): pn_util.remove_place(net, p) pn_util.remove_transition(net, c1) pn_util.remove_transition(net, c2) return net return None
[docs]def group_blocks_in_net(net, parameters=None): """ Groups the blocks in the Petri net Parameters -------------- net Petri net parameters Parameters of the algorithm Returns -------------- grouped_net Petri net (blocks are grouped according to the algorithm) """ if parameters is None: parameters = {} from pm4py.algo.analysis.workflow_net import algorithm as wf_eval if not wf_eval.apply(net): raise ValueError('The Petri net provided is not a WF-net') net = deepcopy(net) while len(net.transitions) > 1: if binary_choice_detection(net) is not None: continue elif binary_sequence_detection(net) is not None: continue elif binary_concurrency_detection(net) is not None: continue elif binary_loop_detection(net) is not None: continue else: break return net
[docs]def apply(net, im, fm, parameters=None): """ Transforms a WF-net to a process tree Parameters ------------- net Petri net im Initial marking fm Final marking Returns ------------- tree Process tree """ if parameters is None: parameters = {} debug = exec_utils.get_param_value(Parameters.DEBUG, parameters, False) fold = exec_utils.get_param_value(Parameters.FOLD, parameters, True) grouped_net = group_blocks_in_net(net, parameters=parameters) if len(grouped_net.transitions) == 1: pt_str = list(grouped_net.transitions)[0].label pt = pt_util.parse(pt_str) ret = pt_util.fold(pt) if fold else pt tree_sort(ret) return ret else: if debug: from pm4py.visualization.petri_net import visualizer as pn_viz pn_viz.view(pn_viz.apply(grouped_net, parameters={"format": "svg"})) raise ValueError('Parsing of WF-net Failed')