Source code for pm4py.algo.discovery.dfg.variants.performance

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from collections import Counter
from enum import Enum
from statistics import mean, median, stdev

from pm4py.util import constants, exec_utils
from pm4py.util import xes_constants as xes_util
from pm4py.util.business_hours import BusinessHours
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream


[docs]class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY AGGREGATION_MEASURE = "aggregationMeasure" BUSINESS_HOURS = "business_hours" WORKTIMING = "worktiming" WEEKENDS = "weekends" WORKCALENDAR = "workcalendar"
[docs]def apply(log: Union[EventLog, EventStream], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[Tuple[str, str], float]: return performance(log, parameters=parameters)
[docs]def performance(log: Union[EventLog, EventStream], parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Dict[Tuple[str, str], float]: """ Measure performance between couples of attributes in the DFG graph Parameters ---------- log Log parameters Possible parameters passed to the algorithms: aggregationMeasure -> performance aggregation measure (min, max, mean, median) activity_key -> Attribute to use as activity timestamp_key -> Attribute to use as timestamp - Parameters.BUSINESS_HOURS => calculates the difference of time based on the business hours, not the total time. Default: False - Parameters.WORKTIMING => work schedule of the company (provided as a list where the first number is the start of the work time, and the second number is the end of the work time), if business hours are enabled Default: [7, 17] (work shift from 07:00 to 17:00) - Parameters.WEEKENDS => indexes of the days of the week that are weekend Default: [6, 7] (weekends are Saturday and Sunday) Returns ------- dfg DFG graph """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_util.DEFAULT_NAME_KEY) start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_util.DEFAULT_TIMESTAMP_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_util.DEFAULT_TIMESTAMP_KEY) aggregation_measure = exec_utils.get_param_value(Parameters.AGGREGATION_MEASURE, parameters, "mean") business_hours = exec_utils.get_param_value(Parameters.BUSINESS_HOURS, parameters, False) worktiming = exec_utils.get_param_value(Parameters.WORKTIMING, parameters, [7, 17]) weekends = exec_utils.get_param_value(Parameters.WEEKENDS, parameters, [6, 7]) workcalendar = exec_utils.get_param_value(Parameters.WORKCALENDAR, parameters, constants.DEFAULT_BUSINESS_HOURS_WORKCALENDAR) if business_hours: dfgs0 = map((lambda t: [ ((t[i - 1][activity_key], t[i][activity_key]), max(0, BusinessHours(t[i - 1][timestamp_key].replace(tzinfo=None), t[i][start_timestamp_key].replace(tzinfo=None), worktiming=worktiming, weekends=weekends, workcalendar=workcalendar).getseconds())) for i in range(1, len(t))]), log) else: dfgs0 = map((lambda t: [ ((t[i - 1][activity_key], t[i][activity_key]), max(0, (t[i][start_timestamp_key] - t[i - 1][timestamp_key]).total_seconds())) for i in range(1, len(t))]), log) ret0 = {} for el in dfgs0: for couple in el: if not couple[0] in ret0: ret0[couple[0]] = [] ret0[couple[0]].append(couple[1]) ret = Counter() for key in ret0: if aggregation_measure == "median": ret[key] = median(ret0[key]) elif aggregation_measure == "min": ret[key] = min(ret0[key]) elif aggregation_measure == "max": ret[key] = max(ret0[key]) elif aggregation_measure == "stdev": ret[key] = stdev(ret0[key]) if len(ret0[key]) > 1 else 0 elif aggregation_measure == "sum": ret[key] = sum(ret0[key]) elif aggregation_measure == "raw_values": ret[key] = ret0[key] elif aggregation_measure == "all": ret[key] = {"median": median(ret0[key]), "min": min(ret0[key]), "max": max(ret0[key]), "stdev": stdev(ret0[key]) if len(ret0[key]) > 1 else 0, "sum": sum(ret0[key]), "mean": mean(ret0[key])} else: ret[key] = mean(ret0[key]) return ret