# pm4py package#

Process mining for Python

## pm4py.analysis module#

pm4py.analysis.construct_synchronous_product_net(trace: Trace, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) Tuple[PetriNet, Marking, Marking][source]#

constructs the synchronous product net between a trace and a Petri net process model.

Parameters:
Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

sync_net, sync_im, sync_fm = pm4py.construct_synchronous_product_net(log[0], net, im, fm)
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.compute_emd(language1: Dict[List[str], float], language2: Dict[List[str], float]) float[source]#

Computes the earth mover distance between two stochastic languages (for example, the first extracted from the log, and the second extracted from the process model.

Parameters:
• language1 – (first) stochastic language

• language2 – (second) stochastic language

Return type:

`float`

```import pm4py

language_log = pm4py.get_stochastic_language(log)
print(language_log)
language_model = pm4py.get_stochastic_language(net, im, fm)
print(language_model)
emd_distance = pm4py.compute_emd(language_log, language_model)
print(emd_distance)
```
pm4py.analysis.solve_marking_equation(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, cost_function: Optional[Dict[Transition, float]] = None) float[source]#

Solves the marking equation of a Petri net. The marking equation is solved as an ILP problem. An optional transition-based cost function to minimize can be provided as well.

Parameters:
Return type:

`float`

```import pm4py

heuristic = pm4py.solve_marking_equation(net, im, fm)
```
pm4py.analysis.solve_extended_marking_equation(trace: Trace, sync_net: PetriNet, sync_im: Marking, sync_fm: Marking, split_points: Optional[List[int]] = None) float[source]#

Gets an heuristics value (underestimation of the cost of an alignment) between a trace and a synchronous product net using the extended marking equation with the standard cost function (e.g. sync moves get cost equal to 0, invisible moves get cost equal to 1, other move on model / move on log get cost equal to 10000), with an optimal provisioning of the split points

Parameters:
Return type:

`float`

```import pm4py

ext_mark_eq_heu = pm4py.solve_extended_marking_equation(log[0], net, im, fm)
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.check_soundness(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) bool[source]#

Check if a given Petri net is a sound WF-net. A Petri net is a WF-net iff:

• it has a unique source place

• it has a unique end place

• every element in the WF-net is on a path from the source to the sink place

A WF-net is sound iff:
• it contains no live-locks

• we are able to always reach the final marking

For a formal definition of sound WF-net, consider: http://www.padsweb.rwth-aachen.de/wvdaalst/publications/p628.pdf

Parameters:
Return type:

`bool`

```import pm4py

is_sound = pm4py.check_soundness(net, im, fm)
```
pm4py.analysis.cluster_log(log: Union[EventLog, EventStream, DataFrame], sklearn_clusterer=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Generator[EventLog, None, None][source]#

Apply clustering to the provided event log (method based on the extraction of profiles for the traces of the event log) based on a Scikit-Learn clusterer (default: K-means with two clusters)

Parameters:
• log – log object

• sklearn_clusterer – the Scikit-Learn clusterer to be used (default: KMeans(n_clusters=2, random_state=0, n_init=”auto”))

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Generator[pd.DataFrame, None, None]`

```import pm4py

print(clust_log)
```
pm4py.analysis.insert_artificial_start_end(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Inserts the artificial start/end activities in an event log / Pandas dataframe

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

dataframe = pm4py.insert_artificial_start_end(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.analysis.insert_case_service_waiting_time(log: Union[EventLog, DataFrame], service_time_column: str = '@@service_time', sojourn_time_column: str = '@@sojourn_time', waiting_time_column: str = '@@waiting_time', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts the service/waiting/sojourn times of the case in the dataframe.

Parameters:
• log – event log / Pandas dataframe

• service_time_column (`str`) – column to be used for the service time

• sojourn_time_column (`str`) – column to be used for the sojourn time

• waiting_time_column (`str`) – column to be used for the waiting time

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• start_timestamp_key (`str`) – attribute to be used as start timestamp

Return type:

`pd.DataFrame`

```import pm4py

dataframe = pm4py.insert_case_service_waiting_time(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
```
pm4py.analysis.insert_case_arrival_finish_rate(log: Union[EventLog, DataFrame], arrival_rate_column='@@arrival_rate', finish_rate_column='@@finish_rate', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts the arrival/finish rates of the case in the dataframe. The arrival rate is computed as the difference between the start time of the case and the start time of the previous case to start. The finish rate is computed as the difference between the end time of the case and the end time of the next case to end.

Parameters:
• log – event log / Pandas dataframe

• arrival_rate_column (`str`) – column to be used for the arrival rate

• finish_rate_column (`str`) – column to be used for the finish rate

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• start_timestamp_key (`str`) – attribute to be used as start timestamp

Return type:

`pd.DataFrame`

```import pm4py

dataframe = pm4py.insert_case_arrival_finish_rate(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
```
pm4py.analysis.check_is_workflow_net(net: PetriNet) bool[source]#

Checks if the input Petri net satisfies the WF-net conditions: 1. unique source place 2. unique sink place 3. every node is on a path from the source to the sink

Parameters:

net (`PetriNet`) – petri net

Return type:

`bool`

```import pm4py

is_wfnet = pm4py.check_is_workflow_net(net, im, fm)
```
pm4py.analysis.maximal_decomposition(net: PetriNet, im: Marking, fm: Marking) List[Tuple[PetriNet, Marking, Marking]][source]#

Calculate the maximal decomposition of an accepting Petri net.

Parameters:
Return type:

`List[Tuple[PetriNet, Marking, Marking]]`

```import pm4py

list_nets = pm4py.maximal_decomposition(net, im, fm)
for anet in list_nets:
subnet, subim, subfm = anet
pm4py.view_petri_net(subnet, subim, subfm, format='svg')
```
pm4py.analysis.generate_marking(net: PetriNet, place_or_dct_places: Union[str, Place, Dict[str, int], Dict[Place, int]]) [source]#

Generate a marking for a given Petri net

Parameters:
• net (`PetriNet`) – petri net

• place_or_dct_places – place, or dictionary of places, to be used in the marking. Possible values: single Place object for the marking; name of the place for the marking; dictionary associating to each place its number of tokens; dictionary associating to names of places a number of tokens.

Return type:

`Marking`

```import pm4py

marking = pm4py.generate_marking(net, {'source': 2})
```
pm4py.analysis.reduce_petri_net_invisibles(net: PetriNet) [source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:

net (`PetriNet`) – petri net

Return type:

`PetriNet`

```import pm4py

net = pm4py.reduce_petri_net_invisibles(net)
```
pm4py.analysis.reduce_petri_net_implicit_places(net: PetriNet, im: Marking, fm: Marking) Tuple[PetriNet, Marking, Marking][source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:
Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net = pm4py.reduce_petri_net_implicit_places(net, im, fm)
```

## pm4py.cli module#

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.cli.cli_interface()[source]#

## pm4py.conformance module#

The `pm4py.conformance` module contains the conformance checking algorithms implemented in `pm4py`

pm4py.conformance.conformance_diagnostics_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Dict[str, Any]][source]#

Apply token-based replay for conformance checking analysis. The methods return the full token-based-replay diagnostics.

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:

• trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.

• activated_transitions: list of transitions activated in the model by the token-based replay.

• reached_marking: marking reached at the end of the replay.

• missing_tokens: number of missing tokens.

• consumed_tokens: number of consumed tokens.

• remaining_tokens: number of remaining tokens.

• produced_tokens: number of produced tokens.

Parameters:
• log – event log

• petri_net (`PetriNet`) – petri net

• initial_marking (`Marking`) – initial marking

• final_marking (`Marking`) – final marking

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• return_diagnostics_dataframe (`bool`) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

`List[Dict[str, Any]]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
tbr_diagnostics = pm4py.conformance_diagnostics_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.conformance_diagnostics_alignments(log: Union[EventLog, DataFrame], *args, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', variant_str: Optional[str] = None, return_diagnostics_dataframe: bool = False) List[Dict[str, Any]][source]#

Apply the alignments algorithm between a log and a process model. The methods return the full alignment diagnostics.

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

• Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

• Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

• Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
• Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

• Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

With each trace, a dictionary containing among the others the following information is associated:

alignment: contains the alignment (sync moves, moves on log, moves on model) cost: contains the cost of the alignment according to the provided cost function fitness: is equal to 1 if the trace is perfectly fitting.

Parameters:
• log – event log

• args – specification of the process model

• multi_processing (`bool`) – boolean value that enables the multiprocessing

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• variant_str – variant specification (for Petri net alignments)

• return_diagnostics_dataframe (`bool`) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

`List[Dict[str, Any]]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
alignments_diagnostics = pm4py.conformance_diagnostics_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.fitness_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float][source]#

Calculates the fitness using token-based replay. The fitness is calculated on a log-based level.

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution

Parameters:
• log – event log

• petri_net (`PetriNet`) – petri net

• initial_marking (`Marking`) – initial marking

• final_marking (`Marking`) – final marking

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, float]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_tbr = pm4py.fitness_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.fitness_alignments(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float][source]#

Calculates the fitness using alignments

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

• Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

• Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

• Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
• Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

• Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.

Parameters:
• log – event log

• petri_net (`PetriNet`) – petri net

• initial_marking (`Marking`) – initial marking

• final_marking (`Marking`) – final marking

• multi_processing (`bool`) – boolean value that enables the multiprocessing

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, float]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_alignments = pm4py.fitness_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.precision_token_based_replay(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision precision using token-based replay

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The reference paper for the TBR-based precision (ETConformance) is: Muñoz-Gama, Jorge, and Josep Carmona. “A fresh look at precision in process conformance.” International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
• log – event log

• petri_net (`PetriNet`) – petri net

• initial_marking (`Marking`) – initial marking

• final_marking (`Marking`) – final marking

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`float`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_tbr = pm4py.precision_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.precision_alignments(log: Union[EventLog, DataFrame], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision of the model w.r.t. the event log using alignments

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

• Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

• Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

• Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
• Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

• Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The reference paper for the alignments-based precision (Align-ETConformance) is: Adriansyah, Arya, et al. “Measuring precision of modeled behavior.” Information systems and e-Business Management 13.1 (2015): 37-67

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
• log – event log

• petri_net (`PetriNet`) – petri net

• initial_marking (`Marking`) – initial marking

• final_marking (`Marking`) – final marking

• multi_processing (`bool`) – boolean value that enables the multiprocessing

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`float`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_alignments = pm4py.precision_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.conformance_diagnostics_footprints(*args) Union[List[Dict[str, Any]], Dict[str, Any]][source]#

Provide conformance checking diagnostics using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

`Union[List[Dict[str, Any]], Dict[str, Any]]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
footprints_diagnostics = pm4py.conformance_diagnostics_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.fitness_footprints(*args) Dict[str, float][source]#

Calculates fitness using footprints. The output is a dictionary containing two keys: - perc_fit_traces => percentage of fit traces (over the log) - log_fitness => the fitness value over the log

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

`Dict[str, float]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_fp = pm4py.fitness_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.precision_footprints(*args) float[source]#

Calculates precision using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

`float`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_fp = pm4py.precision_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.check_is_fitting(*args, activity_key='concept:name') bool[source]#

Checks if a trace object is fit against a process model

Parameters:

args – arguments (trace object; process model (process tree, petri net, BPMN))

Return type:

`bool`

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.conformance.conformance_temporal_profile(log: Union[EventLog, DataFrame], temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], zeta: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[List[Tuple[float, float, float, float]]][source]#

Performs conformance checking on the provided log with the provided temporal profile. The result is a list of time-based deviations for every case. E.g. if the log on top of which the conformance is applied is the following (1 case): A (timestamp: 2000-01) B (timestamp: 2002-01) The difference between the timestamps of A and B is two years. If the temporal profile: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)} is specified, and zeta is set to 1, then the aforementioned case would be deviating (considering the couple of activities (‘A’, ‘B’)), because 2 years > 1.5 months + 0.5 months.

Parameters:
• log – log object

• temporal_profile – temporal profile. E.g., if the log has two cases: A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06); A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03); The temporal profile will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

• zeta (`float`) – number of standard deviations allowed from the average. E.g. zeta=1 allows every timestamp between AVERAGE-STDEV and AVERAGE+STDEV.

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• return_diagnostics_dataframe (`bool`) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

`List[List[Tuple[float, float, float, float]]]`

```import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_temporal_profile = pm4py.conformance_temporal_profile(dataframe, temporal_profile, zeta=1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.conformance.conformance_log_skeleton(log: Union[EventLog, DataFrame], log_skeleton: Dict[str, Any], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Set[Any]][source]#

Performs conformance checking using the log skeleton

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

• “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

in the history of the case. For example, ‘C should always be preceded by A’

• “always_after”: specifies that some activities should always trigger the execution of some other activities

in the future history of the case. For example, ‘A should always be followed by C’

• “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

a case. For example, ‘B and C should always happen the same number of times’.

• “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

For example, ‘there should be no case containing both C and D’.

• “activ_occurrences”: specifies the allowed number of occurrences per activity:

E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Parameters:
• log – log object

• log_skeleton – log skeleton object, expressed as dictionaries of the six constraints (never_together, always_before …) along with the discovered rules.

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• return_diagnostics_dataframe (`bool`) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

`List[Set[Any]]`

```import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_lsk = pm4py.conformance_log_skeleton(dataframe, log_skeleton, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

## pm4py.connectors module#

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.connectors.extract_log_outlook_mails() DataFrame[source]#

Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer.

CASE ID (case:concept:name) => identifier of the conversation ACTIVITY (concept:name) => activity that is performed in the current item (send e-mail, receive e-mail,

refuse meeting …)

TIMESTAMP (time:timestamp) => timestamp of creation of the item in Outlook RESOURCE (org:resource) => sender of the current item

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_outlook_calendar(email_user: Optional[str] = None, calendar_id: int = 9) DataFrame[source]#

Extracts the history of the calendar events (creation, update, start, end) in a Pandas dataframe from the local Outlook instance running on the current computer.

CASE ID (case:concept:name) => identifier of the meeting ACTIVITY (concept:name) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (time:timestamp) => the timestamp of the event case:subject => the subject of the meeting

Parameters:
• email_user – (optional) e-mail address from which the (shared) calendar should be extracted

• calendar_id (`int`) – identifier of the calendar for the given user (default: 9)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_windows_events() DataFrame[source]#

Extract a process mining dataframe from all the events recorded in the Windows registry.

CASE ID (case:concept:name) => name of the computer emitting the events. ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier

TIMESTAMP (time:timestamp) => timestamp of generation of the event RESOURCE (org:resource) => username involved in the event

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_chrome_history(history_db_path: Optional[str] = None) DataFrame[source]#

CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit

Parameters:

history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_firefox_history(history_db_path: Optional[str] = None) DataFrame[source]#

Extracts a dataframe containing the navigation history of Mozilla Firefox. Please keep Google Chrome history closed when extracting.

CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit

Parameters:

history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: Optional[str] = None) DataFrame[source]#

Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.

Parameters:
• owner (`str`) – owner of the repository (e.g., pm4py)

• repo (`str`) – name of the repository (e.g., pm4py-core)

• auth_token – authorization token

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_camunda_workflow(connection_string: str) DataFrame[source]#

Extracts a dataframe from the Camunda workflow system. Aside from the traditional columns, the processID of the process in Camunda is returned.

Parameters:

connection_string (`str`) – ODBC connection string to the Camunda database

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_sap_o2c(connection_string: str, prefix: str = '') DataFrame[source]#

Extracts a dataframe for the SAP O2C process.

Parameters:
• connection_string (`str`) – ODBC connection string to the SAP database

• prefix (`str`) – prefix for the tables (example: SAPSR3.)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_log_sap_accounting(connection_string: str, prefix: str = '') DataFrame[source]#

Extracts a dataframe for the SAP Accounting process.

Parameters:
• connection_string (`str`) – ODBC connection string to the SAP database

• prefix (`str`) – prefix for the tables (example: SAPSR3.)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_ocel_outlook_mails() OCEL[source]#

Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer as an object-centric event log.

ACTIVITY (ocel:activity) => activity that is performed in the current item (send e-mail, receive e-mail,

refuse meeting …)

TIMESTAMP (ocel:timestamp) => timestamp of creation of the item in Outlook

Object types: - org:resource => the snder of the mail - recipients => the list of recipients of the mail - topic => the topic of the discussion

Return type:

`OCEL`

pm4py.connectors.extract_ocel_outlook_calendar(email_user: Optional[str] = None, calendar_id: int = 9) OCEL[source]#

Extracts the history of the calendar events (creation, update, start, end) as an object-centric event log from the local Outlook instance running on the current computer.

ACTIVITY (ocel:activity) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (ocel:timestamp) => the timestamp of the event

Object types: - case:concept:name => identifier of the meeting - case:subject => the subject of the meeting

Parameters:
• email_user – (optional) e-mail address from which the (shared) calendar should be extracted

• calendar_id (`int`) – identifier of the calendar for the given user (default: 9)

Return type:

`OCEL`

pm4py.connectors.extract_ocel_windows_events() OCEL[source]#

Extract a process mining dataframe from all the events recorded in the Windows registry as an object-centric event log.

ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier

TIMESTAMP (time:timestamp) => timestamp of generation of the event

Object types: - categoryString: translation of the subcategory. The translation is source-specific. - computerName: name of the computer that generated this event. - eventIdentifier: identifier of the event. This is specific to the source that generated the event log entry. - eventType: 1=Error; 2=Warning; 3=Information; 4=Security Audit Success;5=Security Audit Failure; - sourceName: name of the source (application, service, driver, or subsystem) that generated the entry. - user: user name of the logged-on user when the event occurred. If the user name cannot be determined, this will be NULL.

Return type:

`OCEL`

pm4py.connectors.extract_ocel_chrome_history(history_db_path: Optional[str] = None) OCEL[source]#

ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit

Object Types: - case:concept:name : the profile of Chrome that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited

Parameters:

history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)

Return type:

`OCEL`

pm4py.connectors.extract_ocel_firefox_history(history_db_path: Optional[str] = None) OCEL[source]#

Extracts an object-centric event log containing the navigation history of Mozilla Firefox. Please keep Mozilla Firefox history closed when extracting.

ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit

Object Types: - case:concept:name : the profile of Firefox that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited

Parameters:

history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)

Return type:

`OCEL`

pm4py.connectors.extract_ocel_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: Optional[str] = None) OCEL[source]#

Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.

ACTIVITY (ocel:activity) => the event (created, commented, closed, subscribed …) TIMESTAMP (ocel:timestamp) => the timestamp of execution of the event

Object types: - case:concept:name => the URL of the events related to the issue - org:resource => the involved resource - case:repo => the repository in which the issue is created

Parameters:
• owner (`str`) – owner of the repository (e.g., pm4py)

• repo (`str`) – name of the repository (e.g., pm4py-core)

• auth_token – authorization token

Return type:

`OCEL`

pm4py.connectors.extract_ocel_camunda_workflow(connection_string: str) OCEL[source]#

Extracts an object-centric event log from the Camunda workflow system.

Parameters:

connection_string (`str`) – ODBC connection string to the Camunda database

Return type:

`pd.DataFrame`

pm4py.connectors.extract_ocel_sap_o2c(connection_string: str, prefix: str = '') OCEL[source]#

Extracts an object-centric event log for the SAP O2C process.

Parameters:
• connection_string (`str`) – ODBC connection string to the SAP database

• prefix (`str`) – prefix for the tables (example: SAPSR3.)

Return type:

`pd.DataFrame`

pm4py.connectors.extract_ocel_sap_accounting(connection_string: str, prefix: str = '') OCEL[source]#

Extracts an object-centric event log for the SAP Accounting process.

Parameters:
• connection_string (`str`) – ODBC connection string to the SAP database

• prefix (`str`) – prefix for the tables (example: SAPSR3.)

Return type:

`pd.DataFrame`

## pm4py.convert module#

The `pm4py.convert` module contains the cross-conversions implemented in `pm4py`

pm4py.convert.convert_to_event_log(obj: Union[DataFrame, EventStream], case_id_key: str = 'case:concept:name', **kwargs) [source]#

Converts a DataFrame/EventStream object to an event log object

Parameters:
• obj – DataFrame or EventStream object

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`EventLog`

```import pandas as pd
import pm4py

dataframe = pm4py.format_dataframe(dataframe, case_id_column='case:concept:name', activity_column='concept:name', timestamp_column='time:timestamp')
log = pm4py.convert_to_event_log(dataframe)
```
pm4py.convert.convert_to_event_stream(obj: Union[EventLog, DataFrame], case_id_key: str = 'case:concept:name', **kwargs) [source]#

Converts a log object to an event stream

Parameters:
• obj – log object

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`EventStream`

```import pm4py

event_stream = pm4py.convert_to_event_stream(log)
```
pm4py.convert.convert_to_dataframe(obj: Union[EventStream, EventLog], **kwargs) DataFrame[source]#

Converts a log object to a dataframe

Parameters:

obj – log object

Return type:

`pd.DataFrame`

```import pm4py

dataframe = pm4py.convert_to_dataframe(log)
```
pm4py.convert.convert_to_bpmn(*args: Union[Tuple[PetriNet, Marking, Marking], ProcessTree]) BPMN[source]#

Converts an object to a BPMN diagram. As an input, either a Petri net (with corresponding initial and final marking) or a process tree can be provided. A process tree can always be converted into a BPMN model and thus quality of the result object is guaranteed. For Petri nets, the quality of the converison largely depends on the net provided (e.g., sound WF-nets are likely to produce reasonable BPMN models)

Parameters:

args – petri net (with initial and final marking) or process tree

Return type:

`BPMN`

```import pm4py

# import a Petri net from a file
bpmn_graph = pm4py.convert_to_bpmn(net, im, fm)
```
pm4py.convert.convert_to_petri_net(*args: Union[BPMN, ProcessTree, HeuristicsNet, dict]) Tuple[PetriNet, Marking, Marking][source]#

Converts an input model to an (accepting) Petri net. The input objects can either be a process tree, BPMN model or a Heuristic net. The output is a triple, containing the Petri net and the initial and final markings. The markings are only returned if they can be reasonable derived from the input model.

Parameters:

args – process tree or BPMN

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

# imports a process tree from a PTML file
net, im, fm = pm4py.convert_to_petri_net(process_tree)
```
pm4py.convert.convert_to_process_tree(*args: Union[Tuple[PetriNet, Marking, Marking], BPMN]) [source]#

Converts an input model to a process tree. The input models can either be Petri nets (marked) or BPMN models. For both input types, the conversion is not guaranteed to work, hence, invocation of the method can yield an Exception.

Parameters:

args – petri net (along with initial and final marking) or BPMN

Return type:

`ProcessTree`

```import pm4py

# imports a BPMN file
# converts the BPMN to a process tree (through intermediate conversion to a Petri net)
process_tree = pm4py.convert_to_process_tree(bpmn_graph)
```
pm4py.convert.convert_to_reachability_graph(*args: Union[Tuple[PetriNet, Marking, Marking], BPMN, ProcessTree]) [source]#

Converts an input model to a reachability graph (transition system). The input models can either be Petri nets (with markings), BPMN models or process trees. The output is the state-space of the model (i.e., the reachability graph), enocdoed as a `TransitionSystem` object.

Parameters:

args – petri net (along with initial and final marking), process tree or BPMN

Return type:

`TransitionSystem`

```import pm4py

# reads a Petri net from a file
# converts it to reachability graph
reach_graph = pm4py.convert_to_reachability_graph(net, im, fm)
```
pm4py.convert.convert_log_to_ocel(log: Union[EventLog, EventStream, DataFrame], activity_column: str = 'concept:name', timestamp_column: str = 'time:timestamp', object_types: Optional[Collection[str]] = None, obj_separator: str = ' AND ', additional_event_attributes: Optional[Collection[str]] = None) OCEL[source]#

Converts an event log to an object-centric event log with one or more than one object types.

Parameters:
• log_obj – log object

• activity_column (`str`) – activity column

• timestamp_column (`str`) – timestamp column

• object_types – list of columns to consider as object types

• obj_separator (`str`) – separator between different objects in the same column

• additional_event_attributes – additional attributes to be considered as event attributes in the OCEL

Return type:

`OCEL`

pm4py.convert.convert_ocel_to_networkx(ocel: OCEL, variant: str = 'ocel_to_nx') DiGraph[source]#

Converts an OCEL to a NetworkX DiGraph object.

Parameters:
• ocel (`OCEL`) – object-centric event log

• variant (`str`) – variant of the conversion to use: “ocel_to_nx” -> graph containing event and object IDS and two type of relations (REL=related objects, DF=directly-follows); “ocel_features_to_nx” -> graph containing different types of interconnection at the object level

Return type:

`nx.DiGraph`

pm4py.convert.convert_log_to_networkx(log: Union[EventLog, EventStream, DataFrame], include_df: bool = True, case_id_key: str = 'concept:name', other_case_attributes_as_nodes: Optional[Collection[str]] = None, event_attributes_as_nodes: Optional[Collection[str]] = None) DiGraph[source]#

Converts an event log object to a NetworkX DiGraph object. The nodes of the graph are the events, the cases (and possibly the attributes of the log). The edges are: - Connecting each event to the corresponding case (BELONGS_TO type) - Connecting every event to the directly-following one (DF type, if enabled) - Connecting every case/event to the given attribute values (ATTRIBUTE_EDGE type)

Parameters:
• log – log object (EventLog, EventStream, Pandas dataframe)

• include_df (`bool`) – include the directly-follows graph relation in the graph (bool)

• case_id_attribute – specify which attribute at the case level should be considered the case ID (str)

• other_case_attributes_as_nodes – specify which attributes at the case level should be inserted in the graph as nodes (other than the caseID) (list, default empty)

• event_attributes_as_nodes – specify which attributes at the event level should be inserted in the graph as nodes (list, default empty)

Return type:

`nx.DiGraph`

pm4py.convert.convert_petri_net_to_networkx(net: PetriNet, im: Marking, fm: Marking) DiGraph[source]#

Converts a Petri net to a NetworkX DiGraph. Each place and transition is corresponding to a node in the graph.

Parameters:
Return type:

`nx.DiGraph`

pm4py.convert.convert_petri_net_type(net: PetriNet, im: Marking, fm: Marking, type: str = 'classic') Tuple[PetriNet, Marking, Marking][source]#

Changes the Petri net (internal) type

Parameters:
Return type:

`Tuple[PetriNet, Marking, Marking]`

## pm4py.discovery module#

The `pm4py.discovery` module contains the process discovery algorithms implemented in `pm4py`

pm4py.discovery.discover_dfg(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the frequency of relation as value.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[dict, dict, dict]`

```import pm4py

dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_directly_follows_graph(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#
pm4py.discovery.discover_dfg_typed(log: DataFrame, case_id_key: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') [source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a typed DFG object, i.e., as specified in `pm4py.objects.dfg.obj.py` (`DirectlyFollowsGraph` Class) The DFG object describes a graph, start activities and end activities. The graph is a collection of triples of the form (a,b,f) representing an arc a->b with frequency f. The start activities are a collection of tuples of the form (a,f) representing that activity a starts f cases. The end activities are a collection of tuples of the form (a,f) representing that ativity a ends f cases.

This method replaces `pm4py.discover_dfg` and `pm4py.discover_directly_follows_graph`. In a future release, these functions will adopt the same behavior as this function.

Parameters:
• log (`DataFrame`) – `pandas.DataFrame`

• case_id_key (`str`) – attribute to be used as case identifier

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

Return type:

`DFG`

```import pm4py

dfg = pm4py.discover_dfg_typed(log, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_performance_dfg(log: Union[EventLog, DataFrame], business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], workcalendar=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a performance directly-follows graph from an event log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the performance of relation as value.

Parameters:
• log – event log / Pandas dataframe

• business_hours (`bool`) – enables/disables the computation based on the business hours (default: False)

• business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[dict, dict, dict]`

```import pm4py

performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_petri_net_alpha(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha Miner.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_alpha(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_petri_net_ilp(log: Union[EventLog, DataFrame], alpha: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the ILP Miner.

Parameters:
• log – event log / Pandas dataframe

• alpha (`float`) – noise threshold for the sequence encoding graph (1.0=no filtering, 0.0=greatest filtering)

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_ilp(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_petri_net_alpha_plus(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha+ algorithm

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_alpha_plus(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.discovery.discover_petri_net_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], multi_processing: bool = False, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the inductive miner algorithm.

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
• log – event log / Pandas dataframe / typed DFG

• noise_threshold (`float`) – noise threshold (default: 0.0)

• multi_processing (`bool`) – boolean that enables/disables multiprocessing in inductive miner

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_petri_net_heuristics(log: Union[EventLog, DataFrame], dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discover a Petri net using the Heuristics Miner

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
• log – event log / Pandas dataframe

• dependency_threshold (`float`) – dependency threshold (default: 0.5)

• and_threshold (`float`) – AND threshold (default: 0.65)

• loop_two_threshold (`float`) – loop two threshold (default: 0.5)

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Tuple[PetriNet, Marking, Marking]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_heuristics(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_process_tree_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') [source]#

Discovers a process tree using the inductive miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
• log – event log / Pandas dataframe / typed DFG

• noise_threshold (`float`) – noise threshold (default: 0.0)

• activity_key (`str`) – attribute to be used for the activity

• multi_processing (`bool`) – boolean that enables/disables multiprocessing in inductive miner

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`ProcessTree`

```import pm4py

process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_heuristics_net(log: Union[EventLog, DataFrame], dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, min_act_count: int = 1, min_dfg_occurrences: int = 1, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', decoration: str = 'frequency') [source]#

Discovers an heuristics net

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
• log – event log / Pandas dataframe

• dependency_threshold (`float`) – dependency threshold (default: 0.5)

• and_threshold (`float`) – AND threshold (default: 0.65)

• loop_two_threshold (`float`) – loop two threshold (default: 0.5)

• min_act_count (`int`) – minimum number of occurrences per activity in order to be included in the discovery

• min_dfg_occurrences (`int`) – minimum number of occurrences per arc in the DFG in order to be included in the discovery

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• decoration (`str`) – the decoration that should be used (frequency, performance)

Return type:

`HeuristicsNet`

```import pm4py

heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.derive_minimum_self_distance(log: Union[DataFrame, EventLog, EventStream], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The activity key ‘concept:name’ is used.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

msd = pm4py.derive_minimum_self_distance(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_footprints(*args: Union[EventLog, Tuple[PetriNet, Marking, Marking], ProcessTree]) Union[List[Dict[str, Any]], Dict[str, Any]][source]#

Discovers the footprints out of the provided event log / process model

Parameters:

args – event log / process model

Return type:

`Union[List[Dict[str, Any]], Dict[str, Any]]`

```import pm4py

footprints = pm4py.discover_footprints(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_eventually_follows_graph(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], int][source]#

Gets the eventually follows graph from a log object.

The eventually follows graph is a dictionary associating to every couple of activities which are eventually following each other the number of occurrences of this relation.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[Tuple[str, str], int]`

```import pm4py

efg = pm4py.discover_eventually_follows_graph(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_bpmn_inductive(log: Union[EventLog, DataFrame, DirectlyFollowsGraph], noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') BPMN[source]#

Discovers a BPMN using the Inductive Miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
• log – event log / Pandas dataframe / typed DFG

• noise_threshold (`float`) – noise threshold (default: 0.0)

• multi_processing (`bool`) – boolean that enables/disables multiprocessing in inductive miner

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`BPMN`

```import pm4py

bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_transition_system(log: Union[EventLog, DataFrame], direction: str = 'forward', window: int = 2, view: str = 'sequence', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') [source]#

Discovers a transition system as described in the process mining book “Process Mining: Data Science in Action”

Parameters:
• log – event log / Pandas dataframe

• direction (`str`) – direction in which the transition system is built (forward, backward)

• window (`int`) – window (2, 3, …)

• view (`str`) – view to use in the construction of the states (sequence, set, multiset)

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`TransitionSystem`

```import pm4py

transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_prefix_tree(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Trie[source]#

Discovers a prefix tree from the provided log object.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Trie`

```import pm4py

prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_temporal_profile(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], Tuple[float, float]][source]#

Discovers a temporal profile from a log object.

Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. “Temporal Conformance Checking at Runtime based on Time-infused Process Models.” arXiv preprint arXiv:2008.07262 (2020).

The output is a dictionary containing, for every couple of activities eventually following in at least a case of the log, the average and the standard deviation of the difference of the timestamps.

E.g. if the log has two cases:

A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06) A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03)

The returned dictionary will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[Tuple[str, str], Tuple[float, float]]`

```import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_log_skeleton(log: Union[EventLog, DataFrame], noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Any][source]#

Discovers a log skeleton from an event log.

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

• “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

in the history of the case. For example, ‘C should always be preceded by A’

• “always_after”: specifies that some activities should always trigger the execution of some other activities

in the future history of the case. For example, ‘A should always be followed by C’

• “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

a case. For example, ‘B and C should always happen the same number of times’.

• “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

For example, ‘there should be no case containing both C and D’.

• “activ_occurrences”: specifies the allowed number of occurrences per activity:

E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

Parameters:
• log – event log / Pandas dataframe

• noise_threshold (`float`) – noise threshold, acting as described in the paper.

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, Any]`

```import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.discovery.discover_batches(log: Union[EventLog, DataFrame], merge_distance: int = 900, min_batch_size: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') List[Tuple[Tuple[str, str], int, Dict[str, Any]]][source]#

Discover batches from the provided log object

We say that an activity is executed in batches by a given resource when the resource executes several times the same activity in a short period of time.

Identifying such activities may identify points of the process that can be automated, since the activity of the person may be repetitive.

The following categories of batches are detected: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched)

The approach has been described in the following paper: Martin, N., Swennen, M., Depaire, B., Jans, M., Caris, A., & Vanhoof, K. (2015, December). Batch Processing: Definition and Event Log Identification. In SIMPDA (pp. 137-140).

The output is a (sorted) list containing tuples. Each tuple contain:
• Index 0: the activity-resource for which at least one batch has been detected

• Index 1: the number of batches for the given activity-resource

• Index 2: a list containing all the batches. Each batch is described by:

# The start timestamp of the batch # The complete timestamp of the batch # The list of events that are executed in the batch

Parameters:
• log – event log / Pandas dataframe

• merge_distance (`int`) – the maximum time distance between non-overlapping intervals in order for them to be considered belonging to the same batch (default: 15*60 15 minutes)

• min_batch_size (`int`) – the minimum number of events for a batch to be considered (default: 2)

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• resource_key (`str`) – attribute to be used as resource

Return type:

`List[Tuple[Tuple[str, str], int, Dict[str, Any]]]`

```import pm4py

batches = pm4py.discover_log_skeleton(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp', resource_key='org:resource')
```

## pm4py.filtering module#

The `pm4py.filtering` module contains the filtering features offered in `pm4py`

pm4py.filtering.filter_log_relative_occurrence_event_attribute(log: Union[EventLog, DataFrame], min_relative_stake: float, attribute_key: str = 'concept:name', level='cases', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log keeping only the events having an attribute value which occurs: - in at least the specified (min_relative_stake) percentage of events, when level=”events” - in at least the specified (min_relative_stake) percentage of cases, when level=”cases”

Parameters:
• log – event log / Pandas dataframe

• min_relative_stake (`float`) – minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.

• attribute_key (`str`) – the attribute to filter

• level (`str`) – the level of the filter (if level=”events”, then events / if level=”cases”, then cases)

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_log_relative_occurrence_event_attribute(dataframe, 0.5, level='cases', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_start_activities(log: Union[EventLog, DataFrame], activities: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter cases having a start activity in the provided list

Parameters:
• log – event log / Pandas dataframe

• activities – collection of start activities

• retain (`bool`) – if True, we retain the traces containing the given start activities, if false, we drop the traces

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_start_activities(dataframe, ['Act. A'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_end_activities(log: Union[EventLog, DataFrame], activities: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter cases having an end activity in the provided list

Parameters:
• log – event log / Pandas dataframe

• activities – collection of end activities

• retain (`bool`) – if True, we retain the traces containing the given end activities, if false, we drop the traces

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_end_activities(dataframe, ['Act. Z'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_event_attribute_values(log: Union[EventLog, DataFrame], attribute_key: str, values: Union[Set[str], List[str]], level: str = 'case', retain: bool = True, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log object on the values of some event attribute

Parameters:
• log – event log / Pandas dataframe

• attribute_key (`str`) – attribute to filter

• values – admitted (or forbidden) values

• level (`str`) – specifies how the filter should be applied (‘case’ filters the cases where at least one occurrence happens, ‘event’ filter the events eventually trimming the cases)

• retain (`bool`) – specifies if the values should be kept or removed

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_event_attribute_values(dataframe, 'concept:name', ['Act. A', 'Act. Z'], case_id_key='case:concept:name')
```
pm4py.filtering.filter_trace_attribute_values(log: Union[EventLog, DataFrame], attribute_key: str, values: Union[Set[str], List[str]], retain: bool = True, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on the values of a trace attribute

Parameters:
• log – event log / Pandas dataframe

• attribute_key (`str`) – attribute to filter

• values – collection of values to filter

• retain (`bool`) – boolean value (keep/discard matching traces)

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_trace_attribute_values(dataframe, 'case:creator', ['Mike'], case_id_key='case:concept:name')
```
pm4py.filtering.filter_variants(log: Union[EventLog, DataFrame], variants: Union[Set[str], List[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on a specified set of variants

Parameters:
• log – event log / Pandas dataframe

• variants – collection of variants to filter; A variant should be specified as a list of tuples of activity names, e.g., [(‘a’, ‘b’, ‘c’)]

• retain (`bool`) – boolean; if True all traces conforming to the specified variants are retained; if False, all those traces are removed

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_variants(dataframe, [('Act. A', 'Act. B', 'Act. Z'), ('Act. A', 'Act. C', 'Act. Z')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_directly_follows_relation(log: Union[EventLog, DataFrame], relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Retain traces that contain any of the specified ‘directly follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>].

Parameters:
• log – event log / Pandas dataframe

• relations – list of activity name pairs, which are allowed/forbidden paths

• retain (`bool`) – parameter that says whether the paths should be kept/removed

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_directly_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_eventually_follows_relation(log: Union[EventLog, DataFrame], relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Retain traces that contain any of the specified ‘eventually follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].

Parameters:
• log – event log / Pandas dataframe

• relations – list of activity name pairs, which are allowed/forbidden paths

• retain (`bool`) – parameter that says whether the paths should be kept/removed

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_eventually_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_time_range(log: Union[EventLog, DataFrame], dt1: str, dt2: str, mode='events', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filter a log on a time interval

Parameters:
• log – event log / Pandas dataframe

• dt1 (`str`) – left extreme of the interval

• dt2 (`str`) – right extreme of the interval

• mode (`str`) – modality of filtering (events, traces_contained, traces_intersecting). events: any event that fits the time frame is retained; traces_contained: any trace completely contained in the timeframe is retained; traces_intersecting: any trace intersecting with the time-frame is retained.

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_contained', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_intersecting', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='events', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_between(log: Union[EventLog, DataFrame], act1: str, act2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Finds all the sub-cases leading from an event with activity “act1” to an event with activity “act2” in the log, and returns a log containing only them.

Example:

Log A B C D E F A B E F C A B F C B C B E F C

act1 = B act2 = C

Returned sub-cases: B C (from the first case) B E F C (from the second case) B F C (from the third case) B C (from the third case) B E F C (from the third case)

Parameters:
• log – event log / Pandas dataframe

• act1 (`str`) – source activity

• act2 (`str`) – target activity

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_between(dataframe, 'A', 'D', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.filtering.filter_case_size(log: Union[EventLog, DataFrame], min_size: int, max_size: int, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases having a length (number of events) included between min_size and max_size

Parameters:
• log – event log / Pandas dataframe

• min_size (`int`) – minimum allowed number of events

• max_size (`int`) – maximum allowed number of events

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_case_size(dataframe, 5, 10, case_id_key='case:concept:name')
```
pm4py.filtering.filter_case_performance(log: Union[EventLog, DataFrame], min_performance: float, max_performance: float, timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases having a duration (the timestamp of the last event minus the timestamp of the first event) included between min_performance and max_performance

Parameters:
• log – event log / Pandas dataframe

• min_performance (`float`) – minimum allowed case duration

• max_performance (`float`) – maximum allowed case duration

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_case_performance(dataframe, 3600.0, 86400.0, timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_activities_rework(log: Union[EventLog, DataFrame], activity: str, min_occurrences: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, keeping the cases where the specified activity occurs at least min_occurrences times.

Parameters:
• log – event log / Pandas dataframe

• activity (`str`) – activity

• min_occurrences (`int`) – minimum desidered number of occurrences

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_activities_rework(dataframe, 'Approve Order', 2, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_paths_performance(log: Union[EventLog, DataFrame], path: Tuple[str, str], min_performance: float, max_performance: float, keep=True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the event log, either: - (keep=True) keeping the cases having the specified path (tuple of 2 activities) with a duration included between min_performance and max_performance - (keep=False) discarding the cases having the specified path with a duration included between min_performance and max_performance

Parameters:
• log – event log / Pandas dataframe

• path – tuple of two activities (source_activity, target_activity)

• min_performance (`float`) – minimum allowed performance (of the path)

• max_performance (`float`) – maximum allowed performance (of the path)

• keep (`bool`) – keep/discard the cases having the specified path with a duration included between min_performance and max_performance

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_paths_performance(dataframe, ('A', 'D'), 3600.0, 86400.0, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_variants_top_k(log: Union[EventLog, DataFrame], k: int, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Keeps the top-k variants of the log

Parameters:
• log – event log / Pandas dataframe

• k (`int`) – number of variants that should be kept

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_variants_top_k(dataframe, 5, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_variants_by_coverage_percentage(log: Union[EventLog, DataFrame], min_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the variants of the log by a coverage percentage (e.g., if min_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 1 and variant 2).

Parameters:
• log – event log / Pandas dataframe

• min_coverage_percentage (`float`) – minimum allowed percentage of coverage

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_variants_by_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_variants_by_maximum_coverage_percentage(log: Union[EventLog, DataFrame], max_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the variants of the log by a maximum coverage percentage (e.g., if max_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 2 and variant 3).

Parameters:
• log – event log / Pandas dataframe

• max_coverage_percentage (`float`) – maximum allowed percentage of coverage

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_variants_by_maximum_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_prefixes(log: Union[EventLog, DataFrame], activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the log, keeping the prefixes to a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The prefixes to “C” are respectively:

A,B A,B,Z,A,B A,B

Parameters:
• log – event log / Pandas dataframe

• activity (`str`) – target activity of the filter

• strict (`bool`) – applies the filter strictly (cuts the occurrences of the selected activity).

• first_or_last (`str`) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_suffixes(log: Union[EventLog, DataFrame], activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Filters the log, keeping the suffixes from a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The suffixes from “C” are respectively:

D D D,C,E,C,F

Parameters:
• log – event log / Pandas dataframe

• activity (`str`) – target activity of the filter

• strict (`bool`) – applies the filter strictly (cuts the occurrences of the selected activity).

• first_or_last (`str`) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_ocel_event_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided event attributes values

Parameters:
• ocel (`OCEL`) – object-centric event log

• attribute_key (`str`) – attribute at the event level

• attribute_values – collection of attribute values

• positive (`bool`) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_event_attribute(ocel, 'ocel:activity', ['A', 'B', 'D'])
```
pm4py.filtering.filter_ocel_object_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided object attributes values

Parameters:
• ocel (`OCEL`) – object-centric event log

• attribute_key (`str`) – attribute at the event level

• attribute_values – collection of attribute values

• positive (`bool`) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_object_attribute(ocel, 'ocel:type', ['order'])
```
pm4py.filtering.filter_ocel_object_types_allowed_activities(ocel: OCEL, correspondence_dict: Dict[str, Collection[str]]) OCEL[source]#

Filters an object-centric event log keeping only the specified object types with the specified activity set (filters out the rest).

Parameters:
• ocel (`OCEL`) – object-centric event log

• correspondence_dict – dictionary containing, for every object type of interest, a collection of allowed activities. Example: {“order”: [“Create Order”], “element”: [“Create Order”, “Create Delivery”]}

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(ocel, {'order': ['create order', 'pay order'], 'item})
```
pm4py.filtering.filter_ocel_object_per_type_count(ocel: OCEL, min_num_obj_type: Dict[str, int]) OCEL[source]#

Filters the events of the object-centric logs which are related to at least the specified amount of objects per type.

E.g. pm4py.filter_object_per_type_count(ocel, {“order”: 1, “element”: 2})

Would keep the following events:

ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order

0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1] 1 e11 1981-01-01 Create Order [i6, i5] [o2] 2 e14 1981-01-04 Create Order [i8, i7] [o3]

Parameters:
• ocel (`OCEL`) – object-centric event log

• min_num_obj_type – minimum number of objects per type

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_object_per_type_count(ocel, {'order': 1, 'element': 2})
```
pm4py.filtering.filter_ocel_start_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which a new object for the given object type is spawn. (E.g. an event with activity “Create Order” might spawn new orders).

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_type (`str`) – object type to consider

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(ocel, 'delivery')
```
pm4py.filtering.filter_ocel_end_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which an object for the given object type terminates its lifecycle. (E.g. an event with activity “Pay Order” might terminate an order).

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_type (`str`) – object type to consider

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(ocel, 'delivery')
```
pm4py.filtering.filter_ocel_events_timestamp(ocel: OCEL, min_timest: Union[datetime, str], max_timest: Union[datetime, str], timestamp_key: str = 'ocel:timestamp') OCEL[source]#

Filters the object-centric event log keeping events in the provided timestamp range

Parameters:
• ocel (`OCEL`) – object-centric event log

• min_timest – left extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

• max_timest – right extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

• timestamp_key (`str`) – the attribute to use as timestamp (default: ocel:timestamp)

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_events_timestamp(ocel, '1990-01-01 00:00:00', '2010-01-01 00:00:00')
```
pm4py.filtering.filter_four_eyes_principle(log: Union[EventLog, DataFrame], activity1: str, activity2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') Union[EventLog, DataFrame][source]#

Filter the cases of the log which violates the four eyes principle on the provided activities.

Parameters:
• log – event log

• activity1 (`str`) – first activity

• activity2 (`str`) – second activity

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• resource_key (`str`) – attribute to be used as resource

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_four_eyes_principle(dataframe, 'Act. A', 'Act. B', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_activity_done_different_resources(log: Union[EventLog, DataFrame], activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') Union[EventLog, DataFrame][source]#

Filters the cases where an activity is repeated by different resources.

Parameters:
• log – event log

• activity (`str`) – activity to consider

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• resource_key (`str`) – attribute to be used as resource

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

filtered_dataframe = pm4py.filter_activity_done_different_resources(dataframe, 'Act. A', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.filtering.filter_ocel_object_types(ocel: OCEL, obj_types: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object types of an object-centric event log.

Parameters:
• ocel (`OCEL`) – object-centric event log

• obj_types – object types to keep/remove

• positive (`bool`) – boolean value (True=keep, False=remove)

• level (`int`) – recursively expand the set of object identifiers until the specified level

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order'])
```
pm4py.filtering.filter_ocel_objects(ocel: OCEL, object_identifiers: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object identifiers of an object-centric event log.

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_identifiers – object identifiers to keep/remove

• positive (`bool`) – boolean value (True=keep, False=remove)

• level (`int`) – recursively expand the set of object identifiers until the specified level

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=1)
```
pm4py.filtering.filter_ocel_events(ocel: OCEL, event_identifiers: Collection[str], positive: bool = True) OCEL[source]#

Filters the event identifiers of an object-centric event log.

Parameters:
• ocel (`OCEL`) – object-centric event log

• event_identifiers – event identifiers to keep/remove

• positive (`bool`) – boolean value (True=keep, False=remove)

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1'])
```
pm4py.filtering.filter_ocel_cc_object(ocel: OCEL, object_id: str) OCEL[source]#

Returns the connected component of the object-centric event log to which the object with the provided identifier belongs.

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_id (`str`) – object identifier

Return type:

`OCEL`

```import pm4py

filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'order1')
```

## pm4py.hof module#

pm4py.hof.filter_log(f: Callable[[Any], bool], log: EventLog) Union[EventLog, EventStream][source]#

Filters the log according to a given (lambda) function.

Parameters:
• f – function that specifies the filter criterion, may be a lambda

• log (`EventLog`) – event log; either EventLog or EventStream Object

Return type:

`Union[log_inst.EventLog, log_inst.EventStream]`

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.filter_trace(f: Callable[[Any], bool], trace: Trace) [source]#

Filters the trace according to a given (lambda) function.

Parameters:
• f – function that specifies the filter criterion, may be a lambda

• trace (`Trace`) – trace; PM4Py trace object

Return type:

`log_inst.Trace`

pm4py.hof.sort_log(log: EventLog, key, reverse: bool = False) Union[EventLog, EventStream][source]#

Sorts the event log according to a given key.

Parameters:
• log (`EventLog`) – event log object; either EventLog or EventStream

• key – sorting key

• reverse (`bool`) – indicates whether sorting should be reversed or not

Return type:

`Union[log_inst.EventLog, log_inst.EventStream]`

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.sort_trace(trace: Trace, key, reverse: bool = False) [source]#

Sorts the events in a trace according to a given key.

Parameters:
• trace (`Trace`) – input trace

• key – sorting key

• reverse (`bool`) – indicates whether sorting should be reversed (default False)

Return type:

`log_inst.Trace`

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

## pm4py.meta module#

Process mining for Python

## pm4py.ml module#

The `pm4py.ml` module contains the machine learning features offered in `pm4py`

pm4py.ml.split_train_test(log: Union[EventLog, DataFrame], train_percentage: float = 0.8, case_id_key='case:concept:name') Union[Tuple[EventLog, EventLog], Tuple[DataFrame, DataFrame]][source]#

Split an event log in a training log and a test log (for machine learning purposes). Returns the training and the test event log.

Parameters:
• log – event log / Pandas dataframe

• train_percentage (`float`) – fraction of traces to be included in the training log (from 0.0 to 1.0)

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[Tuple[EventLog, EventLog], Tuple[pd.DataFrame, pd.DataFrame]]`

```import pm4py

train_df, test_df = pm4py.split_train_test(dataframe, train_percentage=0.75)
```
pm4py.ml.get_prefixes_from_log(log: Union[EventLog, DataFrame], length: int, case_id_key: str = 'case:concept:name') Union[EventLog, DataFrame][source]#

Gets the prefixes of a log of a given length. The returned log object contain the prefixes: - if a trace has lower or identical length, it is included as-is - if a trace has greater length, it is cut

Parameters:
• log – event log / Pandas dataframe

• length (`int`) – length

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Union[EventLog, pd.DataFrame]`

```import pm4py

trimmed_df = pm4py.get_prefixes_from_log(dataframe, length=5, case_id_key='case:concept:name')
```
pm4py.ml.extract_outcome_enriched_dataframe(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts additional columns in the dataframe which are computed on the overall case, so they model the outcome of the case.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• start_timestamp_key (`str`) – attribute to be used as start timestamp

Return type:

`pd.DataFrame`

```import pm4py

enriched_df = pm4py.extract_outcome_enriched_dataframe(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
```
pm4py.ml.extract_features_dataframe(log: Union[EventLog, DataFrame], str_tr_attr=None, num_tr_attr=None, str_ev_attr=None, num_ev_attr=None, str_evsucc_attr=None, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', resource_key='org:resource', **kwargs) DataFrame[source]#

Extracts a dataframe containing the features of each case of the provided log object

Parameters:
• log – log object (event log / Pandas dataframe)

• str_tr_attr – (if provided) string attributes at the case level which should be extracted as features

• num_tr_attr – (if provided) numeric attributes at the case level which should be extracted as features

• str_ev_attr – (if provided) string attributes at the event level which should be extracted as features (one-hot encoding)

• num_ev_attr – (if provided) numeric attributes at the event level which should be extracted as features (last value per attribute in a case)

• activity_key (`str`) – the attribute to be used as activity

• timestamp_key (`str`) – the attribute to be used as timestamp

• case_id_key (`str`) – the attribute to be used as case identifier

• resource_key (`str`) – the attribute to be used as resource

Return type:

`pd.DataFrame`

```import pm4py

features_df = pm4py.extract_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.ml.extract_ocel_features(ocel: OCEL, obj_type: str, enable_object_lifecycle_paths: bool = True, enable_object_work_in_progress: bool = False, object_str_attributes: Optional[Collection[str]] = None, object_num_attributes: Optional[Collection[str]] = None, include_obj_id: bool = False, debug: bool = False) DataFrame[source]#

Extracts from an object-centric event log a set of features (returned as dataframe) computed on the OCEL for the objects of a given object type.

Parameters:
• ocel (`OCEL`) – object-centric event log

• obj_type (`str`) – object type that should be considered

• enable_object_lifecycle_paths (`bool`) – enables the “lifecycle paths” feature

• enable_object_work_in_progress (`bool`) – enables the “work in progress” feature (which has an high computational cost)

• object_str_attributes – string attributes at the object level to one-hot encode during the feature extraction

• object_num_attributes – numeric attributes at the object level to one-hot encode during the feature extraction

• include_obj_id (`bool`) – includes the object identifier as column of the “features” dataframe

• debug (`bool`) – enables debugging mode (telling at which point of the feature extraction you are)

Return type:

`pd.DataFrame`

```import pm4py

fea_df = pm4py.extract_ocel_features(ocel, "item")
```
pm4py.ml.extract_temporal_features_dataframe(log: Union[EventLog, DataFrame], grouper_freq='W', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp', resource_key='org:resource') DataFrame[source]#

Extracts a dataframe containing the temporal features of the provided log object

Implements the approach described in the paper: Pourbafrani, Mahsa, Sebastiaan J. van Zelst, and Wil MP van der Aalst. “Supporting automatic system dynamics model generation for simulation in the context of process mining.” International Conference on Business Information Systems. Springer, Cham, 2020.

Parameters:
• log – log object (event log / Pandas dataframe)

• grouper_freq (`str`) – the grouping frequency (D, W, M, Y) to use

• activity_key (`str`) – the attribute to be used as activity

• timestamp_key (`str`) – the attribute to be used as timestamp

• case_id_key (`str`) – the attribute to be used as case identifier

• resource_key (`str`) – the attribute to be used as resource

• start_timestamp_key (`str`) – the attribute to be used as start timestamp

Return type:

`pd.DataFrame`

```import pm4py

temporal_features_df = pm4py.extract_temporal_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.ml.extract_target_vector(log: Union[EventLog, DataFrame], variant: str, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name') Tuple[Any, List[str]][source]#

Extracts from a log object the target vector for a specific ML use case (next activity, next time, remaining time)

Parameters:
• log – log object (event log / Pandas dataframe)

• variant (`str`) – variant of the algorithm to be used: next_activity, next_time, remaining_time

• activity_key (`str`) – the attribute to be used as activity

• timestamp_key (`str`) – the attribute to be used as timestamp

• case_id_key (`str`) – the attribute to be used as case identifier

Return type:

`Tuple[Any, List[str]]`

```import pm4py

vector_next_act, class_next_act = pm4py.extract_target_vector(log, 'next_activity', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
vector_next_time, class_next_time = pm4py.extract_target_vector(log, 'next_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
vector_rem_time, class_rem_time = pm4py.extract_target_vector(log, 'remaining_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```

## pm4py.ocel module#

The `pm4py.ocel` module contains the object-centric process mining features offered in `pm4py`

pm4py.ocel.ocel_get_object_types(ocel: OCEL) List[str][source]#

Gets the list of object types contained in the object-centric event log (e.g., [“order”, “item”, “delivery”]).

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`List[str]`

```import pm4py

object_types = pm4py.ocel_get_object_types(ocel)
```
pm4py.ocel.ocel_get_attribute_names(ocel: OCEL) List[str][source]#

Gets the list of attributes at the event and the object level of an object-centric event log (e.g. [“cost”, “amount”, “name”])

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`List[str]`

```import pm4py

attribute_names = pm4py.ocel_get_attribute_names(ocel)
```
pm4py.ocel.ocel_flattening(ocel: OCEL, object_type: str) DataFrame[source]#

Flattens the object-centric event log to a traditional event log with the choice of an object type. In the flattened log, the objects of a given object type are the cases, and each case contains the set of events related to the object.

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_type (`str`) – object type

Return type:

`pd.DataFrame`

```import pm4py

event_log = pm4py.ocel_flattening(ocel, 'items')
```
pm4py.ocel.ocel_object_type_activities(ocel: OCEL) Dict[str, Collection[str]][source]#

Gets the set of activities performed for each object type

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`Dict[str, Collection[str]]`

```import pm4py

ot_activities = pm4py.ocel_object_type_activities(ocel)
```
pm4py.ocel.ocel_objects_ot_count(ocel: OCEL) Dict[str, Dict[str, int]][source]#

Counts for each event the number of related objects per type

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`Dict[str, Dict[str, int]]`

```import pm4py

objects_ot_count = pm4py.ocel_objects_ot_count(ocel)
```
pm4py.ocel.ocel_temporal_summary(ocel: OCEL) DataFrame[source]#

Returns the ``temporal summary’’ from an object-centric event log. The temporal summary aggregates all the events performed in the same timestamp, and reports the set of activities and the involved objects.

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`pd.DataFrame`

```import pm4py

temporal_summary = pm4py.ocel_temporal_summary(ocel)
```
pm4py.ocel.ocel_objects_summary(ocel: OCEL) DataFrame[source]#

Gets the objects summary of an object-centric event log

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`pd.DataFrame`

```import pm4py

objects_summary = pm4py.ocel_objects_summary(ocel)
```
pm4py.ocel.ocel_objects_interactions_summary(ocel: OCEL) DataFrame[source]#

Gets the objects interactions summary of an object-centric event log. The objects interactions summary has a row for every combination (event, related object, other related object). Properties such as the activity of the event, and the object types of the two related objects, are included.

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`OCEL`

```import pm4py

interactions_summary = pm4py.ocel_objects_interactions_summary(ocel)
```
pm4py.ocel.discover_ocdfg(ocel: OCEL, business_hours=False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)]) Dict[str, Any][source]#

Discovers an OC-DFG from an object-centric event log.

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

Reference paper: Berti, Alessandro, and Wil van der Aalst. “Extracting multiple viewpoint models from relational databases.” Data-Driven Process Discovery and Analysis. Springer, Cham, 2018. 24-51.

Parameters:
• ocel (`OCEL`) – object-centric event log

• business_hours (`bool`) – boolean value that enables the usage of the business hours

• business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

Return type:

`Dict[str, Any]`

```import pm4py

ocdfg = pm4py.discover_ocdfg(ocel)
```
pm4py.ocel.discover_oc_petri_net(ocel: OCEL, inductive_miner_variant: str = 'im') Dict[str, Any][source]#

Discovers an object-centric Petri net from the provided object-centric event log.

Reference paper: van der Aalst, Wil MP, and Alessandro Berti. “Discovering object-centric Petri nets.” Fundamenta informaticae 175.1-4 (2020): 1-40.

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`Dict[str, Any]`

```import pm4py

ocpn = pm4py.discover_oc_petri_net(ocel)
```
pm4py.ocel.discover_objects_graph(ocel: OCEL, graph_type: str = 'object_interaction') Set[Tuple[str, str]][source]#

Discovers an object graph from the provided object-centric event log

Parameters:
• ocel (`OCEL`) – object-centric event log

• graph_type (`str`) – type of graph to consider (object_interaction, object_descendants, object_inheritance, object_cobirth, object_codeath)

Return type:

`Dict[str, Any]`

```import pm4py

obj_graph = pm4py.ocel_discover_objects_graph(ocel, graph_type='object_interaction')
```
pm4py.ocel.ocel_o2o_enrichment(ocel: OCEL, included_graphs: Optional[Collection[str]] = None) OCEL[source]#

Inserts the information inferred from the graph computations (pm4py.discover_objects_graph) in the list of O2O relations of the OCEL.

Parameters:
• ocel (`OCEL`) – object-centric event log

• included_graphs – types of graphs to include, provided as list/set of strings (object_interaction_graph, object_descendants_graph, object_inheritance_graph, object_cobirth_graph, object_codeath_graph)

Return type:

`OCEL`

```import pm4py

ocel = pm4py.ocel_o2o_enrichment(ocel)
print(ocel.o2o)
```
pm4py.ocel.ocel_e2o_lifecycle_enrichment(ocel: OCEL) OCEL[source]#

Inserts lifecycle-based information (when an object is created/terminated or other types of relations) in the list of E2O relations of the OCEL

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`OCEL`

```import pm4py

ocel = pm4py.ocel_e2o_lifecycle_enrichment(ocel)
print(ocel.relations)
```
pm4py.ocel.sample_ocel_objects(ocel: OCEL, num_objects: int) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the objects that is chosen in a random way. Only the events related to at least one of these objects are filtered from the event log. As a note, the relationships between the different objects are probably going to be ruined by this sampling.

Parameters:
• ocel (`OCEL`) – Object-centric event log

• num_objects (`int`) – Number of objects of the object-centric event log

Return type:

`OCEL`

```import pm4py

sampled_ocel = pm4py.sample_ocel_objects(ocel, 50) # keeps only 50 random objects
```
pm4py.ocel.sample_ocel_connected_components(ocel: OCEL, connected_components: int = 1) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the executions. The number of considered connected components need to be specified by the user.

Parameters:
• ocel (`OCEL`) – Object-centric event log

• connected_components (`int`) – Number of connected components to pick from the OCEL

Return type:

`OCEL`

```import pm4py

sampled_ocel = pm4py.sample_ocel_connected_components(ocel, 5) # keeps only 5 connected components
```
pm4py.ocel.ocel_drop_duplicates(ocel: OCEL) OCEL[source]#

Drop relations between events and objects happening at the same time, with the same activity, to the same object identifier. This ends up cleaning the OCEL from duplicate events.

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`OCEL`

```import pm4py

ocel = pm4py.ocel_drop_duplicates(ocel)
```
pm4py.ocel.ocel_merge_duplicates(ocel: OCEL, have_common_object: Optional[bool] = False) OCEL[source]#

Merge events in the OCEL that happen with the same activity at the same timestamp

Parameters:
• ocel (`OCEL`) – object-centric event log

• have_common_object – impose the additional merge condition that the two events should happen at the same timestamp.

Return type:

`OCEL`

```import pm4py

ocel = pm4py.ocel_merge_duplicates(ocel)
```

Sorts the OCEL not only based on the timestamp column and the index, but using an additional sorting column that further determines the order of the events happening at the same timestamp.

Parameters:
• ocel (`OCEL`) – object-centric event log

• additional_column (`str`) – additional column to use for the sorting

• primary_column (`str`) – primary column to be used for the sorting (default: ocel:timestamp)

Return type:

`OCEL`

```import pm4py

```

Adds a small time-delta to the timestamp column based on the current index of the event. This ensures the correct ordering of the events in any object-centric process mining solution.

Parameters:

ocel (`OCEL`) – object-centric event log

Return type:

`OCEL`

```import pm4py

```
pm4py.ocel.cluster_equivalent_ocel(ocel: OCEL, object_type: str, max_objs: int = 9223372036854775807) Dict[str, Collection[OCEL]][source]#

Perform a clustering of the object-centric event log, based on the ‘executions’ of a single object type. Equivalent ‘executions’ are grouped in the output dictionary.

Parameters:
• ocel (`OCEL`) – object-centric event log

• object_type (`str`) – reference object type

• max_objs (`int`) – maximum number of objects (of the given object type)

Return type:

`Dict[str, Collection[OCEL]]`

```import pm4py

clusters = pm4py.cluster_equivalent_ocel(ocel, "order")
```

## pm4py.openai module#

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.openai.abstract_dfg(log_obj: Union[DataFrame, EventLog, EventStream], max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the DFG abstraction of a traditional event log

Parameters:
• log_obj – log object

• max_len (`int`) – maximum length of the (string) abstraction

• include_performance (`bool`) – (boolean) includes the performance of the paths in the abstraction

• relative_frequency (`bool`) – (boolean) uses the relative instead of the absolute frequency of the paths

• response_header (`bool`) – includes a short header before the paths, pointing to the description of the abstraction

• activity_key (`str`) – the column to be used as activity

• timestamp_key (`str`) – the column to be used as timestamp

• case_id_key (`str`) – the column to be used as case identifier

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_dfg(log))
```
pm4py.openai.abstract_variants(log_obj: Union[DataFrame, EventLog, EventStream], max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the variants abstraction of a traditional event log

Parameters:
• log_obj – log object

• max_len (`int`) – maximum length of the (string) abstraction

• include_performance (`bool`) – (boolean) includes the performance of the variants in the abstraction

• relative_frequency (`bool`) – (boolean) uses the relative instead of the absolute frequency of the variants

• response_header (`bool`) – includes a short header before the variants, pointing to the description of the abstraction

• activity_key (`str`) – the column to be used as activity

• timestamp_key (`str`) – the column to be used as timestamp

• case_id_key (`str`) – the column to be used as case identifier

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_variants(log))
```
pm4py.openai.abstract_ocel(ocel: OCEL, include_timestamps: bool = True) str[source]#

Obtains the abstraction of an object-centric event log, including the list of events and the objects of the OCEL

Parameters:
• ocel (`OCEL`) – object-centric event log

• include_timestamps (`bool`) – (boolean) includes the timestamp information in the abstraction

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_ocel(ocel))
```
pm4py.openai.abstract_ocel_ocdfg(ocel: OCEL, include_header: bool = True, include_timestamps: bool = True, max_len: int = 10000) str[source]#

Obtains the abstraction of an object-centric event log, representing in text the object-centric directly-follows graph

Parameters:
• ocel (`OCEL`) – object-centric event log

• include_header (`bool`) – (boolean) includes the header in the abstraction

• include_timestamps (`bool`) – (boolean) includes the timestamp information in the abstraction

• max_len (`int`) – maximum length of the abstraction

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_ocel_ocdfg(ocel))
```
pm4py.openai.abstract_ocel_features(ocel: OCEL, obj_type: str, include_header: bool = True, max_len: int = 10000) str[source]#

Obtains the abstraction of an object-centric event log, representing in text the features and their values.

Parameters:
• ocel (`OCEL`) – object-centric event log

• obj_type (`str`) – the object type that should be considered in the feature extraction

• include_header (`bool`) – (boolean) includes the header in the abstraction

• max_len (`int`) – maximum length of the abstraction

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_ocel_ocdfg(ocel))
```
pm4py.openai.abstract_event_stream(log_obj: Union[DataFrame, EventLog, EventStream], max_len: int = 10000, response_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the event stream abstraction of a traditional event log

Parameters:
• log_obj – log object

• max_len (`int`) – maximum length of the (string) abstraction

• response_header (`bool`) – includes a short header before the variants, pointing to the description of the abstraction

• activity_key (`str`) – the column to be used as activity

• timestamp_key (`str`) – the column to be used as timestamp

• case_id_key (`str`) – the column to be used as case identifier

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_event_stream(log))
```
pm4py.openai.abstract_petri_net(net: PetriNet, im: Marking, fm: Marking, response_header: bool = True) str[source]#

Obtain an abstraction of a Petri net

Parameters:
Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_petri_net(net, im, fm))
```
pm4py.openai.abstract_log_attributes(log_obj: Union[DataFrame, EventLog, EventStream], max_len: int = 10000, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Abstracts the attributes of a log (reporting their name, their type, and the top values)

Parameters:
• log_obj – log object

• max_len (`int`) – maximum length of the (string) abstraction

• activity_key (`str`) – the column to be used as activity

• timestamp_key (`str`) – the column to be used as timestamp

• case_id_key (`str`) – the column to be used as case identifier

Return type:

`str`

```import pm4py

print(pm4py.openai.abstract_log_attributes(log))
```

## pm4py.org module#

The `pm4py.org` module contains the organizational analysis techniques offered in `pm4py`

pm4py.org.discover_handover_of_work_network(log: Union[EventLog, DataFrame], beta=0, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the handover of work network of the event log. The handover of work network is essentially the DFG of the event log, however, using the resource as a node of the graph, instead of the activity. As such, to use this, resource information should be present in the event log.

Return type:

`SNA`

Parameters:
• log – event log / Pandas dataframe

• beta (`int`) – beta parameter for Handover metric

• resource_key (`str`) – attribute to be used for the resource

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

```import pm4py

metric = pm4py.discover_handover_of_work_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.org.discover_working_together_network(log: Union[EventLog, DataFrame], resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the working together network of the process. Two nodes resources are connected in the graph if the resources collaborate on an instance of the process.

Return type:

`SNA`

Parameters:
• log – event log / Pandas dataframe

• resource_key (`str`) – attribute to be used for the resource

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

```import pm4py

metric = pm4py.discover_working_together_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.org.discover_activity_based_resource_similarity(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates similarity between the resources in the event log, based on their activity profiles.

Return type:

`SNA`

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• resource_key (`str`) – attribute to be used for the resource

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

```import pm4py

act_res_sim = pm4py.discover_activity_based_resource_similarity(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.org.discover_subcontracting_network(log: Union[EventLog, DataFrame], n=2, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the subcontracting network of the process.

Return type:

`SNA`

Parameters:
• log – event log / Pandas dataframe

• n (`int`) – n parameter for Subcontracting metric

• resource_key (`str`) – attribute to be used for the resource

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

```import pm4py

metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.org.discover_organizational_roles(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Role][source]#

Mines the organizational roles

A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:

Reference paper: Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.

Parameters:
• log – event log / Pandas dataframe

• activity_key (`str`) – attribute to be used for the activity

• resource_key (`str`) – attribute to be used for the resource

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

```import pm4py

roles = pm4py.discover_organizational_roles(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
```
pm4py.org.discover_network_analysis(log: Union[DataFrame, EventLog, EventStream], out_column: str, in_column: str, node_column_source: str, node_column_target: str, edge_column: str, edge_reference: str = '_out', performance: bool = False, sorting_column: str = 'time:timestamp', timestamp_column: str = 'time:timestamp') Dict[Tuple[str, str], Dict[str, Any]][source]#

Performs a network analysis of the log based on the provided parameters.

The classical social network analysis methods are based on the order of the events inside a case. For example, the Handover of Work metric considers the directly-follows relationships between resources during the work of a case. An edge is added between the two resources if such relationships occurs.

Real-life scenarios may be more complicated. At first, is difficult to collect events inside the same case without having convergence/divergence issues (see first section of the OCEL part). At second, the type of relationship may also be important. Consider for example the relationship between two resources: this may be more efficient if the activity that is executed is liked by the resources, rather than disgusted.

The network analysis that we introduce here generalizes some existing social network analysis metrics, becoming independent from the choice of a case notion and permitting to build a multi-graph instead of a simple graph.

With this, we assume events to be linked by signals. An event emits a signal (that is contained as one attribute of the event) that is assumed to be received by other events (also, this is an attribute of these events) that follow the first event in the log. So, we assume there is an OUT attribute (of the event) that is identical to the IN attribute (of the other events).

When we collect this information, we can build the network analysis graph: - The source node of the relation is given by an aggregation over a node_column_source attribute. - The target node of the relation is given by an aggregation over a node_column_target attribute. - The type of edge is given by an aggregation over an edge_column attribute. - The network analysis graph can either be annotated with frequency or performance information.

The output is a multigraph. Two events EV1 and EV2 of the log are merged (indipendently from the case notion) based on having EV1.OUT_COLUMN = EV2.IN_COLUMN. Then, an aggregation is applied on the couple of events (NODE_COLUMN) to obtain the nodes that are connected. The edges between these nodes are aggregated based on some property of the source event (EDGE_COLUMN).

Parameters:
• log – event log / Pandas dataframe

• out_column (`str`) – the source column of the link (default: the case identifier; events of the same case are linked)

• in_column (`str`) – the target column of the link (default: the case identifier; events of the same case are linked)

• node_column_source (`str`) – the attribute to be used for the node definition of the source event (default: the resource of the log, org:resource)

• node_column_target (`str`) – the attribute to be used for the node definition of the target event (default: the resource of the log, org:resource)

• edge_column (`str`) – the attribute to be used for the edge definition (default: the activity of the log, concept:name)

• edge_reference (`str`) – decide if the edge attribute should be picked from the source event. Values: _out => the source event ; _in => the target event

• performance (`bool`) – boolean value that enables the performance calculation on the edges of the network analysis

• sorting_column (`str`) – the column that should be used to sort the log before performing the network analysis (default: time:timestamp)

• timestamp_column (`str`) – the column that should be used as timestamp for the performance-related analysis (default: time:timestamp)

Return type:

`Dict[Tuple[str, str], Dict[str, Any]]`

```import pm4py

net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')
```

## pm4py.privacy module#

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.privacy.anonymize_differential_privacy(log: Union[EventLog, DataFrame], epsilon: float = 1.0, k: int = 10, p: int = 20) DataFrame[source]#

Protect event logs with differential privacy. Differential privacy is a guarantee that bounds the impact the data of one individual has on a query result.

Control-flow information is anonymized with SaCoFa. This algorithm inserts noise into a trace-variant count, through the step-wise construction of a prefix tree.

Contextual-information, like timestamps or resources, is anonymized with PRIPEL. This technique enriches a control-flow anonymized event log with contextual information from the original log, while still achieving differential privacy. PRIPEL anonymizes each event’s timestamp and other attributes, that are stored as strings, integers, floats, or booleans.

SaCoFa is described in: S. A. Fahrenkog-Petersen, M. Kabierski, F. Rösel, H. van der Aa and M. Weidlich, “SaCoFa: Semantics-aware Control-flow Anonymization for Process Mining,” 2021 3rd International Conference on Process Mining (ICPM), 2021, pp. 72-79. https://doi.org/10.48550/arXiv.2109.08501

PRIPEL is described in: Fahrenkrog-Petersen, S.A., van der Aa, H., Weidlich, M. (2020). PRIPEL: Privacy-Preserving Event Log Publishing Including Contextual Information. In: Fahland, D., Ghidini, C., Becker, J., Dumas, M. (eds) Business Process Management. BPM 2020. Lecture Notes in Computer Science, vol 12168. Springer, Cham. https://doi.org/10.1007/978-3-030-58666-9_7

Parameters:
• log – event log / Pandas dataframe

• epsilon (`float`) – the strength of the differential privacy guarantee. The smaller the value of epsilon, the stronger the privacy guarantee that is provided.

• k (`int`) – the maximal length of considered traces in the prefix tree. We recommend setting k, that roughly 80% of all traces from the original event log are covered.

• p (`int`) – the pruning parameter, which denotes the minimum count a prefix has to have in order to not be discarded. The dependent exponential runtime of the algorithms is mitigated by the pruning parameter.

Return type:

`pd.DataFrame`

```import pm4py

anonymized_event_log = pm4py.anonymize_differential_privacy(event_log, epsilon=1.0, k=10, p=20)
```

The `pm4py.read` module contains all funcationality related to reading files/objects from disk.

pm4py.read.read_xes(file_path: str, variant: str = 'lxml', return_legacy_log_object: bool = False, **kwargs) Union[DataFrame, EventLog][source]#

Reads an event log stored in XES format (see xes-standard) Returns a table (`pandas.DataFrame`) view of the event log.

Parameters:
• file_path (`str`) – file path of the event log (`.xes` file) on disk

• variant (`str`) – the variant of the importer to use. “iterparse” => traditional XML parser; “line_by_line” => text-based line-by-line importer ; “chunk_regex” => chunk-of-bytes importer (default); “iterparse20” => XES 2.0 importer

• return_legacy_log_object (`bool`) – boolean value enabling returning a log object (default: False)

Return type:

`DataFrame`

```import pm4py

```

Reads a Petri net object from a .pnml file. The Petri net object returned is a triple containing the following objects:

1. Petrinet Object, encoded as a `PetriNet` class

2. Initial Marking

3. Final Marking

Return type:

`Tuple[PetriNet, Marking, Marking]`

Parameters:

file_path (`str`) – file path of the Petri net model (`.pnml` file) on disk

```import pm4py

```

Reads a process tree object from a .ptml file

Parameters:

file_path (`str`) – file path of the process tree object on disk

Return type:

`ProcessTree`

```import pm4py

```

Reads a DFG object from a .dfg file. The DFG object returned is a triple containing the following objects:

1. DFG Object, encoded as a `Dict[Tuple[str,str],int]`, s.t. `DFG[('a','b')]=k` implies that activity `'a'` is directly followed by activity `'b'` a total of `k` times in the log

2. Start activity dictionary, encoded as a `Dict[str,int]`, s.t., `S['a']=k` implies that activity `'a'` is starting `k` traces in the event log

3. End activity dictionary, encoded as a `Dict[str,int]`, s.t., `E['z']=k` implies that activity `'z'` is ending `k` traces in the event log.

Return type:

`Tuple[Dict[Tuple[str,str],int], Dict[str,int], Dict[str,int]]`

Parameters:

file_path (`str`) – file path of the dfg model on disk

```import pm4py

```

Reads a BPMN model from a .bpmn file

Parameters:

file_path (`str`) – file path of the bpmn model

Return type:

`BPMN`

```import pm4py

```

Reads an object-centric event log from a file (see: http://www.ocel-standard.org/). The `OCEL` object is returned by this method

Parameters:
• file_path (`str`) – file path of the object-centric event log

• objects_path – [Optional] file path from which the objects dataframe should be read

Return type:

`OCEL`

```import pm4py

```

Reads an object-centric event log from a CSV file (see: http://www.ocel-standard.org/). The `OCEL` object is returned by this method

Parameters:
• file_path (`str`) – file path of the object-centric event log (.csv)

• objects_path – [Optional] file path from which the objects dataframe should be read

Return type:

`OCEL`

```import pm4py

```

Reads an object-centric event log from a JSON-OCEL file (see: http://www.ocel-standard.org/). The `OCEL` object is returned by this method

Parameters:

file_path (`str`) – file path of the object-centric event log (.jsonocel)

Return type:

`OCEL`

```import pm4py

```

Reads an object-centric event log from a XML-OCEL file (see: http://www.ocel-standard.org/). The `OCEL` object is returned by this method

Parameters:

file_path (`str`) – file path of the object-centric event log (.xmlocel)

Return type:

`OCEL`

```import pm4py

```

Reads an object-centric event log from a SQLite database (see: http://www.ocel-standard.org/). The `OCEL` object is returned by this method

Parameters:

file_path (`str`) – file path of the SQLite database (.sqlite)

Return type:

`OCEL`

```import pm4py

```

Parameters:

file_path (`str`) – path to the OCEL2.0 event log

Return type:

`OCEL`

```import pm4py

```

Reads an OCEL2.0 event log from a SQLite database

Parameters:

file_path (`str`) – path to the OCEL2.0 database

Return type:

`OCEL`

```import pm4py

```

Reads an OCEL2.0 event log from an XML file

Parameters:

file_path (`str`) – path to the OCEL2.0 event log

Return type:

`OCEL`

```import pm4py

```

## pm4py.sim module#

The `pm4py.sim` module contains the simulation algorithms offered in `pm4py`

pm4py.sim.play_out(*args: Union[Tuple[PetriNet, Marking, Marking], dict, Counter, ProcessTree], **kwargs) [source]#

Performs the playout of the provided model, i.e., gets a set of traces from the model. The function either takes a petri net, initial and final marking, or, a process tree as an input.

Parameters:
• args – model (Petri net with initial and final marking, or process tree)

• kwargs – dictionary containing the parameters of the playout

Return type:

`EventLog`

```import pm4py

log = pm4py.play_out(net, im, fm)
```
pm4py.sim.generate_process_tree(**kwargs) [source]#

Generates a process tree

Reference paper: PTandLogGenerator: A Generator for Artificial Event Data

Parameters:

kwargs – dictionary containing the parameters of the process tree generator algorithm

Return type:

`ProcessTree`

```import pm4py

process_tree = pm4py.generate_process_tree()
```

## pm4py.stats module#

The `pm4py.stats` module contains the statistics offered in `pm4py`

pm4py.stats.get_start_activities(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the start activities from a log object

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

start_activities = pm4py.get_start_activities(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_end_activities(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the end activities of a log

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

end_activities = pm4py.get_end_activities(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_event_attributes(log: Union[EventLog, DataFrame]) List[str][source]#

Returns the attributes at the event level of the log

Parameters:

log – Log object

Return type:

`List[str]`

```import pm4py

event_attributes = pm4py.get_event_attributes(dataframe)
```
pm4py.stats.get_trace_attributes(log: Union[EventLog, DataFrame]) List[str][source]#

Gets the attributes at the trace level of a log object

Parameters:

log – Log object

Return type:

`List[str]`

```import pm4py

trace_attributes = pm4py.get_trace_attributes(dataframe)
```
pm4py.stats.get_event_attribute_values(log: Union[EventLog, DataFrame], attribute: str, count_once_per_case=False, case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the values for a specified (event) attribute

Parameters:
• log – Log object

• attribute (`str`) – attribute

• count_once_per_case (`bool`) – If True, consider only an occurrence of the given attribute value inside a case (if there are multiple events sharing the same attribute value, count only 1 occurrence)

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

activities = pm4py.get_event_attribute_values(dataframe, 'concept:name', case_id_key='case:concept:name')
```
pm4py.stats.get_trace_attribute_values(log: Union[EventLog, DataFrame], attribute: str, case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Returns the values for a specified trace attribute

Parameters:
• log – Log object

• attribute (`str`) – Attribute

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

tr_attr_values = pm4py.get_trace_attribute_values(dataframe, 'case:attribute', case_id_key='case:concept:name')
```
pm4py.stats.get_variants(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str], List[Trace]][source]#

Gets the variants from the log

Parameters:
• log – Event log

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[Tuple[str], List[Trace]]`

```import pm4py

variants = pm4py.get_variants(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_variants_as_tuples(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str], List[Trace]][source]#

Gets the variants from the log (where the keys are tuples and not strings)

Parameters:
• log – Event log

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[Tuple[str], List[Trace]]`

```import pm4py

variants = pm4py.get_variants_as_tuples(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_stochastic_language(*args, **kwargs) Dict[List[str], float][source]#

Gets the stochastic language from the provided object

Parameters:
• args – Pandas dataframe / event log / accepting Petri net / process tree

• kwargs – keyword arguments

Return type:

`Dict[List[str], float]`

```import pm4py

language_log = pm4py.get_stochastic_language(log)
print(language_log)
language_model = pm4py.get_stochastic_language(net, im, fm)
print(language_model)
```
pm4py.stats.get_minimum_self_distances(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The minimum self distance is the minimal observed self distance value in the event log.

Parameters:
• log – event log (either pandas.DataFrame, EventLog or EventStream)

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

msd = pm4py.get_minimum_self_distances(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_minimum_self_distance_witnesses(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Set[str]][source]#

This function derives the minimum self distance witnesses. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The minimum self distance is the minimal observed self distance value in the event log. A ‘witness’ is an activity that witnesses the minimum self distance. For example, if the minimum self distance of activity a in some log L is 2, then, if trace <a,b,c,a> is in log L, b and c are a witness of a.

Parameters:
• log – Event Log to use

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, Set[str]]`

```import pm4py

msd_wit = pm4py.get_minimum_self_distance_witnesses(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_case_arrival_average(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Gets the average difference between the start times of two consecutive cases

Parameters:
• log – log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`float`

```import pm4py

case_arr_avg = pm4py.get_case_arrival_average(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_rework_cases_per_activity(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

Find out for which activities of the log the rework (more than one occurrence in the trace for the activity) occurs. The output is a dictionary associating to each of the aforementioned activities the number of cases for which the rework occurred.

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[str, int]`

```import pm4py

rework = pm4py.get_rework_cases_per_activity(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_case_overlap(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[int][source]#

Associates to each case in the log the number of cases concurrently open

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`List[int]`

```import pm4py

overlap = pm4py.get_case_overlap(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. the get_case_overlap function will be removed in a future release.

pm4py.stats.get_cycle_time(log: Union[EventLog, DataFrame], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the cycle time of the event log.

So: Cycle time = Average time between completion of units.

Example taken from the website: Consider a manufacturing facility, which is producing 100 units of product per 40 hour week. The average throughput rate is 1 unit per 0.4 hours, which is one unit every 24 minutes. Therefore the cycle time is 24 minutes on average.

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`float`

```import pm4py

cycle_time = pm4py.get_cycle_time(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_all_case_durations(log: Union[EventLog, DataFrame], business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[float][source]#

Gets the durations of the cases in the event log

Parameters:
• log – Event log

• business_hours (`bool`) – Enables/disables the computation based on the business hours (default: False)

• business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`List[float]`

```import pm4py

case_durations = pm4py.get_all_case_durations(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_case_duration(log: Union[EventLog, DataFrame], case_id: str, business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: Optional[str] = None) float[source]#

Gets the duration of a specific case

Parameters:
• log – Event log

• case_id (`str`) – Case identifier

• business_hours (`bool`) – Enables/disables the computation based on the business hours (default: False)

• business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key – attribute to be used as case identifier

Return type:

`float`

```import pm4py

duration = pm4py.get_case_duration(dataframe, 'case 1', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```
pm4py.stats.get_activity_position_summary(log: Union[EventLog, DataFrame], activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[int, int][source]#

Given an event log, returns a dictionary which summarize the positions of the activities in the different cases of the event log. E.g., if an activity happens 1000 times in the position 1 (the second event of a case), and 500 times in the position 2 (the third event of a case), then the returned dictionary would be: {1: 1000, 2: 500}

Parameters:
• log – Event log object / Pandas dataframe

• activity (`str`) – Activity to consider

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

Return type:

`Dict[int, int]`

```import pm4py

act_pos = pm4py.get_activity_position_summary(dataframe, 'Act. A', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
```

## pm4py.utils module#

pm4py.utils.format_dataframe(df: DataFrame, case_id: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', start_timestamp_key: str = 'start_timestamp', timest_format: Optional[str] = None) DataFrame[source]#

Give the appropriate format on the dataframe, for process mining purposes

Parameters:
• df (`DataFrame`) – Dataframe

• case_id (`str`) – Case identifier column

• activity_key (`str`) – Activity column

• timestamp_key (`str`) – Timestamp column

• start_timestamp_key (`str`) – Start timestamp column

• timest_format – Timestamp format that is provided to Pandas

Return type:

`pd.DataFrame`

```import pandas as pd
import pm4py

dataframe = pm4py.format_dataframe(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp', start_timestamp_key='start_timestamp', timest_format='%Y-%m-%d %H:%M:%S')
```

Deprecated since version 2.3.0: This will be removed in 3.0.0. the format_dataframe function does not need application anymore.

pm4py.utils.rebase(log_obj: Union[EventLog, EventStream, DataFrame], case_id: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', start_timestamp_key: str = 'start_timestamp') Union[EventLog, EventStream, DataFrame][source]#

Re-base the log object, changing the case ID, activity and timestamp attributes.

Parameters:
• log_obj – Log object

• case_id (`str`) – Case identifier

• activity_key (`str`) – Activity

• timestamp_key (`str`) – Timestamp

• start_timestamp_key (`str`) – Start timestamp

Return type:

`Union[EventLog, EventStream, pd.DataFrame]`

```import pm4py

rebased_dataframe = pm4py.rebase(dataframe, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
```
pm4py.utils.parse_process_tree(tree_string: str) [source]#

Parse a process tree from a string

Parameters:

tree_string (`str`) – String representing a process tree (e.g. ‘-> ( ‘A’, O ( ‘B’, ‘C’ ), ‘D’ )’). Operators are ‘->’: sequence, ‘+’: parallel, ‘X’: xor choice, ‘*’: binary loop, ‘O’ or choice

Return type:

`ProcessTree`

```import pm4py

process_tree = pm4py.parse_process_tree('-> ( 'A', O ( 'B', 'C' ), 'D' )')
```
pm4py.utils.serialize(*args) Tuple[str, bytes][source]#

Serialize a PM4Py object into a bytes string

Parameters:

args – A PM4Py object, among: - an EventLog object - a Pandas dataframe object - a (Petrinet, Marking, Marking) tuple - a ProcessTree object - a BPMN object - a DFG, including the dictionary of the directly-follows relations, the start activities and the end activities

Return type:

`Tuple[str, bytes]`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe)
serialization = pm4py.serialize(net, im, fm)
```
pm4py.utils.deserialize(ser_obj: Tuple[str, bytes]) Any[source]#

Deserialize a bytes string to a PM4Py object

Parameters:

ser_obj – Serialized object (a tuple consisting of a string denoting the type of the object, and a bytes string representing the serialization)

Return type:

`Any`

```import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe)
serialization = pm4py.serialize(net, im, fm)
net, im, fm = pm4py.deserialize(serialization)
```
pm4py.utils.get_properties(log, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource', group_key: Optional[str] = None, **kwargs)[source]#

Gets the properties from a log object

Parameters:
• log – Log object

• activity_key (`str`) – attribute to be used for the activity

• timestamp_key (`str`) – attribute to be used for the timestamp

• case_id_key (`str`) – attribute to be used as case identifier

• resource_key (`str`) – (if provided) attribute to be used as resource

• group_key – (if provided) attribute to be used as group identifier

Return type:

`Dict`

pm4py.utils.set_classifier(log, classifier, classifier_attribute='@@classifier')[source]#

Methods to set the specified classifier on an existing event log

Parameters:
• log – Log object

• classifier – Classifier that should be set: - A list of event attributes can be provided - A single event attribute can be provided - A classifier stored between the “classifiers” of the log object can be provided

• classifier_attribute (`str`) – The attribute of the event that should store the concatenation of the attribute values for the given classifier

Return type:

`Union[EventLog, pd.DataFrame]`

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.Please use the method-specific arguments.

pm4py.utils.parse_event_log_string(traces: Collection[str], sep: str = ',', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') DataFrame[source]#

Parse a collection of traces expressed as strings (e.g., [“A,B,C,D”, “A,C,B,D”, “A,D”]) to a log object (Pandas dataframe)

Parameters:
• traces – Collection of traces expressed as strings

• sep (`str`) – Separator used to split the activities of a string trace

• activity_key (`str`) – The attribute that should be used as activity

• timestamp_key (`str`) – The attribute that should be used as timestamp

• case_id_key (`str`) – The attribute that should be used as case identifier

Return type:

`pd.DataFrame`

```import pm4py

dataframe = pm4py.parse_event_log_string(["A,B,C,D", "A,C,B,D", "A,D"])
```
pm4py.utils.project_on_event_attribute(log: Union[EventLog, DataFrame], attribute_key='concept:name') List[List[str]][source]#

Project the event log on a specified event attribute. The r