Source code for pm4py.algo.discovery.inductive.variants.im_clean.cuts.loop

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import copy
from typing import List, Optional, Set, Dict

from pm4py.algo.discovery.inductive.variants.im_clean import utils as im_utils
from pm4py.algo.discovery.inductive.variants.im_clean.d_types import Cut, DFG
from pm4py.objects.log.obj import EventLog, Trace


[docs]def detect(dfg: DFG, alphabet: Dict[str, int], start_activities: Dict[str, int], end_activities: Dict[str, int]) -> Optional[Cut]: ''' This method finds a loop cut in the dfg. Implementation follows function LoopCut on page 190 of "Robust Process Mining with Guarantees" by Sander J.J. Leemans (ISBN: 978-90-386-4257-4) Basic Steps: 1. merge all start and end activities in one group ('do' group) 2. remove start/end activities from the dfg 3. detect connected components in (undirected representative) of the reduced graph 4. check if each component meets the start/end criteria of the loop cut definition (merge with the 'do' group if not) 5. return the cut if at least two groups remain Parameters ---------- dfg directly follows graph alphabet alphabet of the dfg / log start_activities multiset of start activities of the dfg / log end_activities multiset of end activities of the dfg / log Returns ------- A list of sets of activities, i.e., forming a maximal loop cut None if no cut is found. ''' if len(dfg) == 0: return None groups = [set(start_activities.keys()).union(set(end_activities.keys()))] for c in _compute_connected_components(dfg, alphabet, start_activities, end_activities, groups[0]): groups.append(set(c.nodes)) groups = _exclude_sets_non_reachable_from_start(dfg, start_activities, end_activities, groups) groups = _exclude_sets_no_reachable_from_end(dfg, start_activities, end_activities, groups) groups = _check_start_completeness(dfg, start_activities, end_activities, groups) groups = _check_end_completeness(dfg, start_activities, end_activities, groups) groups = list(filter(lambda g: len(g) > 0, groups)) return groups if len(groups) > 1 else None
def _check_start_completeness(dfg, start_activities, end_activities, groups): i = 1 while i < len(groups): merge = False for a in groups[i]: if merge: break for (x, b) in dfg: if x == a and b in start_activities: for s in start_activities: if not (a, s) in dfg: merge = True if merge: groups[0] = groups[0].union(groups[i]) del groups[i] continue i = i + 1 return groups def _check_end_completeness(dfg, start_activities, end_activities, groups): i = 1 while i < len(groups): merge = False for a in groups[i]: if merge: break for (b, x) in dfg: if x == a and b in end_activities: for e in end_activities: if not (e, a) in dfg: merge = True if merge: groups[0] = groups[0].union(groups[i]) del groups[i] continue i = i + 1 return groups def _exclude_sets_non_reachable_from_start(dfg: DFG, start_activities: Dict[str, int], end_activities: Dict[str, int], groups: Cut) -> Cut: for a in set(start_activities).difference(set(end_activities)): for (x, b) in dfg: if x == a: group_a, group_b = None, None for group in groups: group_a = group if a in group else group_a group_b = group if b in group else group_b groups = [group for group in groups if group != group_a and group != group_b] # we are always merging on the do-part groups.insert(0, group_a.union(group_b)) return groups def _exclude_sets_no_reachable_from_end(dfg: DFG, start_activities: Dict[str, int], end_activities: Dict[str, int], groups: Cut) -> Cut: for b in set(end_activities).difference(set(start_activities)): for (a, x) in dfg: if x == b: group_a, group_b = None, None for group in groups: group_a = group if a in group else group_a group_b = group if b in group else group_b groups = [group for group in groups if group != group_a and group != group_b] groups.insert(0, group_a.union(group_b)) return groups def _compute_connected_components(dfg: DFG, alphabet: Set[str], start_activities: Dict[str, int], end_activities: Dict[str, int], do_set: Set[str]): import networkx as nx reduced_dfg = copy.copy(dfg) for (a, b) in dfg: if a in start_activities or a in end_activities or b in start_activities or b in end_activities: del reduced_dfg[(a, b)] reduced_alphabet = set(alphabet).difference(do_set) nx_directed = im_utils.transform_dfg_to_directed_nx_graph(reduced_dfg, reduced_alphabet) nx_undirected = nx_directed.to_undirected() return [nx_undirected.subgraph(c).copy() for c in nx.connected_components(nx_undirected)]
[docs]def project(log: EventLog, cut: Cut, activity_key: str) -> List[EventLog]: do = cut[0] redo = cut[1:] redo_activities = [y for x in redo for y in x] do_log = EventLog() redo_logs = [] for i in range(len(redo)): redo_logs.append(EventLog()) for t in log: do_trace = Trace() redo_trace = Trace() for e in t: if e[activity_key] in do: do_trace.append(e) if len(redo_trace) > 0: redo_logs = _append_trace_to_redo_log(redo_trace, redo_logs, redo, activity_key) redo_trace = Trace() else: if e[activity_key] in redo_activities: redo_trace.append(e) if len(do_trace) > 0: do_log.append(do_trace) do_trace = Trace() if len(redo_trace) > 0: redo_logs = _append_trace_to_redo_log(redo_trace, redo_logs, redo, activity_key) do_log.append(do_trace) logs = [do_log] logs.extend(redo_logs) return logs
def _append_trace_to_redo_log(redo_trace: Trace, redo_logs: List[List[Trace]], redo_groups: List[Set[str]], activity_key: str) -> List[List[Trace]]: activities = set(x[activity_key] for x in redo_trace) inte = [(i, len(activities.intersection(redo_groups[i]))) for i in range(len(redo_groups))] inte = sorted(inte, key=lambda x: (x[1], x[0]), reverse=True) redo_logs[inte[0][0]].append(redo_trace) return redo_logs
[docs]def project_dfg(dfg_sa_ea_actcount, groups): dfgs = [] skippable = [False, False] for gind, g in enumerate(groups): activities = {} start_activities = {} end_activities = {} paths_frequency = {} for act in dfg_sa_ea_actcount.act_count: if act in g: activities[act] = dfg_sa_ea_actcount.act_count[act] for arc in dfg_sa_ea_actcount.dfg: if arc[0] in g and arc[1] in g: paths_frequency[arc] = dfg_sa_ea_actcount.dfg[arc] if arc[1] in dfg_sa_ea_actcount.start_activities and arc[0] in dfg_sa_ea_actcount.end_activities: skippable[1] = True if gind == 0: for act in dfg_sa_ea_actcount.start_activities: if act in g: start_activities[act] = dfg_sa_ea_actcount.start_activities[act] else: skippable[0] = True for act in dfg_sa_ea_actcount.end_activities: if act in g: end_activities[act] = dfg_sa_ea_actcount.end_activities[act] else: skippable[1] = True elif gind == 1: for act in g: start_activities[act] = 1 end_activities[act] = 1 dfgs.append(im_utils.DfgSaEaActCount(paths_frequency, start_activities, end_activities, activities)) return [dfgs, skippable]