Source code for pm4py.algo.discovery.heuristics.variants.plusplus

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from copy import copy
from enum import Enum
from typing import Optional, Dict, Any, Tuple

import pandas as pd

from pm4py.algo.discovery.dfg import algorithm as dfg_alg
from pm4py.algo.discovery.dfg.adapters.pandas import df_statistics
from pm4py.objects.conversion.heuristics_net import converter as hn_conv_alg
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.heuristics_net import defaults
from pm4py.objects.heuristics_net.obj import HeuristicsNet
from pm4py.objects.heuristics_net.node import Node
from pm4py.objects.log.obj import EventLog
from pm4py.objects.log.util import interval_lifecycle
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.statistics.attributes.log import get as log_attributes
from pm4py.statistics.attributes.pandas import get as pd_attributes
from pm4py.statistics.concurrent_activities.log import get as conc_act_get
from pm4py.statistics.concurrent_activities.pandas import get as pd_conc_act
from pm4py.statistics.end_activities.log import get as log_ea
from pm4py.statistics.end_activities.pandas import get as pd_ea
from pm4py.statistics.eventually_follows.log import get as efg_get
from pm4py.statistics.eventually_follows.pandas import get as pd_efg
from pm4py.statistics.sojourn_time.log import get as soj_get
from pm4py.statistics.sojourn_time.pandas import get as pd_soj_time
from pm4py.statistics.start_activities.log import get as log_sa
from pm4py.statistics.start_activities.pandas import get as pd_sa
from pm4py.util import exec_utils, constants, xes_constants as xes


