Supported/Described Version(s): PM4Py 1.5.X
This documentation assumes that the reader has a basic understanding of process mining and python concepts.

Handling Event Data

Since PM4Py 1.5.x, we offer a simplified interface to import/export event logs. This provides a restricted set of choices in comparison to the normal interface. Moreover, we provide a simplified interface to convert the formats of the log objects.



The methods highlighted by ** are both required when importing a CSV file.

Function Description
log = pm4py.read_xes(file_path) Reads an event log in the XES standard.

Parameters:
file_path - File path

Returns:
Event log
** dataframe = pm4py.read_csv(file_path, sep=',', quotechar=None, encoding=None, nrows=None, timest_format=None) Reads an event log in the CSV format (Pandas adapter).

Parameters:
file_path - File path
sep - Separator; default: ,
quotechar - Quote char; default: None
encoding - Encoding; default: default of Pandas
nrows - (If specified) number of rows
timest_format - Format of the timestamp columns

Returns:
Dataframe
** dataframe = pm4py.format_dataframe(df, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp') Give the appropriate format on the dataframe, for process mining purposes .

Parameters:
df - Dataframe
case_id - Case identifier column
activity_key - Activity column
timestamp_key - Timestamp column

Returns:
Dataframe
pm4py.write_xes(log, file_path) Exports a XES log.

Parameters:
log - Event log
file_path - Destination path
pm4py.write_csv(log, file_path) Exports a CSV log .

Parameters:
log - Event log
file_path - Destination path
Function Description
log = pm4py.convert_to_event_log(obj) Converts a log object to an event log.

Parameters:
obj - Log object

Returns:
Event log object
stream = pm4py.convert_to_event_stream(obj) Converts a log object to an event stream .

Parameters:
obj - Log object

Returns:
Event stream object
dataframe = pm4py.convert_to_dataframe(obj) Converts a log object to a dataframe.

Parameters:
obj - Log object

Returns:
Dataframe

In this section, information about importing and exporting event logs, stored in various data formats, is presented. Before we dive into the details of importing and exporting various different types of files containing event data, we first briefly explain the two basic notions of event data used within PM4Py. We assume the reader to be farmiliar with the general concept of an event log. In general, we distingiush between two different event data object types:

  • Event Stream (objects.log.log.EventStream); Simply represents a sequence of events. Events themselves are simply an extension of the Mapping class of python (collections.abc.Mapping), which allows us to use events as a dict. From a programming perspective, an Event Stream behaves exactly like a list object in Python. However, when applying lambda functions, the result needs to be explicitly casted to an EventStream object.
  • Event Log (objects.log.log.EventLog); Represents a sequence of sequences of events. The concept of an event log is the more traditional view on event data, i.e., executions of a process are captured in traces of events. However, in PM4Py, the Event Log maintains an order of traces. In this way, sorting traces using some specific sorting criterion is supported naturally, and, lambda functions and filters are easily applied on top of Event Logs as well.

Importing IEEE XES files

IEEE XES is a standard format describing how event logs are stored. For more information about the format, please study the IEEE XES Website. A simple synthetic event log can be downloaded from here. Note that several real event logs have been made available, over the past few years. You can find them here.


The example code on the right shows how to import an event log, stored in the IEEE XES format, given a file path to the log file. The code fragment uses the standard importer (iterparse, described in a later paragraph). Note that IEEE XES Event Logs are imported into an Event Log object, i.e., as described earlier.

from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply('<path_to_xes_file.xes>')

Event logs are stored as an extension of the Python list data structure. To access a trace in the log, it is enough to provide its index in the event log. Consider the example on the right on how to access the different objects stored in the imported log.

print(log[0]) #prints the first trace of the log
print(log[0][0]) #prints the first event of the first trace

The apply() method of the xes_importer, i.e. located in pm4py.objects.log.importer.xes.importer.py, contains two optional parameters: variant and parameters. The variant parameter indicates which variant of the importer to use. The parameters parameter is a Python dictionary, specifying specific parameters of choice.

from pm4py.objects.log.importer.xes import importer as xes_importer
variant = xes_importer.Variants.ITERPARSE
parameters = {variant.value.Parameters.TIMESTAMP_SORT: True}
log = xes_importer.apply('<path_to_xes_file>',
                         variant=variant, parameters=parameters)

This method invocation style is used throughout PM4Py in the various different algorithms implemented, i.e., by wrapping around the different implementations, new variants of algorithms are easily called, using previously written PM4Py code. W.r.t. XES importers, two variants are provided. One implementation is based on the iterparse() function of xml.etree. The other variant is a line-by-line, custom parser (for improved performance). It does not follow the standard and is able to import traces, simple trace attributes, events, and simple event attributes. To specify a variant, we add the following argument to the call to the importer: variant=xes_importer.Variants.ITERPARSE (note that, in the example code, this is encapsulated in local varaible variant). The xes_importer.Variants.ITERPARSE-value, actually maps on the underlying Python module, implementing the iterparse-based importer. We are able to access that reference, by accessing the value property, e.g., xes_importer.Variants.ITERPARSE.value. That module, contains a parameter definition, i.e., Parameters, containing all possible parameters for the iterparse-variant. As an example, parameter TIMESTAMP_SORT is one of those, accessed by xes_importer.Variants.ITERPARSE.value.Parameters.TIMESTAMP_SORT. Click the button below, to reveal all variants and corresponding parameters defined for importing IEEE XES files.

Variant Parameter Key Type Default Description
Iterparse (ITERPARSE) TIMESTAMP_SORT boolean False If True, the log is sorted by timestamp.
TIMESTAMP_KEY string 'time:timestamp' If timestamp_sort is True, then using this event-attribute key to read timestamps.
REVERSE_SORT boolean False If True, the sorting is inverted.
INSERT_TRACE_INDICES boolean False If True, trace indices are added as an event attribute for each event
MAX_TRACES integer 1000000000 Maximum number of traces to import from the log
Line-By-Line (LINE_BY_LINE) TIMESTAMP_SORT boolean False (Same as Iterparse)
TIMESTAMP_KEY string 'time:timestamp' (Same as Iterparse)
REVERSE_SORT boolean False (Same as Iterparse)
INSERT_TRACE_INDICES boolean False (Same as Iterparse)
MAX_TRACES integer 1000000000 (Same as Iterparse)
MAX_BYTES integer 100000000000 Maximum number of bytes to read

Importing CSV files

Apart from the IEEE XES standard, a lot of event logs are actually stored in a CSV file. In general, there is two ways to deal with CSV files in PM4Py:

  • Import the CSV into a pandas DataFrame; In general, most existing algorithms in PM4Py are coded to be flexible in terms of their input, i.e., if a certain event log object is provided that is not in the right form, we translate it to the appropriate form for you. Hence, after importing a dataframe, most algorithms are directly able to work with the data frame.
  • Convert the CSV into an event log object (similar to the result of the IEEE XES importer presented in the previous section); In this case, the first step is to import the CSV file using pandas (similar to the previous bullet) and subsequently converting it to the event log object. In the remainder of this section, we briefly highlight how to convert a pandas DataFrame to an event log. Note that most algorithms use the same type of conversion, in case a given event data object is not of the right type.

To convert objects in PM4Py, there is a dedicated package, i.e., objects.conversion. The conversion package allows one to convert an object of a certain type to a new object of a different type (if such a conversion is applicable). Within the conversion package, a standard naming convention is applied, i.e., the type of the input object defines the package in which the code resides. Thus, since we assume that the imported DataFrame represents an event log, we find the appropriate conversion in the objects.conversion.log package.


The example code on the right shows how to convert a CSV file into the PM4Py internal event data object types. By default, the converter converts the dataframe to an Event Log object (i.e., not an Event Stream).

Actually, we suggest to sort the dataframe by its timestamp column. In the example on the right, it is assumed that the timestamp column is timestamp. This ensures that events are sorted by their timestamp.

import pandas as pd
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter

log_csv = pd.read_csv('<path_to_csv_file.csv>', sep=',')
log_csv = dataframe_utils.convert_timestamp_columns_in_df(log_csv)
log_csv = log_csv.sort_values('<timestamp_column>')
event_log = log_converter.apply(log_csv)

Note that the example code above does not directly work in a lot of cases. There are a few reasons for this. First of all, a CSV-file, by definition, is more close to an Event Stream, i.e., it represents a sequence of events. Since an event log 'glues' events together that belong to the same case, i.e., into a trace of events, we need to specify to the converter what attribute to use for this. The parameter we need to set for this, i.e., in the converter is the CASE_ID_KEY parameter. Its default value is 'case:concept:name'. Hence, when our input event data, stored in a csv-file has a column with the name case:concept:name, that column is used to define traces.


Therefore, let us consider a very simple example event log, and, assume it is stored as a csv-file:

case activity timestamp clientID
1 register request 20200422T0455 1337
2 register request 20200422T0457 1479
1 submit payment 20200422T0503 1337

In this small example table, we observe four columns, i.e., case, activity, timestamp and clientID. Clearly, when importing the data and converting it to an Event Log object, we aim to combine all rows (events) with the same value for the case column together. Hence, the default value of the CASE_ID_KEY parameter is not set to the right value. Another interesting phenomenon in the example data is the fourth column, i.e., clientID. In fact, the client ID is an attribute that will not change over the course of execution a process instance, i.e., it is a case-level attribute. PM4Py allows us to specify that a column actually describes a case-level attribute (under the assumption that the attribute does not change during the execution of a process). However, for this, we need to specify an additional parameter, i.e., the CASE_ATTRIBUTE_PREFIX parameter, with default value 'case:'.


The example code on the right shows how to convert the previously examplified csv data file. After loading the csv file of the example table, we rename the clientID column to case:clientID (this is a specific operation provided by pandas!). Then, we specify that the column identifying the case identifier attribute is the column with name 'case'. Note that the full parameter path is log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY

import pandas as pd
from pm4py.objects.conversion.log import converter as log_converter

log_csv = pd.read_csv('<path_to_csv_file.csv>', sep=',')
log_csv.rename(columns={'clientID': 'case:clientID'}, inplace=True)
parameters = {log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY: 'case'}
event_log = log_converter.apply(log_csv, parameters=parameters, variant=log_converter.Variants.TO_EVENT_LOG)

In case we would like to use a different prefix for the case-level attributes, e.g., 'caseAttr', we can do so by mapping the CASE_ATTRIBUTE_PREFIX (full path: log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ATTRIBUTE_PREFIX) to the value 'caseAttr'. Note that in the call to the converter, in this case, we explicitly set the variant to be used, e.g., log_converter.Variants.TO_EVENT_LOG. Finally, note that any type of data format that can be parsed to a Pandas DataFrame, is supported by PM4Py.

Converting Event Data

In this section, we describe how to convert event log objects from one object type to another object type. As mentioned in the previous section, the conversion functionality of event logs is located in pm4py.objects.conversion.log.converter. There are three objects, which we are able to 'switch' between, i.e., Event Log, Event Stream and Data Frame objects. Please refer to the previous code snippet for an example of applying log conversion (applied when importing a CSV object). Finally, note that most algorithms internally use the converters, in order to be able to handle an input event data object of any form. In such a case, the default parameters are used.

Variant Parameter Key Type Default Description
TO_EVENT_LOG STREAM_POST_PROCESSING boolean False Removes events that have no type information.
CASE_ATTRIBUTE_PREFIX string 'case:' Any attribute (column in case of DF) with the prefix 'case:' is stored as a trace attribute.
CASE_ID_KEY string 'case:concept:name' Attribute (column in case of DF) that needs to be used to define traces.
DEEP_COPY boolean False If set to True objects will be created using a deep-copy (if applicable). Avoids side-effects (specifically when converting an Event Stream to an Event Log).
TO_EVENT_STREAM STREAM_POST_PROCESSING boolean False (Same as TO_EVENT_LOG)
CASE_ATTRIBUTE_PREFIX string 'case:' Any trace attribute (in case of converting an Event Log to an Event Stream object) will get this prefix. Not applicable if we translate a DataFrame to an Event Stream object.
DEEP_COPY boolean False (Same as TO_EVENT_LOG)
TO_DATA_FRAME CASE_ATTRIBUTE_PREFIX string 'case:' (Same as TO_EVENT_STREAM; will only be applied if input is an Event Log object, i.e., which will first be translated to an Event Stream Object.)
DEEP_COPY boolean False (Same as TO_EVENT_STREAM)

Exporting IEEE XES files

Exporting an Event Log object to an IEEE Xes file is fairly straightforward in PM4Py. Consider the example code fragment on the right, which depicts this functionality.

from pm4py.objects.log.exporter.xes import exporter as xes_exporter
xes_exporter.apply(log, '<path_to_exported_log.xes>')

In the example, the log object is assumed to be an Event Log object. The exporter also accepts an Event Stream or DataFrame object as an input. However, the exporter will first convert the given input object into an Event Log. Hence, in this case, standard parameters for the conversion are used. Thus, if the user wants more control, it is advisable to apply the conversion to Event Log, prior to exporting.

Variant Parameter Key Type Default Description
ETree (ETREE) COMPRESS boolean False If True, the log is stored as a 'xes.gz' file.

Exporting logs to CSV

To export an event log to a csv-file, PM4Py uses Pandas. Hence, an event log is first converted to a Pandas Data Frame, after which it is written to disk.

import pandas as pd
from pm4py.objects.conversion.log import converter as log_converter
dataframe = log_converter.apply(log, variant=log_converter.Variants.TO_DATA_FRAME)
dataframe.to_csv('<path_to_csv_file.csv>')
                                

In case an event log object is provided that is not a dataframe, i.e., an Event Log or Event Stream, the conversion is applied, using the default parameter values, i.e., as presented in the Converting Event Data section. Note that exporting event data to as csv file has no parameters. In case more control over the conversion is needed, please apply a conversion to dataframe first, prior to exporting to csv.

I/O with Other File Types

At this moment, I/O of any format supported by Pandas (dataframes) is implicitly supported. As long as data can be loaded into a Pandas dataframe, PM4Py is reasonably able to work with such files.

Generic Event Data Manipulation

Since Event Logs and Event Streams are iterables (note: this does not apply for dataframes), they are applicable to be used in combination with lambda functions. However, as they contain more information (such as log-level attributes), directly appying, e.g., a filter, does not work. Therefore, a utility package is available that wraps around filtering/maps/sorting in order to combine this functionality with Event Logs. The code is located in pm4py.objects.log.util.func

Consider the code fragment on the right, which first imports an event log and then filters out each trace with a length shorter than three. The func.filter_ function mimics the built-in Python function filter(). However, it returns the filtered list of traces, included in an Event Log (or Event Stream) object.

from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.util import func

log = xes_importer.apply('<path_to_imported_log.xes>')
log = func.filter_(lambda t: len(t) > 2, log)

Apart from the filter_-function, the pm4py.objects.log.util.func package provides a map_ and a sort_ function.

Filtering Event Data

PM4Py also has various specific methods to filter an event log.

Since PM4Py 1.5.x, we offer a simplified interface to filter logs. This provides a restricted set of choices in comparison to the normal interface.



Function Description
log = pm4py.filter_attribute_values(log, attribute, values, how='cases', positive=True) Filter a log object on the values of some attribute

Parameters:
log - Log object
attribute - Attribute to filter
values - List containing the admitted (or forbidden) values
how - Specifies how the filter should be applied (the string cases filters the cases where at least one occurrence happens, events filters the events eventually trimming the cases)
positive - Boolean specifying if the values should be kept or removed

Returns:
Filtered log object
log = pm4py.filter_end_activities(log, admitted_end_activities) Filter cases having an end activity in the provided list.

Parameters:
log - Log object
admitted_end_activities - List of admitted end activities

Returns:
Filtered log object
log = pm4py.filter_start_activities(log, admitted_start_activities) Filter cases having an end activity in the provided list.

Parameters:
log - Log object
admitted_start_activities - List of admitted start activities

Returns:
Filtered log object
log = pm4py.filter_timestamp(log, dt1, dt2, how='events') Filter cases having an end activity in the provided list.

Parameters:
log - Log object
dt1 - Left extreme of the interval
dt2 - Right extreme of the interval
how - Modality of filtering (events, traces_contained, traces_intersecting)

Returns:
Filtered log object
log = pm4py.filter_paths(log, allowed_paths, positive=True) Filter a log on a specified list of paths.

Parameters:
log - Log object
allowed_paths - List of allowed/forbidden paths (list of tuples containing two activities)
positive - Parameter that says whether the paths should be kept/removed

Returns:
Filtered log object
log = pm4py.filter_variants(log, admitted_variants) Filter a log on a specified set of variants.

Parameters:
log - Log object
admitted_variants - List of variants to filter

Returns:
Filtered log object
log = pm4py.filter_variants_percentage(log, percentage=0.8) Filter a log on a specified set of variants.

Parameters:
log - Log object
percentage - Percentage of admitted variants

Returns:
Filtered log object
Function Description
attributes_list = pm4py.get_attributes(log) Returns the attributes at the event level of the log.

Parameters:
log - Log object

Returns:
List of attributes contained in the log.
attribute_values = pm4py.get_attribute_values(log, attribute) Returns the values for a specified attribute.

Parameters:
log - Log object
attribute - Attribute

Returns:
Dictionary of values along with their count.
end_activities = pm4py.get_end_activities(log) Returns the end activities of a log.

Parameters:
log - Log object

Returns:
Dictionary of end activities along with their count.
start_activities = pm4py.get_start_activities(log) Returns the start activities from a log object.

Parameters:
log - Log object

Returns:
Dictionary of start activities along with their count.
variants = pm4py.get_variants(log) Gets the variants from the log.

Parameters:
log - Log object

Returns:
Dictionary of variants along with their count.

Filtering on timeframe

In the following paragraph, various methods regarding filtering with time frames are present. For each of the methods, the log and Pandas Dataframe methods are revealed.

One might be interested in only keeping the traces that are contained in a specific interval, e.g. 09 March 2011 and 18 January 2012. The first code snippet works for a log object, the second one for a dataframe object.

from pm4py.algo.filtering.log.timestamp import timestamp_filter
filtered_log = timestamp_filter.filter_traces_contained
               (log, "2011-03-09 00:00:00", "2012-01-18 23:59:59")
from pm4py.algo.filtering.pandas.timestamp import timestamp_filter
df_timest_intersecting = timestamp_filter.filter_traces_intersecting
               (dataframe, "2011-03-09 00:00:00", "2012-01-18 23:59:59",
                                          parameters={timestamp_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      timestamp_filter.Parameters.TIMESTAMP_KEY: "time:timestamp"})

However, it is also possible to keep the traces that are intersecting with a time interval. The first example is again for log objects, the second one for dataframe objects.

from pm4py.algo.filtering.log.timestamp import timestamp_filter
filtered_log = timestamp_filter.filter_traces_intersecting
               (log, "2011-03-09 00:00:00", "2012-01-18 23:59:59")
from pm4py.algo.filtering.pandas.timestamp import timestamp_filter
df_timest_intersecting = timestamp_filter.filter_traces_intersecting
               (dataframe, "2011-03-09 00:00:00", "2012-01-18 23:59:59",
                                          parameters={timestamp_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      timestamp_filter.Parameters.TIMESTAMP_KEY: "time:timestamp"})

Until now, only trace based techniques have been discussed. However, there is a method to keep the events that are contained in specific timeframe. As previously mentioned, the first code snippet provides information about how to apply this technique on log objects, whereby the second snippets provides information about how to apply this on dataframe objects.

from pm4py.algo.filtering.log.timestamp import timestamp_filter
filtered_log_events = timestamp_filter.apply_events
               (log, "2011-03-09 00:00:00", "2012-01-18 23:59:59")
from pm4py.algo.filtering.pandas.timestamp import timestamp_filter
df_timest_events = timestamp_filter.apply_events
               (dataframe, "2011-03-09 00:00:00", "2012-01-18 23:59:59",
                                          parameters={timestamp_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      timestamp_filter.Parameters.TIMESTAMP_KEY: "time:timestamp"})

Filter on case performance

This filter permits to keep only traces with duration that is inside a specified interval. In the examples, traces between 1 and 10 days are kept. Note that the time parameters are given in seconds. The first code snippet applies this technique on log object, the second one on a dataframe object.

from pm4py.algo.filtering.log.cases import case_filter
filtered_log = case_filter.filter_case_performance(log, 86400, 864000)
from pm4py.algo.filtering.pandas.cases import case_filter
df_cases = case_filter.filter_case_performance
               (dataframe, min_case_performance=86400, max_case_performance=864000,
                                          parameters={case_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      case_filter.Parameters.TIMESTAMP_KEY: "time:timestamp"})

Filter on start activities

In general, PM4Py offers two methods to filter a log or a dataframe on start activities. In the first method, a list of start activities has to be specified. On the activities that are contained in the list, the filter is applied on. In the second method, a decreasing factor is used. An explanation can be inspected by clicking on the button below.

Suppose the following start activity and their respective occurrences.
Activity Number of occurrences
A 1000
B 700
C 300
D 50
Assume DECREASING_FACTOR to be 0.6. The most frequent start activity is kept, A in this case. Then, the number of occurrences of the next frequent activity is divided by the number of occurrences of this activity. Therefore, the computation is 700/1000=0.7. Since 0.7>0.6, B is kept as admissible start activity. In the next step, the number of occurrences of activity C and B are compared. In this case 300/700≈0.43. Since 0.43<0.6, C is not accepted as admissible start activity and the method stops here.

First of all, it might be necessary to know the starting activities. Therefore, code snippets are provided. Subsequently, an example of filtering is provided. The first snippet is working with log object, the second one is working on a dataframe. log_start is a dictionary that contains as key the activity and as value the number of occurrence.

from pm4py.algo.filtering.log.start_activities import start_activities_filter
log_start = start_activities_filter.get_start_activities(log)
filtered_log = start_activities_filter.apply(log, ["S1"]) #suppose "S1" is the start activity you want to filter on
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
log_start = start_activities_filter.get_start_activities(dataframe)
df_start_activities = start_activities_filter.apply(dataframe, ["S1"],
                                          parameters={start_activities_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      start_activities_filter.Parameters.ACTIVITY_KEY: "concept:name"}) #suppose "S1" is the start activity you want to filter on
						

As mentioned earlier, there is also a method that aims to keep the frequent start activities. Again, the first snippet is about a log object, the second is about a dataframe object. The default value for DECREASING_FACTOR is 0.6.

from pm4py.algo.filtering.log.start_activities import start_activities_filter
log_af_sa = start_activities_filter.apply_auto_filter
               (log, parameters={start_activities_filter.Parameters.DECREASING_FACTOR: 0.6})
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
df_auto_sa = start_activities_filter.apply_auto_filter
               (dataframe, parameters={start_activities_filter.Parameters.DECREASING_FACTOR: 0.6})

Filter on end activities

In general, PM4Py offers two methods to filter a log or a dataframe on end activities. In the first method, a list of end activities has to be specified. On the activities that are contained in the list, the filter is applied on. In the second method, a decreasing factor is used. An explanation can be inspected by clicking on the button in the start activity section.

This filter permits to keep only traces with an end activity among a set of specified activities. First of all, it might be necessary to know the end activities. Therefore, code snippets are provided. Subsequently, an example of filtering is provided. Here, for the dataframe filtering, a further attribute specification is possible: case:concept:name is in this case the column of the dataframe that is the Case ID, concept:name is the column of the dataframe that is the activity.

from pm4py.algo.filtering.log.end_activities import end_activities_filter
end_activities = end_activities_filter.get_end_activities(log)
filtered_log = end_activities_filter.apply(log, ["pay compensation"])
from pm4py.algo.filtering.pandas.end_activities import end_activities_filter
end_acitivites = end_activities_filter.get_end_activities(df)
filtered_df = end_activities_filter.apply(df, ["pay compensation"],
                                          parameters={end_activities_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      end_activities_filter.Parameters.ACTIVITY_KEY: "concept:name"})

Filter on variants

A variant is a set of cases that share the same control-flow perspective, so a set of cases that share the same classified events (activities) in the same order. In this section, we will focus for all methods first on log objects, then we will continue with the dataframe.

To get the list of variants contained in a given log, the following code could be used. The first code is for an log object, the second for a dataframe. The result is expressed as a dictionary having as key the variant and as value the list of cases that share the variant.

from pm4py.algo.filtering.log.variants import variants_filter
variants = variants_filter.get_variants(log)
from pm4py.statistics.traces.pandas import case_statistics
variants = case_statistics.get_variants_df(df,
                                          parameters={case_statistics.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      case_statistics.Parameters.ACTIVITY_KEY: "concept:name"})

If the number of occurrences of the variants is of interest, the following code retrieves a list of variants along with their count (so, a dictionary which key is the variant and the value is the number of occurrences).

from pm4py.statistics.traces.log import case_statistics
variants_count = case_statistics.get_variant_statistics(log)
variants_count = sorted(variants_count, key=lambda x: x['count'], reverse=True)
from pm4py.statistics.traces.pandas import case_statistics
variants_count = case_statistics.get_variant_statistics(df,
                                          parameters={case_statistics.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      case_statistics.Parameters.ACTIVITY_KEY: "concept:name",
                                                      case_statistics.Parameters.TIMESTAMP_KEY: "time:timestamp"})
variants_count = sorted(variants_count, key=lambda x: x['case:concept:name'], reverse=True)

To filter based on variants, assume that variants is a list, whereby each element is a variant (expressed in an equal way as in the variants retrieval method). The first method can be applied on log objects, the second can be applied on dataframe objects. Note that the variants given in variants are kept.

from pm4py.algo.filtering.log.variants import variants_filter
             filtered_log1 = variants_filter.apply(log, variants)
from pm4py.algo.filtering.pandas.variants import variants_filter
             filtered_df1 = variants_filter.apply(df, variants,
                                          parameters={variants_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      variants_filter.Parameters.ACTIVITY_KEY: "concept:name"})

Contrary to the previous example, suppose you want to filter the given variants out. Again, let variants be a list, whereby each element is a variant.

filtered_log2 = variants_filter.apply(log, variants, parameters={variants_filter.Parameters.POSITIVE: False})
filtered_df2 = variants_filter.apply(df, variants,
                                          parameters={variants_filter.Parameters.POSITIVE: False, variants_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      variants_filter.Parameters.ACTIVITY_KEY: "concept:name"})

A filter to keep automatically the most common variants could be applied through the apply_auto_filter method. This method accepts a parameter called DECREASING_FACTOR (default value is 0.6; further details are provided in the start activities filter).

auto_filtered_log = variants_filter.apply_auto_filter(log)
auto_filtered_df = variants_filter.apply_auto_filter(df,
                                          parameters={variants_filter.Parameters.POSITIVE: False, variants_filter.Parameters.CASE_ID_KEY: "case:concept:name",
                                                      variants_filter.Parameters.ACTIVITY_KEY: "concept:name"})
On the event log objects, a filter on the variants percentage can be applied as shown in the following example. The percentage of variants to keep must be specified in the percentage parameter as a number between 0 (only the most frequent variant is kept) and 1 (all the variants are kept).

from pm4py.algo.filtering.log.variants import variants_filter

filtered_log = variants_filter.filter_log_variants_percentage(log, percentage=0.5)
                            

Filter on attributes values

Filtering on attributes values permits alternatively to:

  • Keep cases that contains at least an event with one of the given attribute values
  • Remove cases that contains an event with one of the the given attribute values
  • Keep events (trimming traces) that have one of the given attribute values
  • Remove events (trimming traces) that have one of the given attribute values

Example of attributes are the resource (generally contained in org:resource attribute) and the activity (generally contained in concept:name attribute). As noted before, the first method can be applied on log objects, the second on dataframe objects.

To get the list of resources and activities contained in the log, the following code could be used.

from pm4py.algo.filtering.log.attributes import attributes_filter
activities = attributes_filter.get_attribute_values(log, "concept:name")
resources = attributes_filter.get_attribute_values(log, "org:resource")
from pm4py.algo.filtering.pandas.attributes import attributes_filter
activities = attributes_filter.get_attribute_values(df, attribute_key="concept:name")
resources = attributes_filter.get_attribute_values(df, attribute_key="org:resource")

To filter traces containing/not containing a given list of resources, the following code could be used.


tracefilter_log_pos = attributes_filter.apply(log, ["Resource10"],
                                          parameters={attributes_filter.Parameters.ATTRIBUTE_KEY: "org:resource", attributes_filter.Parameters.POSITIVE: True})
tracefilter_log_neg = attributes_filter.apply(log, ["Resource10"],
                                          parameters={attributes_filter.Parameters.ATTRIBUTE_KEY: "org:resource", attributes_filter.Parameters.POSITIVE: False})

df_traces_pos = attributes_filter.apply(df, ["Resource10"],
                                          parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "org:resource", attributes_filter.Parameters.POSITIVE: True})
