Source code for pm4py.streaming.algo.conformance.footprints.variants.classic

from pm4py.util import constants, exec_utils, xes_constants
from pm4py.streaming.util.dictio import generator
from pm4py.streaming.algo.interface import StreamingAlgorithm
import logging
from copy import copy
from threading import Lock


[docs]class Parameters: DICT_VARIANT = "dict_variant" DICT_ID = "dict_id" CASE_DICT_ID = "case_dict_id" DEV_DICT_ID = "dev_dict_id" CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
START_ACTIVITIES = "start_activities" END_ACTIVITIES = "end_activities" ACTIVITIES = "activities" SEQUENCE = "sequence" PARALLEL = "parallel"
[docs]class FootprintsStreamingConformance(StreamingAlgorithm): def __init__(self, footprints, parameters=None): """ Initialize the footprints streaming conformance object Parameters --------------- footprints Footprints parameters Parameters of the algorithm """ self.footprints = footprints self.case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME) self.activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) self.start_activities = footprints[START_ACTIVITIES] self.end_activities = footprints[END_ACTIVITIES] self.activities = footprints[ACTIVITIES] self.all_fps = set(footprints[SEQUENCE]).union(set(footprints[PARALLEL])) self.build_dictionaries(parameters=parameters) StreamingAlgorithm.__init__(self)
[docs] def build_dictionaries(self, parameters): """ Builds the dictionaries needed to store the information during the replay Parameters --------------- parameters Parameters: - Parameters.DICT_VARIANT: type of dictionary to use - Parameters.CASE_DICT_ID: identifier of the dictionary hosting the last activity of a case (1) - Parameters.DEV_DICT_ID: identifier of the dictionary hosting the deviations (2) """ dict_variant = exec_utils.get_param_value(Parameters.DICT_VARIANT, parameters, generator.Variants.THREAD_SAFE) case_dict_id = exec_utils.get_param_value(Parameters.CASE_DICT_ID, parameters, 0) dev_dict_id = exec_utils.get_param_value(Parameters.DEV_DICT_ID, parameters, 1) parameters_case_dict = copy(parameters) parameters_case_dict[Parameters.DICT_ID] = case_dict_id parameters_dev_dict = copy(parameters) parameters_dev_dict[Parameters.DICT_ID] = dev_dict_id self.case_dict = generator.apply(variant=dict_variant, parameters=parameters_case_dict) self.dev_dict = generator.apply(variant=dict_variant, parameters=parameters_dev_dict)
[docs] def encode_str(self, stru): """ Encodes a string for storage in generic dictionaries """ return str(stru)
def _process(self, event): """ Check an event and updates the case dictionary Parameters ---------------- event Event (dictionary) """ case = event[self.case_id_key] if self.case_id_key in event else None activity = event[self.activity_key] if self.activity_key in event else None if case is not None and activity is not None: self.verify_footprints(self.encode_str(case), self.encode_str(activity)) else: self.message_case_or_activity_not_in_event(event)
[docs] def verify_footprints(self, case, activity): """ Verify the event according to the footprints (assuming it has a case and an activity) Parameters ---------------- case Case ID activity Activity """ if case not in self.case_dict.keys(): self.dev_dict[case] = 0 if activity in self.activities: if case not in self.case_dict.keys(): self.verify_start_case(case, activity) else: self.verify_intra_case(case, activity) self.case_dict[case] = activity else: self.dev_dict[case] = int(self.dev_dict[case]) + 1 self.message_activity_not_possible(activity, case)
[docs] def verify_intra_case(self, case, activity): """ Verify the footprints of the current event Parameters ---------------- case Case activity Activity """ prev = self.case_dict[case] df = (prev, activity) if df not in self.all_fps: self.dev_dict[case] = int(self.dev_dict[case]) + 1 self.message_footprints_not_possible(df, case)
[docs] def verify_start_case(self, case, activity): """ Verify the start activity of a case Parameters --------------- case Case activity Activity """ if activity not in self.start_activities: self.dev_dict[case] = int(self.dev_dict[case]) + 1 self.message_start_activity_not_possible(activity, case)
[docs] def get_status(self, case): """ Gets the current status of a case Parameters ----------------- case Case Returns ----------------- boolean Boolean value (True if there are no deviations) """ if case in self.case_dict.keys(): num_dev = int(self.dev_dict[case]) if num_dev == 0: return True else: return False else: self.message_case_not_in_dictionary(case)
[docs] def terminate(self, case): """ Terminate a case (checking its end activity) Parameters ----------------- case Case Returns ----------------- boolean Boolean value (True if there are no deviations) """ if case in self.case_dict.keys(): curr = self.case_dict[case] if curr not in self.end_activities: self.message_end_activity_not_possible(curr, case) self.dev_dict[case] = int(self.dev_dict[case]) + 1 num_dev = int(self.dev_dict[case]) del self.case_dict[case] del self.dev_dict[case] if num_dev == 0: return True else: return False else: self.message_case_not_in_dictionary(case)
[docs] def terminate_all(self): """ Terminate all cases """ cases = list(self.case_dict.keys()) for case in cases: self.terminate(case)
[docs] def message_case_or_activity_not_in_event(self, event): """ Sends a message if the case or the activity are not there in the event """ logging.error("case or activities are none! " + str(event))
[docs] def message_activity_not_possible(self, activity, case): """ Sends a message if the activity is not contained in the footprints Parameters -------------- activity Activity case Case """ logging.error( "the activity " + str(activity) + " is not possible according to the footprints! case: " + str(case))
[docs] def message_footprints_not_possible(self, df, case): """ Sends a message if the directly-follows between two activities is not possible Parameters --------------- df Directly-follows relations case Case """ logging.error("the footprints " + str(df) + " are not possible! case: " + str(case))
[docs] def message_start_activity_not_possible(self, activity, case): """ Sends a message if the activity is not a possible start activity Parameters --------------- activity Activity case Case """ logging.error("the activity " + str(activity) + " is not a possible start activity! case: " + str(case))
[docs] def message_end_activity_not_possible(self, activity, case): """ Sends a message if the activity is not a possible end activity Parameters ---------------- activity Activity case Case """ logging.error("the activity " + str(activity) + " is not a possible end activity! case: " + str(case))
[docs] def message_case_not_in_dictionary(self, case): """ Sends a message if the case is not in the current dictionary Parameters ---------------- case Case """ logging.error("the case " + str(case) + " is not in the dictionary! case: " + str(case))
def _current_result(self): """ Gets a diagnostics dataframe with the status of the cases Returns ------- diagn_df Diagnostics dataframe """ import pandas as pd cases = list(self.case_dict.keys()) diagn_stream = [] for case in cases: status = self.get_status(case) diagn_stream.append({"case": case, "is_fit": status}) return pd.DataFrame(diagn_stream)
[docs]def apply(footprints, parameters=None): """ Gets a footprints conformance checking object Parameters -------------- footprints Footprints object parameters Parameters of the algorithm Returns -------------- fp_check_obj Footprints conformance checking object """ if parameters is None: parameters = {} return FootprintsStreamingConformance(footprints, parameters=parameters)