Source code for pm4py.objects.log.util.get_prefixes

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from copy import deepcopy

from pm4py.objects.log.obj import EventLog, Trace
from pm4py.objects.log.util import basic_filter
from pm4py.util import xes_constants as xes
from pm4py.util import constants
import logging


[docs]def get_prefixes_from_log(log: EventLog, length: int) -> EventLog: """ Gets the prefixes of a log of a given length Parameters ---------------- log Event log length Length Returns ---------------- prefix_log Log contain the prefixes: - if a trace has lower or identical length, it is included as-is - if a trace has greater length, it is cut """ prefix_log = EventLog(list(), attributes=log.attributes, extensions=log.extensions, classifiers=log.classifiers, omni_present=log.omni_present, properties=log.properties) for trace in log: if len(trace) <= length: prefix_log.append(trace) else: new_trace = Trace(attributes=trace.attributes) for i in range(length): new_trace.append(trace[i]) prefix_log.append(new_trace) return prefix_log
[docs]def get_log_with_log_prefixes(log, parameters=None): """ Gets an extended log that contains, in order, all the prefixes for a case of the original log Parameters -------------- log Original log parameters Possible parameters of the algorithm Returns ------------- all_prefixes_log Log with all the prefixes change_indexes Indexes of the extended log where there was a change between cases """ all_prefixes_log = EventLog() change_indexes = [] for trace in log: cumulative_trace = Trace() for event in trace: all_prefixes_log.append(deepcopy(cumulative_trace)) cumulative_trace.append(event) all_prefixes_log.append(deepcopy(cumulative_trace)) change_indexes.append([len(all_prefixes_log) - 1] * len(trace)) return all_prefixes_log, change_indexes
[docs]def get_log_traces_to_activities(log, activities, parameters=None): """ Get sublogs taking to each one of the specified activities Parameters ------------- log Trace log object activities List of activities in the log parameters Possible parameters of the algorithm, including: PARAMETER_CONSTANT_ACTIVITY_KEY -> activity PARAMETER_CONSTANT_TIMESTAMP_KEY -> timestamp Returns ------------- list_logs List of event logs taking to the first occurrence of each activity considered_activities All activities that are effectively have been inserted in the list of logs (in some of them, the resulting log may be empty) """ if parameters is None: parameters = {} activity_key = parameters[ constants.PARAMETER_CONSTANT_ACTIVITY_KEY] if constants.PARAMETER_CONSTANT_ACTIVITY_KEY in parameters else xes.DEFAULT_NAME_KEY parameters[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = activity_key list_logs = [] considered_activities = [] for act in activities: other_acts = [ac for ac in activities if not ac == act] parameters_filt1 = deepcopy(parameters) parameters_filt2 = deepcopy(parameters) parameters_filt1["positive"] = True parameters_filt2["positive"] = False filtered_log = basic_filter.filter_log_traces_attr(log, [act], parameters=parameters_filt1) logging.info("get_log_traces_to_activities activities=" + str(activities) + " act=" + str( act) + " 0 len(filtered_log)=" + str(len(filtered_log))) filtered_log = basic_filter.filter_log_traces_attr(filtered_log, other_acts, parameters=parameters_filt2) logging.info("get_log_traces_to_activities activities=" + str(activities) + " act=" + str( act) + " 1 len(filtered_log)=" + str(len(filtered_log))) filtered_log, act_durations = get_log_traces_until_activity(filtered_log, act, parameters=parameters) logging.info("get_log_traces_to_activities activities=" + str(activities) + " act=" + str( act) + " 2 len(filtered_log)=" + str(len(filtered_log))) if filtered_log: list_logs.append(filtered_log) considered_activities.append(act) return list_logs, considered_activities
[docs]def get_log_traces_until_activity(log, activity, parameters=None): """ Gets a reduced version of the log containing, for each trace, only the events before a specified activity Parameters ------------- log Trace log activity Activity to reach parameters Possible parameters of the algorithm, including: PARAMETER_CONSTANT_ACTIVITY_KEY -> activity PARAMETER_CONSTANT_TIMESTAMP_KEY -> timestamp Returns ------------- new_log New log """ if parameters is None: parameters = {} activity_key = parameters[ constants.PARAMETER_CONSTANT_ACTIVITY_KEY] if constants.PARAMETER_CONSTANT_ACTIVITY_KEY in parameters else xes.DEFAULT_NAME_KEY timestamp_key = parameters[ constants.PARAMETER_CONSTANT_TIMESTAMP_KEY] if constants.PARAMETER_CONSTANT_TIMESTAMP_KEY in parameters else xes.DEFAULT_TIMESTAMP_KEY duration_attribute = parameters["duration"] if "duration" in parameters else None use_future_attributes = parameters["use_future_attributes"] if "use_future_attributes" in parameters else False new_log = EventLog() traces_interlapsed_time_to_act = [] i = 0 while i < len(log): ev_in_tr_w_act = sorted([j for j in range(len(log[i])) if log[i][j][activity_key] == activity]) if ev_in_tr_w_act and ev_in_tr_w_act[0] > 0: new_trace = Trace(log[i][0:ev_in_tr_w_act[0]]) for attr in log[i].attributes: new_trace.attributes[attr] = log[i].attributes[attr] if duration_attribute is None: try: curr_trace_interlapsed_time_to_act = log[i][ev_in_tr_w_act[0]][timestamp_key].timestamp() - \ log[i][ev_in_tr_w_act[0] - 1][timestamp_key].timestamp() except: curr_trace_interlapsed_time_to_act = log[i][ev_in_tr_w_act[0]][timestamp_key] - \ log[i][ev_in_tr_w_act[0] - 1][timestamp_key] logging.error("timestamp_key not timestamp") else: curr_trace_interlapsed_time_to_act = log[i][ev_in_tr_w_act[0]][duration_attribute] traces_interlapsed_time_to_act.append(curr_trace_interlapsed_time_to_act) if use_future_attributes: for j in range(ev_in_tr_w_act[0] + 1, len(log[i])): new_ev = deepcopy(log[i][j]) if activity_key in new_ev: del new_ev[activity_key] new_trace.append(new_ev) new_log.append(new_trace) i = i + 1 return new_log, traces_interlapsed_time_to_act