df_traces_neg = attributes_filter.apply(df, ["Resource10"],
                                          parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "org:resource", attributes_filter.Parameters.POSITIVE: False})

To apply automatically a filter on events attributes (trimming traces and keeping only events containing the attribute with a frequent value), the apply_auto_filter method is provided. The method accepts as parameters the attribute name and the DECREASING_FACTOR (default 0.6; an explanation could be found on the start activities filter).

from pm4py.algo.filtering.log.attributes import attributes_filter
filtered_log = attributes_filter.apply_auto_filter(log, parameters={
    attributes_filter.Parameters.ATTRIBUTE_KEY: "concept:name", attributes_filter.Parameters.DECREASING_FACTOR: 0.6})
from pm4py.algo.filtering.pandas.attributes import attributes_filter
filtered_df = attributes_filter.apply_auto_filter(df, parameters={
    attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "concept:name", attributes_filter.Parameters.DECREASING_FACTOR: 0.6})

Filter on numeric attribute values

Filtering on numeric attribute values provide options that are similar to filtering on string attribute values (that we already considered).

First, we import, the log. Subsequently, we want to keep only the events satisfying an amount comprised between 34 and 36. An additional filter aims to to keep only cases with at least an event satisfying the specified amount. The filter on cases provide the option to specify up to two attributes that are checked on the events that shall satisfy the numeric range. For example, if we are interested in cases having an event with activity Add penalty that has an amount between 34 and 500, a code snippet is also provided.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "roadtraffic100traces.xes"))