[docs]class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY DEPENDENCY_THRESH = "dependency_thresh" AND_MEASURE_THRESH = "and_measure_thresh" MIN_ACT_COUNT = "min_act_count" MIN_DFG_OCCURRENCES = "min_dfg_occurrences" HEU_NET_DECORATION = "heu_net_decoration"
[docs]def apply(log: EventLog, parameters: Optional[Dict[Any, Any]] = None) -> Tuple[PetriNet, Marking, Marking]: """ Discovers a Petri net using the Heuristics Miner ++ algorithm Implements the approach described in Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010. https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf Parameters -------------- log Event log parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.DEPENDENCY_THRESH - Parameters.AND_MEASURE_THRESH - Parameters.MIN_ACT_COUNT - Parameters.MIN_DFG_OCCURRENCES - Parameters.HEU_NET_DECORATION Returns -------------- net Petri net im Initial marking fm Final marking """ heu_net = apply_heu(log, parameters=parameters) net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters) return net, im, fm
[docs]def apply_pandas(df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None) -> Tuple[PetriNet, Marking, Marking]: """ Discovers a Petri net using the Heuristics Miner ++ algorithm Implements the approach described in Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010. https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf Parameters -------------- df Dataframe parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.CASE_ID_KEY - Parameters.DEPENDENCY_THRESH - Parameters.AND_MEASURE_THRESH - Parameters.MIN_ACT_COUNT - Parameters.MIN_DFG_OCCURRENCES - Parameters.HEU_NET_DECORATION Returns -------------- net Petri net im Initial marking fm Final marking """ heu_net = apply_heu_pandas(df, parameters=parameters) net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters) return net, im, fm
[docs]def apply_heu(log: EventLog, parameters: Optional[Dict[Any, Any]] = None) -> HeuristicsNet: """ Discovers an heuristics net using the Heuristics Miner ++ algorithm Implements the approach described in Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010. https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf Parameters -------------- log Event log parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.DEPENDENCY_THRESH - Parameters.AND_MEASURE_THRESH - Parameters.MIN_ACT_COUNT - Parameters.MIN_DFG_OCCURRENCES - Parameters.HEU_NET_DECORATION Returns -------------- heu_net Heuristics net """ if parameters is None: parameters = {} log = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters) log = interval_lifecycle.to_interval(log, parameters=parameters) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, None) if start_timestamp_key is None: start_timestamp_key = xes.DEFAULT_START_TIMESTAMP_KEY parameters = copy(parameters) parameters[Parameters.START_TIMESTAMP_KEY] = start_timestamp_key start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities = discover_abstraction_log( log, parameters=parameters) return discover_heu_net_plus_plus(start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities, parameters=parameters)
[docs]def discover_abstraction_log(log: EventLog, parameters: Optional[Dict[Any, Any]] = None) -> Tuple[ Any, Any, Any, Any, Any, Any, Any]: """ Discovers an abstraction from a log that is useful for the Heuristics Miner ++ algorithm Parameters -------------- log Event log parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.CASE_ID_KEY Returns -------------- start_activities Start activities end_activities End activities activities_occurrences Activities along with their number of occurrences dfg Directly-follows graph performance_dfg (Performance) Directly-follows graph sojourn_time Sojourn time for each activity concurrent_activities Concurrent activities """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY) start_activities = log_sa.get_start_activities(log, parameters=parameters) end_activities = log_ea.get_end_activities(log, parameters=parameters) activities_occurrences = log_attributes.get_attribute_values(log, activity_key, parameters=parameters) efg_parameters = copy(parameters) efg_parameters[efg_get.Parameters.KEEP_FIRST_FOLLOWING] = True dfg = efg_get.apply(log, parameters=efg_parameters) performance_dfg = dfg_alg.apply(log, variant=dfg_alg.Variants.PERFORMANCE, parameters=parameters) sojourn_time = soj_get.apply(log, parameters=parameters) concurrent_activities = conc_act_get.apply(log, parameters=parameters) return ( start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities)
[docs]def apply_heu_pandas(df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None) -> HeuristicsNet: """ Discovers an heuristics net using the Heuristics Miner ++ algorithm Implements the approach described in Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010. https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf Parameters -------------- df Dataframe parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.CASE_ID_KEY - Parameters.DEPENDENCY_THRESH - Parameters.AND_MEASURE_THRESH - Parameters.MIN_ACT_COUNT - Parameters.MIN_DFG_OCCURRENCES - Parameters.HEU_NET_DECORATION Returns -------------- heu_net Heuristics net """ if parameters is None: parameters = {} start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities = discover_abstraction_dataframe( df, parameters=parameters) return discover_heu_net_plus_plus(start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities)
[docs]def discover_abstraction_dataframe(df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None) -> Tuple[ Any, Any, Any, Any, Any, Any, Any]: """ Discovers an abstraction from a dataframe that is useful for the Heuristics Miner ++ algorithm Parameters -------------- df Dataframe parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY - Parameters.START_TIMESTAMP_KEY - Parameters.TIMESTAMP_KEY - Parameters.CASE_ID_KEY Returns -------------- start_activities Start activities end_activities End activities activities_occurrences Activities along with their number of occurrences dfg Directly-follows graph performance_dfg (Performance) Directly-follows graph sojourn_time Sojourn time for each activity concurrent_activities Concurrent activities """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, None) if start_timestamp_key is None: start_timestamp_key = xes.DEFAULT_START_TIMESTAMP_KEY parameters = copy(parameters) parameters[Parameters.START_TIMESTAMP_KEY] = start_timestamp_key timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY) case_id_glue = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) start_activities = pd_sa.get_start_activities(df, parameters=parameters) end_activities = pd_ea.get_end_activities(df, parameters=parameters) activities_occurrences = pd_attributes.get_attribute_values(df, activity_key, parameters=parameters) efg_parameters = copy(parameters) efg_parameters[pd_efg.Parameters.KEEP_FIRST_FOLLOWING] = True dfg = pd_efg.apply(df, parameters=efg_parameters) performance_dfg = df_statistics.get_dfg_graph(df, case_id_glue=case_id_glue, activity_key=activity_key, timestamp_key=timestamp_key, start_timestamp_key=start_timestamp_key, measure="performance") sojourn_time = pd_soj_time.apply(df, parameters=parameters) concurrent_activities = pd_conc_act.apply(df, parameters=parameters) return ( start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities)
[docs]def discover_heu_net_plus_plus(start_activities, end_activities, activities_occurrences, dfg, performance_dfg, sojourn_time, concurrent_activities, parameters: Optional[Dict[Any, Any]] = None): """ Discovers an heuristics net using the Heuristics Miner ++ algorithm Implements the approach described in Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010. https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf Parameters -------------- start_activities Start activities end_activities End activities activities_occurrences Activities along with their number of occurrences dfg Directly-follows graph performance_dfg (Performance) Directly-follows graph sojourn_time Sojourn time for each activity concurrent_activities Concurrent activities parameters Parameters of the algorithm, including: - Parameters.DEPENDENCY_THRESH - Parameters.AND_MEASURE_THRESH - Parameters.MIN_ACT_COUNT - Parameters.MIN_DFG_OCCURRENCES - Parameters.HEU_NET_DECORATION Returns -------------- heu_net Heuristics net """ if parameters is None: parameters = {} dependency_thresh = exec_utils.get_param_value(Parameters.DEPENDENCY_THRESH, parameters, defaults.DEFAULT_DEPENDENCY_THRESH) and_measure_thresh = exec_utils.get_param_value(Parameters.AND_MEASURE_THRESH, parameters, defaults.DEFAULT_AND_MEASURE_THRESH) min_act_count = exec_utils.get_param_value(Parameters.MIN_ACT_COUNT, parameters, defaults.DEFAULT_MIN_ACT_COUNT) min_dfg_occurrences = exec_utils.get_param_value(Parameters.MIN_DFG_OCCURRENCES, parameters, defaults.DEFAULT_MIN_DFG_OCCURRENCES) heu_net_decoration = exec_utils.get_param_value(Parameters.HEU_NET_DECORATION, parameters, "frequency") # filter on activity and paths occurrence activities_occurrences = {x: y for x, y in activities_occurrences.items() if y >= min_act_count} dfg = {x: y for x, y in dfg.items() if y >= min_dfg_occurrences and x[0] in activities_occurrences and x[1] in activities_occurrences} performance_dfg = {x: y for x, y in performance_dfg.items() if x in dfg} start_activities = {x: y for x, y in start_activities.items() if x in activities_occurrences} end_activities = {x: y for x, y in end_activities.items() if x in activities_occurrences} activities = list(activities_occurrences.keys()) if heu_net_decoration == "frequency": heu_net = HeuristicsNet(dfg, activities=activities, activities_occurrences=activities_occurrences, start_activities=start_activities, end_activities=end_activities) else: heu_net = HeuristicsNet(dfg, activities=activities, activities_occurrences=activities_occurrences, start_activities=start_activities, end_activities=end_activities, performance_dfg=performance_dfg) heu_net.min_dfg_occurrences = min_dfg_occurrences heu_net.sojourn_times = sojourn_time heu_net.concurrent_activities = concurrent_activities return calculate(heu_net, dependency_thresh, and_measure_thresh, heu_net_decoration)
[docs]def calculate(heu_net: HeuristicsNet, dependency_thresh: float, and_measure_thresh: float, heu_net_decoration: str) -> HeuristicsNet: """ Calculates the dependency matrix and the AND measures using the Heuristics Miner ++ formulas Parameters ---------------- heu_net Heuristics net dependency_thresh Dependency threshold and_measure_thresh AND measure threshold heu_net_decoration Decoration to use (frequency/performance) Returns ---------------- heu_net Heuristics net """ heu_net.performance_matrix = {} heu_net.dependency_matrix = {} heu_net.dfg_matrix = {} for el in heu_net.dfg: act1 = el[0] act2 = el[1] if act1 not in heu_net.dfg_matrix: heu_net.dfg_matrix[act1] = {} heu_net.dependency_matrix[act1] = {} heu_net.performance_matrix[act1] = {} heu_net.dfg_matrix[act1][act2] = heu_net.dfg[el] heu_net.dependency_matrix[act1][act2] = -1 heu_net.performance_matrix[act1][act2] = heu_net.performance_dfg[el] if heu_net.performance_dfg else 0.0 for act1 in heu_net.activities: heu_net.nodes[act1] = Node(heu_net, act1, heu_net.activities_occurrences[act1], node_type=heu_net.node_type) # calculates the dependencies between the activities heu_net = calculate_dependency(heu_net, dependency_thresh, heu_net_decoration) # calculates the AND measure for outgoing edges (e.g. which activities happen in parallel after a given activity) heu_net = calculate_and_out_measure(heu_net, and_measure_thresh) # calculates the AND measure for ingoing edges (e.g. which activities happen in parallel before a given activity) heu_net = calculate_and_in_measure(heu_net, and_measure_thresh) return heu_net
[docs]def calculate_dependency(heu_net: HeuristicsNet, dependency_thresh: float, heu_net_decoration: str) -> HeuristicsNet: """ Calculates the dependency matrix using the Heuristics Miner ++ formula Parameters -------------- heu_net Heuristics net dependency_thresh Dependency threshold heu_net_decoration Decoration to include (frequency/performance) Returns --------------- heu_net Heuristics net (enriched) """ for act1 in heu_net.activities: if act1 in heu_net.dfg_matrix: for act2 in heu_net.dfg_matrix[act1]: v1 = heu_net.dfg_matrix[act1][act2] v2 = heu_net.dfg_matrix[act2][act1] if act2 in heu_net.dfg_matrix and act1 in heu_net.dfg_matrix[ act2] else 0.0 tup = tuple(sorted((act1, act2))) # added term for Heuristics Miner ++ v3 = heu_net.concurrent_activities[tup] if tup in heu_net.concurrent_activities else 0.0 dep = (v1 - v2) / (v1 + v2 + v3) heu_net.dependency_matrix[act1][act2] = dep if dep > dependency_thresh: repr_value = v1 if heu_net_decoration == "frequency" else heu_net.performance_matrix[act1][act2] heu_net.nodes[act1].add_output_connection(heu_net.nodes[act2], dep, v1, repr_value=repr_value) heu_net.nodes[act2].add_input_connection(heu_net.nodes[act1], dep, v1, repr_value=repr_value) return heu_net
[docs]def calculate_and_out_measure(heu_net: HeuristicsNet, and_measure_thresh: float) -> HeuristicsNet: """ Calculates the AND measure for outgoing edges using the Heuristics Miner ++ formula Parameters --------------- heu_net Heuristics net and_measure_thresh And measure threshold Returns --------------- heu_net Heuristics net (enriched) """ for act in heu_net.nodes: nodes = sorted(x.node_name for x in heu_net.nodes[act].output_connections) i = 0 while i < len(nodes): n1 = nodes[i] v3 = heu_net.dfg_matrix[act][n1] if act in heu_net.dfg_matrix and n1 in heu_net.dfg_matrix[act] else 0.0 j = i + 1 while j < len(nodes): n2 = nodes[j] tup = tuple(sorted((n1, n2))) v1 = heu_net.dfg_matrix[n1][n2] if n1 in heu_net.dfg_matrix and n2 in heu_net.dfg_matrix[n1] else 0.0 v2 = heu_net.dfg_matrix[n2][n1] if n2 in heu_net.dfg_matrix and n1 in heu_net.dfg_matrix[n2] else 0.0 v4 = heu_net.dfg_matrix[act][n2] if act in heu_net.dfg_matrix and n2 in heu_net.dfg_matrix[act] else 0.0 # added term for Heuristics Miner ++ v5 = heu_net.concurrent_activities[tup] if tup in heu_net.concurrent_activities else 0.0 this_value = (v1 + v2 + v5) / (v3 + v4) if this_value > and_measure_thresh: if n1 not in heu_net.nodes[act].and_measures_out: heu_net.nodes[act].and_measures_out[n1] = {} heu_net.nodes[act].and_measures_out[n1][n2] = this_value j = j + 1 i = i + 1 return heu_net
[docs]def calculate_and_in_measure(heu_net: HeuristicsNet, and_measure_thresh: float) -> HeuristicsNet: """ Calculates the AND measure for incoming edges using the Heuristics Miner ++ formula Parameters --------------- heu_net Heuristics net and_measure_thresh And measure threshold Returns --------------- heu_net Heuristics net (enriched) """ for act in heu_net.nodes: nodes = sorted(x.node_name for x in heu_net.nodes[act].input_connections) i = 0 while i < len(nodes): n1 = nodes[i] v3 = heu_net.dfg_matrix[n1][act] if n1 in heu_net.dfg_matrix and act in heu_net.dfg_matrix[n1] else 0.0 j = i + 1 while j < len(nodes): n2 = nodes[j] tup = tuple(sorted((n1, n2))) v1 = heu_net.dfg_matrix[n1][n2] if n1 in heu_net.dfg_matrix and n2 in heu_net.dfg_matrix[n1] else 0.0 v2 = heu_net.dfg_matrix[n2][n1] if n2 in heu_net.dfg_matrix and n1 in heu_net.dfg_matrix[n2] else 0.0 v4 = heu_net.dfg_matrix[n2][act] if n2 in heu_net.dfg_matrix and act in heu_net.dfg_matrix[n2] else 0.0 # added term for Heuristics Miner ++ v5 = heu_net.concurrent_activities[tup] if tup in heu_net.concurrent_activities else 0.0 this_value = (v1 + v2 + v5) / (v3 + v4) if this_value > and_measure_thresh: if n1 not in heu_net.nodes[act].and_measures_in: heu_net.nodes[act].and_measures_in[n1] = {} heu_net.nodes[act].and_measures_in[n1][n2] = this_value j = j + 1 i = i + 1 return heu_net
[docs]def apply_dfg(dfg, activities=None, activities_occurrences=None, start_activities=None, end_activities=None, parameters=None): raise Exception("not implemented for plusplus version")
[docs]def apply_heu_dfg(dfg, activities=None, activities_occurrences=None, start_activities=None, end_activities=None, dfg_window_2=None, freq_triples=None, parameters=None): raise Exception("not implemented for plusplus version")