Source code for pm4py.algo.transformation.log_to_features.variants.trace_based

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from enum import Enum
from typing import Optional, Dict, Any, Union, Tuple, List, Set

import pandas as pd

from pm4py.objects.conversion.log import converter
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.log.util import dataframe_utils
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants as xes
from pm4py.util import xes_constants


[docs]class Parameters(Enum): ENABLE_ACTIVITY_DEF_REPRESENTATION = "enable_activity_def_representation" ENABLE_SUCC_DEF_REPRESENTATION = "enable_succ_def_representation" STR_TRACE_ATTRIBUTES = "str_tr_attr" STR_EVENT_ATTRIBUTES = "str_ev_attr" NUM_TRACE_ATTRIBUTES = "num_tr_attr" NUM_EVENT_ATTRIBUTES = "num_ev_attr" STR_EVSUCC_ATTRIBUTES = "str_evsucc_attr" FEATURE_NAMES = "feature_names" ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY RESOURCE_KEY = constants.PARAMETER_CONSTANT_RESOURCE_KEY EPSILON = "epsilon" DEFAULT_NOT_PRESENT = "default_not_present" ENABLE_ALL_EXTRA_FEATURES = "enable_all_extra_features" ENABLE_CASE_DURATION = "enable_case_duration" ADD_CASE_IDENTIFIER_COLUMN = "add_case_identifier_column" ENABLE_TIMES_FROM_FIRST_OCCURRENCE = "enable_times_from_first_occurrence" ENABLE_TIMES_FROM_LAST_OCCURRENCE = "enable_times_from_last_occurrence" ENABLE_DIRECT_PATHS_TIMES_LAST_OCC = "enable_direct_paths_times_last_occ" ENABLE_INDIRECT_PATHS_TIMES_LAST_OCC = "enable_indirect_paths_times_last_occ" ENABLE_WORK_IN_PROGRESS = "enable_work_in_progress" ENABLE_RESOURCE_WORKLOAD = "enable_resource_workload" ENABLE_FIRST_LAST_ACTIVITY_INDEX = "enable_first_last_activity_index" ENABLE_MAX_CONCURRENT_EVENTS = "enable_max_concurrent_events" ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY = "enable_max_concurrent_events_per_activity" CASE_ATTRIBUTE_PREFIX = constants.CASE_ATTRIBUTE_PREFIX
[docs]def max_concurrent_events(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Counts for every trace the maximum number of events (of any activity) that happen concurrently (e.g., their time intervals [st1, ct1] and [st2, ct2] have non-empty intersection). Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) data = [] feature_names = ["@@max_concurrent_activities_general"] for trace in log: max_conc = 0 i = 0 while i < len(trace)-1: conc = 0 ct = trace[i][timestamp_key].timestamp() j = i + 1 while j < len(trace): st = trace[j][start_timestamp_key].timestamp() if st > ct: break conc = conc + 1 j = j + 1 if conc > max_conc: max_conc = conc i = i + 1 data.append([max_conc]) return data, feature_names
[docs]def max_concurrent_events_per_activity(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Counts for every trace and every activity the maximum number of events of the given activity that happen concurrently (e.g., their time intervals [st1, ct1] and [st2, ct2] have non-empty intersection). Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) activities = list(set(y[activity_key] for x in log for y in x)) data = [] feature_names = ["@@max_concurrent_activities_like_"+x for x in activities] for trace in log: max_conc_act = {act: 0 for act in activities} i = 0 while i < len(trace)-1: conc = 0 act = trace[i][activity_key] ct = trace[i][timestamp_key].timestamp() j = i + 1 while j < len(trace): st = trace[j][start_timestamp_key].timestamp() actj = trace[j][activity_key] if st > ct: break if act == actj: conc = conc + 1 j = j + 1 i = i + 1 max_conc_act[act] = conc arr = [] for act in activities: arr.append(max_conc_act[act]) data.append(arr) return data, feature_names
[docs]def resource_workload(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each resource of the log, the workload of the resource during the lead time of a case. Defaults if a resource is not contained in a case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} from intervaltree.intervaltree import IntervalTree, Interval start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) resource_key = exec_utils.get_param_value(Parameters.RESOURCE_KEY, parameters, xes_constants.DEFAULT_RESOURCE_KEY) epsilon = exec_utils.get_param_value(Parameters.EPSILON, parameters, 0.000001) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) tree_dict = {} for case in log: if case: resources = set(x[resource_key] for x in case) st = case[0][start_timestamp_key].timestamp() - epsilon ct = case[-1][timestamp_key].timestamp() + epsilon for res in resources: if res not in tree_dict: tree_dict[res] = IntervalTree() tree_dict[res].add(Interval(st, ct)) resources_list = sorted(list(tree_dict)) data = [] feature_names = ["resource_workload@@"+r for r in resources_list] for case in log: data.append([]) resources = set(x[resource_key] for x in case) st = case[0][start_timestamp_key].timestamp() - epsilon ct = case[-1][timestamp_key].timestamp() + epsilon for res in resources_list: if res in resources: data[-1].append(len(tree_dict[res][st:ct])) else: data[-1].append(default_not_present) return data, feature_names
[docs]def work_in_progress(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each resource of the log, the number of cases which are open during the lead time of the case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} from intervaltree.intervaltree import IntervalTree, Interval start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) epsilon = exec_utils.get_param_value(Parameters.EPSILON, parameters, 0.000001) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) tree = IntervalTree() for case in log: if case: st = case[0][start_timestamp_key].timestamp() - epsilon ct = case[-1][timestamp_key].timestamp() + epsilon tree.add(Interval(st, ct)) data = [] feature_names = ["@@work_in_progress"] for case in log: if case: st = case[0][start_timestamp_key].timestamp() - epsilon ct = case[-1][timestamp_key].timestamp() + epsilon data.append([len(tree[st:ct])]) else: data.append([default_not_present]) return data, feature_names
[docs]def indirect_paths_times_last_occ(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each indirect path of the case, the difference between the start timestamp of the later event and the completion timestamp of the first event. Defaults if a path is not present in a case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) all_paths = set() for trace in log: for i in range(len(trace)-1): for j in range(i+2, len(trace)): all_paths.add((trace[i][activity_key], trace[j][activity_key])) all_paths = sorted(list(all_paths)) data = [] feature_names = [] for p in all_paths: feature_names.append("indirectPathPerformanceLastOcc@@"+p[0]+"##"+p[1]) for trace in log: data.append([]) trace_paths_perf = {} for i in range(len(trace)-1): for j in range(i+2, len(trace)): p = (trace[i][activity_key], trace[j][activity_key]) tc = trace[i][timestamp_key].timestamp() ts = trace[j][start_timestamp_key].timestamp() if ts > tc: trace_paths_perf[p] = ts - tc for p in all_paths: if p in trace_paths_perf: data[-1].append(trace_paths_perf[p]) else: data[-1].append(default_not_present) return data, feature_names
[docs]def direct_paths_times_last_occ(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each direct path of the case, the difference between the start timestamp of the later event and the completion timestamp of the first event. Defaults if a path is not present in a case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) all_paths = set() for trace in log: for i in range(len(trace)-1): all_paths.add((trace[i][activity_key], trace[i+1][activity_key])) all_paths = sorted(list(all_paths)) data = [] feature_names = [] for p in all_paths: feature_names.append("directPathPerformanceLastOcc@@"+p[0]+"##"+p[1]) for trace in log: data.append([]) trace_paths_perf = {} for i in range(len(trace)-1): p = (trace[i][activity_key], trace[i+1][activity_key]) tc = trace[i][timestamp_key].timestamp() ts = trace[i+1][start_timestamp_key].timestamp() if ts > tc: trace_paths_perf[p] = ts - tc for p in all_paths: if p in trace_paths_perf: data[-1].append(trace_paths_perf[p]) else: data[-1].append(default_not_present) return data, feature_names
[docs]def times_from_first_occurrence_activity_case(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each activity, the times from the start to the case, and to the end of the case, from the first occurrence of the activity in the case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) activities_log = set() for trace in log: for event in trace: activities_log.add(event[activity_key]) activities_log = sorted(list(activities_log)) data = [] feature_names = [] for act in activities_log: feature_names.append("startToFirstOcc@@"+act) feature_names.append("firstOccToEnd@@"+act) for trace in log: data.append([]) activities_occ = {} for i in range(len(trace)): if not trace[i][activity_key] in activities_occ: activities_occ[trace[i][activity_key]] = i for act in activities_log: if act in activities_occ: ev = trace[activities_occ[act]] this_ev_st = ev[start_timestamp_key].timestamp() this_ev_ct = ev[timestamp_key].timestamp() start_ev_ct = trace[0][timestamp_key].timestamp() end_ev_st = trace[-1][start_timestamp_key].timestamp() data[-1].append(this_ev_st - start_ev_ct) data[-1].append(end_ev_st - this_ev_ct) else: data[-1].append(default_not_present) data[-1].append(default_not_present) return data, feature_names
[docs]def times_from_last_occurrence_activity_case(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, and for each activity, the times from the start to the case, and to the end of the case, from the last occurrence of the activity in the case. Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, 0) activities_log = set() for trace in log: for event in trace: activities_log.add(event[activity_key]) activities_log = sorted(list(activities_log)) data = [] feature_names = [] for act in activities_log: feature_names.append("startToLastOcc@@"+act) feature_names.append("lastOccToEnd@@"+act) for trace in log: data.append([]) activities_occ = {} for i in range(len(trace)): activities_occ[trace[i][activity_key]] = i for act in activities_log: if act in activities_occ: ev = trace[activities_occ[act]] this_ev_st = ev[start_timestamp_key].timestamp() this_ev_ct = ev[timestamp_key].timestamp() start_ev_ct = trace[0][timestamp_key].timestamp() end_ev_st = trace[-1][start_timestamp_key].timestamp() data[-1].append(this_ev_st - start_ev_ct) data[-1].append(end_ev_st - this_ev_ct) else: data[-1].append(default_not_present) data[-1].append(default_not_present) return data, feature_names
[docs]def first_last_activity_index_trace(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Consider as features the first and the last index of an activity inside a case Parameters ------------------ log Event log parameters Parameters, including: - Parameters.ACTIVITY_KEY => the attribute to use as activity - Parameters.DEFAULT_NOT_PRESENT => the replacement value for activities that are not present for the specific case Returns ----------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) default_not_present = exec_utils.get_param_value(Parameters.DEFAULT_NOT_PRESENT, parameters, -1) activities_log = set() for trace in log: for event in trace: activities_log.add(event[activity_key]) activities_log = sorted(list(activities_log)) data = [] feature_names = [] for act in activities_log: feature_names.append("firstIndexAct@@"+act) feature_names.append("lastIndexAct@@"+act) for trace in log: data.append([]) first_occ = {} last_occ = {} for index, event in enumerate(trace): act = event[activity_key] last_occ[act] = index if act not in first_occ: first_occ[act] = index for act in activities_log: if act not in first_occ: data[-1].append(default_not_present) data[-1].append(default_not_present) else: data[-1].append(first_occ[act]) data[-1].append(last_occ[act]) return data, feature_names
[docs]def case_duration(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Calculates for each case, the case duration (and adds it as a feature) Parameters ----------------- log Event log parameters Parameters of the algorithm Returns ---------------- data Numeric value of the features feature_names Names of the features """ if parameters is None: parameters = {} start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) feature_names = ["@@caseDuration"] data = [] for trace in log: if trace: data.append([trace[-1][timestamp_key].timestamp() - trace[0][start_timestamp_key].timestamp()]) else: data.append([0]) return data, feature_names
[docs]def get_string_trace_attribute_rep(trace: Trace, trace_attribute: str) -> str: """ Get a representation of the feature name associated to a string trace attribute value Parameters ------------ trace Trace of the log trace_attribute Attribute of the trace to consider Returns ------------ rep Representation of the feature name associated to a string trace attribute value """ if trace_attribute in trace.attributes: return "trace:" + str(trace_attribute) + "@" + str(trace.attributes[trace_attribute]) return "trace:" + str(trace_attribute) + "@UNDEFINED"
[docs]def get_all_string_trace_attribute_values(log: EventLog, trace_attribute: str) -> List[str]: """ Get all string trace attribute values representations for a log Parameters ------------ log Trace log trace_attribute Attribute of the trace to consider Returns ------------ list List containing for each trace a representation of the feature name associated to the attribute """ values = set() for trace in log: values.add(get_string_trace_attribute_rep(trace, trace_attribute)) return list(sorted(values))
[docs]def get_string_event_attribute_rep(event: Event, event_attribute: str) -> str: """ Get a representation of the feature name associated to a string event attribute value Parameters ------------ event Single event of a trace event_attribute Event attribute to consider Returns ------------ rep Representation of the feature name associated to a string event attribute value """ return "event:" + str(event_attribute) + "@" + str(event[event_attribute])
[docs]def get_values_event_attribute_for_trace(trace: Trace, event_attribute: str) -> Set[str]: """ Get all the representations for the events of a trace associated to a string event attribute values Parameters ------------- trace Trace of the log event_attribute Event attribute to consider Returns ------------- values All feature names present for the given attribute in the given trace """ values_trace = set() for event in trace: if event_attribute in event: values_trace.add(get_string_event_attribute_rep(event, event_attribute)) if not values_trace: values_trace.add("event:" + str(event_attribute) + "@UNDEFINED") return values_trace
[docs]def get_all_string_event_attribute_values(log: EventLog, event_attribute: str) -> List[str]: """ Get all the representations for all the traces of the log associated to a string event attribute values Parameters ------------ log Trace of the log event_attribute Event attribute to consider Returns ------------ values All feature names present for the given attribute in the given log """ values = set() for trace in log: values = values.union(get_values_event_attribute_for_trace(trace, event_attribute)) return list(sorted(values))
[docs]def get_string_event_attribute_succession_rep(event1: Event, event2: Event, event_attribute: str) -> str: """ Get a representation of the feature name associated to a string event attribute value Parameters ------------ event1 First event of the succession event2 Second event of the succession event_attribute Event attribute to consider Returns ------------ rep Representation of the feature name associated to a string event attribute value """ return "succession:" + str(event_attribute) + "@" + str(event1[event_attribute]) + "#" + str( event2[event_attribute])
[docs]def get_values_event_attribute_succession_for_trace(trace: Trace, event_attribute: str) -> Set[str]: """ Get all the representations for the events of a trace associated to a string event attribute succession values Parameters ------------- trace Trace of the log event_attribute Event attribute to consider Returns ------------- values All feature names present for the given attribute succession in the given trace """ values_trace = set() for i in range(len(trace) - 1): event1 = trace[i] event2 = trace[i + 1] if event_attribute in event1 and event_attribute in event2: values_trace.add(get_string_event_attribute_succession_rep(event1, event2, event_attribute)) if not values_trace: values_trace.add("succession:" + str(event_attribute) + "@UNDEFINED") return values_trace
[docs]def get_all_string_event_succession_attribute_values(log: EventLog, event_attribute: str) -> List[str]: """ Get all the representations for all the traces of the log associated to a string event attribute succession values Parameters ------------ log Trace of the log event_attribute Event attribute to consider Returns ------------ values All feature names present for the given attribute succession in the given log """ values = set() for trace in log: values = values.union(get_values_event_attribute_succession_for_trace(trace, event_attribute)) return list(sorted(values))
[docs]def get_numeric_trace_attribute_rep(trace_attribute: str) -> str: """ Get the feature name associated to a numeric trace attribute Parameters ------------ trace_attribute Name of the trace attribute Returns ------------ feature_name Name of the feature """ return "trace:" + trace_attribute
[docs]def get_numeric_trace_attribute_value(trace: Trace, trace_attribute: str) -> Union[int, float]: """ Get the value of a numeric trace attribute from a given trace Parameters ------------ trace Trace of the log Returns ------------ value Value of the numeric trace attribute for the given trace """ if trace_attribute in trace.attributes: return trace.attributes[trace_attribute] raise Exception("at least a trace without trace attribute: " + trace_attribute)
[docs]def get_numeric_event_attribute_rep(event_attribute: str) -> str: """ Get the feature name associated to a numeric event attribute Parameters ------------ event_attribute Name of the event attribute Returns ------------- feature_name Name of the feature """ return "event:" + event_attribute
[docs]def get_numeric_event_attribute_value(event: Event, event_attribute: str) -> Union[int, float]: """ Get the value of a numeric event attribute from a given event Parameters ------------- event Event Returns ------------- value Value of the numeric event attribute for the given event """ if event_attribute in event: return event[event_attribute] return None
[docs]def get_numeric_event_attribute_value_trace(trace: Trace, event_attribute: str) -> Union[int, float]: """ Get the value of the last occurrence of a numeric event attribute given a trace Parameters ------------- trace Trace of the log Returns ------------- value Value of the last occurrence of a numeric trace attribute for the given trace """ non_zero_values = [] for event in trace: value = get_numeric_event_attribute_value(event, event_attribute) if value is not None: non_zero_values.append(value) if len(non_zero_values) > 0: return non_zero_values[-1] raise Exception("at least a trace without any event with event attribute: " + event_attribute)
[docs]def get_default_representation_with_attribute_names(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None, feature_names: Optional[List[str]] = None) -> Tuple[ Any, List[str], List[str], List[str], List[str], List[str]]: """ Gets the default data representation of an event log (for process tree building) returning also the attribute names Parameters ------------- log Trace log parameters Possible parameters of the algorithm feature_names (If provided) Feature to use in the representation of the log Returns ------------- data Data to provide for decision tree learning feature_names Names of the features, in order """ from pm4py.statistics.attributes.log.select import select_attributes_from_log_for_tree if parameters is None: parameters = {} enable_activity_def_representation = exec_utils.get_param_value(Parameters.ENABLE_ACTIVITY_DEF_REPRESENTATION, parameters, False) enable_succ_def_representation = exec_utils.get_param_value(Parameters.ENABLE_SUCC_DEF_REPRESENTATION, parameters, False) activity_key = parameters[ constants.PARAMETER_CONSTANT_ACTIVITY_KEY] if constants.PARAMETER_CONSTANT_ACTIVITY_KEY in parameters else xes.DEFAULT_NAME_KEY blacklist = parameters["blacklist"] if "blacklist" in parameters else [] str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr = select_attributes_from_log_for_tree(log) str_evsucc_attr = None if enable_succ_def_representation: str_evsucc_attr = [activity_key] if enable_activity_def_representation and activity_key not in str_ev_attr: str_ev_attr.append(activity_key) str_tr_attr = [x for x in str_tr_attr if x not in blacklist] str_ev_attr = [x for x in str_ev_attr if x not in blacklist] num_tr_attr = [x for x in num_tr_attr if x not in blacklist] num_ev_attr = [x for x in num_ev_attr if x not in blacklist] if str_evsucc_attr is not None: str_evsucc_attr = [x for x in str_evsucc_attr if x not in blacklist] data, feature_names = get_representation(log, str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr, str_evsucc_attr=str_evsucc_attr, feature_names=feature_names) return data, feature_names, str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr
[docs]def get_default_representation(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None, feature_names: Optional[List[str]] = None) -> Tuple[Any, List[str]]: """ Gets the default data representation of an event log (for process tree building) Parameters ------------- log Trace log parameters Possible parameters of the algorithm feature_names (If provided) Feature to use in the representation of the log Returns ------------- data Data to provide for decision tree learning feature_names Names of the features, in order """ from pm4py.statistics.attributes.log.select import select_attributes_from_log_for_tree if parameters is None: parameters = {} enable_activity_def_representation = exec_utils.get_param_value(Parameters.ENABLE_ACTIVITY_DEF_REPRESENTATION, parameters, True) enable_succ_def_representation = exec_utils.get_param_value(Parameters.ENABLE_SUCC_DEF_REPRESENTATION, parameters, True) activity_key = parameters[ constants.PARAMETER_CONSTANT_ACTIVITY_KEY] if constants.PARAMETER_CONSTANT_ACTIVITY_KEY in parameters else xes.DEFAULT_NAME_KEY blacklist = parameters["blacklist"] if "blacklist" in parameters else [] str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr = select_attributes_from_log_for_tree(log) str_evsucc_attr = None if enable_succ_def_representation: str_evsucc_attr = [activity_key] if enable_activity_def_representation and activity_key not in str_ev_attr: str_ev_attr.append(activity_key) str_tr_attr = [x for x in str_tr_attr if x not in blacklist] str_ev_attr = [x for x in str_ev_attr if x not in blacklist] num_tr_attr = [x for x in num_tr_attr if x not in blacklist] num_ev_attr = [x for x in num_ev_attr if x not in blacklist] if str_evsucc_attr is not None: str_evsucc_attr = [x for x in str_evsucc_attr if x not in blacklist] return get_representation(log, str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr, str_evsucc_attr=str_evsucc_attr, feature_names=feature_names)
[docs]def get_representation(log: EventLog, str_tr_attr: List[str], str_ev_attr: List[str], num_tr_attr: List[str], num_ev_attr: List[str], str_evsucc_attr: Optional[List[str]] = None, feature_names: Optional[List[str]] = None) -> Tuple[Any, List[str]]: """ Get a representation of the event log that is suited for the data part of the decision tree learning NOTE: this function only encodes the last value seen for each attribute Parameters ------------- log Trace log str_tr_attr List of string trace attributes to consider in data vector creation str_ev_attr List of string event attributes to consider in data vector creation num_tr_attr List of numeric trace attributes to consider in data vector creation num_ev_attr List of numeric event attributes to consider in data vector creation str_evsucc_attr List of attributes succession of values to consider in data vector creation feature_names (If provided) Feature to use in the representation of the log Returns ------------- data Data to provide for decision tree learning feature_names Names of the features, in order """ data = [] dictionary = {} count = 0 if feature_names is None: feature_names = [] for trace_attribute in str_tr_attr: values = get_all_string_trace_attribute_values(log, trace_attribute) for value in values: dictionary[value] = count feature_names.append(value) count = count + 1 for event_attribute in str_ev_attr: values = get_all_string_event_attribute_values(log, event_attribute) for value in values: dictionary[value] = count feature_names.append(value) count = count + 1 for trace_attribute in num_tr_attr: dictionary[get_numeric_trace_attribute_rep(trace_attribute)] = count feature_names.append(get_numeric_trace_attribute_rep(trace_attribute)) count = count + 1 for event_attribute in num_ev_attr: dictionary[get_numeric_event_attribute_rep(event_attribute)] = count feature_names.append(get_numeric_event_attribute_rep(event_attribute)) count = count + 1 if str_evsucc_attr: for event_attribute in str_evsucc_attr: values = get_all_string_event_succession_attribute_values(log, event_attribute) for value in values: dictionary[value] = count feature_names.append(value) count = count + 1 else: count = len(feature_names) for index, value in enumerate(feature_names): dictionary[value] = index for trace in log: trace_rep = [0] * count for trace_attribute in str_tr_attr: trace_attr_rep = get_string_trace_attribute_rep(trace, trace_attribute) if trace_attr_rep in dictionary: trace_rep[dictionary[trace_attr_rep]] = 1 for event_attribute in str_ev_attr: values = get_values_event_attribute_for_trace(trace, event_attribute) for value in values: if value in dictionary: trace_rep[dictionary[value]] = 1 for trace_attribute in num_tr_attr: this_value = get_numeric_trace_attribute_rep(trace_attribute) if this_value in dictionary: trace_rep[dictionary[this_value]] = get_numeric_trace_attribute_value( trace, trace_attribute) for event_attribute in num_ev_attr: this_value = get_numeric_event_attribute_rep(event_attribute) if this_value in dictionary: trace_rep[dictionary[this_value]] = get_numeric_event_attribute_value_trace( trace, event_attribute) if str_evsucc_attr: for event_attribute in str_evsucc_attr: values = get_values_event_attribute_succession_for_trace(trace, event_attribute) for value in values: if value in dictionary: trace_rep[dictionary[value]] = 1 data.append(trace_rep) # data = np.asarray(data) return data, feature_names
[docs]def apply(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]: """ Extract the features from an event log (a vector for each trace) Parameters ----------------- log Log parameters Parameters of the algorithm, including: - STR_TRACE_ATTRIBUTES => string trace attributes to consider in the features extraction - STR_EVENT_ATTRIBUTES => string event attributes to consider in the features extraction - NUM_TRACE_ATTRIBUTES => numeric trace attributes to consider in the features extraction - NUM_EVENT_ATTRIBUTES => numeric event attributes to consider in the features extraction - STR_EVSUCC_ATTRIBUTES => succession of event attributes to consider in the features extraction - FEATURE_NAMES => features to consider (in the given order) - ENABLE_ALL_EXTRA_FEATURES => enables all the extra features - ENABLE_CASE_DURATION => enables the case duration as additional feature - ENABLE_TIMES_FROM_FIRST_OCCURRENCE => enables the addition of the times from start of the case, to the end of the case, from the first occurrence of an activity of a case - ADD_CASE_IDENTIFIER_COLUMN => adds the case identifier (string) as column of the feature table (default: False) - ENABLE_TIMES_FROM_LAST_OCCURRENCE => enables the addition of the times from start of the case, to the end of the case, from the last occurrence of an activity of a case - ENABLE_DIRECT_PATHS_TIMES_LAST_OCC => add the duration of the last occurrence of a directed (i, i+1) path in the case as feature - ENABLE_INDIRECT_PATHS_TIMES_LAST_OCC => add the duration of the last occurrence of an indirect (i, j) path in the case as feature - ENABLE_WORK_IN_PROGRESS => enables the work in progress (number of concurrent cases) as a feature - ENABLE_RESOURCE_WORKLOAD => enables the resource workload as a feature - ENABLE_FIRST_LAST_ACTIVITY_INDEX => enables the insertion of the indexes of the activities as features - ENABLE_MAX_CONCURRENT_EVENTS => enables the count of the number of concurrent events inside a case - ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY => enables the count of the number of concurrent events per activity Returns ------------- data Data to provide for decision tree learning feature_names Names of the features, in order """ if parameters is None: parameters = {} str_tr_attr = exec_utils.get_param_value(Parameters.STR_TRACE_ATTRIBUTES, parameters, None) num_tr_attr = exec_utils.get_param_value(Parameters.NUM_TRACE_ATTRIBUTES, parameters, None) str_ev_attr = exec_utils.get_param_value(Parameters.STR_EVENT_ATTRIBUTES, parameters, None) num_ev_attr = exec_utils.get_param_value(Parameters.NUM_EVENT_ATTRIBUTES, parameters, None) str_evsucc_attr = exec_utils.get_param_value(Parameters.STR_EVSUCC_ATTRIBUTES, parameters, None) feature_names = exec_utils.get_param_value(Parameters.FEATURE_NAMES, parameters, None) at_least_one_provided = (str_tr_attr is not None) or (num_tr_attr is not None) or (str_ev_attr is not None) or (num_ev_attr is not None) if str_tr_attr is None: str_tr_attr = [] if num_tr_attr is None: num_tr_attr = [] if str_ev_attr is None: str_ev_attr = [] if num_ev_attr is None: num_ev_attr = [] if type(log) is pd.DataFrame: case_attribute_prefix = exec_utils.get_param_value(Parameters.CASE_ATTRIBUTE_PREFIX, parameters, "case:") if str_tr_attr or num_tr_attr or str_ev_attr or num_ev_attr: columns = list(set([case_attribute_prefix + x for x in str_tr_attr]).union(set([case_attribute_prefix + x for x in num_tr_attr])).union( set(str_ev_attr)).union(set(num_ev_attr))) fea_df = dataframe_utils.get_features_df(log, columns, parameters=parameters) feature_names = list(fea_df.columns) else: fea_df = dataframe_utils.automatic_feature_extraction_df(log, parameters=parameters) feature_names = list(fea_df.columns) return fea_df, feature_names else: enable_all = exec_utils.get_param_value(Parameters.ENABLE_ALL_EXTRA_FEATURES, parameters, False) case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, xes_constants.DEFAULT_TRACEID_KEY) add_case_identifier_column = exec_utils.get_param_value(Parameters.ADD_CASE_IDENTIFIER_COLUMN, parameters, False) enable_case_duration = exec_utils.get_param_value(Parameters.ENABLE_CASE_DURATION, parameters, enable_all) enable_times_from_first_occ = exec_utils.get_param_value(Parameters.ENABLE_TIMES_FROM_FIRST_OCCURRENCE, parameters, enable_all) enable_times_from_last_occ = exec_utils.get_param_value(Parameters.ENABLE_TIMES_FROM_LAST_OCCURRENCE, parameters, enable_all) enable_direct_paths_times_last_occ = exec_utils.get_param_value(Parameters.ENABLE_DIRECT_PATHS_TIMES_LAST_OCC, parameters, enable_all) enable_indirect_paths_times_last_occ = exec_utils.get_param_value( Parameters.ENABLE_INDIRECT_PATHS_TIMES_LAST_OCC, parameters, enable_all) enable_work_in_progress = exec_utils.get_param_value(Parameters.ENABLE_WORK_IN_PROGRESS, parameters, enable_all) enable_resource_workload = exec_utils.get_param_value(Parameters.ENABLE_RESOURCE_WORKLOAD, parameters, enable_all) enable_first_last_activity_index = exec_utils.get_param_value(Parameters.ENABLE_FIRST_LAST_ACTIVITY_INDEX, parameters, enable_all) enable_max_concurrent_events = exec_utils.get_param_value(Parameters.ENABLE_MAX_CONCURRENT_EVENTS, parameters, enable_all) enable_max_concurrent_events_per_activity = exec_utils.get_param_value(Parameters.ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY, parameters, enable_all) log = converter.apply(log, variant=converter.Variants.TO_EVENT_LOG, parameters=parameters) if at_least_one_provided: datas, features_namess = get_representation(log, str_tr_attr, str_ev_attr, num_tr_attr, num_ev_attr, str_evsucc_attr=str_evsucc_attr, feature_names=feature_names) else: datas, features_namess = get_default_representation(log, parameters=parameters) if add_case_identifier_column: for i in range(len(datas)): datas[i] = [log[i].attributes[case_id_key]] + datas[i] features_namess = ["@@case_id_column"] + features_namess # add additional features if enable_case_duration: data, features_names = case_duration(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_times_from_first_occ: data, features_names = times_from_first_occurrence_activity_case(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_times_from_last_occ: data, features_names = times_from_last_occurrence_activity_case(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_direct_paths_times_last_occ: data, features_names = direct_paths_times_last_occ(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_indirect_paths_times_last_occ: data, features_names = indirect_paths_times_last_occ(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_work_in_progress: data, features_names = work_in_progress(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_resource_workload: data, features_names = resource_workload(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_first_last_activity_index: data, features_names = first_last_activity_index_trace(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_max_concurrent_events: data, features_names = max_concurrent_events(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names if enable_max_concurrent_events_per_activity: data, features_names = max_concurrent_events_per_activity(log, parameters=parameters) for i in range(len(datas)): datas[i] = datas[i] + data[i] features_namess = features_namess + features_names return datas, features_namess