from pm4py.algo.filtering.log.attributes import attributes_filter
filtered_log_events = attributes_filter.apply_numeric_events(log, 34, 36,
                                             parameters={attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

filtered_log_cases = attributes_filter.apply_numeric(log, 34, 36,
                                             parameters={attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

filtered_log_cases = attributes_filter.apply_numeric(log, 34, 500,
                                             parameters={attributes_filter.Parameters.ATTRIBUTE_KEY: "amount",
                                                         attributes_filter.Parameters.STREAM_FILTER_KEY1: "concept:name",
                                                         attributes_filter.Parameters.STREAM_FILTER_VALUE1: "Add penalty"})

The former method can also be applied on dataframes.

import os
from pm4py.objects.log.adapters.pandas import csv_import_adapter
df = csv_import_adapter.import_dataframe_from_path(os.path.join("tests", "input_data", "roadtraffic100traces.csv"))

from pm4py.algo.filtering.pandas.attributes import attributes_filter
filtered_df_events = attributes_filter.apply_numeric_events(df, 34, 36,
                                             parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

filtered_df_cases = attributes_filter.apply_numeric(df, 34, 36,
                                             parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

filtered_df_cases = attributes_filter.apply_numeric(df, 34, 500,
                                             parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount",
                                                         attributes_filter.Parameters.STREAM_FILTER_KEY1: "concept:name",
                                                         attributes_filter.Parameters.STREAM_FILTER_VALUE1: "Add penalty"})

Process Discovery

Since PM4Py 1.5.x, we offer a simplified interface for process discovery. This provides a restricted set of choices in comparison to the normal interface. Moreover, we offer a simplified interface to visualize and save the visualizations of the process models.





Function Description
petri_net, initial_marking, final_marking = pm4py.discover_petri_net_alpha(log) Discovers a Petri net using the Alpha Miner.

Parameters:
log - Event log

Returns:
  • petri_net: Petri net
  • initial_marking: Initial marking
  • final_marking: Final marking
petri_net, initial_marking, final_marking = pm4py.discover_petri_net_alpha_plus(log) Discovers a Petri net using the Alpha+ algorithm.

Parameters:
log - Event log

Returns:
  • petri_net: Petri net
  • initial_marking: Initial marking
  • final_marking: Final marking
petri_net, initial_marking, final_marking = pm4py.discover_petri_net_heuristics(log) Discovers a Petri net using the Heuristics Miner.

Parameters:
log - Event log
dependency_threshold - Dependency threshold (default: 0.5)
and_threshold - AND threshold (default: 0.65)
loop_two_threshold - Loop two threshold (default: 0.5)

Returns:
  • petri_net: Petri net
  • initial_marking: Initial marking
  • final_marking: Final marking
petri_net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(log) Discovers a Petri net using the Inductive Miner algorithm.

Parameters:
log - Event log
noise_threshold - Noise threshold (default: 0.0)

Returns:
  • petri_net: Petri net
  • initial_marking: Initial marking
  • final_marking: Final marking
process_tree = pm4py.discover_tree_inductive(log) Discovers a process tree using the Inductive Miner algorithm.

Parameters:
log - Event log
noise_threshold - Noise threshold (default: 0.0)

Returns:
Process tree
dfg, start_activities, end_activities = pm4py.discover_dfg(log) Discovers a DFG from a log.

Parameters:
log - Event log

Returns:
  • dfg: DFG
  • start_activities: Start activities
  • end_activities: End activities
heuristics_net = pm4py.discover_heuristics_net(log, dependency_threshold=0.5, and_threshold=0.65, loop_two_threshold=0.5) Discovers an heuristics net.

Parameters:
log - Event log
dependency_threshold - Dependency threshold (default: 0.5)
and_threshold - AND threshold (default: 0.65)
loop_two_threshold - Loop two threshold (default: 0.5)

Returns:
Heuristics net
Function Description
pm4py.view_petri_net(petri_net, initial_marking, final_marking, format='png') Visualize a Petri net.

Parameters:
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking
format - Format of the output picture (default: png)
pm4py.view_process_tree(tree, format='png') Visualize a process tree .

Parameters:
tree - Process tree
format - Format of the visualization (default: png)
pm4py.view_heuristics_net(heu_net, format='png') Visualize an heuristics net.

Parameters:
heu_net - Heuristics net
format - Format of the visualization (default: png)
pm4py.view_dfg(dfg, start_activities, end_activities, format='png', log=None) Visualize a (composite) DFG .

Parameters:
dfg - DFG object
start_activities - Start activities
end_activities - End activities
format - Format of the visualization (default: png)
log - Log object (if provided, is used to decorate the frequency of activities)
Function Description
pm4py.save_vis_petri_net(petri_net, initial_marking, final_marking, file_path) Saves a Petri net visualization to a file.

Parameters:
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking
file_path - Destination path
pm4py.save_vis_process_tree(tree, file_path) Saves the visualization of a process tree .

Parameters:
tree - Process tree
file_path - Destination path
pm4py.save_vis_heuristics_net(heu_net, file_path) Saves the visualization of an heuristics net .

Parameters:
heu_net - Heuristics net
file_path - Destination path
pm4py.save_vis_dfg(dfg, start_activities, end_activities, file_path, log=None) Saves a DFG visualization to a file.

Parameters:
dfg - DFG object
start_activities - Start activities
end_activities - End activities
file_path - Destination path
log - Log object (if provided, is used to decorate the frequency of activities)

Process Discovery algorithms want to find a suitable process model that describes the order of events/activities that are executed during a process execution.

In the following, we made up an overview to visualize the advantages and disadvantages of the mining algorithms.

Alpha Alpha+ Heuristic Inductive
Cannot handle loops of length one and length two Can handle loops of length one and length two Takes frequency into account Can handle invisible tasks
Invisible and duplicated tasks cannot be discovered Invisible and duplicated tasks cannot be discovered Detects short loops Model is sound
Discovered model might not be sound Discovered model might not be sound Does not guarantee a sound model Most used process mining algorithm
Weak against noise Weak against noise

Alpha Miner

The alpha miner is one of the most known Process Discovery algorithm and is able to find:

  • A Petri net model where all the transitions are visible and unique and correspond to classified events (for example, to activities).
  • An initial marking that describes the status of the Petri net model when a execution starts.
  • A final marking that describes the status of the Petri net model when a execution ends.

We provide an example where a log is read, the Alpha algorithm is applied and the Petri net along with the initial and the final marking are found. The log we take as input is the running-example.xes.

First, the log has to be imported.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))

Subsequently, the Alpha Miner is applied.

from pm4py.algo.discovery.alpha import algorithm as alpha_miner
net, initial_marking, final_marking = alpha_miner.apply(log)

Inductive Miner

In PM4Py, we offer an implementation of the inductive miner (IM), of the inductive miner infrequent (IMf), and of the inductive miner directly-follows (IMd) algorithm. The papers describing the approaches are the following:

The basic idea of Inductive Miner is about detecting a 'cut' in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Two process models can be derived: Petri Net and Process Tree.

To mine a Petri Net, we provide an example. A log is read, the inductive miner is applied and the Petri net along with the initial and the final marking are found. The log we take as input is the running-example.xes. First, the log is read, then the inductive miner algorithm is applied.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.inductive import algorithm as inductive_miner

log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
net, initial_marking, final_marking = inductive_miner.apply(log)

To obtain a process tree, the provided code snippet can be used. The last two lines of code are responsible for the visualization of the process tree.

from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.process_tree import visualizer as pt_visualizer

tree = inductive_miner.apply_tree(log)

gviz = pt_visualizer.apply(tree)
pt_visualizer.view(gviz)

It is also possible to convert a process tree into a petri net.

from pm4py.objects.conversion.process_tree import converter as pt_converter
net, initial_marking, final_marking = pt_converter.apply(tree, variant=pt_converter.Variants.TO_PETRI_NET)

As we said in the introduction of the approach, the available variants inside PM4Py are three:

Variant Description
Variants.IM Produces a model with perfect replay fitness.
Variants.IMf Produces a more precise model, without fitness guarantees, by eliminating some behavior.
Variants.IMd A variant of inductive miner that considers only the directly-follows graph, for maximum performance. However, replay fitness guarantees are lost.

The list of parameters for such variants are:

Variant Parameter Description
Variants.IM Parameters.ACTIVITY_KEY The name of the attribute to be used as activity for process discovery.
Variants.IMf Parameters.ACTIVITY_KEY The name of the attribute to be used as activity for process discovery.
Variants.IMf Parameters.NOISE_THRESHOLD The noise threshold (between 0.0 and 1.0) to be used. Default: 0.2
Variants.IMd Parameters.ACTIVITY_KEY The name of the attribute to be used as activity for process discovery.

Heuristic Miner

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

It is possible to obtain a Heuristic Net and a Petri Net.

To apply the Heuristics Miner to discover an Heuristics Net, it is necessary to import a log. Then, a Heuristic Net can be found. There are also numerous possible parameters that can be inspected by clicking on the following button.

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log_path = os.path.join("tests", "compressed_input_data", "09_a32f0n00.xes.gz")
log = xes_importer.apply(log_path)

from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
heu_net = heuristics_miner.apply_heu(log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})
Parameter name Meaning
DEPENDENCY_THRESH dependency threshold of the Heuristics Miner (default: 0.5)
AND_MEASURE_THRESH AND measure threshold of the Heuristics Miner (default: 0.65)
MIN_ACT_COUNT minimum number of occurrences of an activity to be considered (default: 1)
MIN_DFG_OCCURRENCES minimum number of occurrences of an edge to be considered (default: 1)
DFG_PRE_CLEANING_NOISE_THRESH cleaning threshold of the DFG (in order to remove weaker edges, default 0.05)
LOOP_LENGTH_TWO_THRESH thresholds for the loops of length 2

To visualize the Heuristic Net, code is also provided on the right-hand side.

from pm4py.visualization.heuristics_net import visualizer as hn_visualizer
gviz = hn_visualizer.apply(heu_net)
hn_visualizer.view(gviz)

To obtain a Petri Net that is based on the Heuristics Miner, the code on the right hand side can be used. Also this Petri Net can be visualized.

from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
net, im, fm = heuristics_miner.apply(log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})

from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, im, fm)
pn_visualizer.view(gviz)

Directly-Follows Graph

Process models modeled using Petri nets have a well-defined semantic: a process execution starts from the places included in the initial marking and finishes at the places included in the final marking. In this section, another class of process models, Directly-Follows Graphs, are introduced. Directly-Follows graphs are graphs where the nodes represent the events/activities in the log and directed edges are present between nodes if there is at least a trace in the log where the source event/activity is followed by the target event/activity. On top of these directed edges, it is easy to represent metrics like frequency (counting the number of times the source event/activity is followed by the target event/activity) and performance (some aggregation, for example, the mean, of time inter-lapsed between the two events/activities).

First, we have to import the log. Subsequently, we can extract the Directly-Follows Graph. In addition, code is provided to visualize the Directly-Follows Graph. This visualization is a colored visualization of the Directly-Follows graph that is decorated with the frequency of activities.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))

from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg = dfg_discovery.apply(log)

from pm4py.visualization.dfg import visualizer as dfg_visualization
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.FREQUENCY)
dfg_visualization.view(gviz)

To get a Directly-Follows graph decorated with the performance between the edges, two parameters of the previous code have to be replaced.

from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualization

dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.PERFORMANCE)
dfg_visualization.view(gviz)

To save the obtained DFG, for instance in the SVG format, code is also provided on the right-hand side.

from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualization

dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
parameters = {dfg_visualization.Variants.PERFORMANCE.value.Parameters.FORMAT: "svg"}
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.PERFORMANCE, parameters=parameters)
dfg_visualization.save(gviz, "dfg.svg")

Convert Directly-Follows Graph to a Workflow Net

The Directly-Follows Graph is the representation of a process provided by many commercial tools. An idea of Sander Leemans is about converting the DFG into a workflow net that perfectly mimic the DFG in order to able to perform alignments between the behavior described in the model and the behavior described in the log. This is called DFG mining. The following steps are useful to load the log, calculate the DFG, convert it into a workflow net and perform alignments.

First, we have to import the log. Subsequently, we have to mine the Directly-Follow graph. This DFG can then be converted to a workflow net.

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))

from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg = dfg_discovery.apply(log)

from pm4py.objects.conversion.dfg import converter as dfg_mining
net, im, fm = dfg_mining.apply(dfg)

Adding information about Frequency/Performance

Similar to the Directly-Follows graph, it is also possible to decorate the Petri net with frequency or performance information. This is done by using a replay technique on the model and then assigning frequency/performance to the paths. The variant parameter of the visualizer specifies which annotation should be used. The values for the variant parameter are the following:

  • pn_visualizer.Variants.WO_DECORATION: This is the default value and indicates that the Petri net is not decorated.
  • pn_visualizer.Variants.FREQUENCY: This indicates that the model should be decorated according to frequency information obtained by applying replay.
  • pn_visualizer.Variants.PERFORMANCE: This indicates that the model should be decorated according to performance (aggregated by mean) information obtained by applying replay.

In the case the frequency and performance decoration are chosen, it is required to pass the log as a parameter of the visualization (it needs to be replayed).

The code on the right-hand side can be used to obtain the Petri net mined by the Inductive Miner decorated with frequency information.

from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters, variant=pn_visualizer.Variants.FREQUENCY, log=log)
pn_visualizer.save(gviz, "inductive_frequency.png")

Classifier

Algorithms implemented in pm4py assume to classify events based on their activity name, which is usually reported inside the concept:name event attribute. In some contexts, it is useful to use another event attribute as activity:

  • Importing an event log from a CSV does not assure to have a concept:name event attribute
  • Multiple events in a case may refer to different lifecycles of the same activity

The example on the right-hand side shows the specification of an activity key for the Alpha Miner algorithm.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: "concept:name"}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)

For logs imported from XES format, a list of fields that could be used in order to classify events and apply Process Mining algorithms is usually reported in the classifiers section. The Standard classifier usually includes the activity name (the concept:name attribute) and the lifecycle (the lifecycle:transition attribute); the Event name classifier includes only the activity name.

In PM4Py, it is assumed that algorithms work on a single activity key. In order to use multiple fields, a new attribute should be inserted for each event as the concatenation of the two.

In the following, retrieval and insertion of a corresponding attribute regarding classifiers are discussed.

The example on the right-hand side demonstrates the retrieval of the classifiers inside a log file, using the receipt.xes log. The print command returns a dictionary, whereby the corresponding classifier attribute is revealed.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply(os.path.join("tests","input_data","receipt.xes"))
print(log.classifiers)

To use the classifier Activity classifier and write a new attribute for each event in the log, the following code can be used.

from pm4py.objects.log.util import insert_classifier
log, activity_key = insert_classifier.insert_activity_classifier_attribute(log, "Activity classifier")

Then, as before, the Alpha Miner can be applied on the log specifying the newly inserted activity key.

from pm4py.algo.discovery.alpha import algorithm as alpha_miner
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: activity_key}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)

In the following, a technique is shown to insert a new attribute manually.

In the case, the XES specifies no classifiers, and a different field should be used as activity key, there is the option to specify it manually. For example, in this piece of code we read the receipt.xes log and create a new attribute called customClassifier that is the activity name plus the transition. Subsequently, the Alpha Miner can be applied on this new classifier.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply(os.path.join("tests","input_data","receipt.xes"))
for trace in log:
 for event in trace:
  event["customClassifier"] = event["concept:name"] + event["lifecycle:transition"]

from pm4py.algo.discovery.alpha import algorithm as alpha_miner
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: "customClassifier"}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)

Correlation Miner

In Process Mining, we are used to have logs containing at least:

  • A case identifier
  • An activity
  • A timestamp

The case identifier associates an event, happening to a system, to a particular execution of the process. This permits to apply algorithms such as process discovery, conformance checking, …

However, in some systems (for example, the data collected from IoT systems), it may be difficult to associate a case identifier. On top of these logs, performing classic process mining is impossible. Correlation mining borns as a response to the challenge to extract a process model from such event logs, that permits to read useful information that is contained in the logs without a case identifier, that contains only:

  • An activity column
  • A timestamp column

In this description, we assume there is a total order on events (that means that no events happen in the same timestamp). Situations where a total order is not defined are more complicated.

The Correlation Miner is an approach proposed in:

Pourmirza, Shaya, Remco Dijkman, and Paul Grefen. “Correlation miner: mining business process models and event correlations without case identifiers.” International Journal of Cooperative Information Systems 26.02 (2017): 1742002.

That aims to resolve this problem by resolving an (integer) linear problem defined on top of:

  • The P/S matrix: expressing the relationship of order between the activities as recorded in the log.
  • The Duration matrix: expressing an aggregation of the duration between two activities, obtained by solving an optimization problem

The solution of this problem provides a set of couples of activities that are, according to the approach, in directly-follows relationship, along with the strength of the relationship. This is the “frequency” DFG.

A “performance” DFG can be obtained by the duration matrix, keeping only the entries that appear in the solution of the problem (i.e., the couples of activities that appear in the “frequency” DFG).

This can be then visualized (using for example the PM4Py DFG visualization).

To have a “realistic” example (for which we know the “real” DFG), we can take an existing log and simply remove the case ID column .. Trying then to reconstruct the DFG without having that.

Let’s try an example of that. First, we load a CSV file into a Pandas dataframe, keeping only the concept:name and the time:timestamp columns:


from pm4py.objects.log.adapters.pandas import csv_import_adapter

df = csv_import_adapter.import_dataframe_from_path("tests/input_data/receipt.csv")
df = df[["concept:name", "time:timestamp"]]
                                
Then, we can apply the Correlation Miner approach:

from pm4py.algo.discovery.correlation_mining import algorithm as correlation_miner

frequency_dfg, performance_dfg = correlation_miner.apply(df, parameters={correlation_miner.Variants.CLASSIC.value.Parameters.ACTIVITY_KEY: "concept:name",
                                correlation_miner.Variants.CLASSIC.value.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                
To better visualize the DFG, we can retrieve the frequency of activities

activities_freq = dict(df["concept:name"].value_counts())
                                
And then perform the visualization of the DFG:

from pm4py.visualization.dfg import visualizer as dfg_visualizer
gviz_freq = dfg_visualizer.apply(frequency_dfg, variant=dfg_visualizer.Variants.FREQUENCY, activities_count=activities_freq, parameters={"format": "svg"})
gviz_perf = dfg_visualizer.apply(performance_dfg, variant=dfg_visualizer.Variants.PERFORMANCE, activities_count=activities_freq, parameters={"format": "svg"})
dfg_visualizer.view(gviz_freq)
dfg_visualizer.view(gviz_perf)
                                

Visualizing the DFGs, we can say that the correlation miner was able to discover a visualization where the main path is clear. Different variants of the correlation miner are available:

Variant Description
Variants.CLASSIC Calculates the P/S matrix and the duration matrix in the classic way (the entire list of events is used)
Variants.TRACE_BASED Calculates the P/S matrix and the duration matrix on a classic event log, trace-by-trace, and merges the results. The resolution of the linear problem permits to obtain a model that is more understandable than the classic DFG calculated on top of the log.
Variants.CLASSIC_SPLIT Calculates the P/S matrix and the duration matrix on the entire list of events, as in the classic version, but splits that in chunks to fasten the computation. Hence, the generated model is less accurate (in comparison to the CLASSIC version) but the calculation is faster. The default chunk size is 100000 events.

Petri Net management

Petri nets are one of the most common formalism to express a process model. A Petri net is a directed bipartite graph, in which the nodes represent transitions and places. Arcs are connecting places to transitions and transitions to places, and have an associated weight. A transition can fire if each of its input places contains a number of tokens that is at least equal to the weight of the arc connecting the place to the transition. When a transition is fired, then tokens are removed from the input places according to the weight of the input arc, and are added to the output places according to the weight of the output arc.

A marking is a state in the Petri net that associates each place to a number of tokens and is uniquely associated to a set of enabled transitions that could be fired according to the marking.

Process Discovery algorithms implemented in pm4py returns a Petri net along with an initial marking and a final marking. An initial marking is the initial state of execution of a process, a final marking is a state that should be reached at the end of the execution of the process.

Importing and exporting

Petri nets, along with their initial and final marking, can be imported/exported from the PNML file format. The code on the right-hand side can be used to import a Petri net along with the initial and final marking.

First, we have to import the log. Subsequently, the Petri net is visualized by using the Petri Net visualizer. In addition, the Petri net is exported with its initial marking or initial marking and final marking.

import os
from pm4py.objects.petri.importer import importer as pnml_importer
net, initial_marking, final_marking = pnml_importer.apply(os.path.join("tests","input_data","running-example.pnml"))

from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)

from pm4py.objects.petri.exporter import exporter as pnml_exporter
pnml_exporter.apply(net, initial_marking, "petri.pnml")

pnml_exporter.apply(net, initial_marking, "petri_final.pnml", final_marking=final_marking)

Petri Net properties

This section is about how to get the properties of a Petri Net. A property of the pet is, for example, a the enabled transition in a particular marking. However, also a list of places, transitions or arcs can be inspected.

The list of transitions enabled in a particular marking can be obtained using the right-hand code.

from pm4py.objects.petri import semantics
transitions = semantics.enabled_transitions(net, initial_marking)

The function print(transitions) reports that only the transition register request is enabled in the initial marking in the given Petri net. To obtain all places, transitions, and arcs of the Petri net, the code which can be obtained on the right-hand side can be used.

places = net.places
transitions = net.transitions
arcs = net.arcs

Each place has a name and a set of input/output arcs (connected at source/target to a transition). Each transition has a name and a label and a set of input/output arcs (connected at source/target to a place). The code on the right-hand side prints for each place the name, and for each input arc of the place the name and the label of the corresponding transition. However, there also exsits trans.name, trans.label, arc.target.name.

for place in places:
 print("\nPLACE: "+place.name)
 for arc in place.in_arcs:
  print(arc.source.name, arc.source.label)

Creating a new Petri Net

In this section, an overview of the code necessary to create a new Petri net with places, transitions, and arcs is provided. A Petri net object in pm4py should be created with a name.

The code on the right-hand side creates a Petri Net with the name new_petri_net.

# creating an empty Petri net
from pm4py.objects.petri.petrinet import PetriNet, Marking
net = PetriNet("new_petri_net")

In addition, three places are created, namely source, sink, and p_1. These places are added to the previously created Petri Net.

# creating source, p_1 and sink place
source = PetriNet.Place("source")
sink = PetriNet.Place("sink")
p_1 = PetriNet.Place("p_1")
# add the places to the Petri Net
net.places.add(source)
net.places.add(sink)
net.places.add(p_1)

Similar to the places, transitions can be created. However, they need to be assigned a name and a label.

# Create transitions
t_1 = PetriNet.Transition("name_1", "label_1")
t_2 = PetriNet.Transition("name_2", "label_2")
# Add the transitions to the Petri Net
net.transitions.add(t_1)
net.transitions.add(t_2)

Arcs that connect places with transitions or transitions with places might be necessary. To add arcs, code is provided. The first parameter specifies the starting point of the arc, the second parameter its target and the last parameter states the Petri net it belongs to.

# Add arcs
from pm4py.objects.petri import utils
utils.add_arc_from_to(source, t_1, net)
utils.add_arc_from_to(t_1, p_1, net)
utils.add_arc_from_to(p_1, t_2, net)
utils.add_arc_from_to(t_2, sink, net)

To complete the Petri net, an initial and possibly a final marking need to be defined. To accomplish this, we define the initial marking to contain 1 token in the source place and the final marking to contain 1 token in the sink place.

# Adding tokens
initial_marking = Marking()
initial_marking[source] = 1
final_marking = Marking()
final_marking[sink] = 1

The resulting Petri net along with the initial and final marking can be exported, or visualized.

from pm4py.objects.petri.exporter import exporter as pnml_exporter
pnml_exporter.apply(net, initial_marking, "createdPetriNet1.pnml", final_marking=final_marking)

from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)

