Source code for pm4py.algo.discovery.correlation_mining.util

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import numpy as np
from pm4py.util.lp import solver
from statistics import mean


[docs]def get_c_matrix(PS_matrix, duration_matrix, activities, activities_counter): """ Calculates the C-matrix out of the PS matrix and the duration matrix Parameters -------------- PS_matrix PS matrix duration_matrix Duration matrix activities Ordered list of activities of the log activities_counter Counter of activities Returns -------------- c_matrix C matrix """ C_matrix = np.zeros((len(activities), len(activities))) for i in range(len(activities)): for j in range(len(activities)): val = duration_matrix[i, j] / PS_matrix[i, j] * 1 / ( min(activities_counter[activities[i]], activities_counter[activities[j]])) if PS_matrix[ i, j] > 0 else 0 if val == 0: val = 100000000000 C_matrix[i, j] = val return C_matrix
[docs]def resolve_LP(C_matrix, duration_matrix, activities, activities_counter): """ Formulates and solve the LP problem Parameters -------------- C_matrix C_matrix duration_matrix Duration matrix activities Ordered list of activities of the log activities_counter Counter of activities Returns ------------- dfg Directly-Follows Graph performance_dfg Performance DFG (containing the estimated performance for the arcs) """ edges = [(i, j) for i in range(len(activities)) for j in range(len(activities))] c = [C_matrix[i, j] for i in range(len(activities)) for j in range(len(activities))] edges_sources = {i: [z for z in range(len(edges)) if edges[z][0] == i] for i in range(len(activities))} edges_targets = {j: [z for z in range(len(edges)) if edges[z][1] == j] for j in range(len(activities))} activities_occurrences = {i: activities_counter[activities[i]] for i in range(len(activities))} Aeq = [] beq = [] for i in range(len(activities)): rec = [0] * len(edges) for e in edges_sources[i]: rec[e] = 1 Aeq.append(rec) beq.append(activities_occurrences[i]) for j in range(len(activities)): rec = [0] * len(edges) for e in edges_targets[j]: rec[e] = 1 Aeq.append(rec) beq.append(activities_occurrences[j]) Aeq = np.asmatrix(Aeq).astype(np.float64) beq = np.asmatrix(beq).transpose().astype(np.float64) Aub = [] bub = [] for i in range(len(activities)): for e in edges_sources[i]: rec = [0] * len(edges) rec[e] = 1 Aub.append(rec) bub.append(activities_occurrences[i]) rec = [-x for x in rec] Aub.append(rec) bub.append(0) for j in range(len(activities)): for e in edges_targets[j]: rec = [0] * len(edges) rec[e] = 1 Aub.append(rec) bub.append(activities_occurrences[j]) rec = [-x for x in rec] Aub.append(rec) bub.append(0) Aub = np.asmatrix(Aub).astype(np.float64) bub = np.asmatrix(bub).transpose().astype(np.float64) use_cvxopt = False if solver.DEFAULT_LP_SOLVER_VARIANT == solver.CVXOPT_SOLVER_CUSTOM_ALIGN or solver.DEFAULT_LP_SOLVER_VARIANT == solver.CVXOPT_SOLVER_CUSTOM_ALIGN_ILP: use_cvxopt = True if use_cvxopt: from cvxopt import matrix c = matrix(c) Aub = matrix(Aub) bub = matrix(bub) Aeq = matrix(Aeq) beq = matrix(beq) res = solver.apply(c, Aub, bub, Aeq, beq, variant=solver.DEFAULT_LP_SOLVER_VARIANT) points = solver.get_points_from_sol(res, variant=solver.DEFAULT_LP_SOLVER_VARIANT) points = [round(p) for p in points] dfg = {} performance_dfg = {} for idx, p in enumerate(points): if p > 0: dfg[(activities[edges[idx][0]], activities[edges[idx][1]])] = p performance_dfg[(activities[edges[idx][0]], activities[edges[idx][1]])] = duration_matrix[ edges[idx][0], edges[idx][1]] return dfg, performance_dfg
[docs]def match_return_avg_time(ai, aj, exact=False): """ Matches two list of times (exact or greedy) and returns the average. Parameters -------------- ai First list aj Second list Returns --------------- times_mean Mean of times """ if exact: from pm4py.statistics.util import times_bipartite_matching matching = times_bipartite_matching.exact_match_minimum_average(ai, aj) ret_exact = mean([x[1] - x[0] for x in matching]) if matching else 0 return ret_exact else: ret_greedy = greedy_match_return_avg_time(ai, aj) return ret_greedy
[docs]def greedy_match_return_avg_time(ai, aj): """ Matches two list of times with a greedy method and returns the average. Parameters -------------- ai First list aj Second list parameters Parameters of the algorithm Returns --------------- times_mean Mean of times """ tm0 = calculate_time_match_fifo(ai, aj) td0 = mean([x[1] - x[0] for x in tm0]) if tm0 else 0 tm1 = calculate_time_match_rlifo(ai, aj) td1 = mean([x[1] - x[0] for x in tm1]) if tm1 else 0 return min(td0, td1)
[docs]def calculate_time_match_fifo(ai, aj, times0=None): """ Associate the times between two lists of timestamps using FIFO Parameters -------------- ai First list of timestamps aj Second list of timestamps times0 Correspondence between execution times Returns -------------- times0 Correspondence between execution times """ if times0 is None: times0 = [] k = 0 z = 0 while k < len(ai): while z < len(aj): if ai[k] < aj[z]: times0.append((ai[k], aj[z])) z = z + 1 break z = z + 1 k = k + 1 return times0
[docs]def calculate_time_match_rlifo(ai, aj, times1=None): """ Associate the times between two lists of timestamps using LIFO (start from end) Parameters -------------- ai First list of timestamps aj Second list of timestamps times0 Correspondence between execution times Returns -------------- times0 Correspondence between execution times """ if times1 is None: times1 = [] k = len(ai) - 1 z = len(aj) - 1 while z >= 0: while k >= 0: if ai[k] < aj[z]: times1.append((ai[k], aj[z])) k = k - 1 break k = k - 1 z = z - 1 return times1