Source code for pm4py.objects.log.util.dataframe_utils

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
from enum import Enum
from typing import Optional, Dict, Any, List

import pandas as pd

from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.log.obj import EventStream
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import points_subset
from pm4py.util import xes_constants, pandas_utils


LEGACY_PARQUET_TP_REPLACER = "AAA"
LEGACY_PARQUET_CASECONCEPTNAME = "caseAAAconceptAAAname"


[docs]class Parameters(Enum): PARTITION_COLUMN = "partition_column" CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY CASE_PREFIX = constants.CASE_ATTRIBUTE_PREFIX CASE_ATTRIBUTES = "case_attributes" MANDATORY_ATTRIBUTES = "mandatory_attributes" MAX_NO_CASES = "max_no_cases" MIN_DIFFERENT_OCC_STR_ATTR = 5 MAX_DIFFERENT_OCC_STR_ATTR = 50 TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY PARAM_ARTIFICIAL_START_ACTIVITY = constants.PARAM_ARTIFICIAL_START_ACTIVITY PARAM_ARTIFICIAL_END_ACTIVITY = constants.PARAM_ARTIFICIAL_END_ACTIVITY INDEX_KEY = "index_key" CASE_INDEX_KEY = "case_index_key" USE_EXTREMES_TIMESTAMP = "use_extremes_timestamp"
[docs]def insert_partitioning(df, num_partitions, parameters=None): """ Insert the partitioning in the specified dataframe Parameters ------------- df Dataframe num_partitions Number of partitions parameters Parameters of the algorithm Returns ------------- df Partitioned dataframe """ if parameters is None: parameters = {} case_index_key = exec_utils.get_param_value(Parameters.CASE_INDEX_KEY, parameters, constants.DEFAULT_CASE_INDEX_KEY) partition_column = exec_utils.get_param_value(Parameters.PARTITION_COLUMN, parameters, "@@partitioning") if case_index_key not in df.columns: from pm4py.util import pandas_utils df = pandas_utils.insert_case_index(df, case_index_key) df[partition_column] = df[case_index_key] % num_partitions return df
[docs]def legacy_parquet_support(df, parameters=None): """ For legacy support, Parquet files columns could not contain a ":" that has been arbitrarily replaced by a replacer string. This string substitutes the replacer to the : Parameters --------------- dataframe Dataframe parameters Parameters of the algorithm """ if parameters is None: parameters = {} df.columns = [x.replace(LEGACY_PARQUET_TP_REPLACER, ":") for x in df.columns] return df
[docs]def table_to_stream(table, parameters=None): """ Converts a Pyarrow table to an event stream Parameters ------------ table Pyarrow table parameters Possible parameters of the algorithm """ if parameters is None: parameters = {} dict0 = table.to_pydict() keys = list(dict0.keys()) # for legacy format support if LEGACY_PARQUET_CASECONCEPTNAME in keys: for key in keys: dict0[key.replace(LEGACY_PARQUET_TP_REPLACER, ":")] = dict0.pop(key) stream = EventStream([dict(zip(dict0, i)) for i in zip(*dict0.values())]) return stream
[docs]def table_to_log(table, parameters=None): """ Converts a Pyarrow table to an event log Parameters ------------ table Pyarrow table parameters Possible parameters of the algorithm """ if parameters is None: parameters = {} stream = table_to_stream(table, parameters=parameters) return log_converter.apply(stream, parameters=parameters)
[docs]def convert_timestamp_columns_in_df(df, timest_format=None, timest_columns=None): """ Convert all dataframe columns in a dataframe Parameters ----------- df Dataframe timest_format (If provided) Format of the timestamp columns in the CSV file timest_columns Columns of the CSV that shall be converted into timestamp Returns ------------ df Dataframe with timestamp columns converted """ for col in df.columns: if timest_columns is None or col in timest_columns: if "obj" in str(df[col].dtype) or "str" in str(df[col].dtype): try: if timest_format is None: # makes operations faster if non-ISO8601 but anyhow regular dates are provided df[col] = pd.to_datetime(df[col], utc=True, infer_datetime_format=True) else: df[col] = pd.to_datetime(df[col], utc=True, format=timest_format) except: # print("exception converting column: "+str(col)) pass return df
[docs]def sample_dataframe(df, parameters=None): """ Sample a dataframe on a given number of cases Parameters -------------- df Dataframe parameters Parameters of the algorithm, including: - Parameters.CASE_ID_KEY - Parameters.CASE_ID_TO_RETAIN Returns ------------- sampled_df Sampled dataframe """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) max_no_cases = exec_utils.get_param_value(Parameters.MAX_NO_CASES, parameters, 100) case_ids = list(df[case_id_key].unique()) case_id_to_retain = points_subset.pick_chosen_points_list(min(max_no_cases, len(case_ids)), case_ids) return df[df[case_id_key].isin(case_id_to_retain)]
[docs]def automatic_feature_selection_df(df, parameters=None): """ Performs an automatic feature selection on dataframes, keeping the features useful for ML purposes Parameters --------------- df Dataframe parameters Parameters of the algorithm Returns --------------- featured_df Dataframe with only the features that have been selected """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) mandatory_attributes = exec_utils.get_param_value(Parameters.MANDATORY_ATTRIBUTES, parameters, set(df.columns).intersection( {case_id_key, activity_key, timestamp_key})) min_different_occ_str_attr = exec_utils.get_param_value(Parameters.MIN_DIFFERENT_OCC_STR_ATTR, parameters, 5) max_different_occ_str_attr = exec_utils.get_param_value(Parameters.MAX_DIFFERENT_OCC_STR_ATTR, parameters, 50) cols_dtypes = {x: str(df[x].dtype) for x in df.columns} other_attributes_to_retain = set() no_all_cases = df[case_id_key].nunique() for x, y in cols_dtypes.items(): attr_df = df.dropna(subset=[x]) this_cases = attr_df[case_id_key].nunique() # in any case, keep attributes that appears at least once per case if this_cases == no_all_cases: if "float" in y or "int" in y: # (as in the classic log version) retain always float/int attributes other_attributes_to_retain.add(x) elif "obj" in y or "str" in y: # (as in the classic log version) keep string attributes if they have enough variability, but not too much # (that would be hard to explain) unique_val_count = df[x].nunique() if min_different_occ_str_attr <= unique_val_count <= max_different_occ_str_attr: other_attributes_to_retain.add(x) else: # not consider the attribute after this feature selection if it has other types (for example, date) pass attributes_to_retain = mandatory_attributes.union(other_attributes_to_retain) return df[attributes_to_retain]
[docs]def select_number_column(df: pd.DataFrame, fea_df: pd.DataFrame, col: str, case_id_key=constants.CASE_CONCEPT_NAME) -> pd.DataFrame: """ Extract a column for the features dataframe for the given numeric attribute Parameters -------------- df Dataframe fea_df Feature dataframe col Numeric column case_id_key Case ID key Returns -------------- fea_df Feature dataframe (desidered output) """ df = df.dropna(subset=[col]).groupby(case_id_key).last().reset_index()[[case_id_key, col]] fea_df = fea_df.merge(df, on=[case_id_key], how="left", suffixes=('', '_y')) return fea_df
[docs]def select_string_column(df: pd.DataFrame, fea_df: pd.DataFrame, col: str, case_id_key=constants.CASE_CONCEPT_NAME) -> pd.DataFrame: """ Extract N columns (for N different attribute values; hotencoding) for the features dataframe for the given string attribute Parameters -------------- df Dataframe fea_df Feature dataframe col String column case_id_key Case ID key Returns -------------- fea_df Feature dataframe (desidered output) """ vals = df[col].unique() for val in vals: if val is not None: filt_df_cases = df[df[col] == val][case_id_key].unique() new_col = col + "_" + val.encode('ascii', errors='ignore').decode('ascii').replace(" ", "") fea_df[new_col] = fea_df[case_id_key].isin(filt_df_cases) fea_df[new_col] = fea_df[new_col].astype("int") return fea_df
[docs]def get_features_df(df: pd.DataFrame, list_columns: List[str], parameters: Optional[Dict[Any, Any]] = None) -> pd.DataFrame: """ Given a dataframe and a list of columns, performs an automatic feature extraction Parameters --------------- df Dataframe list_column List of column to consider in the feature extraction parameters Parameters of the algorithm, including: - Parameters.CASE_ID_KEY: the case ID Returns --------------- fea_df Feature dataframe (desidered output) """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) fea_df = pd.DataFrame({case_id_key: sorted(list(df[case_id_key].unique()))}) for col in list_columns: if "obj" in str(df[col].dtype) or "str" in str(df[col].dtype): fea_df = select_string_column(df, fea_df, col, case_id_key=case_id_key) elif "float" in str(df[col].dtype) or "int" in str(df[col].dtype): fea_df = select_number_column(df, fea_df, col, case_id_key=case_id_key) fea_df = fea_df.sort_values(case_id_key) return fea_df
[docs]def automatic_feature_extraction_df(df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None) -> pd.DataFrame: """ Performs an automatic feature extraction given a dataframe Parameters -------------- df Dataframe parameters Parameters of the algorithm, including: - Parameters.CASE_ID_KEY: the case ID - Parameters.MIN_DIFFERENT_OCC_STR_ATTR - Parameters.MAX_DIFFERENT_OCC_STR_ATTR Returns -------------- fea_df Dataframe with the features """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) fea_sel_df = automatic_feature_selection_df(df, parameters=parameters) columns = set(fea_sel_df.columns) if case_id_key in columns: columns.remove(case_id_key) if timestamp_key in columns: columns.remove(timestamp_key) return get_features_df(fea_sel_df, list(columns), parameters=parameters)
[docs]def insert_artificial_start_end(df0: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None) -> pd.DataFrame: """ Inserts the artificial start/end activities in a Pandas dataframe Parameters ------------------ df0 Dataframe parameters Parameters of the algorithm, including: - Parameters.CASE_ID_KEY: the case identifier - Parameters.TIMESTAMP_KEY: the timestamp - Parameters.ACTIVITY_KEY: the activity Returns ----------------- enriched_df Dataframe with artificial start/end activities """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) use_extremes_timestamp = exec_utils.get_param_value(Parameters.USE_EXTREMES_TIMESTAMP, parameters, False) artificial_start_activity = exec_utils.get_param_value(Parameters.PARAM_ARTIFICIAL_START_ACTIVITY, parameters, constants.DEFAULT_ARTIFICIAL_START_ACTIVITY) artificial_end_activity = exec_utils.get_param_value(Parameters.PARAM_ARTIFICIAL_END_ACTIVITY, parameters, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY) index_key = exec_utils.get_param_value(Parameters.INDEX_KEY, parameters, constants.DEFAULT_INDEX_KEY) df = df0.copy() df = pandas_utils.insert_index(df, index_key) df = df.sort_values([case_id_key, timestamp_key, index_key]) start_df = df[[case_id_key, timestamp_key]].groupby(case_id_key).first().reset_index() end_df = df[[case_id_key, timestamp_key]].groupby(case_id_key).last().reset_index() # stability trick: remove 1ms from the artificial start activity timestamp, add 1ms to the artificial end activity timestamp if use_extremes_timestamp: start_df[timestamp_key] = pd.Timestamp.min end_df[timestamp_key] = pd.Timestamp.max start_df[timestamp_key] = start_df[timestamp_key].dt.tz_localize("utc") end_df[timestamp_key] = end_df[timestamp_key].dt.tz_localize("utc") else: start_df[timestamp_key] = start_df[timestamp_key] - pd.Timedelta("1 ms") end_df[timestamp_key] = end_df[timestamp_key] + pd.Timedelta("1 ms") start_df[activity_key] = artificial_start_activity end_df[activity_key] = artificial_end_activity df = pd.concat([start_df, df, end_df]) df = pandas_utils.insert_index(df, index_key) df = df.sort_values([case_id_key, timestamp_key, index_key]) df.attrs = df0.attrs return df
[docs]def dataframe_to_activity_case_table(df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None): """ Transforms a Pandas dataframe into: - an "activity" table, containing the events and their attributes - a "case" table, containing the cases and their attributes Parameters -------------- df Dataframe parameters Parameters of the algorithm that should be used, including: - Parameters.CASE_ID_KEY => the column to be used as case ID (shall be included both in the activity table and the case table) - Parameters.CASE_PREFIX => if a list of attributes at the case level is not provided, then all the ones of the dataframe starting with one of these are considered. - Parameters.CASE_ATTRIBUTES => the attributes of the dataframe to be used as case columns Returns --------------- activity_table Activity table case_table Case table """ if parameters is None: parameters = {} # make sure we start from a dataframe object df = log_converter.apply(df, variant=log_converter.Variants.TO_DATA_FRAME, parameters=parameters) case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) case_id_prefix = exec_utils.get_param_value(Parameters.CASE_PREFIX, parameters, constants.CASE_ATTRIBUTE_PREFIX) case_attributes = exec_utils.get_param_value(Parameters.CASE_ATTRIBUTES, parameters, set([x for x in df.columns if x.startswith(case_id_prefix)])) event_attributes = set([x for x in df.columns if x not in case_attributes]) activity_table = df[event_attributes.union({case_id_key})] case_table = df[case_attributes.union({case_id_key})].groupby(case_id_key).first().reset_index() return activity_table, case_table