To obtain a specific output format (e.g. svg or png) a format parameter should be provided to the algorithm. The code snippet explains how to obtain an SVG representation of the Petri net. The last lines provide an option to save the visualization of the model.

from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT:"svg"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters)
pn_visualizer.view(gviz)

from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "svg"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters)
pn_visualizer.save(gviz, "alpha.svg")

Maximal Decomposition

The decomposition technique proposed in this section is useful for conformance checking purpose. Indeed, splitting the overall model in smaller models can reduce the size of the state space, hence increasing the performance of the conformance checking operation. We propose to use the decomposition technique (maximal decomposition of a Petri net) described in:

Van der Aalst, Wil MP. “Decomposing Petri nets for process mining: A generic approach.” Distributed and Parallel Databases 31.4 (2013): 471-507.

We can see an example of maximal decomposition on top of the Petri net extracted by the Alpha Miner on top of the Running Example log.

Let’s first load the running example log and apply the Alpha Miner.


import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner

log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
net, im, fm = alpha_miner.apply(log)
                                

Then, the decomposition can be found using:


from pm4py.objects.petri.decomposition import decompose

list_nets = decompose(net, im, fm)
                                

If we want to represent each one of the Petri nets, we can use a FOR loop:


from pm4py.visualization.petrinet import visualizer
gviz = []
for index, model in enumerate(list_nets):
    subnet, s_im, s_fm = model

    gviz.append(visualizer.apply(subnet, s_im, s_fm, parameters={visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "png"}))
    visualizer.save(gviz[-1], str(index)+".png")
                                

A log that is fit according to the original model is also fit (projecting on the activities of the net) for these nets. Conversely, any deviation on top of these models represent a deviation also on the original model.

Reachability Graph

A reachability graph is a transition system that can constructed on any Petri net along with an initial marking, and is the graph of all the markings of the Petri net. These markings are connected by as many edges as many transitions connect the two different markings.

The main goal of the reachability graph is to provide an understanding of the state space of the Petri net. Usually, Petri nets containing a lot of concurrency have an incredibly big reachability graph. The same computation of the reachability graph may be unfeasible for such models.

The calculation of the reachability graph, having the Petri net and the initial marking, can be done with the following code:

from pm4py.objects.petri import reachability_graph

ts = reachability_graph.construct_reachability_graph(net, im)
                                
The visualization of the reachability graph is then possible through the code snippet:

from pm4py.visualization.transition_system import visualizer as ts_visualizer

gviz = ts_visualizer.apply(ts, parameters={ts_visualizer.Variants.VIEW_BASED.value.Parameters.FORMAT: "svg"})
ts_visualizer.view(gviz)
                                

Conformance Checking

Conformance checking is a techniques to compare a process model with an event log of the same process. The goal is to check if the event log conforms to the model, and, vice versa.

In PM4Py, two fundamental techniques are implemented: token-based replay and alignments.

Since PM4Py 1.5.x, we offer a simplified interface for conformance checking. This provides a restricted set of choices in comparison to the normal interface.

Function Description
aligned_traces = pm4py.conformance_alignments(log, petri_net, initial_marking, final_marking) Apply the alignments algorithm between a log and a Petri net .

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
A list of alignments for each trace of the log
replay_results = pm4py.conformance_tbr(log, petri_net, initial_marking, final_marking) Apply token-based replay.

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
A list of replay results for each trace of the log

Token-based replay

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

For each trace, there are four values which have to be determined: produced tokens, remaining tokens, missing tokens, and consumed tokens. Based on that, a fomrula can be dervied, whereby a petri net (n) and a trace (t) are given as input:

fitness(n, t)=12(1-rp)+12(1-mc)

To apply the formula on the whole event log, p, r, m, and c are calculated for each trace, summed up, and finally placed into the formula above at the end.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached.

The example on the right shows how to apply token-based replay on a log and a Petri net. First, the log is loaded. Then, the Alpha Miner is applied in order to discover a Petri net. Eventually, the token-based replay is applied. The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:
  • trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.
  • activated_transitions: list of transitions activated in the model by the token-based replay.
  • reached_marking: marking reached at the end of the replay.
  • missing_tokens: number of missing tokens.
  • consumed_tokens: number of consumed tokens.
  • remaining_tokens: number of remaining tokens.
  • produced_tokens: number of produced tokens.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner

log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))

net, initial_marking, final_marking = alpha_miner.apply(log)

from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
replayed_traces = token_replay.apply(log, net, initial_marking, final_marking)

The token-based replay supports different parameters.


Parameter Meaning
Parameters.CONSIDER_REMAINING_IN_FITNESS If the parameter is set to True (as default) trace is considered fit if it does not lead to any missing/remaining token in the model.
Parameters.TRY_TO_REACH_FINAL_MARKING_THROUGH_HIDDEN When the replay of the trace is completed, try to reach the final marking through invisible transitions (default: True)
Parameters.STOP_IMMEDIATELY_UNFIT Stops the replay immediately when a missing token needs to be inserted (default: False)
Parameters.WALK_THROUGH_HIDDEN_TRANS Enables, in general, the traversal of invisible transitions (default: True)
Parameters.CLEANING_TOKEN_FLOOD Limits the token flooding problem by an apposite algorithm (default: False)
Parameters.RETURN_NAMES Return the transitions names, not the transition object, in the list of activated transitions for a trace.
Parameters.ACTIVITY_KEY Establish the attribute of a trace to be used during the replay (default: concept:name),

Diagnostics (TBR)

The execution of token-based replay in PM4Py permits to obtain detailed information about transitions that did not execute correctly, or activities that are in the log and not in the model. In particular, executions that do not match the model are expected to take longer throughput time.

The diagnostics that are provided by PM4Py are the following:

  • Throughput analysis on the transitions that are executed in an unfit way according to the process model (the Petri net).
  • Throughput analysis on the activities that are not contained in the model.
  • Root Cause Analysis on the causes that lead to an unfit execution of the transitions.
  • Root Cause Analysis on the causes that lead to executing activities that are not contained in the process model.

To provide an execution contexts for the examples, a log must be loaded, and a model that is not perfectly fitting is required. To load the log, the following instructions could be used:


                                    import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
                                

To create an unfit model, a filtering operation producing a log where only part of the behavior is kept can be executed:


from pm4py.algo.filtering.log.auto_filter.auto_filter import apply_auto_filter
filtered_log = apply_auto_filter(log)
                                

Then, applying the Inductive Miner algorithm:


from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, initial_marking, final_marking = inductive_miner.apply(filtered_log)
                                

We then apply the token-based replay with special settings. In particular, with disable_variants set to True we avoid to replay only a case with variant; with enable_pltr_fitness set to True we tell the algorithm to return localized Conformance Checking application.


from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay
parameters_tbr = {token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.DISABLE_VARIANTS: True, token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.ENABLE_PLTR_FITNESS: True}
replayed_traces, place_fitness, trans_fitness, unwanted_activities = token_based_replay.apply(log, net,
                                                                                              initial_marking,
                                                                                              final_marking,
                                                                                              parameters=parameters_tbr)
                                

Then, we pass to diagnostics information.

Throughput analysis (unfit execution)

To perform throughput analysis on the transitions that were executed unfit, and then print on the console the result, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
trans_diagnostics = duration_diagnostics.diagnose_from_trans_fitness(log, trans_fitness)
for trans in trans_diagnostics:
    print(trans, trans_diagnostics[trans])
                                

Obtaining an output where is clear that unfit executions lead to much higher throughput times (from 126 to 146 times higher throughput time).

Throughput analysis (activities)

To perform throughput analysis on the process executions containing activities that are not in the model, and then print the result on the screen, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
act_diagnostics = duration_diagnostics.diagnose_from_notexisting_activities(log, unwanted_activities)
for act in act_diagnostics:
    print(act, act_diagnostics[act])
                                

Root Cause Analysis

The output of root cause analysis in the diagnostics context is a decision tree that permits to understand the causes of a deviation. In the following examples, for each deviation, a different decision tree is built and visualized.

In the following examples, that consider the Receipt log, the decision trees will be built on the following choice of attributes (i.e. only org:group attribute will be considered).


# build decision trees
string_attributes = ["org:group"]
numeric_attributes = []
parameters = {"string_attributes": string_attributes, "numeric_attributes": numeric_attributes}
                                

Root Cause Analysis (unfit execution)

To perform root cause analysis on the transitions that are executed in an unfit way, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
trans_root_cause = root_cause_analysis.diagnose_from_trans_fitness(log, trans_fitness, parameters=parameters)
                                

To visualize the decision trees obtained by root cause analysis, the following code could be used:


from pm4py.visualization.decisiontree import visualizer as dt_vis
for trans in trans_root_cause:
    clf = trans_root_cause[trans]["clf"]
    feature_names = trans_root_cause[trans]["feature_names"]
    classes = trans_root_cause[trans]["classes"]
    # visualization could be called
    gviz = dt_vis.apply(clf, feature_names, classes)
    dt_vis.view(gviz)
                                

Root Cause Analysis (activities that are not in the model)

To perform root cause analysis on activities that are executed but are not in the process model, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
act_root_cause = root_cause_analysis.diagnose_from_notexisting_activities(log, unwanted_activities,
                                                                          parameters=parameters)
                                

To visualize the decision trees obtained by root cause analysis, the following code could be used:


from pm4py.visualization.decisiontree import visualizer as dt_vis
for act in act_root_cause:
    clf = act_root_cause[act]["clf"]
    feature_names = act_root_cause[act]["feature_names"]
    classes = act_root_cause[act]["classes"]
    # visualization could be called
    gviz = dt_vis.apply(clf, feature_names, classes)
    dt_vis.view(gviz)
                                

Alignments

PM4Py comes with the following set of linear solvers: PuLP (available for any platform), CVXOPT (available for the most widely used platforms including Windows/Linux for Python 3.6/3.7). Alternatively, ORTools can also be used and installed from PIP.

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.
  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.
  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.
    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

First, we have to import the log. Subsequently, we apply the Inductive Miner on the imported log. In addition, we compute the alignments.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.inductive import algorithm as inductive_miner

log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))

net, initial_marking, final_marking = inductive_miner.apply(log)

from pm4py.algo.conformance.alignments import algorithm as alignments
alignments = alignments.apply_log(log, net, initial_marking, final_marking)

To inspect the alignments, a code snippet is provided. However, the output (a list) reports for each trace the corresponding alignment along with its statistics. With each trace, a dictionary containing among the others the following information is associated:

  • alignment: contains the alignment (sync moves, moves on log, moves on model)
  • cost: contains the cost of the alignment according to the provided cost function
  • fitness: is equal to 1 if the trace is perfectly fitting
print(alignments)

To use a different classifier, we refer to the Classifier section. However, the following code defines a custom classifier for each event of each trace in the log.

for trace in log:
 for event in trace:
  event["customClassifier"] = event["concept:name"] + event["concept:name"]

A parameters dictionary containing the activity key can be formed.


# define the activity key in the parameters
parameters = {inductive_miner.Variants.DFG_BASED.value.Parameters.ACTIVITY_KEY: "customClassifier", alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.ACTIVITY_KEY: "customClassifier"}

Then, a process model is computed, and alignments are also calculated. Besides, the fitness value is calculated and the resulting values are printed.

# calculate process model using the given classifier
net, initial_marking, final_marking = inductive_miner.apply(log, parameters=parameters)
alignments = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)

from pm4py.evaluation.replay_fitness import evaluator as replay_fitness
log_fitness = replay_fitness.evaluate(alignments, variant=replay_fitness.Variants.ALIGNMENT_BASED)

print(log_fitness) 

It is also possible to select other parameters for the alignments.

  • Model cost function: associating to each transition in the Petri net the corresponding cost of a move-on-model.
  • Sync cost function: associating to each visible transition in the Petri net the cost of a sync move.

On the right-hand side, an implementation of a custom model cost function, and sync cost function can be noted. Also, the model cost funtions and sync cost function has to be inserted later in the parameters. Subsequently, the replay is done.

model_cost_function = dict()
sync_cost_function = dict()
for t in net.transitions:
 # if the label is not None, we have a visible transition
 if t.label is not None:
  # associate cost 1000 to each move-on-model associated to visible transitions
  model_cost_function[t] = 1000
  # associate cost 0 to each move-on-log
  sync_cost_function[t] = 0
 else:
  # associate cost 1 to each move-on-model associated to hidden transitions
  model_cost_function[t] = 1

parameters = {}
parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_MODEL_COST_FUNCTION] = model_cost_function
parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_SYNC_COST_FUNCTION] = sync_cost_function

alignments = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)

Different variants of the alignments are available:

Variant Description
Variants.STATE_EQUATION_A_STAR (Default) uses ILP-based heuristics to prune the state space.
Variants.VERSION_DIJKSTRA_NO_HEURISTICS Uses a Dijkstra-based state-space exploration (without the computation of any heuristics). This is faster on models without much concurrency, or when a small number of deviations are contained in the process execution.
Variants.STATE_EQUATION_LESS_MEMORY A variant of the ILP-based replayer that requires less memory to host the states.
Variants.DIJKSTRA_LESS_MEMORY A variant of the Dijkstra replayer that requires less memory to host the spaces.

Among the common parameters of these variants, we have:

Parameter Meaning
Parameters.PARAM_MAX_ALIGN_TIME_TRACE Establishes the maximum available amount of time to complete the alignment of a trace (returns None if the alignment is not finished) (default: infinity).
Parameters.PARAM_MAX_ALIGN_TIME When aligning a log, establishes the maximum available amount of time to complete the alignment (defaults to None the alignments of the traces that were not aligned) (default: infinity).
Parameters.PARAM_ALIGNMENT_RESULT_IS_SYNC_PROD_AWARE Establishes whether the result of an alignment should contain also the name of the transition, not only the label (default: False).
Parameters.ACTIVITY_KEY Establishes the attribute at the event level that should be used to compute tha alignment (default: concept:name).

Decomposition of Alignments

Alignments represent a computationally expensive problem on models that contain a lot of concurrency. Yet, they are the conformance checking technique that provides the best results in term of finding a match between the process execution(s) and the model. To overcome the difficulties related to the size of the state space, various attempts to decompose the model into “smaller” pieces, into which the alignment is easier and still permit to diagnose problems, have been done.

We have seen how to obtain a maximal decomposition of the Petri net model. Now we can see how to perform the decomposition of alignments (that is based on a maximal decomposition of the Petri net model). The approach described here has been published in:

Lee, Wai Lam Jonathan, et al. “Recomposing conformance: Closing the circle on decomposed alignment-based conformance checking in process mining.” Information Sciences 466 (2018): 55-91.

