Source code for pm4py.algo.simulation.playout.petri_net.variants.extensive

'''
    This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

    PM4Py is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    PM4Py is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with PM4Py.  If not, see <https://www.gnu.org/licenses/>.
'''
import datetime
import sys
from collections import Counter
from enum import Enum
from typing import Optional, Dict, Any, Union

from pm4py.objects import petri_net
from pm4py.objects.log import obj as log_instance
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants


[docs]class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY MAX_TRACE_LENGTH = "maxTraceLength" RETURN_ELEMENTS = "return_elements" MAX_MARKING_OCC = "max_marking_occ" PETRI_SEMANTICS = "petri_semantics"
POSITION_MARKING = 0 POSITION_TRACE = 1 POSITION_ELEMENTS = 2
[docs]def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog: """ Do the playout of a Petrinet generating a log (extensive search; stop at the maximum trace length specified Parameters ----------- net Petri net to play-out initial_marking Initial marking of the Petri net final_marking If provided, the final marking of the Petri net parameters Parameters of the algorithm: Parameters.MAX_TRACE_LENGTH -> Maximum trace length Parameters.PETRI_SEMANTICS -> Petri net semantics """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, xes_constants.DEFAULT_TRACEID_KEY) activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY) max_trace_length = exec_utils.get_param_value(Parameters.MAX_TRACE_LENGTH, parameters, 10) return_elements = exec_utils.get_param_value(Parameters.RETURN_ELEMENTS, parameters, False) max_marking_occ = exec_utils.get_param_value(Parameters.MAX_MARKING_OCC, parameters, sys.maxsize) semantics = exec_utils.get_param_value(Parameters.PETRI_SEMANTICS, parameters, petri_net.semantics.ClassicSemantics()) # assigns to each event an increased timestamp from 1970 curr_timestamp = 10000000 feasible_elements = [] to_visit = [(initial_marking, (), ())] visited = set() while len(to_visit) > 0: state = to_visit.pop(0) m = state[POSITION_MARKING] trace = state[POSITION_TRACE] elements = state[POSITION_ELEMENTS] if (m, trace) in visited: continue visited.add((m, trace)) en_t = semantics.enabled_transitions(net, m) if (final_marking is not None and m == final_marking) or (final_marking is None and len(en_t) == 0): if len(trace) <= max_trace_length: feasible_elements.append(elements) for t in en_t: new_elements = elements + (m,) new_elements = new_elements + (t,) counter_elements = Counter(new_elements) if counter_elements[m] > max_marking_occ: continue new_m = semantics.weak_execute(t, net, m) if t.label is not None: new_trace = trace + (t.label,) else: new_trace = trace new_state = (new_m, new_trace, new_elements) if new_state in visited or len(new_trace) > max_trace_length: continue to_visit.append(new_state) if return_elements: return feasible_elements log = log_instance.EventLog() for elements in feasible_elements: log_trace = log_instance.Trace() log_trace.attributes[case_id_key] = str(len(log)) activities = [x.label for x in elements if type(x) is PetriNet.Transition and x.label is not None] for act in activities: curr_timestamp = curr_timestamp + 1 log_trace.append( log_instance.Event({activity_key: act, timestamp_key: datetime.datetime.fromtimestamp(curr_timestamp)})) log.append(log_trace) return log