pm4py package#

Process mining for Python

Subpackages#

Submodules#

pm4py.analysis module#

pm4py.analysis.construct_synchronous_product_net(trace: Trace, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) Tuple[PetriNet, Marking, Marking][source]#

constructs the synchronous product net between a trace and a Petri net process model.

Parameters:
  • trace (Trace) – trace of an event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
log = pm4py.read_xes('log.xes')
sync_net, sync_im, sync_fm = pm4py.construct_synchronous_product_net(log[0], net, im, fm)

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.compute_emd(language1: Dict[List[str], float], language2: Dict[List[str], float]) float[source]#

Computes the earth mover distance between two stochastic languages (for example, the first extracted from the log, and the second extracted from the process model.

Parameters:
  • language1 – (first) stochastic language

  • language2 – (second) stochastic language

Return type:

float

import pm4py

log = pm4py.read_xes('tests/input_data/running-example.xes')
language_log = pm4py.get_stochastic_language(log)
print(language_log)
net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
language_model = pm4py.get_stochastic_language(net, im, fm)
print(language_model)
emd_distance = pm4py.compute_emd(language_log, language_model)
print(emd_distance)
pm4py.analysis.solve_marking_equation(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, cost_function: Optional[Dict[Transition, float]] = None) float[source]#

Solves the marking equation of a Petri net. The marking equation is solved as an ILP problem. An optional transition-based cost function to minimize can be provided as well.

Parameters:
  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • cost_function – optional cost function to use when solving the marking equation

Return type:

float

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
heuristic = pm4py.solve_marking_equation(net, im, fm)
pm4py.analysis.solve_extended_marking_equation(trace: Trace, sync_net: PetriNet, sync_im: Marking, sync_fm: Marking, split_points: Optional[List[int]] = None) float[source]#

Gets an heuristics value (underestimation of the cost of an alignment) between a trace and a synchronous product net using the extended marking equation with the standard cost function (e.g. sync moves get cost equal to 0, invisible moves get cost equal to 1, other move on model / move on log get cost equal to 10000), with an optimal provisioning of the split points

Parameters:
  • trace (Trace) – trace

  • sync_net (PetriNet) – synchronous product net

  • sync_im (Marking) – initial marking (of the sync net)

  • sync_fm (Marking) – final marking (of the sync net)

  • split_points – if specified, the indexes of the events of the trace to be used as split points. If not specified, the split points are identified automatically.

Return type:

float

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
log = pm4py.read_xes('log.xes')
ext_mark_eq_heu = pm4py.solve_extended_marking_equation(log[0], net, im, fm)

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.check_soundness(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) bool[source]#

Check if a given Petri net is a sound WF-net. A Petri net is a WF-net iff:

  • it has a unique source place

  • it has a unique end place

  • every element in the WF-net is on a path from the source to the sink place

A WF-net is sound iff:
  • it contains no live-locks

  • it contains no deadlocks

  • we are able to always reach the final marking

For a formal definition of sound WF-net, consider: http://www.padsweb.rwth-aachen.de/wvdaalst/publications/p628.pdf

Parameters:
  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

Return type:

bool

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
is_sound = pm4py.check_soundness(net, im, fm)
pm4py.analysis.insert_artificial_start_end(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Inserts the artificial start/end activities in an event log / Pandas dataframe

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

dataframe = pm4py.insert_artificial_start_end(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.analysis.check_is_workflow_net(net: PetriNet) bool[source]#

Checks if the input Petri net satisfies the WF-net conditions: 1. unique source place 2. unique sink place 3. every node is on a path from the source to the sink

Parameters:

net (PetriNet) – petri net

Return type:

bool

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
is_wfnet = pm4py.check_is_workflow_net(net, im, fm)
pm4py.analysis.maximal_decomposition(net: PetriNet, im: Marking, fm: Marking) List[Tuple[PetriNet, Marking, Marking]][source]#

Calculate the maximal decomposition of an accepting Petri net.

Parameters:
Return type:

List[Tuple[PetriNet, Marking, Marking]]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
list_nets = pm4py.maximal_decomposition(net, im, fm)
for anet in list_nets:
    subnet, subim, subfm = anet
    pm4py.view_petri_net(subnet, subim, subfm, format='svg')
pm4py.analysis.generate_marking(net: PetriNet, place_or_dct_places: Union[str, Place, Dict[str, int], Dict[Place, int]]) Marking[source]#

Generate a marking for a given Petri net

Parameters:
  • net (PetriNet) – petri net

  • place_or_dct_places – place, or dictionary of places, to be used in the marking. Possible values: single Place object for the marking; name of the place for the marking; dictionary associating to each place its number of tokens; dictionary associating to names of places a number of tokens.

Return type:

Marking

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
marking = pm4py.generate_marking(net, {'source': 2})
pm4py.analysis.reduce_petri_net_invisibles(net: PetriNet) PetriNet[source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:

net (PetriNet) – petri net

Return type:

PetriNet

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
net = pm4py.reduce_petri_net_invisibles(net)
pm4py.analysis.reduce_petri_net_implicit_places(net: PetriNet, im: Marking, fm: Marking) Tuple[PetriNet, Marking, Marking][source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:
Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
net = pm4py.reduce_petri_net_implicit_places(net, im, fm)

pm4py.cli module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.cli.cli_interface()[source]#

pm4py.conformance module#

The pm4py.conformance module contains the conformance checking algorithms implemented in pm4py

pm4py.conformance.conformance_diagnostics_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Dict[str, Any]][source]#

Apply token-based replay for conformance checking analysis. The methods return the full token-based-replay diagnostics.

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:

  • trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.

  • activated_transitions: list of transitions activated in the model by the token-based replay.

  • reached_marking: marking reached at the end of the replay.

  • missing_tokens: number of missing tokens.

  • consumed_tokens: number of consumed tokens.

  • remaining_tokens: number of remaining tokens.

  • produced_tokens: number of produced tokens.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
tbr_diagnostics = pm4py.conformance_diagnostics_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.conformance_diagnostics_alignments(log: Union[EventLog, DataFrame], *args, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Dict[str, Any]][source]#

Apply the alignments algorithm between a log and a process model. The methods return the full alignment diagnostics.

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

With each trace, a dictionary containing among the others the following information is associated:

alignment: contains the alignment (sync moves, moves on log, moves on model) cost: contains the cost of the alignment according to the provided cost function fitness: is equal to 1 if the trace is perfectly fitting.

Parameters:
  • log – event log

  • args – specification of the process model

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
alignments_diagnostics = pm4py.conformance_diagnostics_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.fitness_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float][source]#

Calculates the fitness using token-based replay. The fitness is calculated on a log-based level.

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_tbr = pm4py.fitness_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.fitness_alignments(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float][source]#

Calculates the fitness using alignments

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_alignments = pm4py.fitness_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.precision_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision precision using token-based replay

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The reference paper for the TBR-based precision (ETConformance) is: Muñoz-Gama, Jorge, and Josep Carmona. “A fresh look at precision in process conformance.” International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_tbr = pm4py.precision_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.precision_alignments(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision of the model w.r.t. the event log using alignments

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The reference paper for the alignments-based precision (Align-ETConformance) is: Adriansyah, Arya, et al. “Measuring precision of modeled behavior.” Information systems and e-Business Management 13.1 (2015): 37-67

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_alignments = pm4py.precision_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.conformance_diagnostics_footprints(*args) Union[List[Dict[str, Any]], Dict[str, Any]][source]#

Provide conformance checking diagnostics using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

Union[List[Dict[str, Any]], Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
footprints_diagnostics = pm4py.conformance_diagnostics_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.fitness_footprints(*args) Dict[str, float][source]#

Calculates fitness using footprints. The output is a dictionary containing two keys: - perc_fit_traces => percentage of fit traces (over the log) - log_fitness => the fitness value over the log

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_fp = pm4py.fitness_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.precision_footprints(*args) float[source]#

Calculates precision using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_fp = pm4py.precision_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.check_is_fitting(*args, activity_key='concept:name') bool[source]#

Checks if a trace object is fit against a process model

Parameters:

args – arguments (trace object; process model (process tree, petri net, BPMN))

Return type:

bool

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.conformance.conformance_temporal_profile(log: Union[EventLog, DataFrame], temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], zeta: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[List[Tuple[float, float, float, float]]][source]#

Performs conformance checking on the provided log with the provided temporal profile. The result is a list of time-based deviations for every case. E.g. if the log on top of which the conformance is applied is the following (1 case): A (timestamp: 2000-01) B (timestamp: 2002-01) The difference between the timestamps of A and B is two years. If the temporal profile: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)} is specified, and zeta is set to 1, then the aforementioned case would be deviating (considering the couple of activities (‘A’, ‘B’)), because 2 years > 1.5 months + 0.5 months.

Parameters:
  • log – log object

  • temporal_profile – temporal profile. E.g., if the log has two cases: A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06); A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03); The temporal profile will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

  • zeta (float) – number of standard deviations allowed from the average. E.g. zeta=1 allows every timestamp between AVERAGE-STDEV and AVERAGE+STDEV.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[List[Tuple[float, float, float, float]]]

import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_temporal_profile = pm4py.conformance_temporal_profile(dataframe, temporal_profile, zeta=1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.conformance_log_skeleton(log: Union[EventLog, DataFrame], log_skeleton: Dict[str, Any], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Set[Any]][source]#

Performs conformance checking using the log skeleton

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

  • “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

    in the history of the case. For example, ‘C should always be preceded by A’

  • “always_after”: specifies that some activities should always trigger the execution of some other activities

    in the future history of the case. For example, ‘A should always be followed by C’

  • “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

    a case. For example, ‘B and C should always happen the same number of times’.

  • “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

    For example, ‘there should be no case containing both C and D’.

  • “activ_occurrences”: specifies the allowed number of occurrences per activity:

    E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Parameters:
  • log – log object

  • log_skeleton – log skeleton object, expressed as dictionaries of the six constraints (never_together, always_before …) along with the discovered rules.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[Set[Any]]

import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_lsk = pm4py.conformance_log_skeleton(dataframe, log_skeleton, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

pm4py.convert module#

The pm4py.convert module contains the cross-conversions implemented in pm4py

pm4py.convert.convert_to_event_log(obj: Union[DataFrame, EventStream], case_id_key: str = 'case:concept:name') EventLog[source]#

Converts a DataFrame/EventStream object to an event log object

Parameters:
  • obj – DataFrame or EventStream object

  • case_id_key (str) – attribute to be used as case identifier

Return type:

EventLog

import pandas as pd
import pm4py

dataframe = pm4py.read_csv("tests/input_data/running-example.csv")
dataframe = pm4py.format_dataframe(dataframe, case_id_column='case:concept:name', activity_column='concept:name', timestamp_column='time:timestamp')
log = pm4py.convert_to_event_log(dataframe)
pm4py.convert.convert_to_event_stream(obj: Union[EventLog, DataFrame], case_id_key: str = 'case:concept:name') EventStream[source]#

Converts a log object to an event stream

Parameters:
  • obj – log object

  • case_id_key (str) – attribute to be used as case identifier

Return type:

EventStream

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")
event_stream = pm4py.convert_to_event_stream(log)
pm4py.convert.convert_to_dataframe(obj: Union[EventStream, EventLog]) DataFrame[source]#

Converts a log object to a dataframe

Parameters:

obj – log object

Return type:

pd.DataFrame

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")
dataframe = pm4py.convert_to_dataframe(log)
pm4py.convert.convert_to_bpmn(*args: Union[Tuple[PetriNet, Marking, Marking], ProcessTree]) BPMN[source]#

Converts an object to a BPMN diagram. As an input, either a Petri net (with corresponding initial and final marking) or a process tree can be provided. A process tree can always be converted into a BPMN model and thus quality of the result object is guaranteed. For Petri nets, the quality of the converison largely depends on the net provided (e.g., sound WF-nets are likely to produce reasonable BPMN models)

Parameters:

args – petri net (with initial and final marking) or process tree

Return type:

BPMN

import pm4py

# import a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
bpmn_graph = pm4py.convert_to_bpmn(net, im, fm)
pm4py.convert.convert_to_petri_net(*args: Union[BPMN, ProcessTree, HeuristicsNet, dict]) Tuple[PetriNet, Marking, Marking][source]#

Converts an input model to an (accepting) Petri net. The input objects can either be a process tree, BPMN model or a Heuristic net. The output is a triple, containing the Petri net and the initial and final markings. The markings are only returned if they can be reasonable derived from the input model.

Parameters:

args – process tree or BPMN

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

# imports a process tree from a PTML file
process_tree = pm4py.read_ptml("tests/input_data/running-example.ptml")
net, im, fm = pm4py.convert_to_petri_net(process_tree)
pm4py.convert.convert_to_process_tree(*args: Union[Tuple[PetriNet, Marking, Marking], BPMN]) ProcessTree[source]#

Converts an input model to a process tree. The input models can either be Petri nets (marked) or BPMN models. For both input types, the conversion is not guaranteed to work, hence, invocation of the method can yield an Exception.

Parameters:

args – petri net (along with initial and final marking) or BPMN

Return type:

ProcessTree

import pm4py

# imports a BPMN file
bpmn_graph = pm4py.read_bpmn("tests/input_data/running-example.bpmn")
# converts the BPMN to a process tree (through intermediate conversion to a Petri net)
process_tree = pm4py.convert_to_process_tree(bpmn_graph)
pm4py.convert.convert_to_reachability_graph(*args: Union[Tuple[PetriNet, Marking, Marking], BPMN, ProcessTree]) TransitionSystem[source]#

Converts an input model to a reachability graph (transition system). The input models can either be Petri nets (with markings), BPMN models or process trees. The output is the state-space of the model (i.e., the reachability graph), enocdoed as a TransitionSystem object.

Parameters:

args – petri net (along with initial and final marking), process tree or BPMN

Return type:

TransitionSystem

import pm4py

# reads a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
# converts it to reachability graph
reach_graph = pm4py.convert_to_reachability_graph(net, im, fm)
pm4py.convert.convert_log_to_ocel(log: Union[EventLog, EventStream, DataFrame], activity_column: str = 'concept:name', timestamp_column: str = 'time:timestamp', object_types: Collection[str] = ['case:concept:name'], obj_separator: str = ' AND ') OCEL[source]#

Converts an event log to an object-centric event log with one or more than one object types.

Parameters:
  • log_obj – log object

  • activity_column (str) – activity column

  • timestamp_column (str) – timestamp column

  • object_types – list of columns to consider as object types

  • obj_separator (str) – separator between different objects in the same column

Return type:

OCEL

pm4py.convert.convert_ocel_to_networkx(ocel: OCEL, variant: str = 'ocel_to_nx') DiGraph[source]#

Converts an OCEL to a NetworkX DiGraph object.

Parameters:
  • ocel (OCEL) – object-centric event log

  • variant (str) – variant of the conversion to use: “ocel_to_nx” -> graph containing event and object IDS and two type of relations (REL=related objects, DF=directly-follows); “ocel_features_to_nx” -> graph containing different types of interconnection at the object level

Return type:

nx.DiGraph

pm4py.convert.convert_log_to_networkx(log: Union[EventLog, EventStream, DataFrame], include_df: bool = True, case_id_key: str = 'concept:name', other_case_attributes_as_nodes: Optional[Collection[str]] = None, event_attributes_as_nodes: Optional[Collection[str]] = None) DiGraph[source]#

Converts an event log object to a NetworkX DiGraph object. The nodes of the graph are the events, the cases (and possibly the attributes of the log). The edges are: - Connecting each event to the corresponding case (BELONGS_TO type) - Connecting every event to the directly-following one (DF type, if enabled) - Connecting every case/event to the given attribute values (ATTRIBUTE_EDGE type)

Parameters:
  • log – log object (EventLog, EventStream, Pandas dataframe)

  • include_df (bool) – include the directly-follows graph relation in the graph (bool)

  • case_id_attribute – specify which attribute at the case level should be considered the case ID (str)

  • other_case_attributes_as_nodes – specify which attributes at the case level should be inserted in the graph as nodes (other than the caseID) (list, default empty)

  • event_attributes_as_nodes – specify which attributes at the event level should be inserted in the graph as nodes (list, default empty)

Return type:

nx.DiGraph

pm4py.convert.convert_petri_net_to_networkx(net: PetriNet, im: Marking, fm: Marking) DiGraph[source]#

Converts a Petri net to a NetworkX DiGraph. Each place and transition is corresponding to a node in the graph.

Parameters:
Return type:

nx.DiGraph

pm4py.discovery module#

The pm4py.discovery module contains the process discovery algorithms implemented in pm4py

pm4py.discovery.discover_dfg(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the frequency of relation as value.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[dict, dict, dict]

import pm4py

dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 2.4.0. this method will be replaced by the discover_dfg_typed function(). Please adapt your code to use pm4py.discover_dfg_typed()

pm4py.discovery.discover_directly_follows_graph(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Deprecated since version 2.3.0: This will be removed in 2.4.0. this method will be replaced by the discover_dfg_typed function(). Please adapt your code to use pm4py.discover_dfg_typed()

pm4py.discovery.discover_dfg_typed(log: DataFrame, case_id_key: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') DirectlyFollowsGraph[source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a typed DFG object, i.e., as specified in pm4py.objects.dfg.obj.py (DirectlyFollowsGraph Class) The DFG object describes a graph, start activities and end activities. The graph is a collection of triples of the form (a,b,f) representing an arc a->b with frequency f. The start activities are a collection of tuples of the form (a,f) representing that activity a starts f cases. The end activities are a collection of tuples of the form (a,f) representing that ativity a ends f cases.

This method replaces pm4py.discover_dfg and pm4py.discover_directly_follows_graph. In a future release, these functions will adopt the same behavior as this function.

Parameters:
  • log (DataFrame) – pandas.DataFrame

  • case_id_key (str) – attribute to be used as case identifier

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

Return type:

DFG

import pm4py

dfg = pm4py.discover_dfg_typed(log, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_performance_dfg(log: Union[EventLog, DataFrame], business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], workcalendar=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a performance directly-follows graph from an event log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the performance of relation as value.

Parameters:
  • log – event log / Pandas dataframe

  • business_hours (bool) – enables/disables the computation based on the business hours (default: False)

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[dict, dict, dict]

import pm4py

performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_alpha(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha Miner.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_alpha(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_alpha_plus(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha+ algorithm

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_alpha_plus(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.discovery.discover_petri_net_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], multi_processing: bool = False, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the inductive miner algorithm.

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_heuristics(log: Union[EventLog, DataFrame], dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, min_act_count: int = 1, min_dfg_occurrences: int = 1, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discover a Petri net using the Heuristics Miner

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
  • log – event log / Pandas dataframe

  • dependency_threshold (float) – dependency threshold (default: 0.5)

  • and_threshold (float) – AND threshold (default: 0.65)

  • loop_two_threshold (float) – loop two threshold (default: 0.5)

  • min_act_count (int) – minimum number of occurrences per activity in order to be included in the discovery

  • min_dfg_occurrences (int) – minimum number of occurrences per arc in the DFG in order to be included in the discovery

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_heuristics(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_process_tree_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') ProcessTree[source]#

Discovers a process tree using the inductive miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • activity_key (str) – attribute to be used for the activity

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

ProcessTree

import pm4py

process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_heuristics_net(log: Union[EventLog, DataFrame], dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, min_act_count: int = 1, min_dfg_occurrences: int = 1, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') HeuristicsNet[source]#

Discovers an heuristics net

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
  • log – event log / Pandas dataframe

  • dependency_threshold (float) – dependency threshold (default: 0.5)

  • and_threshold (float) – AND threshold (default: 0.65)

  • loop_two_threshold (float) – loop two threshold (default: 0.5)

  • min_act_count (int) – minimum number of occurrences per activity in order to be included in the discovery

  • min_dfg_occurrences (int) – minimum number of occurrences per arc in the DFG in order to be included in the discovery

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

HeuristicsNet

import pm4py

heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.derive_minimum_self_distance(log: Union[DataFrame, EventLog, EventStream], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The activity key ‘concept:name’ is used.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

msd = pm4py.derive_minimum_self_distance(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_footprints(*args: Union[EventLog, Tuple[PetriNet, Marking, Marking], ProcessTree]) Union[List[Dict[str, Any]], Dict[str, Any]][source]#

Discovers the footprints out of the provided event log / process model

Parameters:

args – event log / process model

Return type:

Union[List[Dict[str, Any]], Dict[str, Any]]

import pm4py

footprints = pm4py.discover_footprints(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_eventually_follows_graph(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], int][source]#

Gets the eventually follows graph from a log object.

The eventually follows graph is a dictionary associating to every couple of activities which are eventually following each other the number of occurrences of this relation.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str, str], int]

import pm4py

efg = pm4py.discover_eventually_follows_graph(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_bpmn_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') BPMN[source]#

Discovers a BPMN using the Inductive Miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

BPMN

import pm4py

bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_transition_system(log: Union[EventLog, DataFrame], direction: str = 'forward', window: int = 2, view: str = 'sequence', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') TransitionSystem[source]#

Discovers a transition system as described in the process mining book “Process Mining: Data Science in Action”

Parameters:
  • log – event log / Pandas dataframe

  • direction (str) – direction in which the transition system is built (forward, backward)

  • window (int) – window (2, 3, …)

  • view (str) – view to use in the construction of the states (sequence, set, multiset)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

TransitionSystem

import pm4py

transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_prefix_tree(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Trie[source]#

Discovers a prefix tree from the provided log object.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Trie

import pm4py

prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_temporal_profile(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], Tuple[float, float]][source]#

Discovers a temporal profile from a log object.

Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. “Temporal Conformance Checking at Runtime based on Time-infused Process Models.” arXiv preprint arXiv:2008.07262 (2020).

The output is a dictionary containing, for every couple of activities eventually following in at least a case of the log, the average and the standard deviation of the difference of the timestamps.

E.g. if the log has two cases:

A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06) A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03)

The returned dictionary will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str, str], Tuple[float, float]]

import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_log_skeleton(log: Union[EventLog, DataFrame], noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Any][source]#

Discovers a log skeleton from an event log.

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

  • “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

    in the history of the case. For example, ‘C should always be preceded by A’

  • “always_after”: specifies that some activities should always trigger the execution of some other activities

    in the future history of the case. For example, ‘A should always be followed by C’

  • “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

    a case. For example, ‘B and C should always happen the same number of times’.

  • “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

    For example, ‘there should be no case containing both C and D’.

  • “activ_occurrences”: specifies the allowed number of occurrences per activity:

    E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

Parameters:
  • log – event log / Pandas dataframe

  • noise_threshold (float) – noise threshold, acting as described in the paper.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, Any]

import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_batches(log: Union[EventLog, DataFrame], merge_distance: int = 900, min_batch_size: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') List[Tuple[Tuple[str, str], int, Dict[str, Any]]][source]#

Discover batches from the provided log object

We say that an activity is executed in batches by a given resource when the resource executes several times the same activity in a short period of time.

Identifying such activities may identify points of the process that can be automated, since the activity of the person may be repetitive.

The following categories of batches are detected: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched)

The approach has been described in the following paper: Martin, N., Swennen, M., Depaire, B., Jans, M., Caris, A., & Vanhoof, K. (2015, December). Batch Processing: Definition and Event Log Identification. In SIMPDA (pp. 137-140).

The output is a (sorted) list containing tuples. Each tuple contain:
  • Index 0: the activity-resource for which at least one batch has been detected

  • Index 1: the number of batches for the given activity-resource

  • Index 2: a list containing all the batches. Each batch is described by:

    # The start timestamp of the batch # The complete timestamp of the batch # The list of events that are executed in the batch

Parameters:
  • log – event log / Pandas dataframe

  • merge_distance (int) – the maximum time distance between non-overlapping intervals in order for them to be considered belonging to the same batch (default: 15*60 15 minutes)

  • min_batch_size (int) – the minimum number of events for a batch to be considered (default: 2)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

List[Tuple[Tuple[str, str], int, Dict[str, Any]]]

import pm4py

batches = pm4py.discover_log_skeleton(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp', resource_key='org:resource')

pm4py.filtering module#

The pm4py.filtering module contains the filtering features offered in pm4py

pm4py.filtering.filter_log_relative_occurrence_event_attribute(log: Union[EventLog, DataFrame], min_relative_stake: float, attribute_key: str = 'concept:name', level='cases', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log keeping only the events having an attribute value which occurs: - in at least the specified (min_relative_stake) percentage of events, when level=”events” - in at least the specified (min_relative_stake) percentage of cases, when level=”cases”

Parameters:
  • log – event log / Pandas dataframe

  • min_relative_stake (float) – minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.

  • attribute_key (str) – the attribute to filter

  • level (str) – the level of the filter (if level=”events”, then events / if level=”cases”, then cases)

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_log_relative_occurrence_event_attribute(dataframe, 0.5, level='cases', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_start_activities(log: Union[EventLog, DataFrame], activities: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter cases having a start activity in the provided list

Parameters:
  • log – event log / Pandas dataframe

  • activities – collection of start activities

  • retain (bool) – if True, we retain the traces containing the given start activities, if false, we drop the traces

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_start_activities(dataframe, ['Act. A'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_end_activities(log: Union[EventLog, DataFrame], activities: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter cases having an end activity in the provided list

Parameters:
  • log – event log / Pandas dataframe

  • activities – collection of end activities

  • retain (bool) – if True, we retain the traces containing the given end activities, if false, we drop the traces

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_end_activities(dataframe, ['Act. Z'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_event_attribute_values(log: Union[EventLog, DataFrame], attribute_key: str, values: Union[Set[str], List[str]], level: str = 'case', retain: bool = True, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log object on the values of some event attribute

Parameters:
  • log – event log / Pandas dataframe

  • attribute_key (str) – attribute to filter

  • values – admitted (or forbidden) values

  • level (str) – specifies how the filter should be applied (‘case’ filters the cases where at least one occurrence happens, ‘event’ filter the events eventually trimming the cases)

  • retain (bool) – specifies if the values should be kept or removed

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_event_attribute_values(dataframe, 'concept:name', ['Act. A', 'Act. Z'], case_id_key='case:concept:name')
pm4py.filtering.filter_trace_attribute_values(log: Union[EventLog, DataFrame], attribute_key: str, values: Union[Set[str], List[str]], retain: bool = True, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on the values of a trace attribute

Parameters:
  • log – event log / Pandas dataframe

  • attribute_key (str) – attribute to filter

  • values – collection of values to filter

  • retain (bool) – boolean value (keep/discard matching traces)

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_trace_attribute_values(dataframe, 'case:creator', ['Mike'], case_id_key='case:concept:name')
pm4py.filtering.filter_variants(log: Union[EventLog, DataFrame], variants: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on a specified set of variants

Parameters:
  • log – event log / Pandas dataframe

  • variants – collection of variants to filter; A variant should be specified as a list of tuples of activity names, e.g., [(‘a’, ‘b’, ‘c’)]

  • retain (bool) – boolean; if True all traces conforming to the specified variants are retained; if False, all those traces are removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants(dataframe, [('Act. A', 'Act. B', 'Act. Z'), ('Act. A', 'Act. C', 'Act. Z')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_directly_follows_relation(log: Union[EventLog, DataFrame], relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Retain traces that contain any of the specified ‘directly follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>].

Parameters:
  • log – event log / Pandas dataframe

  • relations – list of activity name pairs, which are allowed/forbidden paths

  • retain (bool) – parameter that says whether the paths should be kept/removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_directly_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_eventually_follows_relation(log: Union[EventLog, DataFrame], relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Retain traces that contain any of the specified ‘eventually follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].

Parameters:
  • log – event log / Pandas dataframe

  • relations – list of activity name pairs, which are allowed/forbidden paths

  • retain (bool) – parameter that says whether the paths should be kept/removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_eventually_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_time_range(log: Union[EventLog, DataFrame], dt1: str, dt2: str, mode='events', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on a time interval

Parameters:
  • log – event log / Pandas dataframe

  • dt1 (str) – left extreme of the interval

  • dt2 (str) – right extreme of the interval

  • mode (str) – modality of filtering (events, traces_contained, traces_intersecting). events: any event that fits the time frame is retained; traces_contained: any trace completely contained in the timeframe is retained; traces_intersecting: any trace intersecting with the time-frame is retained.

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_contained', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_intersecting', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='events', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_between(log: Union[EventLog, DataFrame], act1: str, act2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Finds all the sub-cases leading from an event with activity “act1” to an event with activity “act2” in the log, and returns a log containing only them.

Example:

Log A B C D E F A B E F C A B F C B C B E F C

act1 = B act2 = C

Returned sub-cases: B C (from the first case) B E F C (from the second case) B F C (from the third case) B C (from the third case) B E F C (from the third case)

Parameters:
  • log – event log / Pandas dataframe

  • act1 (str) – source activity

  • act2 (str) – target activity

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_between(dataframe, 'A', 'D', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_case_size(log: Union[EventLog, DataFrame], min_size: int, max_size: int, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases having a length (number of events) included between min_size and max_size

Parameters:
  • log – event log / Pandas dataframe

  • min_size (int) – minimum allowed number of events

  • max_size (int) – maximum allowed number of events

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_case_size(dataframe, 5, 10, case_id_key='case:concept:name')
pm4py.filtering.filter_case_performance(log: Union[EventLog, DataFrame], min_performance: float, max_performance: float, timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases having a duration (the timestamp of the last event minus the timestamp of the first event) included between min_performance and max_performance

Parameters:
  • log – event log / Pandas dataframe

  • min_performance (float) – minimum allowed case duration

  • max_performance (float) – maximum allowed case duration

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_case_performance(dataframe, 3600.0, 86400.0, timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_activities_rework(log: Union[EventLog, DataFrame], activity: str, min_occurrences: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases where the specified activity occurs at least min_occurrences times.

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – activity

  • min_occurrences (int) – minimum desidered number of occurrences

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_activities_rework(dataframe, 'Approve Order', 2, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_paths_performance(log: Union[EventLog, DataFrame], path: Tuple[str, str], min_performance: float, max_performance: float, keep=True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, either: - (keep=True) keeping the cases having the specified path (tuple of 2 activities) with a duration included between min_performance and max_performance - (keep=False) discarding the cases having the specified path with a duration included between min_performance and max_performance

Parameters:
  • log – event log / Pandas dataframe

  • path – tuple of two activities (source_activity, target_activity)

  • min_performance (float) – minimum allowed performance (of the path)

  • max_performance (float) – maximum allowed performance (of the path)

  • keep (bool) – keep/discard the cases having the specified path with a duration included between min_performance and max_performance

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_paths_performance(dataframe, ('A', 'D'), 3600.0, 86400.0, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_variants_top_k(log: Union[EventLog, DataFrame], k: int, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Keeps the top-k variants of the log

Parameters:
  • log – event log / Pandas dataframe

  • k (int) – number of variants that should be kept

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants_top_k(dataframe, 5, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_variants_by_coverage_percentage(log: Union[EventLog, DataFrame], min_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the variants of the log by a coverage percentage (e.g., if min_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 1 and variant 2).

Parameters:
  • log – event log / Pandas dataframe

  • min_coverage_percentage (float) – minimum allowed percentage of coverage

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants_by_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_prefixes(log: Union[EventLog, DataFrame], activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the log, keeping the prefixes to a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The prefixes to “C” are respectively:

A,B A,B,Z,A,B A,B

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – target activity of the filter

  • strict (bool) – applies the filter strictly (cuts the occurrences of the selected activity).

  • first_or_last (str) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_suffixes(log: Union[EventLog, DataFrame], activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the log, keeping the suffixes from a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The suffixes from “C” are respectively:

D D D,C,E,C,F

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – target activity of the filter

  • strict (bool) – applies the filter strictly (cuts the occurrences of the selected activity).

  • first_or_last (str) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_ocel_event_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided event attributes values

Parameters:
  • ocel (OCEL) – object-centric event log

  • attribute_key (str) – attribute at the event level

  • attribute_values – collection of attribute values

  • positive (bool) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_event_attribute(ocel, 'ocel:activity', ['A', 'B', 'D'])
pm4py.filtering.filter_ocel_object_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided object attributes values

Parameters:
  • ocel (OCEL) – object-centric event log

  • attribute_key (str) – attribute at the event level

  • attribute_values – collection of attribute values

  • positive (bool) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_attribute(ocel, 'ocel:type', ['order'])
pm4py.filtering.filter_ocel_object_types_allowed_activities(ocel: OCEL, correspondence_dict: Dict[str, Collection[str]]) OCEL[source]#

Filters an object-centric event log keeping only the specified object types with the specified activity set (filters out the rest).

Parameters:
  • ocel (OCEL) – object-centric event log

  • correspondence_dict – dictionary containing, for every object type of interest, a collection of allowed activities. Example: {“order”: [“Create Order”], “element”: [“Create Order”, “Create Delivery”]}

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(ocel, {'order': ['create order', 'pay order'], 'item})
pm4py.filtering.filter_ocel_object_per_type_count(ocel: OCEL, min_num_obj_type: Dict[str, int]) OCEL[source]#

Filters the events of the object-centric logs which are related to at least the specified amount of objects per type.

E.g. pm4py.filter_object_per_type_count(ocel, {“order”: 1, “element”: 2})

Would keep the following events:

ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order

0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1] 1 e11 1981-01-01 Create Order [i6, i5] [o2] 2 e14 1981-01-04 Create Order [i8, i7] [o3]

Parameters:
  • ocel (OCEL) – object-centric event log

  • min_num_obj_type – minimum number of objects per type

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_per_type_count(ocel, {'order': 1, 'element': 2})
pm4py.filtering.filter_ocel_start_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which a new object for the given object type is spawn. (E.g. an event with activity “Create Order” might spawn new orders).

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type to consider

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(ocel, 'delivery')
pm4py.filtering.filter_ocel_end_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which an object for the given object type terminates its lifecycle. (E.g. an event with activity “Pay Order” might terminate an order).

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type to consider

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(ocel, 'delivery')
pm4py.filtering.filter_ocel_events_timestamp(ocel: OCEL, min_timest: Union[datetime, str], max_timest: Union[datetime, str], timestamp_key: str = 'ocel:timestamp') OCEL[source]#

Filters the object-centric event log keeping events in the provided timestamp range

Parameters:
  • ocel (OCEL) – object-centric event log

  • min_timest – left extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

  • max_timest – right extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

  • timestamp_key (str) – the attribute to use as timestamp (default: ocel:timestamp)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_events_timestamp(ocel, '1990-01-01 00:00:00', '2010-01-01 00:00:00')
pm4py.filtering.filter_four_eyes_principle(log: Union[EventLog, DataFrame], activity1: str, activity2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') Union[EventLog, DataFrame][source]#

Filter the cases of the log which violates the four eyes principle on the provided activities.

Parameters:
  • log – event log

  • activity1 (str) – first activity

  • activity2 (str) – second activity

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_four_eyes_principle(dataframe, 'Act. A', 'Act. B', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_activity_done_different_resources(log: Union[EventLog, DataFrame], activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') Union[EventLog, DataFrame][source]#

Filters the cases where an activity is repeated by different resources.

Parameters:
  • log – event log

  • activity (str) – activity to consider

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_activity_done_different_resources(dataframe, 'Act. A', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_ocel_object_types(ocel: OCEL, obj_types: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object types of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • obj_types – object types to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

  • level (int) – recursively expand the set of object identifiers until the specified level

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order'])
pm4py.filtering.filter_ocel_objects(ocel: OCEL, object_identifiers: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object identifiers of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_identifiers – object identifiers to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

  • level (int) – recursively expand the set of object identifiers until the specified level

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=1)
pm4py.filtering.filter_ocel_events(ocel: OCEL, event_identifiers: Collection[str], positive: bool = True) OCEL[source]#

Filters the event identifiers of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • event_identifiers – event identifiers to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1'])
pm4py.filtering.filter_ocel_cc_object(ocel: OCEL, object_id: str) OCEL[source]#

Returns the connected component of the object-centric event log to which the object with the provided identifier belongs.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_id (str) – object identifier

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'order1')

pm4py.hof module#

pm4py.hof.filter_log(f: Callable[[Any], bool], log: EventLog) Union[EventLog, EventStream][source]#

Filters the log according to a given (lambda) function.

Parameters:
  • f – function that specifies the filter criterion, may be a lambda

  • log (EventLog) – event log; either EventLog or EventStream Object

Return type:

Union[log_inst.EventLog, log_inst.EventStream]

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.filter_trace(f: Callable[[Any], bool], trace: Trace) Trace[source]#

Filters the trace according to a given (lambda) function.

Parameters:
  • f – function that specifies the filter criterion, may be a lambda

  • trace (Trace) – trace; PM4Py trace object

Return type:

log_inst.Trace

pm4py.hof.sort_log(log: EventLog, key, reverse: bool = False) Union[EventLog, EventStream][source]#

Sorts the event log according to a given key.

Parameters:
  • log (EventLog) – event log object; either EventLog or EventStream

  • key – sorting key

  • reverse (bool) – indicates whether sorting should be reversed or not

Return type:

Union[log_inst.EventLog, log_inst.EventStream]

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.sort_trace(trace: Trace, key, reverse: bool = False) Trace[source]#

Sorts the events in a trace according to a given key.

Parameters:
  • trace (Trace) – input trace

  • key – sorting key

  • reverse (bool) – indicates whether sorting should be reversed (default False)

Return type:

log_inst.Trace

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.meta module#

Process mining for Python

pm4py.ml module#

The pm4py.ml module contains the machine learning features offered in pm4py

pm4py.ml.split_train_test(log: Union[EventLog, DataFrame], train_percentage: float = 0.8, case_id_key='case:concept:name') Union[Tuple[EventLog, EventLog], Tuple[DataFrame, DataFrame]][source]#

Split an event log in a training log and a test log (for machine learning purposes). Returns the training and the test event log.

Parameters:
  • log – event log / Pandas dataframe

  • train_percentage (float) – fraction of traces to be included in the training log (from 0.0 to 1.0)

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[Tuple[EventLog, EventLog], Tuple[pd.DataFrame, pd.DataFrame]]

import pm4py

train_df, test_df = pm4py.split_train_test(dataframe, train_percentage=0.75)
pm4py.ml.get_prefixes_from_log(log: Union[EventLog, DataFrame], length: int, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Gets the prefixes of a log of a given length. The returned log object contain the prefixes: - if a trace has lower or identical length, it is included as-is - if a trace has greater length, it is cut

Parameters:
  • log – event log / Pandas dataframe

  • length (int) – length

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

trimmed_df = pm4py.get_prefixes_from_log(dataframe, length=5, case_id_key='case:concept:name')
pm4py.ml.extract_features_dataframe(log: Union[EventLog, DataFrame], str_tr_attr=None, num_tr_attr=None, str_ev_attr=None, num_ev_attr=None, str_evsucc_attr=None, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', resource_key='org:resource', **kwargs) DataFrame[source]#

Extracts a dataframe containing the features of each case of the provided log object

Parameters:
  • log – log object (event log / Pandas dataframe)

  • str_tr_attr – (if provided) string attributes at the case level which should be extracted as features

  • num_tr_attr – (if provided) numeric attributes at the case level which should be extracted as features

  • str_ev_attr – (if provided) string attributes at the event level which should be extracted as features (one-hot encoding)

  • num_ev_attr – (if provided) numeric attributes at the event level which should be extracted as features (last value per attribute in a case)

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key (str) – the attribute to be used as case identifier

  • resource_key (str) – the attribute to be used as resource

Return type:

pd.DataFrame

import pm4py

features_df = pm4py.extract_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.ml.extract_temporal_features_dataframe(log: Union[EventLog, DataFrame], grouper_freq='W', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp', resource_key='org:resource') DataFrame[source]#

Extracts a dataframe containing the temporal features of the provided log object

Implements the approach described in the paper: Pourbafrani, Mahsa, Sebastiaan J. van Zelst, and Wil MP van der Aalst. “Supporting automatic system dynamics model generation for simulation in the context of process mining.” International Conference on Business Information Systems. Springer, Cham, 2020.

Parameters:
  • log – log object (event log / Pandas dataframe)

  • grouper_freq (str) – the grouping frequency (D, W, M, Y) to use

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key (str) – the attribute to be used as case identifier

  • resource_key (str) – the attribute to be used as resource

  • start_timestamp_key (str) – the attribute to be used as start timestamp

Return type:

pd.DataFrame

import pm4py

temporal_features_df = pm4py.extract_temporal_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

pm4py.ocel module#

The pm4py.ocel module contains the object-centric process mining features offered in pm4py

pm4py.ocel.ocel_get_object_types(ocel: OCEL) List[str][source]#

Gets the list of object types contained in the object-centric event log (e.g., [“order”, “item”, “delivery”]).

Parameters:

ocel (OCEL) – object-centric event log

Return type:

List[str]

import pm4py

object_types = pm4py.ocel_get_object_types(ocel)
pm4py.ocel.ocel_get_attribute_names(ocel: OCEL) List[str][source]#

Gets the list of attributes at the event and the object level of an object-centric event log (e.g. [“cost”, “amount”, “name”])

Parameters:

ocel (OCEL) – object-centric event log

Return type:

List[str]

import pm4py

attribute_names = pm4py.ocel_get_attribute_names(ocel)
pm4py.ocel.ocel_flattening(ocel: OCEL, object_type: str) DataFrame[source]#

Flattens the object-centric event log to a traditional event log with the choice of an object type. In the flattened log, the objects of a given object type are the cases, and each case contains the set of events related to the object.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type

Return type:

pd.DataFrame

import pm4py

event_log = pm4py.ocel_flattening(ocel, 'items')
pm4py.ocel.ocel_object_type_activities(ocel: OCEL) Dict[str, Collection[str]][source]#

Gets the set of activities performed for each object type

Parameters:

ocel (OCEL) – object-centric event log

Return type:

Dict[str, Collection[str]]

import pm4py

ot_activities = pm4py.ocel_object_type_activities(ocel)
pm4py.ocel.ocel_objects_ot_count(ocel: OCEL) Dict[str, Dict[str, int]][source]#

Counts for each event the number of related objects per type

Parameters:

ocel (OCEL) – object-centric event log

Return type:

Dict[str, Dict[str, int]]

import pm4py

objects_ot_count = pm4py.ocel_objects_ot_count(ocel)
pm4py.ocel.ocel_temporal_summary(ocel: OCEL) DataFrame[source]#

Returns the ``temporal summary’’ from an object-centric event log. The temporal summary aggregates all the events performed in the same timestamp, and reports the set of activities and the involved objects.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

pd.DataFrame

import pm4py

temporal_summary = pm4py.ocel_temporal_summary(ocel)
pm4py.ocel.ocel_objects_summary(ocel: OCEL) DataFrame[source]#

Gets the objects summary of an object-centric event log

Parameters:

ocel (OCEL) – object-centric event log

Return type:

pd.DataFrame

import pm4py

objects_summary = pm4py.ocel_objects_summary(ocel)
pm4py.ocel.ocel_objects_interactions_summary(ocel: OCEL) DataFrame[source]#

Gets the objects interactions summary of an object-centric event log. The objects interactions summary has a row for every combination (event, related object, other related object). Properties such as the activity of the event, and the object types of the two related objects, are included.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

interactions_summary = pm4py.ocel_objects_interactions_summary(ocel)
pm4py.ocel.discover_ocdfg(ocel: OCEL, business_hours=False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)]) Dict[str, Any][source]#

Discovers an OC-DFG from an object-centric event log.

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

Reference paper: Berti, Alessandro, and Wil van der Aalst. “Extracting multiple viewpoint models from relational databases.” Data-Driven Process Discovery and Analysis. Springer, Cham, 2018. 24-51.

Parameters:
  • ocel (OCEL) – object-centric event log

  • business_hours (bool) – boolean value that enables the usage of the business hours

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

Return type:

Dict[str, Any]

import pm4py

ocdfg = pm4py.discover_ocdfg(ocel)
pm4py.ocel.discover_oc_petri_net(ocel: OCEL) Dict[str, Any][source]#

Discovers an object-centric Petri net from the provided object-centric event log.

Reference paper: van der Aalst, Wil MP, and Alessandro Berti. “Discovering object-centric Petri nets.” Fundamenta informaticae 175.1-4 (2020): 1-40.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

Dict[str, Any]

import pm4py

ocpn = pm4py.discover_oc_petri_net(ocel)
pm4py.ocel.discover_objects_graph(ocel: OCEL, graph_type: str = 'object_interaction') Set[Tuple[str, str]][source]#

Discovers an object graph from the provided object-centric event log

Parameters:
  • ocel (OCEL) – object-centric event log

  • graph_type (str) – type of graph to consider (object_interaction, object_descendants, object_inheritance, object_cobirth, object_codeath)

Return type:

Dict[str, Any]

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
obj_graph = pm4py.ocel_discover_objects_graph(ocel, graph_type='object_interaction')
pm4py.ocel.sample_ocel_objects(ocel: OCEL, num_objects: int) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the objects that is chosen in a random way. Only the events related to at least one of these objects are filtered from the event log. As a note, the relationships between the different objects are probably going to be ruined by this sampling.

Parameters:
  • ocel (OCEL) – Object-centric event log

  • num_objects (int) – Number of objects of the object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
sampled_ocel = pm4py.sample_ocel_objects(ocel, 50) # keeps only 50 random objects
pm4py.ocel.sample_ocel_connected_components(ocel: OCEL, connected_components: int = 1) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the executions. The number of considered connected components need to be specified by the user.

Parameters:
  • ocel (OCEL) – Object-centric event log

  • connected_components (int) – Number of connected components to pick from the OCEL

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
sampled_ocel = pm4py.sample_ocel_connected_components(ocel, 5) # keeps only 5 connected components
pm4py.ocel.ocel_drop_duplicates(ocel: OCEL) OCEL[source]#

Drop relations between events and objects happening at the same time, with the same activity, to the same object identifier. This ends up cleaning the OCEL from duplicate events.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_drop_duplicates(ocel)
pm4py.ocel.ocel_merge_duplicates(ocel: OCEL, have_common_object: Optional[bool] = False) OCEL[source]#

Merge events in the OCEL that happen with the same activity at the same timestamp

Parameters:
  • ocel (OCEL) – object-centric event log

  • have_common_object – impose the additional merge condition that the two events should happen at the same timestamp.

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_merge_duplicates(ocel)
pm4py.ocel.ocel_sort_by_additional_column(ocel: OCEL, additional_column: str, primary_column: str = 'ocel:timestamp') OCEL[source]#

Sorts the OCEL not only based on the timestamp column and the index, but using an additional sorting column that further determines the order of the events happening at the same timestamp.

Parameters:
  • ocel (OCEL) – object-centric event log

  • additional_column (str) – additional column to use for the sorting

  • primary_column (str) – primary column to be used for the sorting (default: ocel:timestamp)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_sort_by_additional_column(ocel, 'ordering')
pm4py.ocel.ocel_add_index_based_timedelta(ocel: OCEL) OCEL[source]#

Adds a small time-delta to the timestamp column based on the current index of the event. This ensures the correct ordering of the events in any object-centric process mining solution.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_add_index_based_timedelta(ocel)

pm4py.org module#

The pm4py.org module contains the organizational analysis techniques offered in pm4py

pm4py.org.discover_handover_of_work_network(log: Union[EventLog, DataFrame], beta=0, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the handover of work network of the event log. The handover of work network is essentially the DFG of the event log, however, using the resource as a node of the graph, instead of the activity. As such, to use this, resource information should be present in the event log.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • beta (int) – beta parameter for Handover metric

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_handover_of_work_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_working_together_network(log: Union[EventLog, DataFrame], resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the working together network of the process. Two nodes resources are connected in the graph if the resources collaborate on an instance of the process.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_working_together_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_activity_based_resource_similarity(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates similarity between the resources in the event log, based on their activity profiles.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

act_res_sim = pm4py.discover_activity_based_resource_similarity(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_subcontracting_network(log: Union[EventLog, DataFrame], n=2, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the subcontracting network of the process.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • n (int) – n parameter for Subcontracting metric

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_organizational_roles(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Role][source]#

Mines the organizational roles

A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:

Reference paper: Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

roles = pm4py.discover_organizational_roles(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_network_analysis(log: Union[DataFrame, EventLog, EventStream], out_column: str, in_column: str, node_column_source: str, node_column_target: str, edge_column: str, edge_reference: str = '_out', performance: bool = False, sorting_column: str = 'time:timestamp', timestamp_column: str = 'time:timestamp') Dict[Tuple[str, str], Dict[str, Any]][source]#

Performs a network analysis of the log based on the provided parameters.

The classical social network analysis methods are based on the order of the events inside a case. For example, the Handover of Work metric considers the directly-follows relationships between resources during the work of a case. An edge is added between the two resources if such relationships occurs.

Real-life scenarios may be more complicated. At first, is difficult to collect events inside the same case without having convergence/divergence issues (see first section of the OCEL part). At second, the type of relationship may also be important. Consider for example the relationship between two resources: this may be more efficient if the activity that is executed is liked by the resources, rather than disgusted.

The network analysis that we introduce here generalizes some existing social network analysis metrics, becoming independent from the choice of a case notion and permitting to build a multi-graph instead of a simple graph.

With this, we assume events to be linked by signals. An event emits a signal (that is contained as one attribute of the event) that is assumed to be received by other events (also, this is an attribute of these events) that follow the first event in the log. So, we assume there is an OUT attribute (of the event) that is identical to the IN attribute (of the other events).

When we collect this information, we can build the network analysis graph: - The source node of the relation is given by an aggregation over a node_column_source attribute. - The target node of the relation is given by an aggregation over a node_column_target attribute. - The type of edge is given by an aggregation over an edge_column attribute. - The network analysis graph can either be annotated with frequency or performance information.

The output is a multigraph. Two events EV1 and EV2 of the log are merged (indipendently from the case notion) based on having EV1.OUT_COLUMN = EV2.IN_COLUMN. Then, an aggregation is applied on the couple of events (NODE_COLUMN) to obtain the nodes that are connected. The edges between these nodes are aggregated based on some property of the source event (EDGE_COLUMN).

Parameters:
  • log – event log / Pandas dataframe

  • out_column (str) – the source column of the link (default: the case identifier; events of the same case are linked)

  • in_column (str) – the target column of the link (default: the case identifier; events of the same case are linked)

  • node_column_source (str) – the attribute to be used for the node definition of the source event (default: the resource of the log, org:resource)

  • node_column_target (str) – the attribute to be used for the node definition of the target event (default: the resource of the log, org:resource)

  • edge_column (str) – the attribute to be used for the edge definition (default: the activity of the log, concept:name)

  • edge_reference (str) – decide if the edge attribute should be picked from the source event. Values: _out => the source event ; _in => the target event

  • performance (bool) – boolean value that enables the performance calculation on the edges of the network analysis

  • sorting_column (str) – the column that should be used to sort the log before performing the network analysis (default: time:timestamp)

  • timestamp_column (str) – the column that should be used as timestamp for the performance-related analysis (default: time:timestamp)

Return type:

Dict[Tuple[str, str], Dict[str, Any]]

import pm4py

net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')

pm4py.privacy module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.privacy.anonymize_differential_privacy(log: Union[EventLog, DataFrame], epsilon: float = 1.0, k: int = 10, p: int = 20) DataFrame[source]#

Protect event logs with differential privacy. Differential privacy is a guarantee that bounds the impact the data of one individual has on a query result.

Control-flow information is anonymized with SaCoFa. This algorithm inserts noise into a trace-variant count, through the step-wise construction of a prefix tree.

Contextual-information, like timestamps or resources, is anonymized with PRIPEL. This technique enriches a control-flow anonymized event log with contextual information from the original log, while still achieving differential privacy. PRIPEL anonymizes each event’s timestamp and other attributes, that are stored as strings, integers, floats, or booleans.

Please install diffprivlib https://diffprivlib.readthedocs.io/en/latest/ (pip install diffprivlib==0.5.2) to run our algorithm.

SaCoFa is described in: S. A. Fahrenkog-Petersen, M. Kabierski, F. Rösel, H. van der Aa and M. Weidlich, “SaCoFa: Semantics-aware Control-flow Anonymization for Process Mining,” 2021 3rd International Conference on Process Mining (ICPM), 2021, pp. 72-79. https://doi.org/10.48550/arXiv.2109.08501

PRIPEL is described in: Fahrenkrog-Petersen, S.A., van der Aa, H., Weidlich, M. (2020). PRIPEL: Privacy-Preserving Event Log Publishing Including Contextual Information. In: Fahland, D., Ghidini, C., Becker, J., Dumas, M. (eds) Business Process Management. BPM 2020. Lecture Notes in Computer Science, vol 12168. Springer, Cham. https://doi.org/10.1007/978-3-030-58666-9_7

Parameters:
  • log – event log / Pandas dataframe

  • epsilon (float) – the strength of the differential privacy guarantee. The smaller the value of epsilon, the stronger the privacy guarantee that is provided.

  • k (int) – the maximal length of considered traces in the prefix tree. We recommend setting k, that roughly 80% of all traces from the original event log are covered.

  • p (int) – the pruning parameter, which denotes the minimum count a prefix has to have in order to not be discarded. The dependent exponential runtime of the algorithms is mitigated by the pruning parameter.

Return type:

pd.DataFrame

import pm4py

event_log = pm4py.read_xes("running-example.xes")
anonymized_event_log = pm4py.anonymize_differential_privacy(event_log, epsilon=1.0, k=10, p=20)

pm4py.read module#

The pm4py.read module contains all funcationality related to reading files/objects from disk.

pm4py.read.read_xes(file_path: str, variant: str = 'lxml', return_legacy_log_object: bool = False, **kwargs) Union[DataFrame, EventLog][source]#

Reads an event log stored in XES format (see xes-standard) Returns a table (pandas.DataFrame) view of the event log.

Parameters:
  • file_path (str) – file path of the event log (.xes file) on disk

  • variant (str) – the variant of the importer to use. “iterparse” => traditional XML parser; “line_by_line” => text-based line-by-line importer ; “chunk_regex” => chunk-of-bytes importer (default); “iterparse20” => XES 2.0 importer

  • return_legacy_log_object (bool) – boolean value enabling returning a log object (default: False)

Return type:

DataFrame

import pm4py

log = pm4py.read_xes("<path_to_xes_file>")
pm4py.read.read_pnml(file_path: str, auto_guess_final_marking: bool = False) Tuple[PetriNet, Marking, Marking][source]#

Reads a Petri net object from a .pnml file. The Petri net object returned is a triple containing the following objects:

  1. Petrinet Object, encoded as a PetriNet class

  2. Initial Marking

  3. Final Marking

Return type:

Tuple[PetriNet, Marking, Marking]

Parameters:

file_path (str) – file path of the Petri net model (.pnml file) on disk

import pm4py

pn = pm4py.read_pnml("<path_to_pnml_file>")
pm4py.read.read_ptml(file_path: str) ProcessTree[source]#

Reads a process tree object from a .ptml file

Parameters:

file_path (str) – file path of the process tree object on disk

Return type:

ProcessTree

import pm4py

process_tree = pm4py.read_ptml("<path_to_ptml_file>")
pm4py.read.read_dfg(file_path: str) Tuple[Dict[Tuple[str, str], int], Dict[str, int], Dict[str, int]][source]#

Reads a DFG object from a .dfg file. The DFG object returned is a triple containing the following objects:

  1. DFG Object, encoded as a Dict[Tuple[str,str],int], s.t. DFG[('a','b')]=k implies that activity 'a' is directly followed by activity 'b' a total of k times in the log

  2. Start activity dictionary, encoded as a Dict[str,int], s.t., S['a']=k implies that activity 'a' is starting k traces in the event log

  3. End activity dictionary, encoded as a Dict[str,int], s.t., E['z']=k implies that activity 'z' is ending k traces in the event log.

Return type:

Tuple[Dict[Tuple[str,str],int], Dict[str,int], Dict[str,int]]

Parameters:

file_path (str) – file path of the dfg model on disk

import pm4py

dfg = pm4py.read_dfg("<path_to_dfg_file>")
pm4py.read.read_bpmn(file_path: str) BPMN[source]#

Reads a BPMN model from a .bpmn file

Parameters:

file_path (str) – file path of the bpmn model

Return type:

BPMN

import pm4py

bpmn = pm4py.read_bpmn('<path_to_bpmn_file>')
pm4py.read.read_ocel(file_path: str, objects_path: Optional[str] = None) OCEL[source]#

Reads an object-centric event log from a file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log

  • objects_path – [Optional] file path from which the objects dataframe should be read

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel("<path_to_ocel_file>")

Deprecated since version 2.3.0: This will be removed in 3.0.0. the read_ocel function is deprecated and replaced by read_ocel_csv, read_ocel_json and read_ocel_xml, respectively

pm4py.read.read_ocel_csv(file_path: str, objects_path: Optional[str] = None) OCEL[source]#

Reads an object-centric event log from a CSV file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log (.csv)

  • objects_path – [Optional] file path from which the objects dataframe should be read

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_csv("<path_to_ocel_file.csv>")
pm4py.read.read_ocel_json(file_path: str) OCEL[source]#

Reads an object-centric event log from a JSON-OCEL file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:

file_path (str) – file path of the object-centric event log (.jsonocel)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_json("<path_to_ocel_file.jsonocel>")
pm4py.read.read_ocel_xml(file_path: str) OCEL[source]#

Reads an object-centric event log from a XML-OCEL file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:

file_path (str) – file path of the object-centric event log (.xmlocel)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_xml("<path_to_ocel_file.xmlocel>")
pm4py.read.read_ocel_sqlite(file_path: str) OCEL[source]#

Reads an object-centric event log from a SQLite database (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:

file_path (str) – file path of the SQLite database (.sqlite)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_sqlite("<path_to_ocel_file.sqlite>")

pm4py.sim module#

The pm4py.sim module contains the simulation algorithms offered in pm4py

pm4py.sim.play_out(*args: Union[Tuple[PetriNet, Marking, Marking], dict, Counter, ProcessTree], **kwargs) EventLog[source]#

Performs the playout of the provided model, i.e., gets a set of traces from the model. The function either takes a petri net, initial and final marking, or, a process tree as an input.

Parameters:
  • args – model (Petri net with initial and final marking, or process tree)

  • kwargs – dictionary containing the parameters of the playout

Return type:

EventLog

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
log = pm4py.play_out(net, im, fm)
pm4py.sim.generate_process_tree(**kwargs) ProcessTree[source]#

Generates a process tree

Reference paper: PTandLogGenerator: A Generator for Artificial Event Data

Parameters:

kwargs – dictionary containing the parameters of the process tree generator algorithm

Return type:

ProcessTree

import pm4py

process_tree = pm4py.generate_process_tree()

pm4py.stats module#

The pm4py.stats module contains the statistics offered in pm4py

pm4py.stats.get_start_activities(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the start activities from a log object

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

start_activities = pm4py.get_start_activities(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_end_activities(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the end activities of a log

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

end_activities = pm4py.get_end_activities(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_event_attributes(log: Union[EventLog, DataFrame]) List[str][source]#

Returns the attributes at the event level of the log

Parameters:

log – Log object

Return type:

List[str]

import pm4py

event_attributes = pm4py.get_event_attributes(dataframe)
pm4py.stats.get_trace_attributes(log: Union[EventLog, DataFrame]) List[str][source]#

Gets the attributes at the trace level of a log object

Parameters:

log – Log object

Return type:

List[str]

import pm4py

trace_attributes = pm4py.get_trace_attributes(dataframe)
pm4py.stats.get_event_attribute_values(log: Union[EventLog, DataFrame], attribute: str, count_once_per_case=False, case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the values for a specified (event) attribute

Parameters:
  • log – Log object

  • attribute (str) – attribute

  • count_once_per_case (bool) – If True, consider only an occurrence of the given attribute value inside a case (if there are multiple events sharing the same attribute value, count only 1 occurrence)

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

activities = pm4py.get_event_attribute_values(dataframe, 'concept:name', case_id_key='case:concept:name')
pm4py.stats.get_trace_attribute_values(log: Union[EventLog, DataFrame], attribute: str, case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the values for a specified trace attribute

Parameters:
  • log – Log object

  • attribute (str) – Attribute

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

tr_attr_values = pm4py.get_trace_attribute_values(dataframe, 'case:attribute', case_id_key='case:concept:name')
pm4py.stats.get_variants(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str], List[Trace]][source]#

Gets the variants from the log

Parameters:
  • log – Event log

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str], List[Trace]]

import pm4py

variants = pm4py.get_variants(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_variants_as_tuples(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str], List[Trace]][source]#

Gets the variants from the log (where the keys are tuples and not strings)

Parameters:
  • log – Event log

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str], List[Trace]]

import pm4py

variants = pm4py.get_variants_as_tuples(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_stochastic_language(*args, **kwargs) Dict[List[str], float][source]#

Gets the stochastic language from the provided object

Parameters:
  • args – Pandas dataframe / event log / accepting Petri net / process tree

  • kwargs – keyword arguments

Return type:

Dict[List[str], float]

import pm4py

log = pm4py.read_xes('tests/input_data/running-example.xes')
language_log = pm4py.get_stochastic_language(log)
print(language_log)
net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
language_model = pm4py.get_stochastic_language(net, im, fm)
print(language_model)
pm4py.stats.get_minimum_self_distances(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The minimum self distance is the minimal observed self distance value in the event log.

Parameters:
  • log – event log (either pandas.DataFrame, EventLog or EventStream)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

msd = pm4py.get_minimum_self_distances(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_minimum_self_distance_witnesses(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Set[str]][source]#

This function derives the minimum self distance witnesses. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The minimum self distance is the minimal observed self distance value in the event log. A ‘witness’ is an activity that witnesses the minimum self distance. For example, if the minimum self distance of activity a in some log L is 2, then, if trace <a,b,c,a> is in log L, b and c are a witness of a.

Parameters:
  • log – Event Log to use

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, Set[str]]

import pm4py

msd_wit = pm4py.get_minimum_self_distance_witnesses(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_case_arrival_average(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Gets the average difference between the start times of two consecutive cases

Parameters:
  • log – log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

case_arr_avg = pm4py.get_case_arrival_average(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_rework_cases_per_activity(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Find out for which activities of the log the rework (more than one occurrence in the trace for the activity) occurs. The output is a dictionary associating to each of the aforementioned activities the number of cases for which the rework occurred.

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

rework = pm4py.get_rework_cases_per_activity(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_case_overlap(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[int][source]#

Associates to each case in the log the number of cases concurrently open

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[int]

import pm4py

overlap = pm4py.get_case_overlap(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. the get_case_overlap function will be removed in a future release.

pm4py.stats.get_cycle_time(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the cycle time of the event log.

The definition that has been followed is the one proposed in: https://www.presentationeze.com/presentations/lean-manufacturing-just-in-time/lean-manufacturing-just-in-time-full-details/process-cycle-time-analysis/calculate-cycle-time/#:~:text=Cycle%20time%20%3D%20Average%20time%20between,is%2024%20minutes%20on%20average.

So: Cycle time = Average time between completion of units.

Example taken from the website: Consider a manufacturing facility, which is producing 100 units of product per 40 hour week. The average throughput rate is 1 unit per 0.4 hours, which is one unit every 24 minutes. Therefore the cycle time is 24 minutes on average.

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

cycle_time = pm4py.get_cycle_time(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_all_case_durations(log: Union[EventLog, DataFrame], business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[float][source]#

Gets the durations of the cases in the event log

Parameters:
  • log – Event log

  • business_hours (bool) – Enables/disables the computation based on the business hours (default: False)

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

List[float]

import pm4py

case_durations = pm4py.get_all_case_durations(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_case_duration(log: Union[EventLog, DataFrame], case_id: str, business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: Optional[str] = None) float[source]#

Gets the duration of a specific case

Parameters:
  • log – Event log

  • case_id (str) – Case identifier

  • business_hours (bool) – Enables/disables the computation based on the business hours (default: False)

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key – attribute to be used as case identifier

Return type:

float

import pm4py

duration = pm4py.get_case_duration(dataframe, 'case 1', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.stats.get_activity_position_summary(log: Union[EventLog, DataFrame], activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[int, int][source]#

Given an event log, returns a dictionary which summarize the positions of the activities in the different cases of the event log. E.g., if an activity happens 1000 times in the position 1 (the second event of a case), and 500 times in the position 2 (the third event of a case), then the returned dictionary would be: {1: 1000, 2: 500}

Parameters:
  • log – Event log object / Pandas dataframe

  • activity (str) – Activity to consider

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[int, int]

import pm4py

act_pos = pm4py.get_activity_position_summary(dataframe, 'Act. A', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

pm4py.utils module#

pm4py.utils.format_dataframe(df: DataFrame, case_id: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', start_timestamp_key: str = 'start_timestamp', timest_format: Optional[str] = None) DataFrame[source]#

Give the appropriate format on the dataframe, for process mining purposes

Parameters:
  • df (DataFrame) – Dataframe

  • case_id (str) – Case identifier column

  • activity_key (str) – Activity column

  • timestamp_key (str) – Timestamp column

  • start_timestamp_key (str) – Start timestamp column

  • timest_format – Timestamp format that is provided to Pandas

Return type:

pd.DataFrame

import pandas as pd
import pm4py

dataframe = pd.read_csv('event_log.csv')
dataframe = pm4py.format_dataframe(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp', start_timestamp_key='start_timestamp', timest_format='%Y-%m-%d %H:%M:%S')

Deprecated since version 2.3.0: This will be removed in 3.0.0. the format_dataframe function does not need application anymore.

pm4py.utils.rebase(log_obj: Union[EventLog, EventStream, DataFrame], case_id: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', start_timestamp_key: str = 'start_timestamp') Union[EventLog, EventStream, DataFrame][source]#

Re-base the log object, changing the case ID, activity and timestamp attributes.

Parameters:
  • log_obj – Log object

  • case_id (str) – Case identifier

  • activity_key (str) – Activity

  • timestamp_key (str) – Timestamp

  • start_timestamp_key (str) – Start timestamp

Return type:

Union[EventLog, EventStream, pd.DataFrame]

import pm4py

rebased_dataframe = pm4py.rebase(dataframe, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.utils.parse_process_tree(tree_string: str) ProcessTree[source]#

Parse a process tree from a string

Parameters:

tree_string (str) – String representing a process tree (e.g. ‘-> ( ‘A’, O ( ‘B’, ‘C’ ), ‘D’ )’). Operators are ‘->’: sequence, ‘+’: parallel, ‘X’: xor choice, ‘*’: binary loop, ‘O’ or choice

Return type:

ProcessTree

import pm4py

process_tree = pm4py.parse_process_tree('-> ( 'A', O ( 'B', 'C' ), 'D' )')
pm4py.utils.serialize(*args) Tuple[str, bytes][source]#

Serialize a PM4Py object into a bytes string

Parameters:

args – A PM4Py object, among: - an EventLog object - a Pandas dataframe object - a (Petrinet, Marking, Marking) tuple - a ProcessTree object - a BPMN object - a DFG, including the dictionary of the directly-follows relations, the start activities and the end activities

Return type:

Tuple[str, bytes]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe)
serialization = pm4py.serialize(net, im, fm)
pm4py.utils.deserialize(ser_obj: Tuple[str, bytes]) Any[source]#

Deserialize a bytes string to a PM4Py object

Parameters:

ser_obj – Serialized object (a tuple consisting of a string denoting the type of the object, and a bytes string representing the serialization)

Return type:

Any

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe)
serialization = pm4py.serialize(net, im, fm)
net, im, fm = pm4py.deserialize(serialization)
pm4py.utils.get_properties(log, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource', group_key: Optional[str] = None, **kwargs)[source]#

Gets the properties from a log object

Parameters:
  • log – Log object

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – (if provided) attribute to be used as resource

  • group_key – (if provided) attribute to be used as group identifier

Return type:

Dict

pm4py.utils.set_classifier(log, classifier, classifier_attribute='@@classifier')[source]#

Methods to set the specified classifier on an existing event log

Parameters:
  • log – Log object

  • classifier – Classifier that should be set: - A list of event attributes can be provided - A single event attribute can be provided - A classifier stored between the “classifiers” of the log object can be provided

  • classifier_attribute (str) – The attribute of the event that should store the concatenation of the attribute values for the given classifier

Return type:

Union[EventLog, pd.DataFrame]

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.Please use the method-specific arguments.

pm4py.utils.parse_event_log_string(traces: Collection[str], sep: str = ',', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') DataFrame[source]#

Parse a collection of traces expressed as strings (e.g., [“A,B,C,D”, “A,C,B,D”, “A,D”]) to a log object (Pandas dataframe)

Parameters:
  • traces – Collection of traces expressed as strings

  • sep (str) – Separator used to split the activities of a string trace

  • activity_key (str) – The attribute that should be used as activity

  • timestamp_key (str) – The attribute that should be used as timestamp

  • case_id_key (str) – The attribute that should be used as case identifier

Return type:

pd.DataFrame

import pm4py

dataframe = pm4py.parse_event_log_string(["A,B,C,D", "A,C,B,D", "A,D"])
pm4py.utils.project_on_event_attribute(log: Union[EventLog, DataFrame], attribute_key='concept:name') List[List[str]][source]#

Project the event log on a specified event attribute. The result is a list, containing a list for each case: all the cases are transformed to list of values for the specified attribute.

Example:

pm4py.project_on_event_attribute(log, “concept:name”)

[[‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘pay compensation’], [‘register request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘pay compensation’], [‘register request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘reject request’], [‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘pay compensation’], [‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘reinitiate request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reject request’], [‘register request’, ‘check ticket’, ‘examine thoroughly’, ‘decide’, ‘reject request’]]

Parameters:
  • log – Event log / Pandas dataframe

  • attribute_key (str) – The attribute to be used

Return type:

List[List[str]]

import pm4py

list_list_activities = pm4py.project_on_event_attribute(dataframe, 'concept:name')
pm4py.utils.sample_cases(log: Union[EventLog, DataFrame], num_cases: int, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

(Random) Sample a given number of cases from the event log.

Parameters:
  • log – Event log / Pandas dataframe

  • num_cases (int) – Number of cases to sample

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

sampled_dataframe = pm4py.sample_cases(dataframe, 10, case_id_key='case:concept:name')
pm4py.utils.sample_events(log: Union[EventStream, OCEL], num_events: int) Union[EventStream, OCEL, DataFrame][source]#

(Random) Sample a given number of events from the event log.

Parameters:
  • log – Event stream / OCEL / Pandas dataframes

  • num_events (int) – Number of events to sample

  • case_id_key – attribute to be used as case identifier

Return type:

Union[EventStream, OCEL, pd.DataFrame]

import pm4py

sampled_dataframe = pm4py.sample_events(dataframe, 100)

pm4py.vis module#

The pm4py.vis module contains the visualizations offered in pm4py

pm4py.vis.view_petri_net(petri_net: PetriNet, initial_marking: Optional[Marking] = None, final_marking: Optional[Marking] = None, format: str = 'png', bgcolor: str = 'white', decorations: Optional[Dict[Any, Any]] = None)[source]#

Views a (composite) Petri net

Parameters:
  • petri_net (PetriNet) – Petri net

  • initial_marking – Initial marking

  • final_marking – Final marking

  • format (str) – Format of the output picture (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

  • decorations – Decorations (color, label) associated to the elements of the Petri net

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_petri_net(net, im, fm, format='svg')
pm4py.vis.save_vis_petri_net(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, file_path: str, bgcolor: str = 'white', decorations: Optional[Dict[Any, Any]] = None)[source]#

Saves a Petri net visualization to a file

Parameters:
  • petri_net (PetriNet) – Petri net

  • initial_marking (Marking) – Initial marking

  • final_marking (Marking) – Final marking

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

  • decorations – Decorations (color, label) associated to the elements of the Petri net

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_petri_net(net, im, fm, 'petri_net.png')
pm4py.vis.view_performance_dfg(dfg: dict, start_activities: dict, end_activities: dict, format: str = 'png', aggregation_measure='mean', bgcolor: str = 'white')[source]#

Views a performance DFG

Parameters:
  • dfg (dict) – DFG object

  • start_activities (dict) – Start activities

  • end_activities (dict) – End activities

  • format (str) – Format of the output picture (default: png)

  • aggregation_measure (str) – Aggregation measure (default: mean): mean, median, min, max, sum, stdev

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.view_performance_dfg(performance_dfg, start_activities, end_activities, format='svg')
pm4py.vis.save_vis_performance_dfg(dfg: dict, start_activities: dict, end_activities: dict, file_path: str, aggregation_measure='mean', bgcolor: str = 'white')[source]#

Saves the visualization of a performance DFG

Parameters:
  • dfg (dict) – DFG object

  • start_activities (dict) – Start activities

  • end_activities (dict) – End activities

  • file_path (str) – Destination path

  • aggregation_measure (str) – Aggregation measure (default: mean): mean, median, min, max, sum, stdev

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_performance_dfg(performance_dfg, start_activities, end_activities, 'perf_dfg.png')
pm4py.vis.view_dfg(dfg: dict, start_activities: dict, end_activities: dict, format: str = 'png', bgcolor: str = 'white')[source]#

Views a (composite) DFG

Parameters:
  • dfg (dict) – DFG object

  • start_activities (dict) – Start activities

  • end_activities (dict) – End activities

  • format (str) – Format of the output picture (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.view_dfg(dfg, start_activities, end_activities, format='svg')
pm4py.vis.save_vis_dfg(dfg: dict, start_activities: dict, end_activities: dict, file_path: str, bgcolor: str = 'white')[source]#

Saves a DFG visualization to a file

Parameters:
  • dfg (dict) – DFG object

  • start_activities (dict) – Start activities

  • end_activities (dict) – End activities

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_dfg(dfg, start_activities, end_activities, 'dfg.png')
pm4py.vis.view_process_tree(tree: ProcessTree, format: str = 'png', bgcolor: str = 'white')[source]#

Views a process tree

Parameters:
  • tree (ProcessTree) – Process tree

  • format (str) – Format of the visualization (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_process_tree(process_tree, format='svg')
pm4py.vis.save_vis_process_tree(tree: ProcessTree, file_path: str, bgcolor: str = 'white')[source]#

Saves the visualization of a process tree

Parameters:
  • tree (ProcessTree) – Process tree

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_process_tree(process_tree, 'process_tree.png')
pm4py.vis.save_vis_bpmn(bpmn_graph: BPMN, file_path: str, bgcolor: str = 'white')[source]#

Saves the visualization of a BPMN graph

Parameters:
  • bpmn_graph (BPMN) – BPMN graph

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_bpmn(bpmn_graph, 'trial.bpmn')
pm4py.vis.view_bpmn(bpmn_graph: BPMN, format: str = 'png', bgcolor: str = 'white')[source]#

Views a BPMN graph

Parameters:
  • bpmn_graph (BPMN) – BPMN graph

  • format (str) – Format of the visualization (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_bpmn(bpmn_graph)
pm4py.vis.view_heuristics_net(heu_net: HeuristicsNet, format: str = 'png', bgcolor: str = 'white')[source]#

Views an heuristics net

Parameters:
  • heu_net (HeuristicsNet) – Heuristics net

  • format (str) – Format of the visualization (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_heuristics_net(heu_net, format='svg')
pm4py.vis.save_vis_heuristics_net(heu_net: HeuristicsNet, file_path: str, bgcolor: str = 'white')[source]#

Saves the visualization of an heuristics net

Parameters:
  • heu_net (HeuristicsNet) – Heuristics net

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_heuristics_net(heu_net, 'heu.png')
pm4py.vis.view_dotted_chart(log: Union[EventLog, DataFrame], format: str = 'png', attributes=None, bgcolor: str = 'white')[source]#

Displays the dotted chart

The dotted chart is a classic visualization of the events inside an event log across different dimensions. Each event of the event log is corresponding to a point. The dimensions are projected on a graph having: - X axis: the values of the first dimension are represented there. - Y-axis: the values of the second dimension are represented there. - Color: the values of the third dimension are represented as different colors for the points of the dotted chart.

The values can be either string, numeric or date values, and are managed accordingly by the dotted chart. The dotted chart can be built on different attributes. A convenient choice for the dotted chart is to visualize the distribution of cases and events over the time, with the following choices: - X-axis: the timestamp of the event. - Y-axis: the index of the case inside the event log. - Color: the activity of the event.

The aforementioned choice permits to identify visually patterns such as: - Batches. - Variations in the case arrival rate. - Variations in the case finishing rate.

Parameters:
  • log – Event log

  • format (str) – Image format

  • attributes – Attributes that should be used to construct the dotted chart. If None, the default dotted chart will be shown: x-axis: time y-axis: cases (in order of occurrence in the event log) color: activity. For custom attributes, use a list of attributes of the form [x-axis attribute, y-axis attribute, color attribute], e.g., [“concept:name”, “org:resource”, “concept:name”])

import pm4py

pm4py.view_dotted_chart(dataframe, format='svg')
pm4py.view_dotted_chart(dataframe, attributes=['time:timestamp', 'concept:name', 'org:resource'])

Deprecated since version 2.3.0: This will be removed in 3.0.0. the dotted chart visualization will be removed in a future release.

pm4py.vis.save_vis_dotted_chart(log: Union[EventLog, DataFrame], file_path: str, attributes=None, bgcolor: str = 'white')[source]#

Saves the visualization of the dotted chart

The dotted chart is a classic visualization of the events inside an event log across different dimensions. Each event of the event log is corresponding to a point. The dimensions are projected on a graph having: - X axis: the values of the first dimension are represented there. - Y-axis: the values of the second dimension are represented there. - Color: the values of the third dimension are represented as different colors for the points of the dotted chart.

The values can be either string, numeric or date values, and are managed accordingly by the dotted chart. The dotted chart can be built on different attributes. A convenient choice for the dotted chart is to visualize the distribution of cases and events over the time, with the following choices: - X-axis: the timestamp of the event. - Y-axis: the index of the case inside the event log. - Color: the activity of the event.

The aforementioned choice permits to identify visually patterns such as: - Batches. - Variations in the case arrival rate. - Variations in the case finishing rate.

Parameters:
  • log – Event log

  • file_path (str) – Destination path

  • attributes – Attributes that should be used to construct the dotted chart (for example, [“concept:name”, “org:resource”])

import pm4py

pm4py.save_vis_dotted_chart(dataframe, 'dotted.png', attributes=['time:timestamp', 'concept:name', 'org:resource'])

Deprecated since version 2.3.0: This will be removed in 3.0.0. the dotted chart visualization will be removed in a future release.

pm4py.vis.view_sna(sna_metric: SNA)[source]#

Represents a SNA metric (.html)

Parameters:

sna_metric (SNA) – Values of the metric

import pm4py

metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.view_sna(metric)
pm4py.vis.save_vis_sna(sna_metric: SNA, file_path: str)[source]#

Saves the visualization of a SNA metric in a .html file

Parameters:
  • sna_metric (SNA) – Values of the metric

  • file_path (str) – Destination path

import pm4py

metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.save_vis_sna(metric, 'sna.png')
pm4py.vis.view_case_duration_graph(log: Union[EventLog, DataFrame], format: str = 'png', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Visualizes the case duration graph

Parameters:
  • log – Log object

  • format (str) – Format of the visualization (png, svg, …)

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.view_case_duration_graph(dataframe, format='svg', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.save_vis_case_duration_graph(log: Union[EventLog, DataFrame], file_path: str, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Saves the case duration graph in the specified path

Parameters:
  • log – Log object

  • file_path (str) – Destination path

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.save_vis_case_duration_graph(dataframe, 'duration.png', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.view_events_per_time_graph(log: Union[EventLog, DataFrame], format: str = 'png', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Visualizes the events per time graph

Parameters:
  • log – Log object

  • format (str) – Format of the visualization (png, svg, …)

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.view_events_per_time_graph(dataframe, format='svg', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.save_vis_events_per_time_graph(log: Union[EventLog, DataFrame], file_path: str, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Saves the events per time graph in the specified path

Parameters:
  • log – Log object

  • file_path (str) – Destination path

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.save_vis_events_per_time_graph(dataframe, 'ev_time.png', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.view_performance_spectrum(log: Union[EventLog, DataFrame], activities: List[str], format: str = 'png', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', bgcolor: str = 'white')[source]#

Displays the performance spectrum

The performance spectrum is a novel visualization of the performance of the process of the time elapsed between different activities in the process executions. The performance spectrum has initially been described in:

Denisov, Vadim, et al. “The Performance Spectrum Miner: Visual Analytics for Fine-Grained Performance Analysis of Processes.” BPM (Dissertation/Demos/Industry). 2018.

Parameters:
  • perf_spectrum – Performance spectrum

  • format (str) – Format of the visualization (png, svg …)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • activity_key – attribute to be used as activity

  • case_id_key – attribute to be used as case identifier

  • timestamp_key – attribute to be used as timestamp

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

pm4py.view_performance_spectrum(dataframe, ['Act. A', 'Act. C', 'Act. D'], format='svg', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. the performance spectrum visualization will be removed in a future release.

pm4py.vis.save_vis_performance_spectrum(log: Union[EventLog, DataFrame], activities: List[str], file_path: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', bgcolor: str = 'white')[source]#

Saves the visualization of the performance spectrum to a file

The performance spectrum is a novel visualization of the performance of the process of the time elapsed between different activities in the process executions. The performance spectrum has initially been described in:

Denisov, Vadim, et al. “The Performance Spectrum Miner: Visual Analytics for Fine-Grained Performance Analysis of Processes.” BPM (Dissertation/Demos/Industry). 2018.

Parameters:
  • log – Event log

  • activities – List of activities (in order) that is used to build the performance spectrum

  • file_path (str) – Destination path (including the extension)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

pm4py.save_vis_performance_spectrum(dataframe, ['Act. A', 'Act. C', 'Act. D'], 'perf_spec.png', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. the performance spectrum visualization will be removed in a future release.

pm4py.vis.view_events_distribution_graph(log: Union[EventLog, DataFrame], distr_type: str = 'days_week', format='png', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Shows the distribution of the events in the specified dimension

Observing the distribution of events over time permits to infer useful information about the work shifts, the working days, and the period of the year that are more or less busy.

Parameters:
  • log – Event log

  • distr_type (str) – Type of distribution (default: days_week): - days_month => Gets the distribution of the events among the days of a month (from 1 to 31) - months => Gets the distribution of the events among the months (from 1 to 12) - years => Gets the distribution of the events among the years of the event log - hours => Gets the distribution of the events among the hours of a day (from 0 to 23) - days_week => Gets the distribution of the events among the days of a week (from Monday to Sunday) - weeks => Gets the distribution of the events among the weeks of a year (from 0 to 52)

  • format (str) – Format of the visualization (default: png)

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.view_events_distribution_graph(dataframe, format='svg', distr_type='days_week', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.save_vis_events_distribution_graph(log: Union[EventLog, DataFrame], file_path: str, distr_type: str = 'days_week', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')[source]#

Saves the distribution of the events in a picture file

Observing the distribution of events over time permits to infer useful information about the work shifts, the working days, and the period of the year that are more or less busy.

Parameters:
  • log – Event log

  • file_path (str) – Destination path (including the extension)

  • distr_type (str) – Type of distribution (default: days_week): - days_month => Gets the distribution of the events among the days of a month (from 1 to 31) - months => Gets the distribution of the events among the months (from 1 to 12) - years => Gets the distribution of the events among the years of the event log - hours => Gets the distribution of the events among the hours of a day (from 0 to 23) - days_week => Gets the distribution of the events among the days of a week (from Monday to Sunday)

  • activity_key (str) – attribute to be used as activity

  • case_id_key (str) – attribute to be used as case identifier

  • timestamp_key (str) – attribute to be used as timestamp

import pm4py

pm4py.save_vis_events_distribution_graph(dataframe, 'ev_distr_graph.png', distr_type='days_week', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.vis.view_ocdfg(ocdfg: Dict[str, Any], annotation: str = 'frequency', act_metric: str = 'events', edge_metric='event_couples', act_threshold: int = 0, edge_threshold: int = 0, performance_aggregation: str = 'mean', format: str = 'png', bgcolor: str = 'white')[source]#

Views an OC-DFG (object-centric directly-follows graph) with the provided configuration.

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

Parameters:
  • ocdfg – Object-centric directly-follows graph

  • annotation (str) – The annotation to use for the visualization. Values: - “frequency”: frequency annotation - “performance”: performance annotation

  • act_metric (str) – The metric to use for the activities. Available values: - “events” => number of events (default) - “unique_objects” => number of unique objects - “total_objects” => number of total objects

  • edge_metric (str) – The metric to use for the edges. Available values: - “event_couples” => number of event couples (default) - “unique_objects” => number of unique objects - “total_objects” => number of total objects

  • act_threshold (int) – The threshold to apply on the activities frequency (default: 0). Only activities having a frequency >= than this are kept in the graph.

  • edge_threshold (int) – The threshold to apply on the edges frequency (default 0). Only edges having a frequency >= than this are kept in the graph.

  • performance_aggregation (str) – The aggregation measure to use for the performance: mean, median, min, max, sum

  • format (str) – The format of the output visualization (default: “png”)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

ocdfg = pm4py.discover_ocdfg(ocel)
pm4py.view_ocdfg(ocdfg, annotation='frequency', format='svg')
pm4py.vis.save_vis_ocdfg(ocdfg: Dict[str, Any], file_path: str, annotation: str = 'frequency', act_metric: str = 'events', edge_metric='event_couples', act_threshold: int = 0, edge_threshold: int = 0, performance_aggregation: str = 'mean', bgcolor: str = 'white')[source]#

Saves the visualization of an OC-DFG (object-centric directly-follows graph) with the provided configuration.

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

Parameters:
  • ocdfg – Object-centric directly-follows graph

  • file_path (str) – Destination path (including the extension)

  • annotation (str) – The annotation to use for the visualization. Values: - “frequency”: frequency annotation - “performance”: performance annotation

  • act_metric (str) – The metric to use for the activities. Available values: - “events” => number of events (default) - “unique_objects” => number of unique objects - “total_objects” => number of total objects

  • edge_metric (str) – The metric to use for the edges. Available values: - “event_couples” => number of event couples (default) - “unique_objects” => number of unique objects - “total_objects” => number of total objects

  • act_threshold (int) – The threshold to apply on the activities frequency (default: 0). Only activities having a frequency >= than this are kept in the graph.

  • edge_threshold (int) – The threshold to apply on the edges frequency (default 0). Only edges having a frequency >= than this are kept in the graph.

  • performance_aggregation (str) – The aggregation measure to use for the performance: mean, median, min, max, sum

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

ocdfg = pm4py.discover_ocdfg(ocel)
pm4py.save_vis_ocdfg(ocdfg, 'ocdfg.png', annotation='frequency')
pm4py.vis.view_ocpn(ocpn: Dict[str, Any], format: str = 'png', bgcolor: str = 'white')[source]#

Visualizes on the screen the object-centric Petri net

Parameters:
  • ocpn – Object-centric Petri net

  • format (str) – Format of the visualization (default: png)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

ocpn = pm4py.discover_oc_petri_net(ocel)
pm4py.view_ocpn(ocpn, format='svg')
pm4py.vis.save_vis_ocpn(ocpn: Dict[str, Any], file_path: str, bgcolor: str = 'white')[source]#

Saves the visualization of the object-centric Petri net into a file

Parameters:
  • ocpn – Object-centric Petri net

  • file_path (str) – Target path of the visualization

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

ocpn = pm4py.discover_oc_petri_net(ocel)
pm4py.save_vis_ocpn(ocpn, 'ocpn.png')
pm4py.vis.view_network_analysis(network_analysis: Dict[Tuple[str, str], Dict[str, Any]], variant: str = 'frequency', format: str = 'png', activity_threshold: int = 1, edge_threshold: int = 1, bgcolor: str = 'white')[source]#

Visualizes the network analysis

Parameters:
  • network_analysis – Network analysis

  • variant (str) – Variant of the visualization: - frequency (if the discovered network analysis contains the frequency of the interactions) - performance (if the discovered network analysis contains the performance of the interactions)

  • format (str) – Format of the visualization (default: png)

  • activity_threshold (int) – The minimum number of occurrences for an activity to be included (default: 1)

  • edge_threshold (int) – The minimum number of occurrences for an edge to be included (default: 1)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')
pm4py.view_network_analysis(net_ana, format='svg')
pm4py.vis.save_vis_network_analysis(network_analysis: Dict[Tuple[str, str], Dict[str, Any]], file_path: str, variant: str = 'frequency', activity_threshold: int = 1, edge_threshold: int = 1, bgcolor: str = 'white')[source]#

Saves the visualization of the network analysis

Parameters:
  • network_analysis – Network analysis

  • file_path (str) – Target path of the visualization

  • variant (str) – Variant of the visualization: - frequency (if the discovered network analysis contains the frequency of the interactions) - performance (if the discovered network analysis contains the performance of the interactions)

  • activity_threshold (int) – The minimum number of occurrences for an activity to be included (default: 1)

  • edge_threshold (int) – The minimum number of occurrences for an edge to be included (default: 1)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')
pm4py.save_vis_network_analysis(net_ana, 'net_ana.png')
pm4py.vis.view_transition_system(transition_system: TransitionSystem, format: str = 'png', bgcolor: str = 'white')[source]#

Views a transition system

Parameters:
  • transition_system (TransitionSystem) – Transition system

  • format (str) – Format of the visualization (png, svg, …)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_transition_system(transition_system, format='svg')
pm4py.vis.save_vis_transition_system(transition_system: TransitionSystem, file_path: str, bgcolor: str = 'white')[source]#

Persists the visualization of a transition system

Parameters:
  • transition_system (TransitionSystem) – Transition system

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_transition_system(transition_system, 'trans_system.png')
pm4py.vis.view_prefix_tree(trie: Trie, format: str = 'png', bgcolor: str = 'white')[source]#

Views a prefix tree

Parameters:
  • prefix_tree – Prefix tree

  • format (str) – Format of the visualization (png, svg, …)

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.view_prefix_tree(prefix_tree, format='svg')
pm4py.vis.save_vis_prefix_tree(trie: Trie, file_path: str, bgcolor: str = 'white')[source]#

Persists the visualization of a prefix tree

Parameters:
  • prefix_tree – Prefix tree

  • file_path (str) – Destination path

  • bgcolor (str) – Background color of the visualization (default: white)

import pm4py

prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.save_vis_prefix_tree(prefix_tree, 'trie.png')
pm4py.vis.view_alignments(log: Union[EventLog, DataFrame], aligned_traces: List[Dict[str, Any]], format: str =