The recomposition permits to understand whether each step of the process has been executed in a sync way or some deviations happened. First, an alignment is performed on top of the decomposed Petri nets.

Then, the agreement between the activities at the border is checked. If a disagreement is found, the two components that are disagreeing are merged and the alignment is repeated on them.

When the steps are agreeing between the different alignments of the components, these can be merged in a single alignment. The order of recomposition is based on the Petri net graph. Despite that, in the case of concurrency, the “recomposed” alignment contains a valid list of moves that may not be in the correct order.

To perform alignments through decomposition/recomposition, the following code can be used. A maximum number of border disagreements can be provided to the algorithm. If the number of border disagreements is reached, then the alignment is interrupted a None as alignment of the specific trace is returned.


from pm4py.algo.conformance.decomp_alignments import algorithm as decomp_alignments

conf = decomp_alignments.apply(log, net, im, fm, parameters={decomp_alignments.Variants.RECOMPOS_MAXIMAL.value.Parameters.PARAM_THRESHOLD_BORDER_AGREEMENT: 2})

Since decomposed models are expected to have less concurrency, the components are aligned using a Dijkstra approach. In the case of border disagreements, this can degrade the performance of the algorithm.

It should be noted that this is not an approximation technique; according to the authors, it should provide the same fitness as the original alignments.

Since the alignment is recomposed, we can use the fitness evaluator to evaluate the fitness (that is not related to the computation of fitness described in the paper).

from pm4py.evaluation.replay_fitness import evaluator as rp_fitness_evaluator

fitness = rp_fitness_evaluator.evaluate(conf, variant=rp_fitness_evaluator.Variants.ALIGNMENT_BASED)
                                

Footprints

Footprints are a very basic (but scalable) conformance checking technique to compare entities (such that event logs, DFGs, Petri nets, process trees, any other kind of model). Essentially, a relationship between any couple of activities of the log/model is inferred. This can include:

  • Directly-Follows Relationships: in the log/model, it is possible that the activity A is directly followed by B.
  • Directly-Before Relationships: in the log/model, it is possible that the activity B is directly preceded by A.
  • Parallel behavior: it is possible that A is followed by B and B is followed by A

A footprints matrix can be calculated, that describes for each couple of activities the footprint relationship. It is possible to calculate that for different types of models and for the entire event log, but also trace-by-trace (if the local behavior is important).

Let’s assume that the running-example.xes event log is loaded:
from pm4py.simulation.tree_generator import simulator as tree_gen
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
And the inductive miner is applied on such log:

from pm4py.algo.discovery.inductive import algorithm as inductive_miner

net, im, fm = inductive_miner.apply(log)
To calculate the footprints for the entire log, the following code can be used:

from pm4py.algo.discovery.footprints import algorithm as footprints_discovery

fp_log = footprints_discovery.apply(log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG)

The footprints of the entire log are:

{‘sequence’: {(‘examine casually’, ‘decide’), (‘decide’, ‘pay compensation’), (‘register request’, ‘examine thoroughly’), (‘reinitiate request’, ‘examine casually’), (‘check ticket’, ‘decide’), (‘register request’, ‘examine casually’), (‘reinitiate request’, ‘examine thoroughly’), (‘decide’, ‘reject request’), (‘examine thoroughly’, ‘decide’), (‘reinitiate request’, ‘check ticket’), (‘register request’, ‘check ticket’), (‘decide’, ‘reinitiate request’)}, ‘parallel’: {(‘examine casually’, ‘check ticket’), (‘check ticket’, ‘examine casually’), (‘check ticket’, ‘examine thoroughly’), (‘examine thoroughly’, ‘check ticket’)}, ‘start_activities’: {‘register request’}, ‘end_activities’: {‘pay compensation’, ‘reject request’}, ‘activities’: {‘reject request’, ‘register request’, ‘check ticket’, ‘decide’, ‘pay compensation’, ‘examine thoroughly’, ‘examine casually’, ‘reinitiate request’}}

The data structure is a dictionary with, as keys, sequence (expressing directly-follows relationships) and parallel (expressing the parallel behavior that can happen in either way).

The footprints of the log, trace-by-trace, can be calculated as follows, and are a list of footprints for each trace:

from pm4py.algo.discovery.footprints import algorithm as footprints_discovery

fp_trace_by_trace = footprints_discovery.apply(log, variant=footprints_discovery.Variants.TRACE_BY_TRACE)
The footprints of the Petri net model can be calculated as follows:

fp_net = footprints_discovery.apply(net, im, fm)

And are the following:

{‘sequence’: {(‘check ticket’, ‘decide’), (‘reinitiate request’, ‘examine casually’), (‘register request’, ‘examine thoroughly’), (‘decide’, ‘reject request’), (‘register request’, ‘check ticket’), (‘register request’, ‘examine casually’), (‘decide’, ‘reinitiate request’), (‘reinitiate request’, ‘examine thoroughly’), (‘decide’, ‘pay compensation’), (‘reinitiate request’, ‘check ticket’), (‘examine casually’, ‘decide’), (‘examine thoroughly’, ‘decide’)}, ‘parallel’: {(‘check ticket’, ‘examine thoroughly’), (‘examine thoroughly’, ‘check ticket’), (‘check ticket’, ‘examine casually’), (‘examine casually’, ‘check ticket’)}, ‘activities’: {‘decide’, ‘examine casually’, ‘reinitiate request’, ‘check ticket’, ‘examine thoroughly’, ‘register request’, ‘reject request’, ‘pay compensation’}, ‘start_activities’: {‘register request’}}

The data structure is a dictionary with, as keys, sequence (expressing directly-follows relationships) and parallel (expressing the parallel behavior that can happen in either way).

It is possible to visualize a comparison between the footprints of the (entire) log and the footprints of the (entire) model.

First of all, let’s see how to visualize a single footprints table, for example the one of the model. The following code can be used:

from pm4py.visualization.footprints import visualizer as fp_visualizer

gviz = fp_visualizer.apply(fp_net, parameters={fp_visualizer.Variants.SINGLE.value.Parameters.FORMAT: "svg"})
fp_visualizer.view(gviz)
To compare the two footprints tables, the following code can be used. Please note that the visualization will look the same, if no deviations are discovered. If deviations are found they are colored by red.

from pm4py.visualization.footprints import visualizer as fp_visualizer

gviz = fp_visualizer.apply(fp_log, fp_net, parameters={fp_visualizer.Variants.COMPARISON.value.Parameters.FORMAT: "svg"})
fp_visualizer.view(gviz)
To actually find some deviations, let’s repeat the procedure on the receipt.xes log, applying a heavy filter on the log to discover a simpler model:

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
from copy import deepcopy
from pm4py.algo.filtering.log.variants import variants_filter

log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
filtered_log = variants_filter.apply_auto_filter(deepcopy(log))

from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, im, fm = inductive_miner.apply(filtered_log)
With a conformance checking operation, we want instead to compare the behavior of the traces of the log against the footprints of the model. This can be done using the following code:

from pm4py.algo.conformance.footprints import algorithm as footprints_conformance

conf_fp = footprints_conformance.apply(fp_trace_by_trace, fp_net)

And will contain, for each trace of the log, a set with the deviations. Extract of the list for some traces:

{(‘T06 Determine necessity of stop advice’, ‘T04 Determine confirmation of receipt’), (‘T02 Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’)}
set()
{(‘T19 Determine report Y to stop indication’, ‘T20 Print report Y to stop indication’), (‘T10 Determine necessity to stop indication’, ‘T16 Report reasons to hold request’), (‘T16 Report reasons to hold request’, ‘T17 Check report Y to stop indication’), (‘T17 Check report Y to stop indication’, ‘T19 Determine report Y to stop indication’)}
set()
set()
{(‘T02 Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’), (‘T10 Determine necessity to stop indication’, ‘T04 Determine confirmation of receipt’), (‘T04 Determine confirmation of receipt’, ‘T03 Adjust confirmation of receipt’), (‘T03 Adjust confirmation of receipt’, ‘T02 Check confirmation of receipt’)}
set()

We can see that for the first trace that contains deviations, there are two deviations, the first related to T06 Determine necessity of stop advice being executed before T04 Determine confirmation of receipt; the second related to T02 Check confirmation of receipt being followed by T06 Determine necessity of stop advice.

The traces for which the conformance returns nothing are fit (at least according to the footprints).

Footprints conformance checking is a way to identify obvious deviations, behavior of the log that is not allowed by the model.

On the log side, their scalability is wonderful! The calculation of footprints for a Petri net model may be instead more expensive.

If we change the underlying model, from Petri nets to process tree, it is possible to exploit its bottomup structure in order to calculate the footprints almost instantaneously.

Let’s open a log, calculate a process tree and then apply the discovery of the footprints. We open the running-example log:

from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply("tests/input_data/running-example.xes")
And apply the inductive miner to discover a process tree:

from pm4py.algo.discovery.inductive import algorithm as inductive_miner

tree = inductive_miner.apply_tree(log)
Then, the footprints can be discovered. We discover the footprints on the entire log, we discover the footprints trace-by-trace in the log, and then we discover the footprints on the process tree:

from pm4py.algo.discovery.footprints import algorithm as fp_discovery

fp_log = fp_discovery.apply(log, variant=fp_discovery.Variants.ENTIRE_EVENT_LOG)
fp_trace_trace = fp_discovery.apply(log, variant=fp_discovery.Variants.TRACE_BY_TRACE)
fp_tree = fp_discovery.apply(tree, variant=fp_discovery.Variants.PROCESS_TREE)

Each one of these contains:

  • A list of sequential footprints contained in the log/allowed by the model
  • A list of parallel footprints contained in the log/allowed by the model
  • A list of activities contained in the log/allowed by the model
  • A list of start activities contained in the log/allowed by the model
  • A list of end activities contained in the log/allowed by the model
It is possible to execute an enhanced conformance checking between the footprints of the (entire) log, and the footprints of the model, by doing:

from pm4py.algo.conformance.footprints import algorithm as fp_conformance

conf_result = fp_conformance.apply(fp_log, fp_tree, variant=fp_conformance.Variants.LOG_EXTENSIVE)

The result contains, for each item of the previous list, the violations.

Given the result of conformance checking, it is possible to calculate the footprints-based fitness and precision of the process model, by doing:

from pm4py.algo.conformance.footprints.util import evaluation

fitness = evaluation.fp_fitness(fp_log, fp_tree, conf_result)
precision = evaluation.fp_precision(fp_log, fp_tree)

These values are both included in the interval [0,1]

Log Skeleton

The concept of log skeleton has been described in the contribution

Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

And is claimingly the most accurate classification approach to decide whether a trace belongs to (the language) of a log or not.

For a log, an object containing a list of relations is calculated.

  • Equivalence: contains the couples of activities that happen ALWAYS with the same frequency inside a trace.
  • Always-after: contains the couples of activities (A,B) such that an occurrence of A is ALWAYS followed, somewhen in the future of the trace, by an occurrence of B.
  • Always-before: contains the couples of activities (B,A) such that an occurrence of B is ALWAYS preceded, somewhen in the past of the trace, by an occurrence of A.
  • Never-together: contains the couples of activities (A,B) that NEVER happens together in the history of the trace.
  • Directly-follows: contains the list of directly-follows relations of the log.
  • For each activity, the number of possible occurrences per trace.

It is also possible to provide a noise threshold. In that case, more relations are found since the conditions are relaxed.

Let’s suppose to take the running-example.xes log:

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
Then, we can calculate the log skeleton:

from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.0})

We can also print that:

{‘equivalence’: {(‘pay compensation’, ‘register request’), (‘examine thoroughly’, ‘register request’), (‘reject request’, ‘register request’), (‘pay compensation’, ‘examine casually’)}, ‘always_after’: {(‘register request’, ‘check ticket’), (‘examine thoroughly’, ‘decide’), (‘register request’, ‘decide’)}, ‘always_before’: {(‘pay compensation’, ‘register request’), (‘pay compensation’, ‘decide’), (‘pay compensation’, ‘check ticket’), (‘reject request’, ‘decide’), (‘pay compensation’, ‘examine casually’), (‘reject request’, ‘check ticket’), (‘examine thoroughly’, ‘register request’), (‘reject request’, ‘register request’)}, ‘never_together’: {(‘pay compensation’, ‘reject request’), (‘reject request’, ‘pay compensation’)}, ‘directly_follows’: set(), ‘activ_freq’: {‘register request’: {1}, ‘examine casually’: {0, 1, 3}, ‘check ticket’: {1, 2, 3}, ‘decide’: {1, 2, 3}, ‘reinitiate request’: {0, 1, 2}, ‘examine thoroughly’: {0, 1}, ‘pay compensation’: {0, 1}, ‘reject request’: {0, 1}}}

We can see the relations (equivalence, always_after, always_before, never_together, directly_follows, activ_freq) as key of the object, and the values are the activities/couples of activities that follow such pattern.

To see how the log skeleton really works, for classification/conformance purposes, let’s change to another log (the receipt.xes log), and calculate an heavily filtered version of that (to have less behavior)

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
from copy import deepcopy
from pm4py.algo.filtering.log.variants import variants_filter
filtered_log = variants_filter.apply_auto_filter(deepcopy(log))
Calculate the log skeleton on top of the filtered log, and then apply the classification as follows:

from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
conf_result = lsk_conformance.apply(log, skeleton)
for trace in conf_result:
    print(conf_result)

In such way, we can get for each trace whether it has been classified as belonging to the filtered log, or not. When deviations are found, the trace does not belong to the language of the original log.

We can also calculate a log skeleton on the original log, for example providing 0.03 as noise threshold, and see which are the effects on the classification:

from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.03})

from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
conf_result = lsk_conformance.apply(log, skeleton)
for trace in conf_result:
    print(conf_result)

We can see that some traces are classified as uncorrect also calculating the log skeleton on the original log, if a noise threshold is provided.

Process Trees

In PM4Py we offer support for process trees (visualization, conversion to Petri net and generation of a log), for importing/exporting, and a functionality to generate them. In this section, the functionalities are examined.

Importing/Exporting Process Trees

In PM4Py, we offer support for importing/exporting process trees in the PTML format.

The following code can be used to import a process tree from a PTML file.


from pm4py.objects.process_tree.importer import importer as ptml_importer

tree = ptml_importer.apply("tests/input_data/running-example.ptml")
                                

The following code can be used to export a process tree into a PTML file.


from pm4py.objects.process_tree.exporter import exporter as ptml_exporter

ptml_exporter.apply(tree, "running-example.ptml")
                                

Generation of process trees

The approach 'PTAndLogGenerator', described by the scientific paper 'PTandLogGenerator: A Generator for Artificial Event Data', has been implemented in the PM4Py library.

The code snippet can be used to generate a process tree.


from pm4py.simulation.tree_generator import simulator as tree_gen
parameters = {}
tree = tree_gen.apply(parameters=parameters)
Suppose the following start activity and their respective occurrences.
Parameter Meaning
MODE most frequent number of visible activities (default 20)
MIN minimum number of visible activities (default 10)
MAX maximum number of visible activities (default 30)
SEQUENCE probability to add a sequence operator to tree (default 0.25)
CHOICE probability to add a choice operator to tree (default 0.25)
PARALLEL probability to add a parallel operator to tree (default 0.25)
LOOP probability to add a loop operator to tree (default 0.25)
OR probability to add an or operator to tree (default 0)
SILENT probability to add silent activity to a choice or loop operator (default 0.25)
DUPLICATE probability to duplicate an activity label (default 0)
LT_DEPENDENCY probability to add a random dependency to the tree (default 0)
INFREQUENT probability to make a choice have infrequent paths (default 0.25)
NO_MODELS number of trees to generate from model population (default 10)
UNFOLD

whether or not to unfold loops in order to include choices underneath in dependencies: 0=False, 1=True

if lt_dependency <= 0: this should always be 0 (False)

if lt_dependency > 0: this can be 1 or 0 (True or False) (default 10)

MAX_REPEAT maximum number of repetitions of a loop (only used when unfolding is True) (default 10)

Generation of a log out of a process tree

The code snippet can be used to generate a log, with 100 cases, out of the process tree.
from pm4py.objects.process_tree import semantics
log = semantics.generate_log(tree, no_traces=100)

Conversion into Petri net

The code snippet can be used to convert the process tree into a Petri net.
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, im, fm = pt_converter.apply(tree)

Visualize a Process Tree

A process tree can be printed, as revealed on the right side.
print(tree)
A process tree can also be visualized, as revealed on the right side.
from pm4py.visualization.process_tree import visualizer as pt_visualizer
gviz = pt_visualizer.apply(tree, parameters={pt_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "png"})
pt_visualizer.view(gviz)

Feature Selection

An operation of feature selection permits to represent the event log in a tabular way. This is important for operations such as prediction and anomaly detection.

Automatic Feature Selection

In PM4Py, we offer ways to perform an automatic feature selection. As example, let us import the receipt log and perform an automatic feature selection on top of it.

First, we import the receipt log:

from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply("tests/input_data/receipt.xes")
                                
Then, let’s perform the automatic feature selection:

from pm4py.objects.log.util import get_log_representation

data, feature_names = get_log_representation.get_default_representation(log)
print(feature_names)
                                

