Source code for pm4py.objects.heuristics_net.obj

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from copy import deepcopy

from pm4py.objects.dfg.utils import dfg_utils
from pm4py.algo.filtering.dfg.dfg_filtering import clean_dfg_based_on_noise_thresh
from pm4py.objects.heuristics_net import defaults
from pm4py.objects.heuristics_net.node import Node
import deprecation

DEFAULT_NET_NAME = ""


[docs]class HeuristicsNet: def __init__(self, frequency_dfg, activities=None, start_activities=None, end_activities=None, activities_occurrences=None, default_edges_color="#000000", performance_dfg=None, dfg_window_2=None, freq_triples=None, net_name=DEFAULT_NET_NAME): """ Initialize an Hueristics Net The implementation is based on the original paper on Heuristics Miner, namely: Weijters, A. J. M. M., Wil MP van Der Aalst, and AK Alves De Medeiros. "Process mining with the heuristics miner-algorithm." Technische Universiteit Eindhoven, Tech. Rep. WP 166 (2006): 1-34. and it manages to calculate the dependency matrix, the loops of length one and two, and the AND measure Parameters ------------- frequency_dfg Directly-Follows graph (frequency) activities Activities start_activities Start activities end_activities End activities activities_occurrences Activities occurrences default_edges_color (If provided) Default edges color performance_dfg Performance DFG dfg_window_2 DFG window 2 freq_triples Frequency triples net_name (If provided) name of the heuristics net """ self.net_name = [net_name] self.nodes = {} self.dependency_matrix = {} self.dfg_matrix = {} self.dfg = frequency_dfg self.performance_dfg = performance_dfg self.node_type = "frequency" if self.performance_dfg is None else "performance" self.activities = activities if self.activities is None: self.activities = dfg_utils.get_activities_from_dfg(frequency_dfg) if start_activities is None: self.start_activities = [dfg_utils.infer_start_activities(frequency_dfg)] else: self.start_activities = [start_activities] if end_activities is None: self.end_activities = [dfg_utils.infer_end_activities(frequency_dfg)] else: self.end_activities = [end_activities] self.activities_occurrences = activities_occurrences if self.activities_occurrences is None: self.activities_occurrences = {} for act in self.activities: self.activities_occurrences[act] = dfg_utils.sum_activities_count(frequency_dfg, [act]) self.default_edges_color = [default_edges_color] self.dfg_window_2 = dfg_window_2 self.dfg_window_2_matrix = {} self.freq_triples = freq_triples self.freq_triples_matrix = {} self.concurrent_activities = {} self.sojourn_times = {}
[docs] @deprecation.deprecated('2.2.5', '3.0.0', details='please use the calculate method in algorithm package.') def calculate(self, dependency_thresh=defaults.DEFAULT_DEPENDENCY_THRESH, and_measure_thresh=defaults.DEFAULT_AND_MEASURE_THRESH, min_act_count=defaults.DEFAULT_MIN_ACT_COUNT, min_dfg_occurrences=defaults.DEFAULT_MIN_DFG_OCCURRENCES, dfg_pre_cleaning_noise_thresh=defaults.DEFAULT_DFG_PRE_CLEANING_NOISE_THRESH, loops_length_two_thresh=defaults.DEFAULT_LOOP_LENGTH_TWO_THRESH, parameters=None): """ Calculate the dependency matrix, populate the nodes Parameters ------------- dependency_thresh (Optional) dependency threshold and_measure_thresh (Optional) AND measure threshold min_act_count (Optional) minimum number of occurrences of an activity min_dfg_occurrences (Optional) minimum dfg occurrences dfg_pre_cleaning_noise_thresh (Optional) DFG pre cleaning noise threshold loops_length_two_thresh (Optional) loops length two threshold parameters Other parameters of the algorithm """ if parameters is None: parameters = {} self.min_dfg_occurrences = min_dfg_occurrences self.dependency_matrix = None self.dependency_matrix = {} self.dfg_matrix = None self.dfg_matrix = {} self.performance_matrix = None self.performance_matrix = {} if dfg_pre_cleaning_noise_thresh > 0.0: self.dfg = clean_dfg_based_on_noise_thresh(self.dfg, self.activities, dfg_pre_cleaning_noise_thresh, parameters=parameters) if self.dfg_window_2 is not None: for el in self.dfg_window_2: act1 = el[0] act2 = el[1] value = self.dfg_window_2[el] if act1 not in self.dfg_window_2_matrix: self.dfg_window_2_matrix[act1] = {} self.dfg_window_2_matrix[act1][act2] = value if self.freq_triples is not None: for el in self.freq_triples: act1 = el[0] act2 = el[1] act3 = el[2] value = self.freq_triples[el] # avoid to consider self-loops if act1 == act3 and not act1 == act2: if act1 not in self.freq_triples_matrix: self.freq_triples_matrix[act1] = {} self.freq_triples_matrix[act1][act2] = value for el in self.dfg: act1 = el[0] act2 = el[1] value = self.dfg[el] perf_value = self.performance_dfg[el] if self.performance_dfg is not None else self.dfg[el] if act1 not in self.dependency_matrix: self.dependency_matrix[act1] = {} self.dfg_matrix[act1] = {} self.performance_matrix[act1] = {} self.dfg_matrix[act1][act2] = value self.performance_matrix[act1][act2] = perf_value if not act1 == act2: inv_couple = (act2, act1) c1 = value if inv_couple in self.dfg: c2 = self.dfg[inv_couple] dep = (c1 - c2) / (c1 + c2 + 1) else: dep = c1 / (c1 + 1) else: dep = value / (value + 1) self.dependency_matrix[act1][act2] = dep for n1 in self.dependency_matrix: for n2 in self.dependency_matrix[n1]: condition1 = n1 in self.activities_occurrences and self.activities_occurrences[n1] >= min_act_count condition2 = n2 in self.activities_occurrences and self.activities_occurrences[n2] >= min_act_count condition3 = self.dfg_matrix[n1][n2] >= min_dfg_occurrences condition4 = self.dependency_matrix[n1][n2] >= dependency_thresh condition = condition1 and condition2 and condition3 and condition4 if condition: if n1 not in self.nodes: self.nodes[n1] = Node(self, n1, self.activities_occurrences[n1], is_start_node=(n1 in self.start_activities), is_end_node=(n1 in self.end_activities), default_edges_color=self.default_edges_color[0], node_type=self.node_type, net_name=self.net_name[0], nodes_dictionary=self.nodes) if n2 not in self.nodes: self.nodes[n2] = Node(self, n2, self.activities_occurrences[n2], is_start_node=(n2 in self.start_activities), is_end_node=(n2 in self.end_activities), default_edges_color=self.default_edges_color[0], node_type=self.node_type, net_name=self.net_name[0], nodes_dictionary=self.nodes) repr_value = self.performance_matrix[n1][n2] self.nodes[n1].add_output_connection(self.nodes[n2], self.dependency_matrix[n1][n2], self.dfg_matrix[n1][n2], repr_value=repr_value) self.nodes[n2].add_input_connection(self.nodes[n1], self.dependency_matrix[n1][n2], self.dfg_matrix[n1][n2], repr_value=repr_value) for node in self.nodes: self.nodes[node].calculate_and_measure_out(and_measure_thresh=and_measure_thresh) self.nodes[node].calculate_and_measure_in(and_measure_thresh=and_measure_thresh) self.nodes[node].calculate_loops_length_two(self.dfg_matrix, self.freq_triples_matrix, loops_length_two_thresh=loops_length_two_thresh) nodes = list(self.nodes.keys()) added_loops = set() for n1 in nodes: for n2 in self.nodes[n1].loop_length_two: if n1 in self.dfg_matrix and n2 in self.dfg_matrix[n1] and self.dfg_matrix[n1][ n2] >= min_dfg_occurrences and n1 in self.activities_occurrences and self.activities_occurrences[ n1] >= min_act_count and n2 in self.activities_occurrences and self.activities_occurrences[ n2] >= min_act_count: if not ((n1 in self.dependency_matrix and n2 in self.dependency_matrix[n1] and self.dependency_matrix[n1][n2] >= dependency_thresh) or ( n2 in self.dependency_matrix and n1 in self.dependency_matrix[n2] and self.dependency_matrix[n2][n1] >= dependency_thresh)): if n2 not in self.nodes: self.nodes[n2] = Node(self, n2, self.activities_occurrences[n2], is_start_node=(n2 in self.start_activities), is_end_node=(n2 in self.end_activities), default_edges_color=self.default_edges_color[0], node_type=self.node_type, net_name=self.net_name[0], nodes_dictionary=self.nodes) v_n1_n2 = self.dfg_matrix[n1][n2] if n1 in self.dfg_matrix and n2 in self.dfg_matrix[n1] else 0 v_n2_n1 = self.dfg_matrix[n2][n1] if n2 in self.dfg_matrix and n1 in self.dfg_matrix[n2] else 0 if (n1, n2) not in added_loops: repr_value = self.performance_matrix[n1][n2] if n1 in self.performance_matrix and n2 in \ self.performance_matrix[n1] else 0 added_loops.add((n1, n2)) self.nodes[n1].add_output_connection(self.nodes[n2], 0, v_n1_n2, repr_value=repr_value) self.nodes[n2].add_input_connection(self.nodes[n1], 0, v_n2_n1, repr_value=repr_value) if (n2, n1) not in added_loops: repr_value = self.performance_matrix[n2][n1] if n2 in self.performance_matrix and n1 in \ self.performance_matrix[n2] else 0 added_loops.add((n2, n1)) self.nodes[n2].add_output_connection(self.nodes[n1], 0, v_n2_n1, repr_value=repr_value) self.nodes[n1].add_input_connection(self.nodes[n2], 0, v_n1_n2, repr_value=repr_value) if len(self.nodes) == 0: for act in self.activities: self.nodes[act] = Node(self, act, self.activities_occurrences[act], is_start_node=(act in self.start_activities), is_end_node=(act in self.end_activities), default_edges_color=self.default_edges_color[0], node_type=self.node_type, net_name=self.net_name[0], nodes_dictionary=self.nodes)
def __add__(self, other_net): copied_self = deepcopy(self) for node_name in copied_self.nodes: if node_name in other_net.nodes: node1 = copied_self.nodes[node_name] node2 = other_net.nodes[node_name] n1n = {x.node_name: x for x in node1.output_connections} n2n = {x.node_name: x for x in node2.output_connections} for out_node1 in node1.output_connections: if out_node1.node_name in n2n: node1.output_connections[out_node1] = node1.output_connections[out_node1] + \ node2.output_connections[n2n[out_node1.node_name]] for out_node2 in node2.output_connections: if out_node2.node_name not in n1n: if out_node2.node_name in copied_self.nodes: nn = copied_self.nodes[out_node2.node_name] node1.output_connections[nn] = node2.output_connections[out_node2] else: node1.output_connections[out_node2] = node2.output_connections[out_node2] diffext = [other_net.nodes[node] for node in other_net.nodes if node not in copied_self.nodes] for node in diffext: copied_self.nodes[node.node_name] = node copied_self.start_activities = copied_self.start_activities + other_net.start_activities copied_self.end_activities = copied_self.end_activities + other_net.end_activities copied_self.default_edges_color = copied_self.default_edges_color + other_net.default_edges_color copied_self.net_name = copied_self.net_name + other_net.net_name return copied_self def __repr__(self): return str(self.nodes) def __str__(self): return str(self.nodes)