Source code for pm4py.algo.clustering.trace_attribute_driven.variants.suc_dist_calc

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import pandas as pd
import numpy as np
from pm4py.algo.clustering.trace_attribute_driven.util import filter_subsets
from scipy.spatial.distance import pdist
from collections import Counter
from pm4py.util import exec_utils, pandas_utils, constants
from enum import Enum
from pm4py.util import constants


[docs]class Parameters(Enum): ATTRIBUTE_KEY = constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY SINGLE = "single" BINARIZE = "binarize" POSITIVE = "positive" LOWER_PERCENT = "lower_percent"
[docs]def occu_suc(dfg, filter_percent): ''' :param dfg: a counter containing all the direct succession relationship with frequency :param filter_percent: clarify the percentage of direct succession one wants to preserve :return: dataframe of direct succession relationship with frequency ''' df = pd.DataFrame.from_dict(dict(dfg), orient='index', columns=['freq']) df = df.sort_values(axis=0, by=['freq'], ascending=False) df = df.reset_index().rename(columns={'index': 'suc'}) # delete duplicated successions df = df.drop_duplicates('suc', keep='first') # delete self succession # filter out direct succession by percentage filter = list(range(0, round(filter_percent * len(df)))) df = pandas_utils.insert_index(df) df = df[df[constants.DEFAULT_INDEX_KEY].isin(filter)].reset_index(drop=True) return df
[docs]def occu_var_suc(var_list, parameters=None): ''' return dataframe that shows the frequency of each element(direct succession) in each variant list :param var_list: :param parameters: binarize states if user wants to binarize the frequency, default is binarized :return: ''' if parameters is None: parameters = {} binarize = exec_utils.get_param_value(Parameters.BINARIZE, parameters, True) comb_list = [var_list[i] + constants.DEFAULT_VARIANT_SEP + var_list[i + 1] for i in range(len(var_list) - 1)] result = Counter(comb_list) # count number of occurrence of each element df = pd.DataFrame.from_dict(dict(result), orient='index', columns=['freq']) df = df.reset_index().rename(columns={'index': 'direct_suc'}) if (binarize): # Binarize succession frequency (optional) df.loc[df.freq > 1, 'freq'] = 1 return df else: return df
[docs]def suc_sim(var_list_1, var_list_2, log1, log2, freq_thres, num, parameters=None): ''' this function compare the activity similarity between two sublogs via the two lists of variants. :param var_list_1: lists of variants in sublog 1 :param var_list_2: lists of variants in sublog 2 :param freq_thres: same as sublog2df() :param log1: input sublog1 of sublog2df(), which must correspond to var_list_1 :param log2: input sublog2 of sublog2df(), which must correspond to var_list_2 :return: the distance matrix between 2 sublogs in which each element is the distance between two variants. ''' if parameters is None: parameters = {} single = exec_utils.get_param_value(Parameters.SINGLE, parameters, False) if len(var_list_1) >= len(var_list_2): max_len = len(var_list_1) min_len = len(var_list_2) max_var = var_list_1 min_var = var_list_2 var_count_max = filter_subsets.sublog2df(log1, freq_thres, num)['count'] var_count_min = filter_subsets.sublog2df(log2, freq_thres, num)['count'] else: max_len = len(var_list_2) min_len = len(var_list_1) max_var = var_list_2 min_var = var_list_1 var_count_max = filter_subsets.sublog2df(log2, freq_thres, num)['count'] var_count_min = filter_subsets.sublog2df(log1, freq_thres, num)['count'] dist_matrix = np.zeros((max_len, min_len)) max_per_var = np.zeros(max_len) max_freq = np.zeros(max_len) col_sum = np.zeros(max_len) if var_list_1 == var_list_2: print("Please give different variant lists!") else: for i in range(max_len): dist_vec = np.zeros(min_len) df_1 = occu_var_suc(max_var[i], parameters={"binarize": False}) for j in range(min_len): df_2 = occu_var_suc(min_var[j], parameters={"binarize": False}) df = pd.merge(df_1, df_2, how='outer', on='direct_suc').fillna(0) # cosine similarity is used to calculate trace similarity dist_vec[j] = (pdist(np.array([df['freq_x'].values, df['freq_y'].values]), 'cosine')[0]) dist_matrix[i][j] = dist_vec[j] if (single): if abs(dist_vec[j]) <= 1e-8: max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[j] max_per_var[i] = dist_vec[j] * max_freq[i] break elif j == (min_len - 1): max_loc_col = np.argmin(dist_vec) # location of max value max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[max_loc_col] max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] else: col_sum[i] += dist_vec[j] * var_count_max.iloc[i] * var_count_min.iloc[j] if (single): # single linkage dist = np.sum(max_per_var) / np.sum(max_freq) else: vmax_vec = (var_count_max.values).reshape(-1, 1) vmin_vec = (var_count_min.values).reshape(1, -1) vec_sum = np.sum(np.dot(vmax_vec, vmin_vec)) dist = np.sum(col_sum) / vec_sum return dist
[docs]def suc_sim_dual(var_list_1, var_list_2, log1, log2, freq_thres, num, parameters=None): ''' this function compare the activity similarity between two sublogs via the two lists of variants. :param var_list_1: lists of variants in sublog 1 :param var_list_2: lists of variants in sublog 2 :param freq_thres: same as sublog2df() :param log1: input sublog1 of sublog2df(), which must correspond to var_list_1 :param log2: input sublog2 of sublog2df(), which must correspond to var_list_2 :return: the distance matrix between 2 sublogs in which each element is the distance between two variants. ''' if parameters is None: parameters = {} single = exec_utils.get_param_value(Parameters.SINGLE, parameters, False) if len(var_list_1) >= len(var_list_2): max_len = len(var_list_1) min_len = len(var_list_2) max_var = var_list_1 min_var = var_list_2 var_count_max = filter_subsets.sublog2df(log1, freq_thres, num)['count'] var_count_min = filter_subsets.sublog2df(log2, freq_thres, num)['count'] else: max_len = len(var_list_2) min_len = len(var_list_1) max_var = var_list_2 min_var = var_list_1 var_count_max = filter_subsets.sublog2df(log2, freq_thres, num)['count'] var_count_min = filter_subsets.sublog2df(log1, freq_thres, num)['count'] dist_matrix = np.zeros((max_len, min_len)) max_per_var = np.zeros(max_len) max_freq = np.zeros(max_len) min_freq = np.zeros(min_len) min_per_var = np.zeros(min_len) col_sum = np.zeros(max_len) index_rec = set(list(range(min_len))) if var_list_1 == var_list_2: print("Please give different variant lists!") else: for i in range(max_len): dist_vec = np.zeros(min_len) df_1 = occu_var_suc(max_var[i], parameters={"binarize": False}) for j in range(min_len): df_2 = occu_var_suc(min_var[j], parameters={"binarize": False}) df = pd.merge(df_1, df_2, how='outer', on='direct_suc').fillna(0) # cosine similarity is used to calculate trace similarity dist_vec[j] = (pdist(np.array([df['freq_x'].values, df['freq_y'].values]), 'cosine')[0]) dist_matrix[i][j] = dist_vec[j] if j == (min_len - 1): max_loc_col = np.argmin(dist_vec) if abs(dist_vec[max_loc_col]) <= 1e-8: index_rec.discard(max_loc_col) max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[max_loc_col] * 2 max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] * 2 else: max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[max_loc_col] max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] if (len(index_rec) != 0): for i in list(index_rec): min_loc_row = np.argmin(dist_matrix[:, i]) min_freq[i] = var_count_max.iloc[min_loc_row] * var_count_min.iloc[i] min_per_var[i] = dist_matrix[min_loc_row, i] * min_freq[i] if (single): # single linkage dist = np.sum(max_per_var) / np.sum(max_freq) else: vmax_vec = (var_count_max.values).reshape(-1, 1) vmin_vec = (var_count_min.values).reshape(1, -1) vec_sum = np.sum(np.dot(vmax_vec, vmin_vec)) dist = np.sum(col_sum) / vec_sum return dist
[docs]def suc_sim_percent(log1, log2, percent_1, percent_2): ''' this function compare the activity similarity between two sublogs via the two lists of variants. :param var_list_1: lists of variants in sublog 1 :param var_list_2: lists of variants in sublog 2 :param freq_thres: same as sublog2df() :param log1: input sublog1 of sublog2df(), which must correspond to var_list_1 :param log2: input sublog2 of sublog2df(), which must correspond to var_list_2 :return: the distance matrix between 2 sublogs in which each element is the distance between two variants. ''' (dataframe_1, var_list_1) = filter_subsets.sublog_percent(log1, percent_1) (dataframe_2, var_list_2) = filter_subsets.sublog_percent(log2, percent_2) if len(var_list_1) >= len(var_list_2): max_len = len(var_list_1) min_len = len(var_list_2) max_var = var_list_1 min_var = var_list_2 var_count_max = dataframe_1['count'] var_count_min = dataframe_2['count'] else: max_len = len(var_list_2) min_len = len(var_list_1) max_var = var_list_2 min_var = var_list_1 var_count_max = dataframe_2['count'] var_count_min = dataframe_1['count'] dist_matrix = np.zeros((max_len, min_len)) max_per_var = np.zeros(max_len) max_freq = np.zeros(max_len) min_freq = np.zeros(min_len) min_per_var = np.zeros(min_len) col_sum = np.zeros(max_len) index_rec = set(list(range(min_len))) if var_list_1 == var_list_2: dist = 0 else: for i in range(max_len): dist_vec = np.zeros(min_len) df_1 = occu_var_suc(max_var[i], parameters={"binarize": False}) for j in range(min_len): df_2 = occu_var_suc(min_var[j], parameters={"binarize": False}) df = pd.merge(df_1, df_2, how='outer', on='direct_suc').fillna(0) # cosine similarity is used to calculate trace similarity dist_vec[j] = (pdist(np.array([df['freq_x'].values, df['freq_y'].values]), 'cosine')[0]) if (np.isnan(dist_vec[j]) == True): dist_vec[j] = 1 dist_matrix[i][j] = dist_vec[j] if j == (min_len - 1): max_loc_col = np.argmin(dist_vec) if abs(dist_vec[max_loc_col]) <= 1e-8: index_rec.discard(max_loc_col) max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[max_loc_col] * 2 max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] * 2 else: max_freq[i] = var_count_max.iloc[i] * var_count_min.iloc[max_loc_col] max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] if (len(index_rec) != 0): for i in list(index_rec): min_loc_row = np.argmin(dist_matrix[:, i]) min_freq[i] = var_count_max.iloc[min_loc_row] * var_count_min.iloc[i] min_per_var[i] = dist_matrix[min_loc_row, i] * min_freq[i] # single linkage dist = (np.sum(max_per_var) + np.sum(min_per_var)) / (np.sum(max_freq) + np.sum(min_freq)) return dist
[docs]def suc_sim_percent_avg(log1, log2, percent_1, percent_2): ''' this function compare the activity similarity between two sublogs via the two lists of variants. :param var_list_1: lists of variants in sublog 1 :param var_list_2: lists of variants in sublog 2 :param freq_thres: same as sublog2df() :param log1: input sublog1 of sublog2df(), which must correspond to var_list_1 :param log2: input sublog2 of sublog2df(), which must correspond to var_list_2 :return: the distance matrix between 2 sublogs in which each element is the distance between two variants. ''' (dataframe_1, var_list_1) = filter_subsets.sublog_percent(log1, percent_1) (dataframe_2, var_list_2) = filter_subsets.sublog_percent(log2, percent_2) if len(var_list_1) >= len(var_list_2): max_len = len(var_list_1) min_len = len(var_list_2) max_var = var_list_1 min_var = var_list_2 var_count_max = dataframe_1['count'] var_count_min = dataframe_2['count'] else: max_len = len(var_list_2) min_len = len(var_list_1) max_var = var_list_2 min_var = var_list_1 var_count_max = dataframe_2['count'] var_count_min = dataframe_1['count'] dist_matrix = np.zeros((max_len, min_len)) col_sum = np.zeros(max_len) for i in range(max_len): dist_vec = np.zeros(min_len) df_1 = occu_var_suc(max_var[i], parameters={"binarize": False}) for j in range(min_len): df_2 = occu_var_suc(min_var[j], parameters={"binarize": False}) df = pd.merge(df_1, df_2, how='outer', on='direct_suc').fillna(0) # cosine similarity is used to calculate trace similarity dist_vec[j] = (pdist(np.array([df['freq_x'].values, df['freq_y'].values]), 'cosine')[0]) if (np.isnan(dist_vec[j]) == True): dist_vec[j] = 1 dist_matrix[i][j] = dist_vec[j] col_sum[i] += dist_vec[j] * var_count_max.iloc[i] * var_count_min.iloc[j] # single linkage vmax_vec = (var_count_max.values).reshape(-1, 1) vmin_vec = (var_count_min.values).reshape(1, -1) vec_sum = np.sum(np.dot(vmax_vec, vmin_vec)) dist = np.sum(col_sum) / vec_sum return dist