Printing the value feature_names, we see that the following attributes were selected:

  • The attribute channel at the trace level (this assumes values Desk, Intern, Internet, Post, e-mail)
  • The attribute department at the trace level (this assumes values Customer contact, Experts, General)
  • The attribute group at the event level (this assumes values EMPTY, Group 1, Group 12, Group 13, Group 14, Group 15, Group 2, Group 3, Group 4, Group 7).

No numeric attribute value is selected. If we print feature_names, we get the following representation:

[‘trace:channel@Desk’, ‘trace:channel@Intern’, ‘trace:channel@Internet’, ‘trace:channel@Post’, ‘trace:channel@e-mail’, ‘trace:department@Customer contact’, ‘trace:department@Experts’, ‘trace:department@General’, ‘event:org:group@EMPTY’, ‘event:org:group@Group 1’, ‘event:org:group@Group 12’, ‘event:org:group@Group 13’, ‘event:org:group@Group 14’, ‘event:org:group@Group 15’, ‘event:org:group@Group 2’, ‘event:org:group@Group 3’, ‘event:org:group@Group 4’, ‘event:org:group@Group 7’]

So, we see that we have different features for different values of the attribute. This is called one-hot encoding. Actually, a case is assigned to 0 if it does not contain an event with the given value for the attribute; a case is assigned to 1 if it contains at least one event with the attribute.

If we represent the features as a dataframe:

import pandas as pd
df = pd.DataFrame(data, columns=feature_names)
print(df)
                                

We can see the features assigned to each different case.

Manual feature selection

The manual feature selection permits to specify which attributes should be included in the feature selection. These may include for example:

  • The activities performed in the process execution (contained usually in the event attribute concept:name ).
  • The resources that perform the process execution (contained usually in the event attribute org:resource ).
  • Some numeric attributes, at discretion of the user.

To do so, we have to call the method get_log_representation. The types of features that can be considered by a manual feature selection are:

Name Description
str_ev_attr String attributes at the event level: these are hot-encoded into features that may assume value 0 or value 1.
str_tr_attr String attributes at the trace level: these are hot-encoded into features that may assume value 0 or value 1.
num_ev_attr Numeric attributes at the event level: these are encoded by including the last value of the attribute among the events of the trace.
num_tr_attr Numeric attributes at trace level: these are encoded by including the numerical value.
str_evsucc_attr Successions related to the string attributes values at the event level: for example, if we have a trace [A,B,C], it might be important to include not only the presence of the single values A, B and C as features; but also the presence of the directly-follows couples (A,B) and (B,C).

Let’s consider for example a feature selection where we are interested to:

  • If a process execution contains, or not, an activity.
  • If a process execution contains, or not, a resource.
  • If a process execution contains, or not, a directly-follows path between different activities.
  • If a process execution contains, or not, a directly-follows path between different resources.
We see that the number of features is way bigger in this setting

from pm4py.objects.log.util import get_log_representation

data, feature_names = get_log_representation.get_representation(log, str_ev_attr=["concept:name", "org:resource"],
                                                                str_tr_attr=[], num_ev_attr=[], num_tr_attr=[],
                                                                str_evsucc_attr=["concept:name", "org:resource"])
print(len(feature_names))
                                

Calculating useful features

Other features are for example the cycle and the lead time associated to a case.

Here, we may suppose to have:

  • A log with lifecycles, where each event is instantaneous
  • OR an interval log, where events may be associated to two timestamps (start and end timestamp).
The lead/cycle time can be calculated on top of interval logs. If we have a lifecycle log, we need to convert that with:

from pm4py.objects.log.util import interval_lifecycle
log = interval_lifecycle.to_interval(log)
                                
Then, features such as the lead/cycle time can be inserted through the instructions:

from pm4py.objects.log.util import interval_lifecycle
from pm4py.util import constants

log = interval_lifecycle.assign_lead_cycle_time(log, parameters={
    constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY: "start_timestamp",
    constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})
                                

After the provision of the start timestamp attribute (in this case, start_timestamp) and of the timestamp attribute (in this case, time:timestamp), the following features are returned by the method:

  • @@approx_bh_partial_cycle_time => incremental cycle time associated to the event (the cycle time of the last event is the cycle time of the instance)
  • @@approx_bh_partial_lead_time => incremental lead time associated to the event
  • @@approx_bh_overall_wasted_time => difference between the partial lead time and the partial cycle time values
  • @@approx_bh_this_wasted_time => wasted time ONLY with regards to the activity described by the ‘interval’ event
  • @@approx_bh_ratio_cycle_lead_time => measures the incremental Flow Rate (between 0 and 1).
These are all numerical attributes, hence we can refine the feature extraction by doing:

from pm4py.objects.log.util import get_log_representation

data, feature_names = get_log_representation.get_representation(log, str_ev_attr=["concept:name", "org:resource"],
                                                                str_tr_attr=[],
                                                                num_ev_attr=["@@approx_bh_partial_cycle_time",
                                                                             "@@approx_bh_partial_lead_time",
                                                                             "@@approx_bh_overall_wasted_time",
                                                                             "@@approx_bh_this_wasted_time",
                                                                             "@approx_bh_ratio_cycle_lead_time"],
                                                                num_tr_attr=[],
                                                                str_evsucc_attr=["concept:name", "org:resource"])
                                

PCA – Reducing the number of features

Some techniques (such as the clustering, prediction, anomaly detection) suffer if the dimensionality of the dataset is too high. Hence, a dimensionality reduction technique (as PCA) helps to cope with the complexity of the data.

Having a Pandas dataframe out of the features extracted from the log:

import pandas as pd

df = pd.DataFrame(data, columns=feature_names)
                                

It is possible to reduce the number of features using a techniques like PCA.

Let’s create the PCA with a number of components equal to 5, and apply the PCA to the dataframe.

from sklearn.decomposition import PCA

pca = PCA(n_components=5)
df2 = pd.DataFrame(pca.fit_transform(df))
                                

So, from more than 400 columns, we pass to 5 columns that contains most of the variance.

Anomaly Detection

In this section, we consider the calculation of an anomaly score for the different cases. This is based on the features extracted; and to work better requires the application of a dimensionality reduction technique (such as the PCA in the previous section).

Let’s apply a method called IsolationForest to the dataframe. This permits to add a column scores that is lower or equal than 0 when the case needs to be considered anomalous, and is greater than 0 when the case needs not to be considered anomalous.

from sklearn.ensemble import IsolationForest
model=IsolationForest()
model.fit(df2)
df2["scores"] = model.decision_function(df2)
                                
To see which cases are more anomalous, we can sort the dataframe inserting an index. Then, the print will show which cases are more anomalous:

df2["@@index"] = df2.index
df2 = df2[["scores", "@@index"]]
df2 = df2.sort_values("scores")
print(df2)
                                

Decision tree about the ending activity of a process

Decision trees are objects that help the understandement of the conditions leading to a particular outcome. In this section, several examples related to the construction of the decision trees are provided.

Ideas behind the building of decision trees are provided in scientific paper: de Leoni, Massimiliano, Wil MP van der Aalst, and Marcus Dees. 'A general process mining framework for correlating, predicting and clustering dynamic behavior based on event logs.'

The general scheme is the following:

  • A representation of the log, on a given set of features, is obtained (for example, using one-hot encoding on string attributes and keeping numeric attributes as-they-are)
  • A representation of the target classes is constructed
  • The decision tree is calculated
  • The decision tree is represented in some ways

A process instance may potentially finish with different activities, signaling different outcomes of the process instance. A decision tree may help to understand the reasons behind each outcome.

First, a log could be loaded. Then, a representation of a log on a given set of features could be obtained.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))

from pm4py.objects.log.util import get_log_representation
str_trace_attributes = []
str_event_attributes = ["concept:name"]
num_trace_attributes = []
num_event_attributes = ["amount"]
data, feature_names = get_log_representation.get_representation(
                           log, str_trace_attributes, str_event_attributes,
                           num_trace_attributes, num_event_attributes)
Or an automatic representation (automatic selection of the attributes) could be obtained:
data, feature_names = get_log_representation.get_default_representation(log)
(Optional) The features that are extracted by those methods can be represented as a Pandas dataframe:

import pandas as pd
dataframe = pd.DataFrame(data, columns=feature_names)
                                
(Optional) And the dataframe can be exported then as a CSV file.

dataframe.to_csv("features.csv", index=False)
                                
Then, the target classes are formed. Each endpoint of the process belongs to a different class.
from pm4py.objects.log.util import get_class_representation
target, classes = get_class_representation.get_class_representation_by_str_ev_attr_value_value(log, "concept:name")
The decision tree could be then calculated and visualized.
from sklearn import tree
clf = tree.DecisionTreeClassifier()
clf.fit(data, target)

from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
gviz = dectree_visualizer.apply(clf, feature_names, classes)

Decision tree about the duration of a case (Root Cause Analysis)

A decision tree about the duration of a case helps to understand the reasons behind an high case duration (or, at least, a case duration that is above the threshold).

First, a log has to be loaded. A representation of a log on a given set of features could be obtained.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))

from pm4py.objects.log.util import get_log_representation
str_trace_attributes = []
str_event_attributes = ["concept:name"]
num_trace_attributes = []
num_event_attributes = ["amount"]

data, feature_names = get_log_representation.get_representation(log, str_trace_attributes, str_event_attributes,
                                                             num_trace_attributes, num_event_attributes)
Or an automatic representation (automatic selection of the attributes) could be obtained:
data, feature_names = get_log_representation.get_default_representation(log)
Then, the target classes are formed. There are two classes: First, traces that are below the specified threshold (here, 200 days). Note that the time is given in seconds. Second, traces that are above the specified threshold.
from pm4py.objects.log.util import get_class_representation
target, classes = get_class_representation.get_class_representation_by_trace_duration(log, 2 * 8640000)
The decision tree could be then calculated and visualized.
from sklearn import tree
clf = tree.DecisionTreeClassifier()
clf.fit(data, target)

from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
gviz = dectree_visualizer.apply(clf, feature_names, classes)

Decision Mining

Decision mining permits, provided:

  • An event log
  • A process model (an accepting Petri net)
  • A decision point

To retrieve the features of the cases that go in the different directions. This permits, for example, to calculate a decision tree that explains the decisions.

Let’s start by importing a XES log:

from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply("tests/input_data/running-example.xes")
                                
Calculating a model using the inductive miner:

from pm4py.algo.discovery.inductive import algorithm as inductive_miner

net, im, fm = inductive_miner.apply(log)
                                
A visualization of the model can be obtained in the following way:

from pm4py.visualization.petrinet import visualizer

gviz = visualizer.apply(net, im, fm, parameters={visualizer.Variants.WO_DECORATION.value.Parameters.DEBUG: True})
visualizer.view(gviz)
                                

For this example, we choose the decision point p_10. There, a decision, is done between the activities examine casually and examine throughly.

To execute the decision mining algorithm, once we have a log, model and a decision point, the following code can be used:

from pm4py.algo.enhancement.decision import algorithm as decision_mining

X, y, class_names = decision_mining.apply(log, net, im, fm, decision_point="p_10")
                                

As we see, the outputs of the apply method are the following:

  • X: a Pandas dataframe containing the features associated to the cases leading to a decision.
  • y: a Pandas dataframe, that is a single column, containing the number of the class that is the output of the decision (in this case, the values possible are 0 and 1, since we have two target classes)
  • class_names: the names of the output classes of the decision (in this case, examine casually and examine thoroughly).

These outputs can be used in a generic way with any classification or comparison technique.

In particular, decision trees can be useful. We provide a function to automate the discovery of decision trees out of the decision mining technique.

The code that should be applied is the following:

from pm4py.algo.enhancement.decision import algorithm as decision_mining

clf, feature_names, classes = decision_mining.get_decision_tree(log, net, im, fm, decision_point="p_10")
                                
Then, a visualization of the decision tree can be obtained in the following way:

from pm4py.visualization.decisiontree import visualizer as tree_visualizer

gviz = tree_visualizer.apply(clf, feature_names, classes)
                                

Statistics

In PM4Py, it is possible to calculate different statistics on top of classic event logs and dataframes.

Throughput Time

Given an event log, it is possible to retrieve the list of all the durations of the cases (expressed in seconds). The only parameter that is needed is the timestamp. The code on the right can be used.

