# Source code for pm4py.objects.dfg.utils.dfg_utils

```
'''
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PM4Py is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
'''
import logging
import pkgutil
import queue
from collections import Counter
from copy import copy
import numpy as np
from pm4py.util import variants_util
[docs]def get_outgoing_edges(dfg):
"""
Gets outgoing edges of the provided DFG graph
"""
outgoing = {}
for el in dfg:
if type(el[0]) is str:
if not el[0] in outgoing:
outgoing[el[0]] = {}
outgoing[el[0]][el[1]] = dfg[el]
else:
if not el[0][0] in outgoing:
outgoing[el[0][0]] = {}
outgoing[el[0][0]][el[0][1]] = el[1]
return outgoing
[docs]def get_ingoing_edges(dfg):
"""
Get ingoing edges of the provided DFG graph
"""
ingoing = {}
for el in dfg:
if type(el[0]) is str:
if not el[1] in ingoing:
ingoing[el[1]] = {}
ingoing[el[1]][el[0]] = dfg[el]
else:
if not el[0][1] in ingoing:
ingoing[el[0][1]] = {}
ingoing[el[0][1]][el[0][0]] = el[1]
return ingoing
[docs]def infer_start_activities(dfg):
"""
Infer start activities from a Directly-Follows Graph
Parameters
----------
dfg
Directly-Follows Graph
Returns
----------
start_activities
Start activities in the log
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
start_activities = []
for act in outgoing:
if act not in ingoing:
start_activities.append(act)
return start_activities
[docs]def infer_end_activities(dfg):
"""
Infer end activities from a Directly-Follows Graph
Parameters
----------
dfg
Directly-Follows Graph
Returns
----------
end_activities
End activities in the log
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
end_activities = []
for act in ingoing:
if act not in outgoing:
end_activities.append(act)
return end_activities
[docs]def infer_start_activities_from_prev_connections_and_current_dfg(initial_dfg, dfg, activities, include_self=True):
"""
Infer the start activities from the previous connections
Parameters
-----------
initial_dfg
Initial DFG
dfg
Directly-follows graph
activities
List of the activities contained in DFG
"""
start_activities = set()
for el in initial_dfg:
if el[0][1] in activities and not el[0][0] in activities:
start_activities.add(el[0][1])
if include_self:
start_activities = start_activities.union(set(infer_start_activities(dfg)))
return start_activities
[docs]def infer_end_activities_from_succ_connections_and_current_dfg(initial_dfg, dfg, activities, include_self=True):
"""
Infer the end activities from the previous connections
Parameters
-----------
initial_dfg
Initial DFG
dfg
Directly-follows graph
activities
List of the activities contained in DFG
"""
end_activities = set()
for el in initial_dfg:
if el[0][0] in activities and not el[0][1] in activities:
end_activities.add(el[0][0])
if include_self:
end_activities = end_activities.union(set(infer_end_activities(dfg)))
return end_activities
[docs]def get_outputs_of_outside_activities_going_to_start_activities(initial_dfg, dfg, activities):
"""
Get outputs of outside activities going to start activities
Parameters
------------
initial_dfg
Initial DFG
dfg
Directly-follows graph
activities
Activities contained in the DFG
"""
outputs = set()
start_activities = infer_start_activities_from_prev_connections_and_current_dfg(initial_dfg, dfg, activities,
include_self=False)
outside_activities_going_to_start_activities = set()
for el in initial_dfg:
if el[0][0] not in activities and el[0][1] in start_activities:
outside_activities_going_to_start_activities.add(el[0][0])
for el in initial_dfg:
if el[0][0] in outside_activities_going_to_start_activities and not el[0][1] in activities:
outputs.add(el[0][1])
outputs = outputs - outside_activities_going_to_start_activities
return outputs
[docs]def get_inputs_of_outside_activities_reached_by_end_activities(initial_dfg, dfg, activities):
"""
Get inputs of outside activities going to start activities
Parameters
------------
initial_dfg
Initial DFG
dfg
Directly-follows graph
activities
Activities contained in the DFG
"""
inputs = set()
end_activities = infer_end_activities_from_succ_connections_and_current_dfg(initial_dfg, dfg, activities,
include_self=False)
input_activities_reached_by_end_activities = set()
for el in initial_dfg:
if el[0][1] not in activities and el[0][0] in end_activities:
input_activities_reached_by_end_activities.add(el[0][1])
for el in initial_dfg:
if el[0][1] in input_activities_reached_by_end_activities and not el[0][0] in activities:
inputs.add(el[0][0])
inputs = inputs - input_activities_reached_by_end_activities
return inputs
[docs]def get_activities_from_dfg(dfg):
"""
Get the list of attributes directly from DFG graph
Parameters
-----------
dfg
Directly-Follows graph
Returns
-----------
list_activities
List of activities that are present in the DFG graph
"""
set_activities = set()
for el in dfg:
if type(el[0]) is str:
set_activities.add(el[0])
set_activities.add(el[1])
else:
set_activities.add(el[0][0])
set_activities.add(el[0][1])
list_activities = sorted(list(set_activities))
return list_activities
[docs]def get_max_activity_count(dfg, act):
"""
Get maximum count of an ingoing/outgoing edge related to an activity
Parameters
------------
dfg
Directly-Follows graph
act
Activity
Returns
------------
max_value
Maximum count of ingoing/outgoing edges to attributes
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
max_value = -1
if act in ingoing:
for act2 in ingoing[act]:
if ingoing[act][act2] > max_value:
max_value = ingoing[act][act2]
if act in outgoing:
for act2 in outgoing[act]:
if outgoing[act][act2] > max_value:
max_value = outgoing[act][act2]
return max_value
[docs]def sum_ingoutg_val_activ(dictio, activity):
"""
Gets the sum of ingoing/outgoing values of an activity
Parameters
-----------
dictio
Dictionary
activity
Current examined activity
Returns
-----------
summ
"""
summ = 0
for act2 in dictio[activity]:
summ += dictio[activity][act2]
return summ
[docs]def max_occ_all_activ(dfg):
"""
Get maximum ingoing/outgoing sum of values related to attributes in DFG graph
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
max_value = -1
for act in ingoing:
summ = sum_ingoutg_val_activ(ingoing, act)
if summ > max_value:
max_value = summ
for act in outgoing:
summ = sum_ingoutg_val_activ(outgoing, act)
if summ > max_value:
max_value = summ
return max_value
[docs]def max_occ_among_specif_activ(dfg, activities):
"""
Get maximum ingoing/outgoing sum of values related to attributes in DFG graph
(here attributes to consider are specified)
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
max_value = -1
for act in activities:
if act in ingoing:
summ = sum_ingoutg_val_activ(ingoing, act)
if summ > max_value:
max_value = summ
if act in outgoing:
summ = sum_ingoutg_val_activ(outgoing, act)
if summ > max_value:
max_value = summ
return max_value
[docs]def sum_start_activities_count(dfg):
"""
Gets the sum of start attributes count inside a DFG
Parameters
-------------
dfg
Directly-Follows graph
Returns
-------------
Sum of start attributes count
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
sum_values = 0
for act in outgoing:
if act not in ingoing:
for act2 in outgoing[act]:
sum_values += outgoing[act][act2]
return sum_values
[docs]def sum_end_activities_count(dfg):
"""
Gets the sum of end attributes count inside a DFG
Parameters
-------------
dfg
Directly-Follows graph
Returns
-------------
Sum of start attributes count
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
sum_values = 0
for act in ingoing:
if act not in outgoing:
for act2 in ingoing[act]:
sum_values += ingoing[act][act2]
return sum_values
[docs]def sum_activities_count(dfg, activities, enable_halving=True):
"""
Gets the sum of specified attributes count inside a DFG
Parameters
-------------
dfg
Directly-Follows graph
activities
Activities to sum
enable_halving
Halves the sum in specific occurrences
Returns
-------------
Sum of start attributes count
"""
ingoing = get_ingoing_edges(dfg)
outgoing = get_outgoing_edges(dfg)
sum_values = 0
for act in activities:
if act in outgoing:
for act2 in outgoing[act]:
sum_values += outgoing[act][act2]
if act in ingoing:
for act2 in ingoing[act]:
sum_values += ingoing[act][act2]
if enable_halving:
if act in ingoing and act in outgoing:
sum_values = int(sum_values / 2)
return sum_values
[docs]def filter_dfg_on_act(dfg, listact):
"""
Filter a DFG graph on a list of attributes
(to produce a projected DFG graph)
Parameters
-----------
dfg
Current DFG graph
listact
List of attributes to filter on
"""
new_dfg = []
for el in dfg:
if el[0][0] in listact and el[0][1] in listact:
new_dfg.append(el)
return new_dfg
[docs]def negate(dfg):
"""
Negate relationship in the DFG graph
Parameters
----------
dfg
Directly-Follows graph
Returns
----------
negated_dfg
Negated Directly-Follows graph (for parallel cut detection)
"""
negated_dfg = []
outgoing = get_outgoing_edges(dfg)
for el in dfg:
if not (el[0][1] in outgoing and el[0][0] in outgoing[el[0][1]]):
negated_dfg.append(el)
return negated_dfg
[docs]def get_activities_direction(dfg, activities):
"""
Calculate activities direction (in a similar way to Heuristics Miner)
Parameters
-----------
dfg
Directly-follows graph
activities
(if provided) activities of the subtree
Returns
-----------
direction
Dictionary that contains for each direction a number that goes from -1 (all ingoing edges)
to 1 (all outgoing edges)
"""
if activities is None:
activities = get_activities_from_dfg(dfg)
ingoing_list = get_ingoing_edges(dfg)
outgoing_list = get_outgoing_edges(dfg)
direction = {}
for act in activities:
outgoing = 0
ingoing = 0
if act in outgoing_list:
outgoing = sum(list(outgoing_list[act].values()))
if act in ingoing_list:
ingoing = sum(list(ingoing_list[act].values()))
dependency = (outgoing - ingoing) / (ingoing + outgoing + 1)
direction[act] = dependency
return direction
[docs]def get_activities_dirlist(activities_direction):
"""
Form an ordered list out of a dictionary that contains for each activity
the direction (going from -1 if all ingoing edges, to 1 if all outgoing edges)
Parameters
-----------
activities_direction
Dictionary that contains for each direction a number that goes from -1 (all ingoing edges)
to 1 (all outgoing edges)
Returns
----------
dirlist
Sorted list of couples of activity plus the direction
"""
dirlist = []
for act in activities_direction:
dirlist.append([act, activities_direction[act]])
dirlist = sorted(dirlist, key=lambda x: (x[1], x[0]), reverse=True)
return dirlist
[docs]def get_activities_self_loop(dfg):
"""
Get attributes that are in self-loop in this subtree
Parameters
----------
dfg
Directly-follows graph
Returns
----------
self_loop_act
Activities of the graph that are in subloop
"""
self_loop_act = []
outgoing = get_outgoing_edges(dfg)
for act in outgoing:
if act in list(outgoing[act].keys()):
self_loop_act.append(act)
return self_loop_act
[docs]def get_connected_components(ingoing, outgoing, activities, force_insert_missing_acti=True):
"""
Get connected components in the DFG graph
Parameters
-----------
ingoing
Ingoing attributes
outgoing
Outgoing attributes
activities
Activities to consider
force_insert_missing_acti
Force the insertion of a missing activity
"""
activities_considered = set()
connected_components = []
for act in ingoing:
ingoing_act = set(ingoing[act].keys())
if act in outgoing:
ingoing_act = ingoing_act.union(set(outgoing[act].keys()))
ingoing_act.add(act)
if ingoing_act not in connected_components:
connected_components.append(ingoing_act)
activities_considered = activities_considered.union(set(ingoing_act))
for act in outgoing:
if act not in ingoing:
outgoing_act = set(outgoing[act].keys())
outgoing_act.add(act)
if outgoing_act not in connected_components:
connected_components.append(outgoing_act)
activities_considered = activities_considered.union(set(outgoing_act))
if force_insert_missing_acti:
for activ in activities:
if activ not in activities_considered:
added_set = set()
added_set.add(activ)
connected_components.append(added_set)
activities_considered.add(activ)
max_it = len(connected_components)
for it in range(max_it - 1):
something_changed = False
old_connected_components = copy(connected_components)
connected_components = []
for i in range(len(old_connected_components)):
conn1 = old_connected_components[i]
if conn1 is not None:
for j in range(i + 1, len(old_connected_components)):
conn2 = old_connected_components[j]
if conn2 is not None:
inte = conn1.intersection(conn2)
if len(inte) > 0:
conn1 = conn1.union(conn2)
something_changed = True
old_connected_components[j] = None
if conn1 is not None and conn1 not in connected_components:
connected_components.append(conn1)
if not something_changed:
break
if len(connected_components) == 0:
for activity in activities:
connected_components.append([activity])
return connected_components
[docs]def add_to_most_probable_component(comps, act2, ingoing, outgoing):
"""
Adds a lost component in parallel cut detection to the most probable component
Parameters
-------------
comps
Connected components
act2
Activity that has been missed
ingoing
Map of ingoing attributes
outgoing
Map of outgoing attributes
Returns
-------------
comps
Fixed connected components
"""
sums = []
idx_max_sum = 0
for comp in comps:
summ = 0
for act1 in comp:
if act1 in ingoing and act2 in ingoing[act1]:
summ = summ + ingoing[act1][act2]
if act1 in outgoing and act2 in outgoing[act1]:
summ = summ + outgoing[act1][act2]
sums.append(summ)
if sums[-1] > sums[idx_max_sum]:
idx_max_sum = len(sums) - 1
comps[idx_max_sum].add(act2)
return comps
[docs]def get_all_activities_connected_as_output_to_activity(dfg, activity):
"""
Gets all the activities that are connected as output to a given activity
Parameters
-------------
dfg
Directly-follows graph
activity
Activity
Returns
-------------
all_activities
All activities connected as output to a given activity
"""
all_activities = set()
for el in dfg:
if el[0][0] == activity:
all_activities.add(el[0][1])
return all_activities
[docs]def get_all_activities_connected_as_input_to_activity(dfg, activity):
"""
Gets all the activities that are connected as input to a given activity
Parameters
------------
dfg
Directly-follows graph
activity
Activity
Returns
------------
all_activities
All activities connected as input to a given activities
"""
all_activities = set()
for el in dfg:
if el[0][1] == activity:
all_activities.add(el[0][0])
return all_activities
[docs]def get_dfg_np_matrix(dfg):
"""
Gets a Numpy matrix describing the DFG graph, along with a dictionary
making correspondence between indexes and activities names
Parameters
-------------
dfg
Directly-Follows graph
Returns
-------------
matrix
Matrix describing the DFG
index_corresp
Dictionary making correspondence between indexes and activities names
"""
activities_in_dfg = get_activities_from_dfg(dfg)
matrix = np.zeros((len(activities_in_dfg), len(activities_in_dfg)))
for el in dfg:
if type(el[0]) is str:
# manage DFG expressed as dictionary (the key is a tuple)
first_el = el[0]
second_el = el[1]
n_occ = dfg[el]
else:
# manage DFG expressed as list of: ((act0, act1), count)
first_el = el[0][0]
second_el = el[0][1]
n_occ = el[1]
act_ind_0 = activities_in_dfg.index(first_el)
act_ind_1 = activities_in_dfg.index(second_el)
matrix[act_ind_0, act_ind_1] = n_occ
index_corresp = {}
for index, act in enumerate(activities_in_dfg):
index_corresp[index] = act
return matrix, index_corresp
[docs]def get_dfg_sa_ea_act_from_variants(variants, parameters=None):
"""
Gets the DFG, the start and end activities, and the activities
from the dictionary/set/list of variants in the log
Parameters
---------------
variants
Dictionary/set/list of variants
parameters
Parameters of the algorithm, including:
- variants_sep: the delimiter splitting activities in a variant
Returns
--------------
dfg
DFG
list_act
List of different activities
start_activities
Start activities
end_activities
End activities
"""
if parameters is None:
parameters = {}
variants = set(variants_util.get_activities_from_variant(v) for v in variants)
dfg = dict(Counter(list((x[i], x[i + 1]) for x in variants for i in range(len(x) - 1))))
list_act = list(set(y for x in variants for y in x))
start_activities = dict(Counter(x[0] for x in variants if x))
end_activities = dict(Counter(x[-1] for x in variants if x))
return dfg, list_act, start_activities, end_activities
[docs]def transform_dfg_to_directed_nx_graph(dfg, activities=None):
"""
Transform DFG to directed NetworkX graph
Returns
------------
G
NetworkX digraph
nodes_map
Correspondence between digraph nodes and activities
"""
if activities is None:
activities = get_activities_from_dfg(dfg)
if pkgutil.find_loader("networkx"):
import networkx as nx
G = nx.DiGraph()
for act in activities:
G.add_node(act)
for el in dfg:
act1 = el[0][0]
act2 = el[0][1]
G.add_edge(act1, act2)
return G
else:
msg = "networkx is not available. inductive miner cannot be used!"
logging.error(msg)
raise Exception(msg)
[docs]def get_successors(dfg, activities_model=None):
"""
Gets the successors of any node of the DFG graph
Parameters
----------------
dfg
DFG
activities_model
Activities of the process model (if None, it is inferred from the process model)
Returns
-----------------
successors
Dictionary associating to each node all the descendants
"""
if activities_model is None:
activities_model = set(x[0] for x in dfg).union(set(x[1] for x in dfg))
prev = {x: set() for x in activities_model}
curr = {x: set() for x in activities_model}
changed = {x: True for x in activities_model}
for x in dfg:
curr[x[0]].add(x[1])
sthing_diff = True
while sthing_diff:
sthing_diff = False
diff = {}
for x in prev:
if changed[x]:
this_diff = curr[x].difference(prev[x])
if this_diff:
sthing_diff = True
else:
changed[x] = False
diff[x] = this_diff
prev = copy(curr)
for x in diff:
if changed[x]:
for y in diff[x]:
curr[x] = curr[x].union(curr[y])
return curr
[docs]def get_predecessors(dfg, activities_model=None):
"""
Gets the predecessors of any node of the DFG graph
Parameters
----------------
dfg
DFG
activities_model
Activities of the process model (if None, it is inferred from the process model)
Returns
-----------------
predecessors
Dictionary associating to each node all the ascendants
"""
if activities_model is None:
activities_model = set(x[0] for x in dfg).union(set(x[1] for x in dfg))
prev = {x: set() for x in activities_model}
curr = {x: set() for x in activities_model}
changed = {x: True for x in activities_model}
for x in dfg:
curr[x[1]].add(x[0])
sthing_diff = True
while sthing_diff:
sthing_diff = False
diff = {}
for x in prev:
if changed[x]:
this_diff = curr[x].difference(prev[x])
if this_diff:
sthing_diff = True
else:
changed[x] = False
diff[x] = this_diff
prev = copy(curr)
for x in diff:
if changed[x]:
for y in diff[x]:
curr[x] = curr[x].union(curr[y])
return curr
[docs]def get_transitive_relations(dfg, alphabet):
'''
Parameters
----------
dfg
directly follows relation (counter describing activity pairs)
Returns
-------
tuple with two dicts.
first argument maps an activit on all other activities that are able to reach the activity ('transitive pre set')
second argument maps an activity on all other activities that it can reach (transitively) ('transitive post set')
'''
q = queue.Queue()
pre = dict()
post = dict()
for a in alphabet:
pre[a] = set()
post[a] = set()
if len(dfg) > 0:
for e in dfg:
q.put(e)
while q.qsize() > 0:
s, t = q.get()
post[s].add(t)
pre[t].add(s)
post[s].update(post[t])
pre[t].update(pre[s])
for a, b in dfg:
if b == s:
# incoming arcs in s; inherit postset of s
if not post[s].issubset(post[a]):
post[a].update(post[s])
q.put((a, b))
if a == t:
# outgoing arcs from t
if not pre[t].issubset(pre[b]):
pre[b].update(pre[t])
q.put((a, b))
return pre, post
```