Source code for pm4py.objects.petri.align_utils

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import heapq
import sys
from copy import copy
from typing import List, Tuple

import numpy as np

from pm4py.objects.petri import semantics, properties
from pm4py.objects.petri.obj import Marking, PetriNet
from pm4py.util.lp import solver as lp_solver

SKIP = '>>'
STD_MODEL_LOG_MOVE_COST = 10000
STD_TAU_COST = 1
STD_SYNC_COST = 0


[docs]def search_path_among_sol(sync_net: PetriNet, ini: Marking, fin: Marking, activated_transitions: List[PetriNet.Transition], skip=SKIP) -> Tuple[ List[PetriNet.Transition], bool, int]: """ (Efficient method) Searches a firing sequence among the X vector that is the solution of the (extended) marking equation Parameters --------------- sync_net Synchronous product net ini Initial marking of the net fin Final marking of the net activated_transitions Transitions that have non-zero occurrences in the X vector skip Skip transition Returns --------------- firing_sequence Firing sequence reach_fm Boolean value that tells if the final marking is reached by the firing sequence explained_events Number of explained events """ reach_fm = False trans_empty_preset = set(t for t in sync_net.transitions if len(t.in_arcs) == 0) trans_with_index = {} trans_wo_index = set() for t in activated_transitions: if properties.TRACE_NET_TRANS_INDEX in t.properties: trans_with_index[t.properties[properties.TRACE_NET_TRANS_INDEX]] = t else: trans_wo_index.add(t) keys = sorted(list(trans_with_index.keys())) trans_with_index = [trans_with_index[i] for i in keys] best_tuple = (0, 0, ini, list()) open_set = [best_tuple] heapq.heapify(open_set) visited = 0 closed = set() len_trace_with_index = len(trans_with_index) while len(open_set) > 0: curr = heapq.heappop(open_set) index = -curr[0] marking = curr[2] if marking in closed: continue if index == len_trace_with_index: reach_fm = True if curr[0] < best_tuple[0]: best_tuple = curr break if curr[0] < best_tuple[0]: best_tuple = curr closed.add(marking) corr_trans = trans_with_index[index] if corr_trans.sub_marking <= marking: visited += 1 new_marking = semantics.weak_execute(corr_trans, marking) heapq.heappush(open_set, (-index-1, visited, new_marking, curr[3]+[corr_trans])) else: enabled = copy(trans_empty_preset) for p in marking: for t in p.ass_trans: if t in trans_wo_index and t.sub_marking <= marking: enabled.add(t) for new_trans in enabled: visited += 1 new_marking = semantics.weak_execute(new_trans, marking) heapq.heappush(open_set, (-index, visited, new_marking, curr[3]+[new_trans])) return best_tuple[-1], reach_fm, -best_tuple[0]
[docs]def construct_standard_cost_function(synchronous_product_net, skip): """ Returns the standard cost function, which is: * event moves: cost 1000 * model moves: cost 1000 * tau moves: cost 1 * sync moves: cost 0 :param synchronous_product_net: :param skip: :return: """ costs = {} for t in synchronous_product_net.transitions: if (skip == t.label[0] or skip == t.label[1]) and (t.label[0] is not None and t.label[1] is not None): costs[t] = STD_MODEL_LOG_MOVE_COST else: if skip == t.label[0] and t.label[1] is None: costs[t] = STD_TAU_COST else: costs[t] = STD_SYNC_COST return costs
[docs]def pretty_print_alignments(alignments): """ Takes an alignment and prints it to the console, e.g.: A | B | C | D | -------------------- A | B | C | >> | :param alignment: <class 'list'> :return: Nothing """ if isinstance(alignments, list): for alignment in alignments: __print_single_alignment(alignment["alignment"]) else: __print_single_alignment(alignments["alignment"])
def __print_single_alignment(step_list): trace_steps = [] model_steps = [] max_label_length = 0 for step in step_list: trace_steps.append(" " + str(step[0]) + " ") model_steps.append(" " + str(step[1]) + " ") if len(step[0]) > max_label_length: max_label_length = len(str(step[0])) if len(str(step[1])) > max_label_length: max_label_length = len(str(step[1])) for i in range(len(trace_steps)): if len(str(trace_steps[i])) - 2 < max_label_length: step_length = len(str(trace_steps[i])) - 2 spaces_to_add = max_label_length - step_length for j in range(spaces_to_add): if j % 2 == 0: trace_steps[i] = trace_steps[i] + " " else: trace_steps[i] = " " + trace_steps[i] print(trace_steps[i], end='|') divider = "" length_divider = len(trace_steps) * (max_label_length + 3) for i in range(length_divider): divider += "-" print('\n' + divider) for i in range(len(model_steps)): if len(model_steps[i]) - 2 < max_label_length: step_length = len(model_steps[i]) - 2 spaces_to_add = max_label_length - step_length for j in range(spaces_to_add): if j % 2 == 0: model_steps[i] = model_steps[i] + " " else: model_steps[i] = " " + model_steps[i] print(model_steps[i], end='|') print('\n\n')
[docs]def add_markings(curr, add): m = Marking() for p in curr.items(): m[p[0]] = p[1] for p in add.items(): m[p[0]] += p[1] if m[p[0]] == 0: del m[p[0]] return m
def __get_alt(open_set, new_marking): for item in open_set: if item.m == new_marking: return item def __reconstruct_alignment(state, visited, queued, traversed, ret_tuple_as_trans_desc=False, lp_solved=0): alignment = list() if state.p is not None and state.t is not None: parent = state.p if ret_tuple_as_trans_desc: alignment = [(state.t.name, state.t.label)] while parent.p is not None: alignment = [(parent.t.name, parent.t.label)] + alignment parent = parent.p else: alignment = [state.t.label] while parent.p is not None: alignment = [parent.t.label] + alignment parent = parent.p return {'alignment': alignment, 'cost': state.g, 'visited_states': visited, 'queued_states': queued, 'traversed_arcs': traversed, 'lp_solved': lp_solved} def __derive_heuristic(incidence_matrix, cost_vec, x, t, h): x_prime = x.copy() x_prime[incidence_matrix.transitions[t]] -= 1 return max(0, h - cost_vec[incidence_matrix.transitions[t]]), x_prime def __is_model_move(t, skip): return t.label[0] == skip and t.label[1] != skip def __is_log_move(t, skip): return t.label[0] != skip and t.label[1] == skip def __trust_solution(x): for v in x: if v < -0.001: return False return True def __compute_exact_heuristic_new_version(sync_net, a_matrix, h_cvx, g_matrix, cost_vec, incidence_matrix, marking, fin_vec, variant, use_cvxopt=False, strict=True): m_vec = incidence_matrix.encode_marking(marking) b_term = [i - j for i, j in zip(fin_vec, m_vec)] b_term = np.matrix([x * 1.0 for x in b_term]).transpose() if not strict: g_matrix = np.vstack([g_matrix, a_matrix]) h_cvx = np.vstack([h_cvx, b_term]) a_matrix = np.zeros((0, a_matrix.shape[1])) b_term = np.zeros((0, b_term.shape[1])) if use_cvxopt: # not available in the latest version of PM4Py from cvxopt import matrix b_term = matrix(b_term) parameters_solving = {"solver": "glpk"} sol = lp_solver.apply(cost_vec, g_matrix, h_cvx, a_matrix, b_term, parameters=parameters_solving, variant=variant) prim_obj = lp_solver.get_prim_obj_from_sol(sol, variant=variant) points = lp_solver.get_points_from_sol(sol, variant=variant) prim_obj = prim_obj if prim_obj is not None else sys.maxsize points = points if points is not None else [0.0] * len(sync_net.transitions) return prim_obj, points def __get_tuple_from_queue(marking, queue): for t in queue: if t.m == marking: return t return None def __vectorize_initial_final_cost(incidence_matrix, ini, fin, cost_function): ini_vec = incidence_matrix.encode_marking(ini) fini_vec = incidence_matrix.encode_marking(fin) cost_vec = [0] * len(cost_function) for t in cost_function.keys(): cost_vec[incidence_matrix.transitions[t]] = cost_function[t] return ini_vec, fini_vec, cost_vec
[docs]class SearchTuple: def __init__(self, f, g, h, m, p, t, x, trust): self.f = f self.g = g self.h = h self.m = m self.p = p self.t = t self.x = x self.trust = trust def __lt__(self, other): if self.f < other.f: return True elif other.f < self.f: return False elif self.trust and not other.trust: return True else: return self.h < other.h def __get_firing_sequence(self): ret = [] if self.p is not None: ret = ret + self.p.__get_firing_sequence() if self.t is not None: ret.append(self.t) return ret def __repr__(self): string_build = ["\nm=" + str(self.m), " f=" + str(self.f), ' g=' + str(self.g), " h=" + str(self.h), " path=" + str(self.__get_firing_sequence()) + "\n\n"] return " ".join(string_build)
[docs]class DijkstraSearchTuple: def __init__(self, g, m, p, t, l): self.g = g self.m = m self.p = p self.t = t self.l = l def __lt__(self, other): if self.g < other.g: return True elif other.g < self.g: return False else: return other.l < self.l def __get_firing_sequence(self): ret = [] if self.p is not None: ret = ret + self.p.__get_firing_sequence() if self.t is not None: ret.append(self.t) return ret def __repr__(self): string_build = ["\nm=" + str(self.m), " g=" + str(self.g), " path=" + str(self.__get_firing_sequence()) + "\n\n"] return " ".join(string_build)
[docs]class DijkstraSearchTupleForAntiAndMulti: # in this version we keep the run and not the previous element # the display is different def __init__(self, g, m, r): self.g = g self.m = m self.r = r def __lt__(self, other): if self.g < other.g: return True elif other.g < self.g: return False else: return len(other.r) < len(self.r) def __get_firing_sequence(self): return self.r def __repr__(self): string_build = ["\nm=" + str(self.m), " g=" + str(self.g), " path=" + str(self.__get_firing_sequence()) + "\n\n"] return " ".join(string_build)
[docs]class TweakedSearchTuple: def __init__(self, f, g, h, m, p, t, x, trust, virgin): self.f = f self.g = g self.h = h self.m = m self.p = p self.t = t self.x = x self.trust = trust # a virgin status must be explored in its firing sequence self.virgin = virgin def __lt__(self, other): if self.f < other.f: return True elif other.f < self.f: return False elif self.virgin and not other.virgin: return True elif self.trust and not other.trust: return True else: return self.h < other.h def __get_firing_sequence(self): ret = [] if self.p is not None: ret = ret + self.p.__get_firing_sequence() if self.t is not None: ret.append(self.t) return ret def __repr__(self): string_build = ["\nm=" + str(self.m), " f=" + str(self.f), ' g=' + str(self.g), " h=" + str(self.h), " path=" + str(self.__get_firing_sequence()) + "\n\n"] return " ".join(string_build)
[docs]def get_visible_transitions_eventually_enabled_by_marking(net, marking): """ Get visible transitions eventually enabled by marking (passing possibly through hidden transitions) Parameters ---------- net Petri net marking Current marking """ all_enabled_transitions = sorted(list(semantics.enabled_transitions(net, marking)), key=lambda x: (str(x.name), id(x))) initial_all_enabled_transitions_marking_dictio = {} all_enabled_transitions_marking_dictio = {} for trans in all_enabled_transitions: all_enabled_transitions_marking_dictio[trans] = marking initial_all_enabled_transitions_marking_dictio[trans] = marking visible_transitions = set() visited_transitions = set() i = 0 while i < len(all_enabled_transitions): t = all_enabled_transitions[i] marking_copy = copy(all_enabled_transitions_marking_dictio[t]) if repr([t, marking_copy]) not in visited_transitions: if t.label is not None: visible_transitions.add(t) else: if semantics.is_enabled(t, net, marking_copy): new_marking = semantics.execute(t, net, marking_copy) new_enabled_transitions = sorted(list(semantics.enabled_transitions(net, new_marking)), key=lambda x: (str(x.name), id(x))) for t2 in new_enabled_transitions: all_enabled_transitions.append(t2) all_enabled_transitions_marking_dictio[t2] = new_marking visited_transitions.add(repr([t, marking_copy])) i = i + 1 return visible_transitions
[docs]def discountedEditDistance(s1,s2,exponent=2, modeled=True): ''' Fast implementation of the discounted distance Inspired from the faster version of the edit distance ''' #print(s1,s2) if len(s1) < len(s2): return discountedEditDistance(s2, s1,exponent=exponent,modeled=False) previous_row = [0] for a in range(len(s2)): if not modeled and (s2[a]=="tau" or s2[a]==None or s2[a][0]=="n"): previous_row.append(previous_row[-1]) else : previous_row.append(previous_row[-1]+exponent**(-(a))) for i, c1 in enumerate(s1): if modeled: exp1 = sum(exponent**(-(a)) for a in range(i+1) if s1[a]!="tau" and s1[a]!=None and s1[a][0]!="n") else : exp1 = sum(exponent**(-(a)) for a in range(i+1)) current_row = [exp1] for j, c2 in enumerate(s2): exp2 = exponent**(-(i+1 + j)) if modeled and (c1 in ["tau", None] or c1[0]=="n" or "skip" in c1): insertions = previous_row[j +1 ] # j+1 instead of j since previous_row and current_row are one character longer deletions = current_row[j] + exp2 # than s2 elif not modeled and (c2 in ["tau", None] or c2[0]=="n"): insertions = previous_row[j +1 ] + exp2 # j+1 instead of j since previous_row and current_row are one character longer deletions = current_row[j] else : insertions = previous_row[j +1 ] + exp2 # j+1 instead of j since previous_row and current_row are one character longer deletions = current_row[j] + exp2 if (c1 != c2): current_row.append(min(insertions, deletions)) else : substitutions = previous_row[j] current_row.append(min(insertions, deletions, substitutions)) previous_row = current_row return len(s1)+len(s2),previous_row[-1]
[docs]def levenshtein(seq1, seq2): ''' Edit distance without substitution ''' size_x = len(seq1) + 1 size_y = len(seq2) + 1 matrix = np.zeros ((size_x, size_y)) for x in range(size_x): matrix [x, 0] = x for y in range(size_y): matrix [0, y] = y for x in range(1, size_x): for y in range(1, size_y): if seq1[x-1] in [None,"tau"] or seq1[x-1][0]=='n' or "skip" in seq1[x-1] or "tau" in seq1[x-1] : matrix [x,y] = min( matrix[x-1, y], matrix[x,y-1] + 1 ) elif seq1[x-1] == seq2[y-1]: matrix [x,y] = min( matrix[x-1, y] + 1, matrix[x-1, y-1], matrix[x, y-1] + 1 ) else: matrix [x,y] = min( matrix[x-1,y] + 1, matrix[x,y-1] + 1 ) return (matrix[size_x - 1, size_y - 1])