from pm4py.statistics.traces.log import case_statistics
all_case_durations = case_statistics.get_all_casedurations(log, parameters={
    case_statistics.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                    
It is also possible to retrieve, as example, the median case duration (that can be also be calculated on top of the previous list)

from pm4py.statistics.traces.log import case_statistics
median_case_duration = case_statistics.get_median_caseduration(log, parameters={
    case_statistics.Parameters.TIMESTAMP_KEY: "time:timestamp"
})
                                    

Case Arrival/Dispersion Ratio

Given an event log, it is possible to retrieve the case arrival ratio, that is the average distance between the arrival of two consecutive cases in the log.

from pm4py.statistics.traces.log import case_arrival
case_arrival_ratio = case_arrival.get_case_arrival_avg(log, parameters={
    case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                    
It is also possible to calculate the case dispersion ratio, that is the average distance between the finishing of two consecutive cases in the log.

from pm4py.statistics.traces.log import case_arrival
case_dispersion_ratio = case_arrival.get_case_dispersion_avg(log, parameters={
    case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                    

Performance Spectrum

The performance spectrum is a powerful tool to analyse the time that is passed between the different activities of the process. The input of the performance spectrum is a list of activities of the log (for which we want to consider the spectrum). The output of the performance spectrum is a list of lists, each one containing the timestamps in which the activities happened inside the cases.

An example application of the performance spectrum on the running-example log, providing as list of activities for the spectrum the list containing register request and decide, is the following:

from pm4py.statistics.performance_spectrum import algorithm as performance_spectrum
ps = performance_spectrum.apply(log, ["register request", "decide"],
                                parameters={performance_spectrum.Parameters.ACTIVITY_KEY: "concept:name",
                                            performance_spectrum.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                    

The only parameters of the performance spectrum are the activity key and the timestamp key.

In such setting, the output of the performance spectrum is the following:


{'list_activities': ['register request', 'decide'], 'points': [[1293703320.0, 1294309080.0], [1293705120.0, 1294222920.0], [1293715920.0, 1294301880.0], [1294300920.0, 1294662480.0], [1294322520.0, 1294415520.0], [1294322520.0, 1294570920.0]]}
                                    

Business Hours

Given an interval event log (an EventLog object where each event is characterised by two timestamps, a start timestamp usually contained in the start_timestamp attribute and a completion timestamp usually contained in the time:timestamp attribute), the duration of the event is the difference between the completion timestamp and the start timestamp.

This may be inficiated by nights (where an activity is not actively worked), weekends (where the workers may not be at the workplace) and other kind of pauses. In PM4Py, a way to consider only the time in which the activity could actually be worked (so, excluding time outside of the working hours and weekends) is provided.

Given a start and end timestamp (expressed as UNIX timestamps), the business hours calculation method could be called as follows:

from pm4py.util.business_hours import BusinessHours
from datetime import datetime

st = datetime.fromtimestamp(100000000)
et = datetime.fromtimestamp(200000000)
bh_object = BusinessHours(st, et)
worked_time = bh_object.getseconds()
print(worked_time)
                                    
To provide specific shifts and weekends (for example, always short weeks with 4 working days and work days from 10 to 16) the following code could be used:

bh_object = BusinessHours(st, et, worktiming=[10, 16], weekends=[5, 6, 7])
worked_time = bh_object.getseconds()
print(worked_time)
                                    

Cycle Time and Waiting Time

Two important KPI for a process executions are:

  • The Lead Time: the overall time in which the instance was worked, from the start to the end, without considering if it was actively worked or not.
  • The Cycle Time: the overall time in which the instance was worked, from the start to the end, considering only the times where it was actively worked.

For these concepts, it is important to consider only business hours (so, excluding nights and weekends). Indeed, in that period the machinery and the workforce is at home, so could not proceed in working the instance, so the time “wasted” there is not recoverable.

Within ‘interval’ event logs (that have a start and an end timestamp), it is possible to calculate incrementally the lead time and the cycle time (event per event). The lead time and the cycle time that are reported on the last event of the case are the ones related to the process execution. With this, it is easy to understand which activities of the process have caused a bottleneck (e.g. the lead time increases significantly more than the cycle time).

The algorithm implemented in PM4Py start sorting each case by the start timestamp (so, activities started earlier are reported earlier in the log), and is able to calculate the lead and cycle time in all the situations, also the complex ones reported in the following picture:

In the following, we aim to insert the following attributes to events inside a log:

Attribute Description
@@approx_bh_partial_cycle_time Incremental cycle time associated to the event (the cycle time of the last event is the cycle time of the instance)
@@approx_bh_partial_lead_time Incremental lead time associated to the event
@@approx_bh_overall_wasted_time Difference between the partial lead time and the partial cycle time values
@@approx_bh_this_wasted_time Wasted time ONLY with regards to the activity described by the ‘interval’ event
@@approx_bh_ratio_cycle_lead_time Measures the incremental Flow Rate (between 0 and 1).

The method that calculates the lead and the cycle time could accept the following optional parameters:

Name Description
worktiming The work timing (e.g. [7, 17])
weekends The specification of the weekends (e.g. [6, 7])
And could be applied with the following line of code:

from pm4py.objects.log.util import interval_lifecycle
enriched_log = interval_lifecycle.assign_lead_cycle_time(log)
                                    

With this, an enriched log that contains for each event the corresponding attributes for lead/cycle time is obtained.

Displaying Graphs

Graphs permits to understand several aspects of the current log (for example, the distribution of a numeric attribute, or the distribution of case duration, or the events over time).

Distribution of case duration

In the following example, the distribution of case duration is shown in two different graphs, a simple plot and a semi-logarithmic (on the X-axis plot). The semi-logarithmic plot is less sensible to possible outliers. First, the Receipt log is loaded. Then, the distribution related to case duration may be obtained. We could obtain the simple plot, Or the semi-logarithmic (on the X-axis) plot.

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log_path = os.path.join("tests","input_data","receipt.xes")
log = xes_importer.apply(log_path)

from pm4py.util import constants
from pm4py.statistics.traces.log import case_statistics
x, y = case_statistics.get_kde_caseduration(log, parameters={constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})

from pm4py.visualization.graphs import visualizer as graphs_visualizer

gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.CASES)
graphs_visualizer.view(gviz)

gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.CASES)
graphs_visualizer.view(gviz)
                                    

Distribution of events over time

In the following example, a graph representing the distribution of events over time is obtained. This is particularly important because it helps to understand in which time intervals the greatest number of events is recorded. The distribution related to events over time may be obtained. The graph could be obtained.

from pm4py.algo.filtering.log.attributes import attributes_filter

x, y = attributes_filter.get_kde_date_attribute(log, attribute="time:timestamp")

from pm4py.visualization.graphs import visualizer as graphs_visualizer

gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.DATES)
graphs_visualizer.view(gviz)
                                    

Distribution of a numeric attribute

In the following example, two graphs related to the distribution of a numeric attribute will be obtained, a normal plot and a semilogarithmic (on the X-axis) plot (that is less sensitive to outliers). First, a filtered version of the Road Traffic log is loaded. Then, the distribution of the numeric attribute amount is obtained. The standard graph could be then obtained, or the semi-logarithmic graph could be obtained

import os
from pm4py.objects.log.importer.xes import importer as xes_importer

log_path = os.path.join("tests", "input_data", "roadtraffic100traces.xes")
log = xes_importer.apply(log_path)

from pm4py.algo.filtering.log.attributes import attributes_filter

x, y = attributes_filter.get_kde_numeric_attribute(log, "amount")

from pm4py.visualization.graphs import visualizer as graphs_visualizer

gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
graphs_visualizer.view(gviz)

from pm4py.visualization.graphs import visualizer as graphs_visualizer

gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
graphs_visualizer.view(gviz)
                                    

Log-Model Evaluation

In PM4Py, it is possible to compare the behavior contained in the log and the behavior contained in the model, in order to see if and how they match. Four different dimensions exist in process mining, including the measurement of replay fitness, the measurement of precision, the measurement of generalization, the measurement of simplicity.

Since PM4Py 1.5.x, we offer a simplified interface for log-model evaluation. This provides a restricted set of choices in comparison to the normal interface.

Function Description
fitness_align_dictio = pm4py.evaluate_fitness_alignments(log, petri_net, initial_marking, final_marking) Calculates the fitness using alignments.

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
Fitness dictionary (from alignments)
fitness_tbr_dictio = pm4py.evaluate_fitness_tbr(log, petri_net, initial_marking, final_marking) Calculates the fitness using token-based replay.

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
Fitness dictionary (from TBR)
prec_align_dictio = pm4py.evaluate_precision_alignments(log, petri_net, initial_marking, final_marking) Calculates the precision using alignments.

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
Precision dictionary (from alignments)
prec_tbr_dictio = pm4py.evaluate_precision_tbr(log, petri_net, initial_marking, final_marking) Calculates the precision using token-based replay.

Parameters:
log - Event log
petri_net - Petri net
initial_marking - Initial marking
final_marking - Final marking

Returns:
Precision dictionary (from TBR)

Replay Fitness

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution:

Berti, Alessandro, and Wil MP van der Aalst. "Reviving Token-based Replay: Increasing Speed While Improving Diagnostics." ATAED@ Petri Nets/ACSD. 2019.

For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.

The two variants of replay fitness are implemented as Variants.TOKEN_BASED and Variants.ALIGNMENT_BASED respectively.

To calculate the replay fitness between an event log and a Petri net model, using the token-based replay method, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.replay_fitness import evaluator as replay_fitness_evaluator
fitness = replay_fitness_evaluator.apply(log, net, im, fm, variant=replay_fitness_evaluator.Variants.TOKEN_BASED)
                                    
To calculate the replay fitness between an event log and a Petri net model, using the alignments method, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.replay_fitness import evaluator as replay_fitness_evaluator
fitness = replay_fitness_evaluator.apply(log, net, im, fm, variant=replay_fitness_evaluator.Variants.ALIGNMENT_BASED)
                                    

Precision

We propose two approaches for the measurement of precision in PM4Py:
  • ETConformance (using token-based replay): the reference paper is Muñoz-Gama, Jorge, and Josep Carmona. "A fresh look at precision in process conformance." International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.
  • Align-ETConformance (using alignments): the reference paper is Adriansyah, Arya, et al. "Measuring precision of modeled behavior." Information systems and e-Business Management 13.1 (2015): 37-67.

The idea underlying the two approaches is the same: the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

This works only if the replay of the prefix on the process model works: if the replay does not produce a result, the prefix is not considered for the computation of precision. Hence, the precision calculated on top of unfit processes is not really meaningful.

The main difference between the approaches is the replay method. Token-based replay is faster but based on heuristics (hence the result of the replay might not be exact). Alignments are exact, work on any kind of relaxed sound nets, but can be slow if the state-space is huge.

The two variants, ETConformance and Align-ETConformance, are available as Variants.ETCONFORMANCE_TOKEN and Variants.ALIGN_ETCONFORMANCE in the implementation respectively.

To calculate the precision between an event log and a Petri net model, using the ETConformance method, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.precision import evaluator as precision_evaluator
prec = precision_evaluator.apply(log, net, im, fm, variant=precision_evaluator.Variants.ETCONFORMANCE_TOKEN)
                                    
To calculate the precision between an event log and a Petri net model, using the Align-ETConformance method, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.precision import evaluator as precision_evaluator
prec = precision_evaluator.apply(log, net, im, fm, variant=precision_evaluator.Variants.ALIGN_ETCONFORMANCE)
                                    

Generalization

Generalization is the third dimension to analyse how the log and the process model match. In particular, we propose the generalization measure described in the following research paper:

Buijs, Joos CAM, Boudewijn F. van Dongen, and Wil MP van der Aalst. "Quality dimensions in process discovery: The importance of fitness, precision, generalization and simplicity." International Journal of Cooperative Information Systems 23.01 (2014): 1440001.

Basically, a model is general whether the elements of the model are visited enough often during a replay operation (of the log on the model). A model may be perfectly fitting the log and perfectly precise (for example, reporting the traces of the log as sequential models going from the initial marking to the final marking; a choice is operated at the initial marking). Hence, to measure generalization a token-based replay operation is performed, and the generalization is calculated as 1 - avg_t (sqrt(1.0 / freq(t)))) where avg_t is the average of the inner value over all the transitions, sqrt is the square root, freq(t) is the frequency of t after the replay.
To calculate the generalization between an event log and a Petri net model, using the generalization method proposed in this section, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.generalization import evaluator as generalization_evaluator
gen = generalization_evaluator.apply(log, net, im, fm)
                                    

Simplicity

Simplicity is the fourth dimension to analyse a process model. In this case, we define simplicity taking into account only the Petri net model. The criteria that we use for simplicity is the inverse arc degree as described in the following research paper

Blum, Fabian Rojas. Metrics in process discovery. Technical Report TR/DCC-2015-6, Computer Science Department, University of Chile, 2015.

First of all, we consider the average degree for a place/transition of the Petri net, that is defined as the sum of the number of input arcs and output arcs. If all the places have at least one input arc and output arc, the number is at least 2. Choosing a number k between 0 and infinity, the simplicity based on the inverse arc degree is then defined as 1.0 / (1.0 + max(mean_degree - k, 0)).

To calculate the simplicity on a Petri net model, using the inverse arc degree, the following code can be used. The resulting value is a number between 0 and 1.

from pm4py.evaluation.simplicity import evaluator as simplicity_evaluator
simp = simplicity_evaluator.apply(net)
                                    

Earth Mover Distance

The Earth Mover Distance as introduced in:

Leemans, Sander JJ, Anja F. Syring, and Wil MP van der Aalst. “Earth movers’ stochastic conformance checking.” International Conference on Business Process Management. Springer, Cham, 2019.

provides a way to calculate the distance between two different stochastic languages.

Generally, one language is extracted from the event log, and one language is extracted from the process model. With language, we mean a set of traces that is weighted according to its probability.

For the event log, trivially taking the set of variants of the log, and dividing by the total number of languages, provides the language of the model.

We can see how the language of the model can be discovered. We can import an event log and calculate its language:

from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.statistics.variants.log import get as variants_module

log = xes_importer.apply("tests/input_data/running-example.xes")
language = variants_module.get_language(log)
print(language)
                                    

Obtaining the following probability distribution:

{(‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘reject request’): 0.16666666666666666, (‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘reinitiate request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reject request’): 0.16666666666666666, (‘register request’, ‘check ticket’, ‘examine thoroughly’, ‘decide’, ‘reject request’): 0.16666666666666666}

The same thing does not happen in a natural way for the process model. In order to calculate a language for the process model, a scalable approach (but non deterministic) is to playout the model in order to obtain an event log.

Let’s first apply the Alpha Miner. Then, we do the playout of the Petri net. We choose the STOCHASTIC_PLAYOUT variant.

from pm4py.algo.discovery.alpha import algorithm as alpha_miner
net, im, fm = alpha_miner.apply(log)
from pm4py.simulation.playout import simulator
playout_log = simulator.apply(net, im, fm, variant=simulator.Variants.STOCHASTIC_PLAYOUT)
                                    
We can then calculate the language of the event log:

from pm4py.simulation.playout import simulator
playout_log = simulator.apply(net, im, fm, parameters={simulator.Variants.STOCHASTIC_PLAYOUT.value.Parameters.LOG: log},
                              variant=simulator.Variants.STOCHASTIC_PLAYOUT)
                                    

Obtaining the language of the model. Then, the earth mover distance is calculated:

  • It is assured that the two languages contain the same words: if a language does not contain a word, that is set to 0
  • A common ordering (for example, alphabetical ordering) is decided among the keys of the languages.
  • The distance between the different keys is calculated (using a string distance function such as the Levensthein function).

This permits to obtain a number greater or equal than 0 that express the distance between the language of the log and the language of the model. This is an alternative measure for the precision. To calculate the Earth Mover Distance, the Python package pyemd should be installed (pip install pyemd).

The code to apply the Earth Mover Distance is the following:

from pm4py.evaluation.earth_mover_distance import evaluator
emd = evaluator.apply(model_language, language)
print(emd)
                                    

If the running-example log is chosen along with the Alpha Miner model, a value similar/equal to 0.1733.

WOFLAN

WOFLAN is a popular approach for soundness checking on workflow nets, that is able to provide meaningful statistics to the final user. WOFLAN is described in this PhD thesis:

http://www.processmining.org/_media/publications/everbeek_phdthesis.pdf

The definition of workflow net and soundness can also be found at:

https://en.wikipedia.org/wiki/Petri_net

WOFLAN is applied to an accepting Petri net (a Petri net with an initial and final marking) and applies the following steps (the meaning of the steps is found in the thesis):

  • Checking if the Petri net and the markings are valid.
  • Checking if the Petri net is a workflow net.
  • Checking if all the places are covered by S-components.
  • Checking if there are not well-handled pairs.
  • Checking if there are places that are uncovered in uniform invariants.
  • Checking if there are places that are uncovered in weighted invariants.
  • Checking if the WPD is proper.
  • Checking for substates in the MCG.
  • Checking if there are unbounded sequences.
  • Checking for dead tasks.
  • Checking for live tasks.
  • Checking for non-live tasks.
  • Checking for sequences leading to deadlocks.

The order of application is described by the picture at the following link. If the step has positive outcome, a Yes is written on the corresponding edge. If the step has a negative outcome, a No is written on the corresponding edge.

Let's see how Woflan can be applied. First, we open a XES log

from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply("tests/input_data/running-example.xes")
                                    
And we discover a model using the Heuristics Miner

from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner

net, im, fm = heuristics_miner.apply(log)
                                    
Then, the soundness can be checked by doing:

from pm4py.evaluation.soundness.woflan import algorithm as woflan

is_sound = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: True,
                                                 woflan.Parameters.PRINT_DIAGNOSTICS: False,
                                                 woflan.Parameters.RETURN_DIAGNOSTICS: False})
                                    

In this case, is_sound contains a boolean value (True if the Petri net is a sound workflow net; False otherwise).

The list of parameters are:

Parameter Description
PRINT_DIAGNOSTICS Enables the printing of the diagnostics on the Petri net, when WOFLAN is executed.
RETURN_DIAGNOSTICS Returns a dictionary containing the diagnostics.
RETURN_ASAP_WHEN_NOT_SOUND Stops the execution of WOFLAN when a condition determining that the Petri net is not a sound workflow net is found.

On the provided Petri net, that is not sound, the output of the technique is False.

To know why such Petri net is not sound, we repeat the execution of the script setting PRINT_DIAGNOSTICS to True and RETURN_ASAP_WHEN_NOT_SOUND to False (to get more diagnostics) We get the following messages during the execution:

Input is ok.
Petri Net is a workflow net.
The following places are not covered by an s-component: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1].
Not well-handled pairs are: [(1, 6), (5, 6), (17, 82), (1, 20), (25, 20), (39, 82), (1, 46), (5, 46), (25, 46), (35, 46), (25, 56), (35, 56), (1, 62), (5, 62), (5, 74), (35, 74), (89, 82)].
The following places are uncovered in uniform invariants: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1]
The following places are uncovered in weighted invariants: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1]
Improper WPD. The following are the improper conditions: [0, 176, 178, 179, 186, 190, 193, 196, 199, 207, 214, 215, 216, 217, 222, 233, 235].
The following sequences are unbounded: [[register request, hid_10, hid_3, check ticket, hid_1, examine casually, hid_7, decide, hid_13], [register request, hid_9, hid_5, examine thoroughly, hid_8, decide, hid_13], [register request, hid_9, hid_5, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_17, hid_2, examine casually, check ticket, hid_7, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_17, hid_2, examine casually, check ticket, hid_7, decide, hid_14, reinitiate request, hid_16]]

From there, we can read that:

  • There are places not covered in an S-component.
  • There are no well-handled pairs.
  • There are places uncovered in uniform and weighted invariants.
  • It is an improper WPD.
  • Some sequences are unbounded.
To get the diagnostics in a dictionary, the execution can be repeated with:

from pm4py.evaluation.soundness.woflan import algorithm as woflan

is_sound, dictio_diagnostics = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: False,
                                                 woflan.Parameters.PRINT_DIAGNOSTICS: False,
                                                 woflan.Parameters.RETURN_DIAGNOSTICS: True})
                                    

The dictionary dictio_diagnostics may contain the following keys (if the computation reach the corresponding step):

Key Description
S_C_NET
PLACE_INVARIANTS
UNIFORM_PLACE_INVARIANTS
S_COMPONENTS
UNCOVERED_PLACES_S_COMPONENT
NOT_WELL_HANDLED_PAIRS
LEFT
UNCOVERED_PLACES_UNIFORM
WEIGHTED_PLACE_INVARIANTS
UNCOVERED_PLACES_WEIGHTED
MCG
DEAD_TASKS
R_G_S_C
R_G
LOCKING_SCENARIOS
RESTRICTED_COVERABILITY_TREE

Simulation

In PM4Py, we offer different simulation algorithms, that starting from a model, are able to produce an output that follows the model and the different rules that have been provided by the user.

Playout of a Petri Net

A playout of a Petri net takes as input a Petri net along with an initial marking, and returns a list of process executions that are allowed from the process model.

We offer different types of playouts:

Variant Description
Variants.BASIC_PLAYOUT A basic playout that accepts a Petri net along with an initial marking, and returns a specified number of process executions (repetitions may be possible).
Variants.EXTENSIVE A playout that accepts a Petri net along with an initial marking, and returns all the executions that are possible according to the model, up to a provided length of trace (may be computationally expensive).

The list of parameters for such variants are:

Variant Parameter Description
Variants.BASIC_PLAYOUT Parameters.ACTIVITY_KEY The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY The trace attribute that should be used as case identifier in the playout log.
Parameters.NO_TRACES The number of traces that the playout log should contain.
Parameters.MAX_TRACE_LENGTH The maximum trace length (after which, the playout of the trace is stopped).
Variants.EXTENSIVE Parameters.ACTIVITY_KEY The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY The trace attribute that should be used as case identifier in the playout log.
Parameters.MAX_TRACE_LENGTH The maximum trace length (after which, the extensive playout is stopped).
An example application of the basic playout, given a Petri net, to get a log of 50 traces, is the following:

from pm4py.simulation.playout import simulator

