Source code for pm4py.visualization.petrinet.util.vis_trans_shortest_paths

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from statistics import mean, median, stdev

from pm4py.visualization.common.utils import *


[docs]def get_shortest_paths_from_trans(original_trans, trans, spaths, visited_arcs, visited_transitions, added_elements, rec_depth): """ Get shortest paths from a given transition Parameters -------------- original_trans Original transition trans Current considered transition spaths Map of shortest paths visited_arcs Set of visited arcs visited_transitions Set of visited transitions added_elements Elements to add recursively rec_depth Recursion depth Returns ------------- spaths Map of shortest paths visited_arcs Set of visited arcs added_elements Elements to add recursively """ for out_arc in trans.out_arcs: if out_arc not in visited_arcs: visited_arcs.add(out_arc) target_place = out_arc.target for place_out_arc in target_place.out_arcs: if place_out_arc not in visited_arcs: visited_arcs.add(place_out_arc) target_trans = place_out_arc.target if target_trans not in visited_transitions: visited_transitions.add(target_trans) if target_trans.label: el1 = ((original_trans.name, target_trans.name), 0, rec_depth) if out_arc not in spaths: spaths[out_arc] = set() spaths[out_arc].add(el1) added_elements.add(el1) el2 = ((original_trans.name, target_trans.name), 1, rec_depth) if place_out_arc not in spaths: spaths[place_out_arc] = set() spaths[place_out_arc].add(el2) added_elements.add(el2) else: spaths, visited_arcs, visited_transitions, added_elements = get_shortest_paths_from_trans( original_trans, target_trans, spaths, visited_arcs, visited_transitions, added_elements, rec_depth + 1) for element in added_elements: new_element = list(element) if new_element[1] == 0: new_element[1] = 2 if out_arc not in spaths: spaths[out_arc] = set() spaths[out_arc].add(tuple(new_element)) if new_element[1] == 1: new_element[1] = 3 if place_out_arc not in spaths: spaths[place_out_arc] = set() spaths[place_out_arc].add(tuple(new_element)) return spaths, visited_arcs, visited_transitions, added_elements
[docs]def get_shortest_paths(net, enable_extension=False): """ Gets shortest paths between visible transitions in a Petri net Parameters ----------- net Petri net enable_extension Enable decoration of more arcs, in a risky way, when needed Returns ----------- spaths Shortest paths """ spaths = {} for trans in net.transitions: if trans.label: visited_arcs = set() visited_transitions = set() added_elements = set() spaths, visited_arcs, visited_transitions, added_elements = get_shortest_paths_from_trans(trans, trans, spaths, visited_arcs, visited_transitions, added_elements, 0) spaths_keys = list(spaths.keys()) for edge in spaths_keys: list_zeros = [el for el in spaths[edge] if el[1] == 0] list_ones = [el for el in spaths[edge] if el[1] == 1] if list_zeros: spaths[edge] = {x for x in spaths[edge] if x[1] == 0} min_dist = min([x[2] for x in spaths[edge]]) possible_targets = set([x[0] for x in spaths[edge] if x[2] == min_dist]) spaths[edge] = set() for target in possible_targets: spaths[edge].add((target, 0, min_dist)) elif list_ones: spaths[edge] = {x for x in spaths[edge] if x[1] == 1} min_dist = min([x[2] for x in spaths[edge]]) possible_targets = set([x[0] for x in spaths[edge] if x[2] == min_dist]) spaths[edge] = set() for target in possible_targets: spaths[edge].add((target, 1, min_dist)) else: unique_targets = set([x[0] for x in spaths[edge]]) if len(unique_targets) == 1: spaths[edge] = set() spaths[edge].add((list(unique_targets)[0], 2, 0)) else: if enable_extension: min_dist = min([x[2] for x in spaths[edge]]) possible_targets = set([x[0] for x in spaths[edge] if x[2] == min_dist]) spaths[edge] = set() for target in possible_targets: spaths[edge].add((target, 2, min_dist)) else: del spaths[edge] return spaths
[docs]def get_decorations_from_dfg_spaths_acticount(net, dfg, spaths, activities_count, variant="frequency", aggregation_measure=None, stat_locale: dict = {}): """ Get decorations from Petrinet without doing any replay but based on DFG measures, shortest paths and activities count. The variant could be 'frequency' or 'performance'. Aggregation measure could also be specified Parameters ----------- net Petri net dfg Directly-Follows graph spaths Shortest paths between visible transitions in the Petri net activities_count Count of activities in the Petri net variant Describe how to decorate the Petri net (could be frequency or performance) aggregation_measure Specifies the aggregation measure stat_locale Dict to locale the stat strings Returns ----------- decorations Decorations to use for the Petri net """ decorations_single_contrib = {} decorations_single_contrib_trans = {} decorations_int = {} decorations = {} if aggregation_measure is None: if "frequency" in variant: aggregation_measure = "sum" elif "performance" in variant: aggregation_measure = "mean" for arc in spaths: for couple in spaths[arc]: dfg_key = couple[0] if dfg_key in dfg: if arc not in decorations_single_contrib: decorations_single_contrib[arc] = [] decorations_single_contrib[arc].append(dfg[dfg_key]) if dfg_key[1] not in decorations_single_contrib_trans: decorations_single_contrib_trans[dfg_key[1]] = {} decorations_single_contrib_trans[dfg_key[1]][dfg_key[0]] = dfg[dfg_key] for arc in decorations_single_contrib: decorations_value = None if aggregation_measure == "sum": decorations_value = sum(decorations_single_contrib[arc]) elif aggregation_measure == "mean": decorations_value = mean(decorations_single_contrib[arc]) elif aggregation_measure == "median": decorations_value = median(decorations_single_contrib[arc]) elif aggregation_measure == "stdev": decorations_value = stdev(decorations_single_contrib[arc]) elif aggregation_measure == "min": decorations_value = min(decorations_single_contrib[arc]) elif aggregation_measure == "max": decorations_value = max(decorations_single_contrib[arc]) if decorations_value is not None: decorations_int[arc] = decorations_value if decorations_int: arcs_min_value = min(list(decorations_int.values())) arcs_max_value = max(list(decorations_int.values())) for arc in decorations_int: if "performance" in variant: arc_label = human_readable_stat(decorations_int[arc], stat_locale) else: arc_label = str(decorations_int[arc]) decorations[arc] = {"label": arc_label, "penwidth": str(get_arc_penwidth(decorations_int[arc], arcs_min_value, arcs_max_value))} trans_map = {} for trans in net.transitions: if trans.label: trans_map[trans.label] = trans if "frequency" in variant: act_min_value = min(list(activities_count.values())) act_max_value = max(list(activities_count.values())) for act in activities_count: if act in trans_map: trans = trans_map[act] color = get_trans_freq_color(activities_count[act], act_min_value, act_max_value) label = act + " (" + str(activities_count[act]) + ")" decorations[trans] = {"label": label, "color": color} elif "performance" in variant: for act in decorations_single_contrib_trans: if act in trans_map: trans = trans_map[act] trans_values = list(decorations_single_contrib_trans[act].values()) decorations[trans] = {"performance": mean(trans_values)} return decorations