Source code for pm4py.objects.conversion.log.variants.to_event_stream

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import math
import pkgutil
from copy import deepcopy, copy
from enum import Enum

from pm4py.objects.conversion.log import constants
from pm4py.objects.log import obj as log_instance
from pm4py.objects.log.obj import EventLog, Event, XESExtension
from pm4py.util import constants as pmutil
from pm4py.util import exec_utils, pandas_utils, xes_constants


[docs]class Parameters(Enum): DEEP_COPY = constants.DEEPCOPY STREAM_POST_PROCESSING = constants.STREAM_POSTPROCESSING CASE_ATTRIBUTE_PREFIX = "case_attribute_prefix" INCLUDE_CASE_ATTRIBUTES = "include_case_attributes" COMPRESS = "compress"
def __postprocess_stream(list_events): """ Postprocess the list of events of the stream in order to make sure that there are no NaN/NaT values Parameters ------------- list_events List of events Returns ------------- list_events Postprocessed stream """ import pandas for event in list_events: event_keys = list(event.keys()) for k in event_keys: typ_k = type(event[k]) if typ_k is pandas._libs.tslibs.nattype.NaTType: del event[k] continue elif (typ_k is float or typ_k is int) and math.isnan(event[k]): del event[k] continue elif event[k] is None: del event[k] continue return list_events def __compress(list_events): """ Compress a list of events, using one instantiation for the same key/value. Parameters -------------- list_events List of events of the stream Returns -------------- :param list_events: :return: """ compress_dict = {} i = 0 while i < len(list_events): # create a new event where keys and values are compressed comp_ev = {} for k, v in list_events[i].items(): # check if the key has already been instantiated. # in that case, use the current instantiation. if k not in compress_dict: compress_dict[k] = k else: k = compress_dict[k] # check if the value has already been instantiated. # in that case, use the current instantiation if v not in compress_dict: compress_dict[v] = v else: v = compress_dict[v] # saves the compressed keys and values in the dictionary comp_ev[k] = v list_events[i] = comp_ev i = i + 1 return list_events
[docs]def apply(log, parameters=None): """ Converts the event log to an event stream Parameters ---------- log: :class:`pm4py.log.log.EventLog` An Event log include_case_attributes: Default is True case_attribute_prefix: Default is 'case:' enable_deepcopy Enables deepcopy (avoid references between input and output objects) Returns ------- log : :class:`pm4py.log.log.EventLog` An Event stream """ if parameters is None: parameters = {} stream_post_processing = exec_utils.get_param_value(Parameters.STREAM_POST_PROCESSING, parameters, False) case_pref = exec_utils.get_param_value(Parameters.CASE_ATTRIBUTE_PREFIX, parameters, 'case:') enable_deepcopy = exec_utils.get_param_value(Parameters.DEEP_COPY, parameters, True) include_case_attributes = exec_utils.get_param_value(Parameters.INCLUDE_CASE_ATTRIBUTES, parameters, True) compress = exec_utils.get_param_value(Parameters.COMPRESS, parameters, False) if pkgutil.find_loader("pandas"): import pandas if isinstance(log, pandas.DataFrame): return __transform_dataframe_to_event_stream(log, stream_post_processing=stream_post_processing, compress=compress) if isinstance(log, EventLog): return __transform_event_log_to_event_stream(log, include_case_attributes=include_case_attributes, case_attribute_prefix=case_pref, enable_deepcopy=enable_deepcopy) return log
def __detect_extensions(df): extensions = set() for col in df.columns: for single_key in col.split(':'): for ext in XESExtension: if single_key == ext.prefix: extensions.add(ext) return extensions def __transform_dataframe_to_event_stream(dataframe, stream_post_processing=False, compress=True): """ Transforms a dataframe to an event stream Parameters ------------------ dataframe Pandas dataframe stream_post_processing Boolean value that enables the post processing to remove NaN / NaT values compress Compresses the stream in order to reduce the memory utilization after the conversion Returns ------------------ stream Event stream """ extensions = __detect_extensions(dataframe) list_events = pandas_utils.to_dict_records(dataframe) if stream_post_processing: list_events = __postprocess_stream(list_events) if compress: list_events = __compress(list_events) for i in range(len(list_events)): list_events[i] = Event(list_events[i]) if hasattr(dataframe, 'attrs'): properties = copy(dataframe.attrs) if pmutil.PARAMETER_CONSTANT_CASEID_KEY in properties: del properties[pmutil.PARAMETER_CONSTANT_CASEID_KEY] else: properties = {} stream = log_instance.EventStream(list_events, attributes={'origin': 'csv'}, properties=properties) for ex in extensions: stream.extensions[ex.name] = { xes_constants.KEY_PREFIX: ex.prefix, xes_constants.KEY_URI: ex.uri} return stream def __transform_dataframe_to_event_stream_new(dataframe, stream_post_processing=False, compress=False): """ Transforms a dataframe to an event stream Parameters ------------------ dataframe Pandas dataframe stream_post_processing Boolean value that enables the post processing to remove NaN / NaT values compress Compresses the stream in order to reduce the memory utilization after the conversion Returns ------------------ stream Event stream """ extensions = __detect_extensions(dataframe) columns_names = list(dataframe.columns) columns_corr = [] for c in columns_names: columns_corr.append(dataframe[c].to_numpy()) length = columns_corr[-1].size list_events = [] for i in range(length): eve = {} for j in range(len(columns_names)): eve[columns_names[j]] = columns_corr[j][i] list_events.append(eve) if stream_post_processing: list_events = __postprocess_stream(list_events) if compress: list_events = __compress(list_events) for i in range(len(list_events)): list_events[i] = Event(list_events[i]) if hasattr(dataframe, 'attrs'): properties = copy(dataframe.attrs) if pmutil.PARAMETER_CONSTANT_CASEID_KEY in properties: del properties[pmutil.PARAMETER_CONSTANT_CASEID_KEY] else: properties = {} stream = log_instance.EventStream(list_events, attributes={'origin': 'csv'}, properties=properties) for ex in extensions: stream.extensions[ex.name] = { xes_constants.KEY_PREFIX: ex.prefix, xes_constants.KEY_URI: ex.uri} return stream def __transform_event_log_to_event_stream(log, include_case_attributes=True, case_attribute_prefix=pmutil.CASE_ATTRIBUTE_PREFIX, enable_deepcopy=False): """ Converts the event log to an event stream Parameters ---------- log: :class:`pm4py.log.log.EventLog` An Event log include_case_attributes: Default is True case_attribute_prefix: Default is 'case:' enable_deepcopy Enables deepcopy (avoid references between input and output objects) Returns ------- log : :class:`pm4py.log.log.EventLog` An Event stream """ event_stream = log_instance.EventStream([], attributes=log.attributes, classifiers=log.classifiers, omni_present=log.omni_present, extensions=log.extensions, properties=log.properties) for index, trace in enumerate(log): for event in trace: new_event = deepcopy(event) if enable_deepcopy else event if include_case_attributes: for key, value in trace.attributes.items(): new_event[case_attribute_prefix + key] = value else: new_event[pmutil.CASE_ATTRIBUTE_GLUE] = str(index) event_stream.append(new_event) return event_stream