simulated_log = simulator.apply(net, im, variant=simulator.Variants.BASIC_PLAYOUT, parameters={simulator.Variants.BASIC_PLAYOUT.value.Parameters.NO_TRACES: 50})
                                    
An example application of the extensive playout, given a Petri net, to get the log containing all the executions of length <= 7:

from pm4py.simulation.playout import simulator

simulated_log = simulator.apply(net, im, variant=simulator.Variants.EXTENSIVE, parameters={simulator.Variants.EXTENSIVE.value.Parameters.MAX_TRACE_LENGTH: 7})
                                    

Monte Carlo Simulation

A time-related simulation permits to know how probable is that a process execution is terminated after a given amount of time. This leads to a better estimation of Service Level Agreements, or a better identification of the process instances that are most likely to have an high throughput time.
All this starts from a performance DFG, for example the one discovered from the running-example log

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
from pm4py.statistics.start_activities.log import get as start_activities
from pm4py.statistics.end_activities.log import get as end_activities
sa = start_activities.get_start_activities(log)
ea = end_activities.get_end_activities(log)
                                    
and the knowledge of the case arrival ratio. The case arrival ratio is the amount of time that passes (in average, or median) between the arrival of two consecutive cases. It can be provided by the user or inferred from the event log. The inference from the event log is done by using the following command:

from pm4py.statistics.traces.log import case_arrival
ratio = case_arrival.get_case_arrival_avg(log)
print(ratio)
                                    

Using the DFG mining approach, it is possible to retrieve a Petri net model from the DFG. This kind of models is the “default” one for Monte Carlo simulation (because its execution semantics is very clear). Moreover, the Petri net extracted by the DFG mining approach is a sound workflow net (that gives other good properties to the model).

The DFG mining approach can be applied in the following way:

from pm4py.objects.conversion.dfg import converter
net, im, fm = converter.apply(dfg_perf, variant=converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE,
                              parameters={converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE.value.Parameters.START_ACTIVITIES: sa,
                                          converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE.value.Parameters.END_ACTIVITIES: ea})
                                    
To perform a basic Montecarlo simulation, the following code can be used. The following is a sort of resource-constrained simulation, where it is assumed that a place can hold at most 1 token per time. Later, we will see how to provide an higher number of tokens that can be hosted by a place.

from pm4py.simulation.montecarlo import simulator as montecarlo_simulation
from pm4py.algo.conformance.tokenreplay.algorithm import Variants
parameters = {}
parameters[
    montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.TOKEN_REPLAY_VARIANT] = Variants.BACKWARDS
parameters[montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.PARAM_CASE_ARRIVAL_RATIO] = 10800
simulated_log, res = montecarlo_simulation.apply(log, net, im, fm, parameters=parameters)
                                    

During the replay operation, some debug messages are written to the screen. The main outputs of the simulation process are:

Key Description of the value
simulated_log The traces that have been simulated during the simulation.
res The result of the simulation (Python dictionary).

Among res, that is the result of the simulation, we have the following keys:

Key Description of the value
places_interval_trees an interval tree for each place, that hosts an interval for each time when it was “full” according to the specified maximum amount of tokens per place.
transitions_interval_trees an interval tree for each transition, that contains all the time intervals in which the transition was enabled but not yet fired (so, the time between a transition was fully enabled and the consumption of the tokens from the input places)
cases_ex_time a list containing the throughput times for all the cases of the log
median_cases_ex_time the median throughput time of the cases in the simulated log
input_case_arrival_ratio the case arrival ratio that was provided by the user, or automatically calculated from the event log.
total_cases_time the difference between the last timestamp of the log, and the first timestamp of the simulated log.
The last four items of the previous list are simple Python objects (floats and lists in the specific). The interval trees objects can be used in the following way to get time-specific information. For example, the following code snippet prints for a random transition in the model, the number of intervals that are overlapping for 11 different points (including the minimum and the maximum timestamp in the log) that are uniformly distributed across the time interval of the log.

import random
last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
pick_trans = random.choice(list(res["transitions_interval_trees"]))
print(pick_trans)
n_div = 10
i = 0
while i < n_div:
    timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
    print("\t", timestamp, len(res["transitions_interval_trees"][pick_trans][timestamp]))
    i = i + 1
                                    
The following code snippet instead prints, for a random transition in the model, the number of intervals that are overlapping for 11 different points (including the minimum and the maximum timestamp of the log) that are uniformly distributed across the time interval of the log:

import random
last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
pick_place = random.choice(list(res["places_interval_trees"]))
print(pick_place)
n_div = 10
i = 0
while i < n_div:
    timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
    print("\t", timestamp, len(res["places_interval_trees"][pick_place][timestamp]))
    i = i + 1
                                    

The information can be used to build some graphs like these (using external programs such as Microsoft Excel).

The simulation process can be resumed as follows:

  • An event log and a model (DFG) is considered.
  • Internally in the simulation, a replay operation is done between the log and the model.
  • The replay operation leads to the construction of a stochastic map that associates to each transition a probability distribution (for example, a normal distribution, an exponential distribution …). The probability distribution that maximizes the likelihood of the observed values during the replay is chosen. The user can force a specific transition (like exponential) if he wants.
  • Moreover, during the replay operation, the frequency of each transition is found. That helps in picking in a “weighted” way one of the transitions enabled in a marking, when the simulation occurs.
  • The simulation process occurs. For each one of the trace that are generated (the distance between the start of them is fixed) a thread is spawned, stochastic choices are made. The possibility to use a given place (depending on the maximum number of resources that is possible to use) is given by a semaphore object in Python.
  • A maximum amount of time is specified for the simulation. If one or more threads exceed that amount of time, the threads are killed and the corresponding trace is not added to the simulation log.

Hence, several parameters are important in order to perform a Monte Carlo simulation. These parameters, that are inside the petri_semaph_fifo class, are (ordered by importance).

Variant Parameter Description
Variants.PETRI_SEMAPH_FIFO Parameters.PARAM_NUM_SIMULATIONS Number of simulations that are performed (the goal is to have such number of traces in the model)
Parameters.PARAM_CASE_ARRIVAL_RATIO The case arrival ratio that is specified by the user.
Parameters.PARAM_MAP_RESOURCES_PER_PLACE A map containing for each place of the Petri net the maximum amount of tokens
Parameters.PARAM_DEFAULT_NUM_RESOURCES_PER_PLACE If the map of resources per place is not specified, then use the specified maximum number of resources per place.
Parameters.PARAM_MAX_THREAD_EXECUTION_TIME Specifies the maximum execution time of the simulation (for example, 60 seconds).
Parameters.PARAM_SMALL_SCALE_FACTOR Specifies the ratio between the “real” time scale and the simulation time scale. A higher ratio means that the simulation goes faster but is in general less accurate. A lower ratio means that the simulation goes slower and is in general more accurate (in providing detailed diagnostics). The default choice is 864000 seconds (10 days). So that means that a second in the simulation is corresponding to 10 days of real log.
Parameters.PARAM_ENABLE_DIAGNOSTICS Enables the printing of the simulation diagnostics through the usage of the “logging” class of Python
Parameters.ACTIVITY_KEY The attribute of the log that should be used as activity
Parameters.TIMESTAMP_KEY The attribute of the log that should be used as timestamp
Parameters.TOKEN_REPLAY_VARIANT The variant of the token-based replay to use: token_replay, the classic variant, that cannot handle duplicate transitions; backwards, the backwards token-based replay, that is slower but can handle invisible transitions.
Parameters.PARAM_FORCE_DISTRIBUTION If specified, the distribution that is forced for the transitions (normal, exponential, ...)
Parameters.PARAM_DIAGN_INTERVAL The time interval in which diagnostics should be printed (for example, diagnostics should be printed every 10 seconds).

CTMC Simulation (DFG)

A time-related simulation permits to know how probable is that a process execution is terminated after a given amount of time. This leads to a better estimation of Service Level Agreements, or a better identification of the process instances that are most likely to have an high throughput time.

All this starts from a performance DFG, for example the one discovered from the running-example log

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
from pm4py.statistics.start_activities.log import get as start_activities
from pm4py.statistics.end_activities.log import get as end_activities
sa = start_activities.get_start_activities(log)
ea = end_activities.get_end_activities(log)
                                    

For the simulation model, a CTMC (Continuous Time Markov Chain) is built from the DFG. This model is very powerful, assuming that the frequency of the edges outgoing an activity is similar. If that is not the case (for example, when an outgoing edge has frequency 1 and the other 10000) the model works less well.

In order to ensure that the DFG contains, as much as possible, only frequent arcs, a filtering operation needs to be applied. For example, it is possible to use the variants-based filtering on the log. An example of application of variants-based filtering is:

from pm4py.algo.filtering.log.variants import variants_filter
log = variants_filter.apply_auto_filter(log)
                                    

Given that the edge contains the average of time between the states, it is assumed by the CTMC that the distribution of times follows an exponential distribution with the given average.

The simulation model can be easily constructed by doing:

from pm4py.objects.stochastic_petri import ctmc
reach_graph, tang_reach_graph, stochastic_map, q_matrix = ctmc.get_tangible_reachability_and_q_matrix_from_dfg_performance(dfg_perf, parameters={"start_activities": sa, "end_activities": ea})
print(tang_reach_graph.states)
                                    

The last line prints the states of the model, that are:

{reinitiaterequest1, sink1, source1, paycompensation1, registerrequest1, decide1, rejectrequest1, examinethoroughly1, checkticket1, examinecasually1}

“source1” is the source state of the model (that is implicitly connected to the start activity “register request”). “sink1” is the terminal state of the model (that is implicitly connected to the end activities “pay compensation” and “reject request”). The other states of the model are the ones in which you go after executing the corresponding activity (for example, “decide1” is the state in which you sink after a “Decide” activity).

Starting from “source1”, we would like to know how much probable is that a process execution is already over after 2 days. To do that, we perform a transient analysis starting from the state “source1” specifiying as 172800 (2 days) the number of seconds:

# pick the source state
state = [x for x in tang_reach_graph.states if x.name == "source1"][0]
# analyse the distribution over the states of the system starting from the source after 172800.0 seconds (2 days)
transient_result = ctmc.transient_analysis_from_tangible_q_matrix_and_single_state(tang_reach_graph, q_matrix, state,
                                                                                 172800.0)
                                    

We get the following dictionary as output:

Counter({examinethoroughly1: 0.2962452842059696, sink1: 0.22722259091795302, decide1: 0.21581166958939804, checkticket1: 0.14875795862276098, examinecasually1: 0.10303611911547725, reinitiaterequest1: 0.008919293375341046, registerrequest1: 7.082354169889813e-06, rejectrequest1: 1.399193916663259e-09, paycompensation1: 4.1973640443858166e-10, source1: 9.448511222571739e-23})

That means that we have the 22,72% of probability to have already finished the process execution (being in the “sink” state) after 2 days. Let’s calculate that for 100 days (8640000 seconds):

Counter({sink1: 0.9999999999459624, examinethoroughly1: 2.439939596108015e-11, decide1: 1.773740724895241e-11, checkticket1: 7.016711086804287e-12, examinecasually1: 4.146293819445992e-12, reinitiaterequest1: 7.377366868580615e-13, rejectrequest1: 1.1499875063552587e-19, paycompensation1: 3.449783588381732e-20, registerrequest1: 3.227610398476988e-258, source1: 1.3702507295618392e-274})

According to the model, we have probability 99,999999995 % to have finished the process after 100 days! That is practically 100%.

Suppose we know how much probable is that, after a decision, the end of the process is reached in 10 days. This can be done:

state = [x for x in tang_reach_graph.states if x.name == "decide1"][0]
transient_result = ctmc.transient_analysis_from_tangible_q_matrix_and_single_state(tang_reach_graph, q_matrix, state,
                                                                                   864000.0)
print(transient_result)
                                    

The result is:

Counter({sink1: 0.9293417034466963, examinethoroughly1: 0.03190639167001194, decide1: 0.02319465799554023, checkticket1: 0.009172805258533999, examinecasually1: 0.005419723629171662, reinitiaterequest1: 0.0009647178045538061, rejectrequest1: 1.5038030381048534e-10, paycompensation1: 4.511175132201897e-11, registerrequest1: 0.0, source1: 0.0})

so we have probability 92,9% to “sink” in 10 days after a decision.

Extensive Playout of a Process Tree

An extensive playout operation permits to obtain (up to the provided limits) the entire language of the process model. Doing an extensive playout operation on a Petri net can be incredibly expensive (the reachability graph needs to be explored). Process trees, with their bottom-up structure, permit to obtain the entire language of an event log in a much easier way, starting from the language of the leafs (that is obvious) and then following specific merge rules for the operators.

However, since the language of a process tree can be incredibly vast (when parallel operators are involved) or also infinite (when loops are involved), the extensive playouts is possible up to some limits:

  • A specification of the maximum number of occurrences for a loop must be done, if a loop is there. This stops an extensive playout operation at the given number of occurences.
  • Since the number of different executions, when loops are involved, is still incredibly big, it is possible to specify the maximum length of a trace to be returned. So, traces that are above the maximum length are automatically discarded.
  • For further limiting the number of different executions, the maximum number of traces returned by the algorithm might be provided.

Moreover, from the structure of the process tree, it is easy to infer the minimum length of a trace allowed by the process model (always following the bottom-up approach).

Some reasonable settings for the extensive playout are the following:

  • Overall, the maximum number of traces returned by the algorithm is set to 100000.
  • The maximum length of a trace that is an output of the playout is, by default, set to the minimum length of a trace accepted by a process tree.
  • The maximum number of loops is set to be the minimum length of a trace divided by two.

The list of parameters are:

Parameter Description
MAX_LIMIT_NUM_TRACES Maximum number of traces that are returned by the algorithm.
MAX_TRACE_LENGTH Maximum length of a trace that is output of the algorithm.
MAX_LOOP_OCC Maximum number of times we enter in a loop.
In the following, we see how the playout can be executed. First, a log can be imported:

from pm4py.objects.log.importer.xes import importer as xes_importer
import os

log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
                                    
Then, a process tree can be discovered using the inductive miner algorithm.

from pm4py.algo.discovery.inductive import algorithm as inductive_miner

tree = inductive_miner.apply_tree(log)
                                    
We specify to retrieve traces of length at most equal to 3, and we want to retrieve at most 100000 traces.

from pm4py.simulation.tree_playout import algorithm as tree_playout

playout_variant = tree_playout.Variants.EXTENSIVE
param = tree_playout.Variants.EXTENSIVE.value.Parameters

simulated_log = tree_playout.apply(tree, variant=playout_variant,
                                   parameters={param.MAX_TRACE_LENGTH: 3, param.MAX_LIMIT_NUM_TRACES: 100000})
print(len(simulated_log))
                                    

At this point, the extensive playout operation is done.

Social Network Analysis

In PM4Py we offer support for different Social Network Analysis metrics, and support for the discovery of roles.

Handover of Work

The Handover of Work metric measures how many times an individual is followed by another individual in the execution of a business process. To calculate the Handover of Work metric, the following code could be used:

from pm4py.algo.enhancement.sna import algorithm as sna
hw_values = sna.apply(log, variant=sna.Variants.HANDOVER_LOG)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_hw_py = sna_visualizer.apply(hw_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_hw_py, variant=sna_visualizer.Variants.PYVIS)
                                    

Subcontracting

The subcontracting metric calculates how many times the work of an individual is interleaved by the work of some other individual, only to eventually “return” to the original individual. To measure the subcontracting metric, the following code could be used:

from pm4py.algo.enhancement.sna import algorithm as sna
sub_values = sna.apply(log, variant=sna.Variants.SUBCONTRACTING_LOG)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_sub_py = sna_visualizer.apply(sub_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_sub_py, variant=sna_visualizer.Variants.PYVIS)
                                    

Working Together

The Working together metric calculates how many times two individuals work together for resolving a process instance. To measure the Working Together metric, the following code could be used:

from pm4py.algo.enhancement.sna import algorithm as sna
wt_values = sna.apply(log, variant=sna.Variants.WORKING_TOGETHER_LOG)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_wt_py = sna_visualizer.apply(wt_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_wt_py, variant=sna_visualizer.Variants.PYVIS)
                                    

Similar Activities

The Similar Activities metric calculates how much similar is the work pattern between two individuals. To measure the Similar Activities metric, the following code could be used:

from pm4py.algo.enhancement.sna import algorithm as sna
ja_values = sna.apply(log, variant=sna.Variants.JOINTACTIVITIES_LOG)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_ja_py = sna_visualizer.apply(ja_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_ja_py, variant=sna_visualizer.Variants.PYVIS)
                                    

Roles Discovery

A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:

An article on roles detection, that has inspired the technique implemented in PM4Py, is:

Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.

  • In understanding which activities are executed by which roles.
  • By understanding roles itself (numerosity of resources for a single activity may not provide enough explanation)

Initially, each activity corresponds to a different role, and is associated to the multiset of his originators. After that, roles are merged according to their similarity, until no more merges are possible.

First, you need to import a log:

from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
                                    
After that, the role detection algorithm can be applied:

from pm4py.algo.enhancement.roles import algorithm as roles_discovery
roles = roles_discovery.apply(log)
                                    

We can print the sets of activities that are grouped in roles by doing print([x[0] for x in roles]).