Supported/Described Version(s): PM4Py 2.2.1 This documentation assumes that the reader has a basic understanding of process
mining
and python concepts.
Handling Event Data
Since PM4Py 1.5.x, we offer a simplified interface to import/export event logs.
This provides a restricted set of choices in comparison to the normal interface.
Moreover, we provide a simplified interface to convert the formats of the
log objects.
The methods highlighted by ** are
both required when importing a CSV file.
In this section, information about importing and exporting event logs, stored in various
data formats, is presented. Before we dive into the details of importing and exporting
various different types of files containing event data, we first briefly explain the two
basic notions of event data used within PM4Py.
We assume the reader to be farmiliar with the general concept of an event log.
In general, we distingiush between two different event data object types:
Event Stream (objects.log.log.EventStream); Simply represents a sequence
of events.
Events themselves are simply an extension of the Mapping class of python (collections.abc.Mapping),
which allows
us to use events as a dict. From a programming perspective, an Event
Stream behaves exactly like a list object in Python.
However, when applying lambda functions, the result needs to be explicitly casted to
an EventStream object.
Event Log (objects.log.log.EventLog); Represents a sequence of
sequences of events.
The concept of an event log is the more traditional view on event data, i.e.,
executions of a process are captured in traces of events.
However, in PM4Py, the Event Log maintains an order of traces. In this way, sorting
traces using some specific sorting criterion is supported naturally, and, lambda
functions and filters are easily applied on top of Event Logs as well.
Importing IEEE XES files
IEEE XES is a standard format describing how event logs are stored.
For more information about the format, please study the IEEE XES Website.
A simple synthetic event log can be downloaded from here.
Note that several real event logs have been made available, over the past few
years.
You can find them here.
The example code on the right shows how to import an event log, stored in the IEEE
XES format, given a file path to the log file.
The code fragment uses the standard importer (iterparse, described in a later
paragraph).
Note that IEEE XES Event Logs are imported into an Event Log object, i.e., as
described earlier.
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply('<path_to_xes_file.xes>')
Event logs are stored as an extension of the Python
list data structure. To access a trace in the log, it is enough to provide its
index in
the event log. Consider the example on the right on how to access the different
objects stored in the imported log.
print(log[0]) #prints the first trace of the log
print(log[0][0]) #prints the first event of the first trace
The apply() method of the xes_importer, i.e. located
in
pm4py.objects.log.importer.xes.importer.py, contains two optional
parameters: variant and parameters. The
variant parameter indicates which variant of the importer to use.
The parameters parameter is a Python dictionary,
specifying specific parameters of choice.
from pm4py.objects.log.importer.xes import importer as xes_importer
variant = xes_importer.Variants.ITERPARSE
parameters = {variant.value.Parameters.TIMESTAMP_SORT: True}
log = xes_importer.apply('<path_to_xes_file>',
variant=variant, parameters=parameters)
This method invocation style is used
throughout PM4Py in the various different algorithms implemented, i.e., by wrapping
around
the different implementations, new variants of algorithms are easily called, using
previously written
PM4Py code.
W.r.t. XES importers, two variants are provided.
One implementation is based on the iterparse() function of xml.etree.
The other variant is a line-by-line, custom parser (for improved
performance). It does not follow the standard and is able to import traces,
simple trace attributes, events, and simple event attributes.
To specify a variant, we add the following argument to the call to the importer:
variant=xes_importer.Variants.ITERPARSE (note that, in the example
code, this is encapsulated in local varaible variant).
The xes_importer.Variants.ITERPARSE-value, actually maps on the
underlying Python module, implementing the iterparse-based importer.
We are able to access that reference, by accessing the value property,
e.g., xes_importer.Variants.ITERPARSE.value.
That module, contains a parameter definition, i.e., Parameters,
containing all possible parameters for the iterparse-variant.
As an example, parameter TIMESTAMP_SORT is one of those, accessed by
xes_importer.Variants.ITERPARSE.value.Parameters.TIMESTAMP_SORT.
Click the button below, to reveal all variants and corresponding parameters defined
for importing IEEE XES files.
Variant
Parameter Key
Type
Default
Description
Iterparse (ITERPARSE)
TIMESTAMP_SORT
boolean
False
If True, the log is sorted by timestamp.
TIMESTAMP_KEY
string
'time:timestamp'
If timestamp_sort is True, then using this
event-attribute key to read timestamps.
REVERSE_SORT
boolean
False
If True, the sorting is inverted.
INSERT_TRACE_INDICES
boolean
False
If True, trace indices are added as an event attribute
for each
event
MAX_TRACES
integer
1000000000
Maximum number of traces to import from the log
Line-By-Line (LINE_BY_LINE)
TIMESTAMP_SORT
boolean
False
(Same as Iterparse)
TIMESTAMP_KEY
string
'time:timestamp'
(Same as Iterparse)
REVERSE_SORT
boolean
False
(Same as Iterparse)
INSERT_TRACE_INDICES
boolean
False
(Same as Iterparse)
MAX_TRACES
integer
1000000000
(Same as Iterparse)
MAX_BYTES
integer
100000000000
Maximum number of bytes to read
Importing CSV files
Apart from the IEEE XES standard, a lot of event logs are actually stored in a CSV
file.
In general, there is two ways to deal with CSV files in PM4Py:
Import the CSV into a pandasDataFrame;
In general, most existing algorithms in PM4Py are coded to be flexible in terms
of their
input, i.e., if a certain event log object is provided that is not in the right
form, we
translate it to the appropriate form for you.
Hence, after importing a dataframe, most algorithms are directly able to work
with the
data frame.
Convert the CSV into an event log object (similar to the result of the IEEE XES
importer
presented in the previous section);
In this case, the first step is to import the CSV file using pandas (similar to
the
previous bullet) and subsequently converting it to the event log object.
In the remainder of this section, we briefly highlight how to convert a pandas
DataFrame
to an event log.
Note that most algorithms use the same type of conversion, in case a given
event data
object is not of the right type.
To convert objects in PM4Py, there is a dedicated package, i.e.,
objects.conversion.
The conversion package allows one to convert an object of a certain type to a new
object of
a different type (if such a conversion is applicable).
Within the conversion package, a standard naming convention is applied, i.e., the
type of
the input object defines the package in which the code resides.
Thus, since we assume that the imported DataFrame represents an event log, we find
the
appropriate conversion in the objects.conversion.log package.
The example code on the right shows how to convert a CSV file into the PM4Py
internal event data object types.
By default, the converter converts the dataframe to an Event Log object (i.e., not
an Event Stream).
Actually, we suggest to sort the dataframe by its timestamp column. In the example on
the right,
it is assumed that the timestamp column is timestamp. This ensures that events
are sorted by
their timestamp.
import pandas as pd
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter
log_csv = pd.read_csv('<path_to_csv_file.csv>', sep=',')
log_csv = dataframe_utils.convert_timestamp_columns_in_df(log_csv)
log_csv = log_csv.sort_values('<timestamp_column>')
event_log = log_converter.apply(log_csv)
Note that the example code above does not directly work in a lot of cases.
There are a few reasons for this.
First of all, a CSV-file, by definition, is more close to an Event Stream, i.e., it
represents a sequence of events.
Since an event log 'glues' events together that belong to the same case, i.e., into
a trace of events, we need to specify to the converter what attribute to use for
this.
The parameter we need to set for this, i.e., in the converter is the
CASE_ID_KEY
parameter.
Its default value is 'case:concept:name'.
Hence, when our input event data, stored in a csv-file has a column
with the name
case:concept:name, that column is used to define traces.
Therefore, let us consider a very simple example event log, and, assume it is stored
as a csv-file:
case
activity
timestamp
clientID
1
register request
20200422T0455
1337
2
register request
20200422T0457
1479
1
submit payment
20200422T0503
1337
In this small example table, we observe four columns, i.e., case,
activity,
timestamp and clientID.
Clearly, when importing the data and converting it to an Event Log object, we aim to
combine all rows (events) with the same value for the case column
together.
Hence, the default value of the CASE_ID_KEY parameter is not set to the
right value.
Another interesting phenomenon in the example data is the fourth column, i.e.,
clientID.
In fact, the client ID is an attribute that will not change over the course of
execution
a process instance, i.e., it is a case-level attribute.
PM4Py allows us to specify that a column actually describes a case-level attribute
(under the assumption that the attribute does not change during the execution of a
process).
However, for this, we need to specify an additional parameter, i.e., the CASE_ATTRIBUTE_PREFIX
parameter, with default value 'case:'.
The example code on the right shows how to convert the previously examplified csv
data file.
After loading the csv file of the example table, we rename the clientID
column to case:clientID (this is a specific operation provided by
pandas!).
Then, we specify that the column identifying the case identifier attribute is the
column with name 'case'.
Note that the full parameter path is log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY
import pandas as pd
from pm4py.objects.conversion.log import converter as log_converter
log_csv = pd.read_csv('<path_to_csv_file.csv>', sep=',')
log_csv.rename(columns={'clientID': 'case:clientID'}, inplace=True)
parameters = {log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY: 'case'}
event_log = log_converter.apply(log_csv, parameters=parameters, variant=log_converter.Variants.TO_EVENT_LOG)
In case we would like to use a different prefix for the case-level attributes, e.g.,
'caseAttr',
we can do so by mapping the CASE_ATTRIBUTE_PREFIX (full path: log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ATTRIBUTE_PREFIX)
to the value 'caseAttr'.
Note that in the call to the converter, in this case, we explicitly set the variant
to be used, e.g., log_converter.Variants.TO_EVENT_LOG.
Finally, note that any type of data format that can be parsed to a Pandas
DataFrame, is supported by PM4Py.
Converting Event Data
In this section, we describe how to convert event log objects from one object type
to another object type.
As mentioned in the previous section, the conversion functionality of event logs is
located in pm4py.objects.conversion.log.converter.
There are three objects, which we are able to 'switch' between, i.e., Event Log,
Event Stream and Data Frame objects.
Please refer to the previous code snippet for an example of applying log conversion
(applied when importing a CSV object).
Finally, note that most algorithms internally use the converters, in order to be
able to handle an input event data object of any form.
In such a case, the default parameters are used.
Variant
Parameter Key
Type
Default
Description
TO_EVENT_LOG
STREAM_POST_PROCESSING
boolean
False
Removes events that have no type information.
CASE_ATTRIBUTE_PREFIX
string
'case:'
Any attribute (column in case of DF) with the prefix 'case:' is
stored as a trace attribute.
CASE_ID_KEY
string
'case:concept:name'
Attribute (column in case of DF) that needs to be used to define
traces.
DEEP_COPY
boolean
False
If set to True objects will be created using a
deep-copy (if applicable).
Avoids side-effects (specifically when converting an Event Stream to
an Event Log).
TO_EVENT_STREAM
STREAM_POST_PROCESSING
boolean
False
(Same as TO_EVENT_LOG)
CASE_ATTRIBUTE_PREFIX
string
'case:'
Any trace attribute (in case of converting an Event Log to an Event
Stream object) will get this prefix.
Not applicable if we translate a DataFrame to an Event Stream
object.
DEEP_COPY
boolean
False
(Same as TO_EVENT_LOG)
TO_DATA_FRAME
CASE_ATTRIBUTE_PREFIX
string
'case:'
(Same as TO_EVENT_STREAM; will only be applied if input is an Event
Log object, i.e., which will first be translated to an Event Stream
Object.)
DEEP_COPY
boolean
False
(Same as TO_EVENT_STREAM)
Exporting IEEE XES files
Exporting an Event Log object to an IEEE Xes file is fairly straightforward in PM4Py.
Consider the example code fragment on the right, which depicts this
functionality.
from pm4py.objects.log.exporter.xes import exporter as xes_exporter
xes_exporter.apply(log, '<path_to_exported_log.xes>')
In the example, the log object is assumed to be an Event Log object.
The exporter also accepts an Event Stream or DataFrame object as an input.
However, the exporter will first convert the given input object into an Event Log.
Hence, in this case, standard parameters for the conversion are used.
Thus, if the user wants more control, it is advisable to apply the conversion to
Event Log, prior to exporting.
Variant
Parameter Key
Type
Default
Description
ETree (ETREE)
COMPRESS
boolean
False
If True, the log is stored as a 'xes.gz' file.
Exporting logs to CSV
To export an event log to a csv-file, PM4Py uses Pandas.
Hence, an event log is first converted to a Pandas Data Frame, after which it is
written to disk.
import pandas as pd
from pm4py.objects.conversion.log import converter as log_converter
dataframe = log_converter.apply(log, variant=log_converter.Variants.TO_DATA_FRAME)
dataframe.to_csv('<path_to_csv_file.csv>')
In case an event log object is provided that is not a dataframe, i.e., an Event Log
or Event Stream, the conversion is applied, using the default parameter values,
i.e., as presented in the Converting
Event Data section.
Note that exporting event data to as csv file has no parameters.
In case more control over the conversion is needed, please apply a conversion to
dataframe first, prior to exporting to csv.
I/O with Other File Types
At this moment, I/O of any format supported by Pandas (dataframes) is implicitly
supported.
As long as data can be loaded into a Pandas dataframe, PM4Py is reasonably able to work
with such files.
Generic Event Data Manipulation
Since Event Logs and Event Streams are iterables (note: this does not apply for
dataframes), they are applicable to be used in combination with lambda
functions.
However, as they contain more information (such as log-level attributes), directly
appying, e.g., a filter, does not work.
Therefore, a utility package is available that wraps around filtering/maps/sorting
in order to combine this functionality with Event Logs.
The code is located in pm4py.objects.log.util.func
Consider the code fragment on the right, which first imports an event log and then
filters out each trace with a length
shorter than three.
The func.filter_ function mimics the built-in Python function filter().
However, it returns the filtered list of traces, included in an Event Log (or Event
Stream) object.
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.util import func
log = xes_importer.apply('<path_to_imported_log.xes>')
log = func.filter_(lambda t: len(t) > 2, log)
Apart from the filter_-function, the
pm4py.objects.log.util.func package provides a map_ and a
sort_ function.
Filtering Event Data
PM4Py also has various specific methods to filter an event log.
Since PM4Py 1.5.x, we offer a simplified interface to filter logs.
This provides a restricted set of choices in comparison to the normal interface.
Filter a log object on the values of some attribute
Parameters: log - Log object attribute - Attribute to filter values - List containing the admitted (or forbidden)
values how - Specifies how the filter should be applied (the string
cases filters the cases where at least one occurrence
happens, events filters the events eventually trimming the
cases) positive - Boolean specifying if the values should be kept
or removed
Filter cases having an end activity in the provided list.
Parameters: log - Log object dt1 - Left extreme of the interval dt2 - Right extreme of the interval how - Modality of filtering (events, traces_contained,
traces_intersecting)
Parameters: log - Log object allowed_paths - List of allowed/forbidden paths (list of
tuples containing two activities) positive - Parameter that says whether the paths should be
kept/removed
Returns:
Dictionary of start activities along with their count.
variants = pm4py.get_variants(log)
Gets the variants from the log.
Parameters: log - Log object
Returns:
Dictionary of variants along with their count.
Filtering on timeframe
In the following paragraph, various methods regarding filtering with time
frames are present. For each of the methods, the log and Pandas
Dataframe methods are revealed.
One might be interested in only keeping the traces that are contained in
a specific interval, e.g. 09 March 2011 and 18 January 2012. The first code snippet
works for a log object, the second one for a dataframe object.
However, it is also possible to keep the traces that are intersecting with a
time interval. The first example is again for log objects, the second one for
dataframe objects.
Until now, only trace based techniques have been discussed. However,
there is a method to keep the events that are contained in specific
timeframe. As previously mentioned, the first code snippet provides information
about how to apply this technique on log objects, whereby the second snippets
provides information about how to apply this on dataframe objects.
This filter permits to keep only traces with duration that is inside a specified
interval. In the examples, traces between 1 and 10 days are kept.
Note that the time parameters are given in seconds. The first code snippet applies
this technique on log object, the second one on a dataframe object.
from pm4py.algo.filtering.log.cases import case_filter
filtered_log = case_filter.filter_case_performance(log, 86400, 864000)
In general, PM4Py offers two methods to filter a log or a dataframe on start activities. In
the first method, a list of start activities has to be specified. On the activities that are
contained in the list, the filter is applied on. In the second method, a decreasing factor
is used. An explanation can be inspected by clicking on the button below.
Suppose the following start activity and their respective occurrences.
Activity
Number of occurrences
A
1000
B
700
C
300
D
50
Assume DECREASING_FACTOR to be 0.6. The most frequent start activity is
kept, A in this
case. Then, the number of
occurrences of the next frequent activity is divided by the number of
occurrences of this activity. Therefore, the computation is 700/1000=0.7. Since
0.7>0.6, B is kept as admissible start activity. In the next step, the number of
occurrences of activity C and B are compared. In this case 300/700≈0.43.
Since 0.43<0.6, C is not accepted as admissible start activity and the method
stops here.
First of all, it might be necessary to know the starting activities. Therefore, code
snippets are provided. Subsequently, an example of filtering is provided. The first
snippet is working with log object, the second one is working on a dataframe.
log_start is a dictionary that contains as key the activity and as
value the number of occurrence.
from pm4py.algo.filtering.log.start_activities import start_activities_filter
log_start = start_activities_filter.get_start_activities(log)
filtered_log = start_activities_filter.apply(log, ["S1"]) #suppose "S1" is the start activity you want to filter on
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
log_start = start_activities_filter.get_start_activities(dataframe)
df_start_activities = start_activities_filter.apply(dataframe, ["S1"],
parameters={start_activities_filter.Parameters.CASE_ID_KEY: "case:concept:name",
start_activities_filter.Parameters.ACTIVITY_KEY: "concept:name"}) #suppose "S1" is the start activity you want to filter on
As mentioned earlier, there is also a method that aims to keep the frequent start
activities. Again, the first snippet is about a log object, the second is about a
dataframe object. The default value for DECREASING_FACTOR is 0.6.
from pm4py.algo.filtering.log.start_activities import start_activities_filter
log_af_sa = start_activities_filter.apply_auto_filter
(log, parameters={start_activities_filter.Parameters.DECREASING_FACTOR: 0.6})
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
df_auto_sa = start_activities_filter.apply_auto_filter
(dataframe, parameters={start_activities_filter.Parameters.DECREASING_FACTOR: 0.6})
Filter on end activities
In general, PM4Py offers two methods to filter a log or a dataframe on end activities. In
the first method, a list of end activities has to be specified. On the activities that are
contained in the list, the filter is applied on. In the second method, a decreasing factor
is used. An explanation can be inspected by clicking on the button in the start activity
section.
This filter permits to keep only traces with an end activity among a set of specified
activities. First of all, it might be necessary to know the end activities.
Therefore, code snippets are provided. Subsequently, an example of
filtering is provided. Here, for the dataframe filtering, a further attribute
specification is possible: case:concept:name is in this case the column
of the dataframe that is the Case ID, concept:name is the column of the
dataframe that is the activity.
A variant is a set of cases that share the same control-flow perspective, so a set of cases
that share the same classified events (activities) in the same order. In this section, we
will focus for all methods first on log objects, then we will continue with the
dataframe.
To get the list of variants contained in a given log, the following code could be
used. The first code is for an log object, the second for a dataframe. The result is
expressed as
a dictionary having as key the variant and as value the list of cases that share the
variant.
from pm4py.algo.filtering.log.variants import variants_filter
variants = variants_filter.get_variants(log)
If the number of occurrences of the variants is of interest, the following code
retrieves a
list of variants along with their count (so, a dictionary which key is the variant
and the value is the number of occurrences).
To filter based on variants, assume that variants is a list, whereby
each element is a variant (expressed in an equal way as in the variants retrieval
method).
The first method can be applied on log objects, the
second can be applied on dataframe objects. Note that the variants given in variants
are kept.
from pm4py.algo.filtering.log.variants import variants_filter
filtered_log1 = variants_filter.apply(log, variants)
A filter to keep automatically the most common variants could be applied through the
apply_auto_filter method. This method accepts a parameter called
DECREASING_FACTOR
(default value is 0.6; further details are provided in the start activities
filter).
On the event log objects, a filter on the variants percentage can be applied as shown in the
following example.
The percentage of variants to keep must be specified in the percentage parameter as a
number between
0 (only the most frequent variant is kept) and 1 (all the variants are kept).
from pm4py.algo.filtering.log.variants import variants_filter
filtered_log = variants_filter.filter_log_variants_percentage(log, percentage=0.5)
To be more explicit, the variants filter on percentage works as follow, given the percentage
P:
The variants of the log are found along with their number of occurrences
A number N is chosen such that if we take all the variants with at least N occurrences, we
include a percentage of cases that is at least P, while if we choose N+1 we would include a
percentage of cases that is below P.
with percentage=1, all the 20 cases would have been kept.
If we choose percentage=0.1, and N=1, then we include all the cases, while choosing N=2 we
include only the cases of the first variant (that are the 5% of the log, hence N=2 is not
valid according to the above principle).
If we choose percentage=0.05, and N=2, then we include exactly 5% of the cases of the log,
that is the minimum requirement.
Filter on attributes values
Filtering on attributes values permits alternatively to:
Keep cases that contains at least an event with one of the given attribute values
Remove cases that contains an event with one of the the given attribute values
Keep events (trimming traces) that have one of the given attribute values
Remove events (trimming traces) that have one of the given attribute values
Example of attributes are the resource (generally contained in org:resource attribute) and
the activity (generally contained in concept:name attribute). As noted before, the first
method can be applied on log objects, the second on dataframe objects.
To get the list of resources and activities contained in the log, the following code
could be used.
To apply automatically a filter on events attributes (trimming traces and keeping
only events containing the attribute with a frequent value), the apply_auto_filter
method is provided. The method accepts as parameters the attribute name and the
DECREASING_FACTOR (default 0.6; an explanation could be found on the start
activities
filter).
Filtering on numeric attribute values provide options that are similar to filtering on string
attribute values (that we already considered).
First, we import, the log. Subsequently, we want to keep only the events satisfying
an amount comprised between 34 and 36. An additional filter aims to to keep only
cases with at least an event satisfying the specified amount. The filter on cases
provide the option to specify up to two attributes that are checked on the events
that shall satisfy the numeric range. For example, if we are interested in cases
having an event with activity Add penalty that has an amount between 34 and 500, a
code snippet is also provided.
Since PM4Py 1.5.x, we offer a simplified interface for process discovery.
This provides a restricted set of choices in comparison to the normal interface.
Moreover, we offer a simplified interface to visualize and save the visualizations of the
process models.
Parameters: dfg - DFG object start_activities - Start activities end_activities - End activities format - Format of the visualization (default: png) log - Log object (if provided, is used to decorate the
frequency of activities)
Parameters: dfg - DFG object start_activities - Start activities end_activities - End activities file_path - Destination path log - Log object (if provided, is used to decorate the
frequency of activities)
Process Discovery algorithms want to find a suitable process model that describes the
order of events/activities that are executed during a process execution.
In the following, we made up an overview to visualize the advantages and disadvantages of
the
mining algorithms.
Alpha
Alpha+
Heuristic
Inductive
Cannot handle loops of length one and length two
Can handle loops of length one and length two
Takes frequency into account
Can handle invisible tasks
Invisible and duplicated tasks cannot be discovered
Invisible and duplicated tasks cannot be discovered
Detects short loops
Model is sound
Discovered model might not be sound
Discovered model might not be sound
Does not guarantee a sound model
Most used process mining algorithm
Weak against noise
Weak against noise
Alpha Miner
The alpha miner is one of the most known Process Discovery algorithm and is able to find:
A Petri net model where all the transitions are visible and unique and correspond to
classified events (for example, to activities).
An initial marking that describes the status of the Petri net model when a execution
starts.
A final marking that describes the status of the Petri net model when a execution
ends.
We provide an example where a log is read, the Alpha algorithm is applied and the Petri net
along with the initial and the final marking are found. The log we take as input is the
running-example.xes.
First, the log has to be imported.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
Subsequently, the Alpha Miner is applied.
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
net, initial_marking, final_marking = alpha_miner.apply(log)
Inductive Miner
In PM4Py, we offer an implementation of the inductive miner (IM), of the inductive miner
infrequent (IMf),
and of the inductive miner directly-follows (IMd) algorithm. The papers describing the
approaches are
the following:
The basic idea of
Inductive Miner is about detecting a 'cut' in the log (e.g. sequential cut, parallel cut,
concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut,
until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs
but uses the Directly Follows graph.
Inductive miner models usually make extensive use of hidden transitions, especially for
skipping/looping on a portion on the model. Furthermore, each visible transition has a
unique label (there are no transitions in the model that share the same label).
Two process models can be derived: Petri Net and Process Tree.
To mine a Petri Net, we provide an example. A log is read, the inductive miner is applied
and the
Petri net along with the initial and the final marking are found. The log we take as
input is the running-example.xes.
First, the log is read, then the inductive miner algorithm is applied.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
net, initial_marking, final_marking = inductive_miner.apply(log)
To obtain a process tree, the provided code snippet can be used. The last two lines
of code are responsible for the visualization of the process tree.
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.visualization.process_tree import visualizer as pt_visualizer
tree = inductive_miner.apply_tree(log)
gviz = pt_visualizer.apply(tree)
pt_visualizer.view(gviz)
It is also possible to convert a process tree into a petri net.
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, initial_marking, final_marking = pt_converter.apply(tree, variant=pt_converter.Variants.TO_PETRI_NET)
As we said in the introduction of the approach, the available variants inside PM4Py are
three:
Variant
Description
Variants.IM
Produces a model with perfect replay fitness.
Variants.IMf
Produces a more precise model, without fitness guarantees, by eliminating some
behavior.
Variants.IMd
A variant of inductive miner that considers only the directly-follows graph, for maximum
performance. However, replay fitness guarantees are lost.
The list of parameters for such variants are:
Variant
Parameter
Description
Variants.IM
Parameters.ACTIVITY_KEY
The name of the attribute to be used as activity for process discovery.
Variants.IMf
Parameters.ACTIVITY_KEY
The name of the attribute to be used as activity for process discovery.
Variants.IMf
Parameters.NOISE_THRESHOLD
The noise threshold (between 0.0 and 1.0) to be used. Default: 0.2
Variants.IMd
Parameters.ACTIVITY_KEY
The name of the attribute to be used as activity for process discovery.
Heuristic Miner
Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to
handle with noise and to find common constructs (dependency between two activities, AND).
The output of the Heuristics Miner is an Heuristics Net, so an object that contains the
activities and the relationships between them. The Heuristics Net can be then converted into
a Petri net. The paper can be visited by clicking on the upcoming link: this
link).
It is possible to obtain a Heuristic Net and a Petri Net.
To apply the Heuristics Miner to discover an Heuristics Net, it is necessary to
import a log. Then, a Heuristic Net can be found. There are also numerous
possible parameters that can be inspected by clicking on the following button.
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log_path = os.path.join("tests", "compressed_input_data", "09_a32f0n00.xes.gz")
log = xes_importer.apply(log_path)
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
heu_net = heuristics_miner.apply_heu(log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})
Parameter name
Meaning
DEPENDENCY_THRESH
dependency threshold of the Heuristics Miner (default: 0.5)
AND_MEASURE_THRESH
AND measure threshold of the Heuristics Miner (default: 0.65)
MIN_ACT_COUNT
minimum number of occurrences of an activity to be considered
(default: 1)
MIN_DFG_OCCURRENCES
minimum number of occurrences of an edge to be considered (default:
1)
DFG_PRE_CLEANING_NOISE_THRESH
cleaning threshold of the DFG (in order to remove weaker edges,
default 0.05)
LOOP_LENGTH_TWO_THRESH
thresholds for the loops of length 2
To visualize the Heuristic Net, code is also provided on the right-hand side.
from pm4py.visualization.heuristics_net import visualizer as hn_visualizer
gviz = hn_visualizer.apply(heu_net)
hn_visualizer.view(gviz)
To obtain a Petri Net that is based on the Heuristics Miner, the code on the right
hand side can be used. Also this Petri Net can be visualized.
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
net, im, fm = heuristics_miner.apply(log, parameters={heuristics_miner.Variants.CLASSIC.value.Parameters.DEPENDENCY_THRESH: 0.99})
from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, im, fm)
pn_visualizer.view(gviz)
Directly-Follows Graph
Process models modeled using Petri nets have a well-defined semantic: a process execution
starts from the places included in the initial marking and finishes at the places included
in the final marking. In this section, another class of process models, Directly-Follows
Graphs, are introduced. Directly-Follows graphs are graphs where the nodes represent the
events/activities in the log and directed edges are present between nodes if there is at
least a trace in the log where the source event/activity is followed by the target
event/activity. On top of these directed edges, it is easy to represent metrics like
frequency (counting the number of times the source event/activity is followed by the target
event/activity) and performance (some aggregation, for example, the mean, of time
inter-lapsed between the two events/activities).
First, we have to import the log. Subsequently, we can extract the Directly-Follows
Graph. In addition, code is provided to visualize the Directly-Follows
Graph. This visualization is a colored visualization of the Directly-Follows graph
that is
decorated with the frequency of activities.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg = dfg_discovery.apply(log)
from pm4py.visualization.dfg import visualizer as dfg_visualization
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.FREQUENCY)
dfg_visualization.view(gviz)
To get a Directly-Follows graph decorated with the performance between the edges, two
parameters of the previous code have to be replaced.
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualization
dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.PERFORMANCE)
dfg_visualization.view(gviz)
To save the obtained DFG, for instance in the SVG format, code is also provided on
the right-hand side.
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_visualization
dfg = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
parameters = {dfg_visualization.Variants.PERFORMANCE.value.Parameters.FORMAT: "svg"}
gviz = dfg_visualization.apply(dfg, log=log, variant=dfg_visualization.Variants.PERFORMANCE, parameters=parameters)
dfg_visualization.save(gviz, "dfg.svg")
Adding information about Frequency/Performance
Similar to the Directly-Follows graph, it is also possible to decorate the Petri net with
frequency or performance information. This is done by using a replay technique on the model
and then assigning frequency/performance to the paths. The variant parameter of the visualizer
specifies which annotation should be used. The values for the variant parameter are the
following:
pn_visualizer.Variants.WO_DECORATION: This is the default value and indicates that the Petri
net is not
decorated.
pn_visualizer.Variants.FREQUENCY: This indicates that the model should be decorated
according to frequency
information obtained by applying replay.
pn_visualizer.Variants.PERFORMANCE: This indicates that the model should be decorated
according to performance
(aggregated by mean) information obtained by applying replay.
In the case the frequency and performance decoration are chosen, it is required to pass the
log as a parameter of the visualization (it needs to be replayed).
The code on the right-hand side can be used to obtain the Petri net mined by the
Inductive Miner decorated with frequency information.
from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters, variant=pn_visualizer.Variants.FREQUENCY, log=log)
pn_visualizer.save(gviz, "inductive_frequency.png")
Classifier
Algorithms implemented in pm4py assume to classify events based on their activity name, which
is usually reported inside the concept:name event attribute. In some contexts, it is useful
to use another event attribute as activity:
Importing an event log from a CSV does not assure to have a concept:name event attribute
Multiple events in a case may refer to different lifecycles of the same activity
The example on the right-hand side shows the specification of an activity key for the
Alpha Miner algorithm.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
log = xes_importer.apply(os.path.join("tests","input_data","running-example.xes"))
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: "concept:name"}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)
For logs imported from XES format, a list of fields that could be used in order to classify
events and apply Process Mining algorithms is usually reported in the classifiers
section.
The Standard classifier usually includes the activity name (the concept:name
attribute) and
the lifecycle (the lifecycle:transition attribute); the Event name classifier
includes only
the activity name.
In PM4Py, it is assumed that algorithms work on a single activity key. In order to use
multiple fields, a new attribute should be inserted for each event as the concatenation of
the two.
In the following, retrieval and insertion of a corresponding attribute regarding classifiers
are discussed.
The example on the right-hand side demonstrates the retrieval of the classifiers
inside a log file, using the receipt.xes log. The print command returns a
dictionary, whereby the corresponding classifier attribute is revealed.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","receipt.xes"))
print(log.classifiers)
To use the classifier Activity classifier and write a new attribute for each event in
the log, the following code can be used.
from pm4py.objects.log.util import insert_classifier
log, activity_key = insert_classifier.insert_activity_classifier_attribute(log, "Activity classifier")
Then, as before, the Alpha Miner can be applied on the log specifying the newly
inserted activity key.
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: activity_key}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)
In the following, a technique is shown to insert a new attribute manually.
In the case, the XES specifies no classifiers, and a different field should be used
as activity key, there is the option to specify it manually. For example, in this
piece of code we read the receipt.xes log and create a new attribute called
customClassifier that is the activity name plus the transition. Subsequently, the
Alpha Miner can be applied on this new classifier.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests","input_data","receipt.xes"))
for trace in log:
for event in trace:
event["customClassifier"] = event["concept:name"] + event["lifecycle:transition"]
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
parameters = {alpha_miner.Variants.ALPHA_CLASSIC.value.Parameters.ACTIVITY_KEY: "customClassifier"}
net, initial_marking, final_marking = alpha_miner.apply(log, parameters=parameters)
Correlation Miner
In Process Mining, we are used to have logs containing at least:
A case identifier
An activity
A timestamp
The case identifier associates an event, happening to a system, to a particular execution of the
process. This permits to apply algorithms such as process discovery, conformance checking, …
However, in some systems (for example, the data collected from IoT systems), it may be difficult
to associate a case identifier. On top of these logs, performing classic process mining is
impossible. Correlation mining borns as a response to the challenge to extract a process model
from such event logs, that permits to read useful information that is contained in the logs
without a case identifier, that contains only:
An activity column
A timestamp column
In this description, we assume there is a total order on events (that means that no events happen
in the same timestamp). Situations where a total order is not defined are more complicated.
The Correlation Miner is an approach proposed in:
Pourmirza, Shaya, Remco Dijkman, and Paul Grefen. “Correlation miner: mining business process
models and event correlations without case identifiers.” International Journal of Cooperative
Information Systems 26.02 (2017): 1742002.
That aims to resolve this problem by resolving an (integer) linear problem defined on top of:
The P/S matrix: expressing the relationship of order between the activities as recorded in
the log.
The Duration matrix: expressing an aggregation of the duration between two activities,
obtained by solving an optimization problem
The solution of this problem provides a set of couples of activities that are, according to the
approach, in directly-follows relationship, along with the strength of the relationship. This is
the “frequency” DFG.
A “performance” DFG can be obtained by the duration matrix, keeping only the entries that appear
in the solution of the problem (i.e., the couples of activities that appear in the “frequency”
DFG).
This can be then visualized (using for example the PM4Py DFG visualization).
To have a “realistic” example (for which we know the “real” DFG), we can take an existing log and
simply remove the case ID column .. Trying then to reconstruct the DFG without having that.
Let’s try an example of that. First, we load a CSV file into a Pandas dataframe, keeping
only the concept:name and the time:timestamp columns:
import pandas as pd
from pm4py.objects.log.util import dataframe_utils
df = pd.read_csv(os.path.join("tests", "input_data", "receipt.csv"))
df = dataframe_utils.convert_timestamp_columns_in_df(df)
df = df[["concept:name", "time:timestamp"]]
Then, we can apply the Correlation Miner approach:
from pm4py.algo.discovery.correlation_mining import algorithm as correlation_miner
frequency_dfg, performance_dfg = correlation_miner.apply(df, parameters={correlation_miner.Variants.CLASSIC.value.Parameters.ACTIVITY_KEY: "concept:name",
correlation_miner.Variants.CLASSIC.value.Parameters.TIMESTAMP_KEY: "time:timestamp"})
To better visualize the DFG, we can retrieve the frequency of activities
Visualizing the DFGs, we can say that the correlation miner was able to discover a visualization
where the main path is clear.
Different variants of the correlation miner are available:
Variant
Description
Variants.CLASSIC
Calculates the P/S matrix and the duration matrix in the classic way (the entire list of
events is used)
Variants.TRACE_BASED
Calculates the P/S matrix and the duration matrix on a classic event log,
trace-by-trace, and merges the results. The resolution of the linear problem permits to
obtain a model that is more understandable than the classic DFG calculated on top of the
log.
Variants.CLASSIC_SPLIT
Calculates the P/S matrix and the duration matrix on the entire list of events, as in
the classic version, but splits that in chunks to fasten the computation. Hence, the
generated model is less accurate (in comparison to the CLASSIC version) but the
calculation is faster. The default chunk size is 100000 events.
Temporal Profile
We propose in PM4Py an implementation of the temporal profile model. This has been described in:
Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).
A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the
provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log
where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple)
is associated to a couple of numbers, the first is the average and the second is the average standard deviation.
We provide an example of discovery for the temporal profile.
We can load an event log, and apply the discovery algorithm.
import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery
log = pm4py.read_xes("tests/input_data/running-example.xes")
parameters = {}
temporal_profile = temporal_profile_discovery.apply(log, parameters=parameters)
Some parameters can be used in order to customize the execution of the temporal profile:
Parameter Key
Type
Default
Description
Parameters.ACTIVITY_KEY
string
concept:name
The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY
string
start_timestamp
The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY
string
time:timestamp
The attribute to use as timestamp.
Petri Net management
Petri nets are one of the most common formalism to express a process model. A Petri net
is a directed bipartite graph, in which the nodes represent transitions and places. Arcs
are connecting places to transitions and transitions to places, and have an associated
weight. A transition can fire if each of its input places contains a number of tokens
that is at least equal to the weight of the arc connecting the place to the transition.
When a transition is fired, then tokens are removed from the input places according to
the weight of the input arc, and are added to the output places according to the weight
of the output arc.
A marking is a state in the Petri net that associates each place to a number of tokens
and is uniquely associated to a set of enabled transitions that could be fired according
to the marking.
Process Discovery algorithms implemented in pm4py returns a Petri net along with an
initial marking and a final marking. An initial marking is the initial state of
execution of a process, a final marking is a state that should be reached at the end of
the execution of the process.
Importing and exporting
Petri nets, along with their initial and final marking, can be imported/exported from the
PNML file format. The code on the right-hand side can be used to import a Petri net along
with the
initial and final marking.
First, we have to import the log. Subsequently, the Petri net is visualized by using
the Petri Net visualizer. In addition, the Petri net is exported with its initial
marking or initial marking and final marking.
import os
from pm4py.objects.petri.importer import importer as pnml_importer
net, initial_marking, final_marking = pnml_importer.apply(os.path.join("tests","input_data","running-example.pnml"))
from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)
from pm4py.objects.petri.exporter import exporter as pnml_exporter
pnml_exporter.apply(net, initial_marking, "petri.pnml")
pnml_exporter.apply(net, initial_marking, "petri_final.pnml", final_marking=final_marking)
Petri Net properties
This section is about how to get the properties of a Petri Net. A property of the pet is, for
example, a the enabled transition in a particular marking. However, also a list of places,
transitions or arcs can be inspected.
The list of transitions enabled in a particular marking can be obtained using the
right-hand code.
from pm4py.objects.petri import semantics
transitions = semantics.enabled_transitions(net, initial_marking)
The function print(transitions) reports that only the transition
register request is
enabled in the initial marking in the given Petri net. To obtain all places,
transitions, and arcs of the Petri net, the code which can be obtained on the
right-hand side can be used.
Each place has a name and a set of input/output arcs (connected at source/target to a
transition). Each transition has a name and a label and a set of input/output arcs
(connected at source/target to a place). The code on the right-hand side prints for
each place the name, and for each input arc of the place the name and the label of
the corresponding transition. However, there also exsits trans.name,
trans.label, arc.target.name.
for place in places:
print("\nPLACE: "+place.name)
for arc in place.in_arcs:
print(arc.source.name, arc.source.label)
Creating a new Petri Net
In this section, an overview of the code necessary to create a new Petri net with places,
transitions, and arcs is provided. A Petri net object in pm4py should be created with a
name.
The code on the right-hand side creates a Petri Net with the name
new_petri_net.
# creating an empty Petri net
from pm4py.objects.petri.petrinet import PetriNet, Marking
net = PetriNet("new_petri_net")
In addition, three places are created, namely source,
sink, and p_1. These places are added to the previously
created Petri Net.
# creating source, p_1 and sink place
source = PetriNet.Place("source")
sink = PetriNet.Place("sink")
p_1 = PetriNet.Place("p_1")
# add the places to the Petri Net
net.places.add(source)
net.places.add(sink)
net.places.add(p_1)
Similar to the places, transitions can be created. However, they need to be assigned
a name and a label.
# Create transitions
t_1 = PetriNet.Transition("name_1", "label_1")
t_2 = PetriNet.Transition("name_2", "label_2")
# Add the transitions to the Petri Net
net.transitions.add(t_1)
net.transitions.add(t_2)
Arcs that connect places with transitions or transitions with places might
be necessary. To add arcs, code is provided. The first parameter specifies the
starting point of the arc, the second parameter its target and the last parameter
states the Petri net it belongs to.
To complete the Petri net, an initial and possibly a final marking need to be
defined.
To accomplish this, we define the initial marking to contain 1 token in the source
place and the final marking to contain 1 token in the sink place.
The resulting Petri net along with the initial and final marking can be exported, or
visualized.
from pm4py.objects.petri.exporter import exporter as pnml_exporter
pnml_exporter.apply(net, initial_marking, "createdPetriNet1.pnml", final_marking=final_marking)
from pm4py.visualization.petrinet import visualizer as pn_visualizer
gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)
To obtain a specific output format (e.g. svg or png) a format parameter should be
provided to the algorithm. The code snippet explains how to obtain an SVG
representation of the Petri net. The last lines provide an option to save the
visualization of the model.
from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT:"svg"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters)
pn_visualizer.view(gviz)
from pm4py.visualization.petrinet import visualizer as pn_visualizer
parameters = {pn_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "svg"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters)
pn_visualizer.save(gviz, "alpha.svg")
Maximal Decomposition
The decomposition technique proposed in this section
is useful for conformance checking purpose. Indeed, splitting
the overall model in smaller models can reduce the size of the
state space, hence increasing the performance of the conformance checking operation.
We propose to use the decomposition technique (maximal decomposition of a Petri net) described
in:
Van der Aalst, Wil MP. “Decomposing Petri nets for process mining: A generic approach.”
Distributed and Parallel Databases 31.4 (2013): 471-507.
We can see an example of maximal decomposition on top of the Petri net extracted by
the Alpha Miner on top of the Running Example log.
Let’s first load the running example log and apply the Alpha Miner.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
net, im, fm = alpha_miner.apply(log)
Then, the decomposition can be found using:
from pm4py.objects.petri.decomposition import decompose
list_nets = decompose(net, im, fm)
If we want to represent each one of the Petri nets, we can use a FOR loop:
from pm4py.visualization.petrinet import visualizer
gviz = []
for index, model in enumerate(list_nets):
subnet, s_im, s_fm = model
gviz.append(visualizer.apply(subnet, s_im, s_fm, parameters={visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "png"}))
visualizer.save(gviz[-1], str(index)+".png")
A log that is fit according to the original model is also fit (projecting on the activities of
the net) for these nets. Conversely, any deviation on top of these models represent a deviation
also on the original model.
Reachability Graph
A reachability graph is a transition system that can constructed on any
Petri net along with an initial marking, and is the graph of all the
markings of the Petri net. These markings are connected by as many edges
as many transitions connect the two different markings.
The main goal of the reachability graph is to provide an understanding of the state space
of the Petri net. Usually, Petri nets containing a lot of concurrency have
an incredibly big reachability graph. The same computation of the reachability
graph may be unfeasible for such models.
The calculation of the reachability graph, having the Petri net
and the initial marking, can be done with the
following code:
from pm4py.objects.petri import reachability_graph
ts = reachability_graph.construct_reachability_graph(net, im)
The visualization of the reachability graph is then possible
through the code snippet:
from pm4py.visualization.transition_system import visualizer as ts_visualizer
gviz = ts_visualizer.apply(ts, parameters={ts_visualizer.Variants.VIEW_BASED.value.Parameters.FORMAT: "svg"})
ts_visualizer.view(gviz)
Conformance Checking
Conformance checking is a techniques to compare a process model with an event log of the
same process. The goal is to check if the event log conforms to the model, and, vice
versa.
In PM4Py, two fundamental techniques are implemented: token-based replay and alignments.
Since PM4Py 1.5.x, we offer a simplified interface for conformance checking.
This provides a restricted set of choices in comparison to the normal interface.
Parameters: log - Event log petri_net - Petri net initial_marking - Initial marking final_marking - Final marking
Returns:
A list of replay results for each trace of the log
Token-based replay
Token-based replay matches a trace and a Petri net model, starting from the initial place, in
order to discover which transitions are executed and in which places we have remaining or
missing tokens for the given process instance. Token-based replay is useful for Conformance
Checking: indeed, a trace is fitting according to the model if, during its execution, the
transitions can be fired without the need to insert any missing token. If the reaching of
the final marking is imposed, then a trace is fitting if it reaches the final marking
without any missing or remaining tokens.
For each trace, there are four values which have to be determined: produced
tokens, remaining tokens, missing tokens, and consumed tokens.
Based on that, a fomrula can be dervied, whereby a petri net (n) and a trace (t) are
given
as input:
fitness(n,
t)=1⁄2(1-r⁄p)+1⁄2(1-m⁄c)
To apply the formula on the whole event log, p, r, m, and c are calculated for each
trace, summed up, and finally placed into the formula above at the end.
In PM4Py there is an implementation of a token replayer that is able to go across hidden
transitions (calculating shortest paths between places) and can be used with any Petri net
model with unique visible transitions and hidden transitions. When a visible transition
needs to be fired and not all places in the preset are provided with the correct number of
tokens, starting from the current marking it is checked if for some place there is a
sequence of hidden transitions that could be fired in order to enable the visible
transition. The hidden transitions are then fired and a marking that permits to enable the
visible transition is reached.
The example on the right shows how to apply token-based replay
on a log and a Petri net. First, the log is loaded. Then, the Alpha
Miner is applied in order to discover a Petri net.
Eventually, the token-based replay is applied. The output of the token-based replay,
stored in the variable replayed_traces, contains for each trace of the log:
trace_is_fit: boolean value (True/False) that is true when
the trace is according to the model.
activated_transitions: list of transitions activated in the model
by the token-based replay.
reached_marking: marking reached at the end of the replay.
missing_tokens: number of missing tokens.
consumed_tokens: number of consumed tokens.
remaining_tokens: number of remaining tokens.
produced_tokens: number of produced tokens.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
net, initial_marking, final_marking = alpha_miner.apply(log)
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
replayed_traces = token_replay.apply(log, net, initial_marking, final_marking)
The token-based replay supports different parameters.
Parameter
Meaning
Parameters.CONSIDER_REMAINING_IN_FITNESS
If the parameter is set to True (as default) trace is considered fit if it does not
lead to any missing/remaining token in the model.
When the replay of the trace is completed, try to reach the final marking through
invisible transitions (default: True)
Parameters.STOP_IMMEDIATELY_UNFIT
Stops the replay immediately when a missing token needs to be inserted (default:
False)
Parameters.WALK_THROUGH_HIDDEN_TRANS
Enables, in general, the traversal of invisible transitions (default: True)
Parameters.CLEANING_TOKEN_FLOOD
Limits the token flooding problem by an apposite algorithm (default: False)
Parameters.RETURN_NAMES
Return the transitions names, not the transition object, in the list of activated
transitions for a trace.
Parameters.ACTIVITY_KEY
Establish the attribute of a trace to be used during the replay (default: concept:name),
Diagnostics (TBR)
The execution of token-based replay in PM4Py permits to obtain detailed information about
transitions that did not execute correctly, or activities that are in the log and not in the
model. In particular, executions that do not match the model are expected to take longer
throughput time.
The diagnostics that are provided by PM4Py are the following:
Throughput analysis on the transitions that are executed in an unfit way according to the
process model (the Petri net).
Throughput analysis on the activities that are not contained in the model.
Root Cause Analysis on the causes that lead to an unfit execution of the transitions.
Root Cause Analysis on the causes that lead to executing activities that are not contained
in the process model.
To provide an execution contexts for the examples, a log must be loaded, and a model that
is not perfectly fitting is required. To load the log, the following instructions could
be used:
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
To create an unfit model, a filtering operation producing a log where only part of the
behavior is kept can be executed:
from pm4py.algo.filtering.log.auto_filter.auto_filter import apply_auto_filter
filtered_log = apply_auto_filter(log)
Then, applying the Inductive Miner algorithm:
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, initial_marking, final_marking = inductive_miner.apply(filtered_log)
We then apply the token-based replay with special settings. In particular, with
disable_variants set to True we avoid to replay only a case with variant; with
enable_pltr_fitness set to True we tell the algorithm to return localized Conformance
Checking application.
To perform throughput analysis on the transitions that were executed unfit, and then
print on the console the result, the following code could be used:
from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
trans_diagnostics = duration_diagnostics.diagnose_from_trans_fitness(log, trans_fitness)
for trans in trans_diagnostics:
print(trans, trans_diagnostics[trans])
Obtaining an output where is clear that unfit executions lead to much higher throughput times
(from 126 to 146 times higher throughput time).
Throughput analysis (activities)
To perform throughput analysis on the process executions containing activities that are
not in the model, and then print the result on the screen, the following code could be
used:
from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
act_diagnostics = duration_diagnostics.diagnose_from_notexisting_activities(log, unwanted_activities)
for act in act_diagnostics:
print(act, act_diagnostics[act])
Root Cause Analysis
The output of root cause analysis in the diagnostics context is a decision tree that permits to
understand the causes of a deviation. In the following examples, for each deviation, a different
decision tree is built and visualized.
In the following examples, that consider the Receipt log, the decision trees will be
built on the following choice of attributes (i.e. only org:group attribute will be
considered).
To perform root cause analysis on the transitions that are executed in an unfit way, the
following code could be used:
from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
trans_root_cause = root_cause_analysis.diagnose_from_trans_fitness(log, trans_fitness, parameters=parameters)
To visualize the decision trees obtained by root cause analysis, the following code
could be used:
from pm4py.visualization.decisiontree import visualizer as dt_vis
for trans in trans_root_cause:
clf = trans_root_cause[trans]["clf"]
feature_names = trans_root_cause[trans]["feature_names"]
classes = trans_root_cause[trans]["classes"]
# visualization could be called
gviz = dt_vis.apply(clf, feature_names, classes)
dt_vis.view(gviz)
Root Cause Analysis (activities that are not in the model)
To perform root cause analysis on activities that are executed but are not in the
process model, the following code could be used:
from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
act_root_cause = root_cause_analysis.diagnose_from_notexisting_activities(log, unwanted_activities,
parameters=parameters)
To visualize the decision trees obtained by root cause analysis, the following code
could be used:
from pm4py.visualization.decisiontree import visualizer as dt_vis
for act in act_root_cause:
clf = act_root_cause[act]["clf"]
feature_names = act_root_cause[act]["feature_names"]
classes = act_root_cause[act]["classes"]
# visualization could be called
gviz = dt_vis.apply(clf, feature_names, classes)
dt_vis.view(gviz)
Alignments
PM4Py comes with the following set of linear solvers: PuLP (available for any platform),
CVXOPT (available for the most widely used platforms including Windows/Linux for Python
3.6/3.7).
Alternatively, ORTools can also be used and installed from PIP.
Alignment-based replay aims to find one of the best alignment between the trace and the
model. For each trace, the output of an alignment is a list of couples where the first
element is an event (of the trace) or » and the second element is a transition (of the
model) or ». For each couple, the following classification could be provided:
Sync move: the classification of the event corresponds to the transition label; in this
case, both the trace and the model advance in the same way during the replay.
Move on log: for couples where the second element is », it corresponds to a replay move
in the trace that is not mimicked in the model. This kind of move is unfit and signal a
deviation between the trace and the model.
Move on model: for couples where the first element is », it corresponds to a replay move
in the model that is not mimicked in the trace. For moves on model, we can have the
following distinction:
Moves on model involving hidden transitions: in this case, even if it is not a
sync move, the move is fit.
Moves on model not involving hidden transitions: in this case, the move is unfit
and signals a deviation between the trace and the model.
First, we have to import the log. Subsequently, we apply the Inductive Miner on the
imported log. In addition, we compute the alignments.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
net, initial_marking, final_marking = inductive_miner.apply(log)
from pm4py.algo.conformance.alignments import algorithm as alignments
aligned_traces = alignments.apply_log(log, net, initial_marking, final_marking)
To inspect the alignments, a code snippet is provided. However, the output (a list)
reports for each trace the corresponding alignment along with its statistics. With
each trace, a dictionary containing among the others the following information is
associated:
alignment: contains the alignment (sync moves, moves on log, moves on model)
cost: contains the cost of the alignment according to the provided cost
function
fitness: is equal to 1 if the trace is perfectly fitting
print(alignments)
To use a different classifier, we refer to the Classifier
section. However, the following code defines a
custom classifier for each
event of each trace in the log.
for trace in log:
for event in trace:
event["customClassifier"] = event["concept:name"] + event["concept:name"]
A parameters dictionary containing the activity key can be formed.
# define the activity key in the parameters
parameters = {inductive_miner.Variants.DFG_BASED.value.Parameters.ACTIVITY_KEY: "customClassifier", alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.ACTIVITY_KEY: "customClassifier"}
Then, a process model is computed, and alignments are also calculated. Besides, the
fitness value is calculated and the resulting values are printed.
# calculate process model using the given classifier
net, initial_marking, final_marking = inductive_miner.apply(log, parameters=parameters)
aligned_traces = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)
from pm4py.evaluation.replay_fitness import evaluator as replay_fitness
log_fitness = replay_fitness.evaluate(alignments, variant=replay_fitness.Variants.ALIGNMENT_BASED)
print(log_fitness)
It is also possible to select other parameters for the alignments.
Model cost function: associating to each transition in the Petri net the corresponding
cost of a move-on-model.
Sync cost function: associating to each visible transition in the Petri net the cost of
a sync move.
On the right-hand side, an implementation of a custom model cost function, and sync
cost function can be noted. Also, the model cost funtions and sync cost function has
to be inserted later in the parameters. Subsequently, the replay is done.
model_cost_function = dict()
sync_cost_function = dict()
for t in net.transitions:
# if the label is not None, we have a visible transition
if t.label is not None:
# associate cost 1000 to each move-on-model associated to visible transitions
model_cost_function[t] = 1000
# associate cost 0 to each move-on-log
sync_cost_function[t] = 0
else:
# associate cost 1 to each move-on-model associated to hidden transitions
model_cost_function[t] = 1
parameters = {}
parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_MODEL_COST_FUNCTION] = model_cost_function
parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_SYNC_COST_FUNCTION] = sync_cost_function
aligned_traces = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)
Different variants of the alignments are available:
Variant
Description
Variants.STATE_EQUATION_A_STAR
(Default) uses ILP-based heuristics to prune
the state space.
Variants.VERSION_DIJKSTRA_NO_HEURISTICS
Uses a Dijkstra-based state-space exploration
(without the computation of any heuristics).
This is faster on models without much concurrency,
or when a small number of deviations are contained in the
process execution.
Variants.STATE_EQUATION_LESS_MEMORY
A variant of the ILP-based replayer that requires less
memory to host the states.
Variants.DIJKSTRA_LESS_MEMORY
A variant of the Dijkstra replayer that requires less
memory to host the spaces.
Among the common parameters of these variants, we have:
Parameter
Meaning
Parameters.PARAM_MAX_ALIGN_TIME_TRACE
Establishes the maximum available amount of time to complete the alignment of a
trace
(returns None if the alignment is not finished) (default: infinity).
Parameters.PARAM_MAX_ALIGN_TIME
When aligning a log, establishes the maximum available amount of time to complete
the alignment
(defaults to None the alignments of the traces that were not aligned) (default:
infinity).
Establishes whether the result of an alignment should contain also the name of the
transition, not only the label (default: False).
Parameters.ACTIVITY_KEY
Establishes the attribute at the event level that should be used to compute tha
alignment (default: concept:name).
Decomposition of Alignments
Alignments represent a computationally expensive problem on models that contain a lot of
concurrency. Yet, they are the conformance checking technique that provides the best results in
term of finding a match between the process execution(s) and the model. To overcome the
difficulties related to the size of the state space, various attempts to decompose the model
into “smaller” pieces, into which the alignment is easier and still permit to diagnose problems,
have been done.
We have seen how to obtain a maximal decomposition of the Petri net model. Now we can see
how to perform the decomposition of alignments (that is based on a maximal decomposition
of the Petri net model). The approach described here has been published in:
Lee, Wai Lam Jonathan, et al. “Recomposing conformance: Closing the circle on decomposed
alignment-based conformance checking in process mining.” Information Sciences 466 (2018):
55-91.
The recomposition permits to understand whether each step of the process has been executed in a
sync way or some deviations happened. First, an alignment is performed on top of the decomposed
Petri nets.
Then, the agreement between the activities at the border is checked. If a disagreement is found,
the two components that are disagreeing are merged and the alignment is repeated on them.
When the steps are agreeing between the different alignments of the components, these can be
merged in a single alignment. The order of recomposition is based on the Petri net graph.
Despite that, in the case of concurrency, the “recomposed” alignment contains a valid list of
moves that may not be in the correct order.
To perform alignments through decomposition/recomposition, the following code can be
used. A maximum number of border disagreements can be provided to the algorithm. If the
number of border disagreements is reached, then the alignment is interrupted a None as
alignment of the specific trace is returned.
from pm4py.algo.conformance.decomp_alignments import algorithm as decomp_alignments
conf = decomp_alignments.apply(log, net, im, fm, parameters={decomp_alignments.Variants.RECOMPOS_MAXIMAL.value.Parameters.PARAM_THRESHOLD_BORDER_AGREEMENT: 2})
Since decomposed models are expected to have less concurrency, the components are aligned using
a Dijkstra approach. In the case of border disagreements, this can degrade the performance of
the algorithm.
It should be noted that this is not an approximation technique;
according to the authors, it should provide the same fitness
as the original alignments.
Since the alignment is recomposed, we can use the fitness evaluator to evaluate
the fitness (that is not related to the computation of fitness described in the paper).
from pm4py.evaluation.replay_fitness import evaluator as rp_fitness_evaluator
fitness = rp_fitness_evaluator.evaluate(conf, variant=rp_fitness_evaluator.Variants.ALIGNMENT_BASED)
Footprints
Footprints are a very basic (but scalable) conformance checking technique to compare entities
(such that event logs, DFGs, Petri nets, process trees, any other kind of model).
Essentially, a relationship between any couple of activities of the log/model is inferred. This
can include:
Directly-Follows Relationships: in the log/model, it is possible that the activity A is
directly followed by B.
Directly-Before Relationships: in the log/model, it is possible that the activity B is
directly preceded by A.
Parallel behavior: it is possible that A is followed by B and B is followed by A
A footprints matrix can be calculated, that describes for each couple of activities the
footprint relationship.
It is possible to calculate that for different types of models and for the entire event log,
but also trace-by-trace (if the local behavior is important).
Let’s assume that the running-example.xes event log is loaded:
from pm4py.simulation.tree_generator import simulator as tree_gen
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
And the inductive miner is applied on such log:
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, im, fm = inductive_miner.apply(log)
To calculate the footprints for the entire log, the following code can be used:
from pm4py.algo.discovery.footprints import algorithm as footprints_discovery
fp_log = footprints_discovery.apply(log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG)
The data structure is a dictionary with, as keys, sequence (expressing directly-follows
relationships) and parallel (expressing the parallel behavior that can happen in either way).
The footprints of the log, trace-by-trace, can be calculated as follows, and are a list of
footprints for each trace:
from pm4py.algo.discovery.footprints import algorithm as footprints_discovery
fp_trace_by_trace = footprints_discovery.apply(log, variant=footprints_discovery.Variants.TRACE_BY_TRACE)
The footprints of the Petri net model can be calculated as follows:
The data structure is a dictionary with, as keys, sequence (expressing directly-follows
relationships) and parallel (expressing the parallel behavior that can happen in either way).
It is possible to visualize a comparison between the footprints of the (entire) log and the
footprints of the (entire) model.
First of all, let’s see how to visualize a single footprints table, for example the one of
the model. The following code can be used:
from pm4py.visualization.footprints import visualizer as fp_visualizer
gviz = fp_visualizer.apply(fp_net, parameters={fp_visualizer.Variants.SINGLE.value.Parameters.FORMAT: "svg"})
fp_visualizer.view(gviz)
To compare the two footprints tables, the following code can be used. Please note that the
visualization will look the same, if no deviations are discovered. If deviations are found
they are colored by red.
from pm4py.visualization.footprints import visualizer as fp_visualizer
gviz = fp_visualizer.apply(fp_log, fp_net, parameters={fp_visualizer.Variants.COMPARISON.value.Parameters.FORMAT: "svg"})
fp_visualizer.view(gviz)
To actually find some deviations, let’s repeat the procedure on the receipt.xes log,
applying a heavy filter on the log to discover a simpler model:
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
from copy import deepcopy
from pm4py.algo.filtering.log.variants import variants_filter
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
filtered_log = variants_filter.apply_auto_filter(deepcopy(log))
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, im, fm = inductive_miner.apply(filtered_log)
With a conformance checking operation, we want instead to compare the behavior of the traces
of the log against the footprints of the model.
This can be done using the following code:
from pm4py.algo.conformance.footprints import algorithm as footprints_conformance
conf_fp = footprints_conformance.apply(fp_trace_by_trace, fp_net)
And will contain, for each trace of the log, a set with the deviations. Extract of the list for
some traces:
{(‘T06 Determine necessity of stop advice’, ‘T04 Determine confirmation of receipt’), (‘T02
Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’)}
set()
{(‘T19 Determine report Y to stop indication’, ‘T20 Print report Y to stop indication’),
(‘T10 Determine necessity to stop indication’, ‘T16 Report reasons to hold request’), (‘T16
Report reasons to hold request’, ‘T17 Check report Y to stop indication’), (‘T17 Check
report Y to stop indication’, ‘T19 Determine report Y to stop indication’)}
set()
set()
{(‘T02 Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’), (‘T10
Determine necessity to stop indication’, ‘T04 Determine confirmation of receipt’), (‘T04
Determine confirmation of receipt’, ‘T03 Adjust confirmation of receipt’), (‘T03 Adjust
confirmation of receipt’, ‘T02 Check confirmation of receipt’)}
set()
We can see that for the first trace that contains deviations, there are two deviations, the
first related to T06 Determine necessity of stop advice being executed before T04 Determine
confirmation of receipt; the second related to T02 Check confirmation of receipt being followed
by T06 Determine necessity of stop advice.
The traces for which the conformance returns nothing are fit (at least according to the
footprints).
Footprints conformance checking is a way to identify obvious deviations, behavior of the log
that is not allowed by the model.
On the log side, their scalability is wonderful! The calculation of footprints for a Petri net
model may be instead more expensive.
If we change the underlying model, from Petri nets to process tree, it is possible to exploit
its bottomup structure in order to calculate the footprints almost instantaneously.
Let’s open a log, calculate a process tree and then apply the discovery of the footprints.
We open the running-example log:
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply("tests/input_data/running-example.xes")
And apply the inductive miner to discover a process tree:
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
tree = inductive_miner.apply_tree(log)
Then, the footprints can be discovered. We discover the footprints on the entire log, we
discover the footprints trace-by-trace in the log, and then we discover the footprints on
the process tree:
from pm4py.algo.discovery.footprints import algorithm as fp_discovery
fp_log = fp_discovery.apply(log, variant=fp_discovery.Variants.ENTIRE_EVENT_LOG)
fp_trace_trace = fp_discovery.apply(log, variant=fp_discovery.Variants.TRACE_BY_TRACE)
fp_tree = fp_discovery.apply(tree, variant=fp_discovery.Variants.PROCESS_TREE)
Each one of these contains:
A list of sequential footprints contained in the log/allowed by the model
A list of parallel footprints contained in the log/allowed by the model
A list of activities contained in the log/allowed by the model
A list of start activities contained in the log/allowed by the model
A list of end activities contained in the log/allowed by the model
It is possible to execute an enhanced conformance checking between the footprints of the
(entire) log, and the footprints of the model, by doing:
from pm4py.algo.conformance.footprints import algorithm as fp_conformance
conf_result = fp_conformance.apply(fp_log, fp_tree, variant=fp_conformance.Variants.LOG_EXTENSIVE)
The result contains, for each item of the previous list, the violations.
Given the result of conformance checking, it is possible to calculate the footprints-based
fitness and precision of the process model, by doing:
These values are both included in the interval [0,1]
Log Skeleton
The concept of log skeleton has been described in the contribution
Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to
process discovery.” arXiv preprint arXiv:1806.08247 (2018).
And is claimingly the most accurate classification approach to decide whether a trace belongs to
(the language) of a log or not.
For a log, an object containing a list of relations is calculated.
Equivalence: contains the couples of activities that happen ALWAYS with the same
frequency inside a trace.
Always-after: contains the couples of activities (A,B) such that an occurrence of
A is ALWAYS followed, somewhen in the future of the trace, by an occurrence of B.
Always-before: contains the couples of activities (B,A) such that an occurrence
of B is ALWAYS preceded, somewhen in the past of the trace, by an occurrence of A.
Never-together: contains the couples of activities (A,B) that NEVER happens
together in the history of the trace.
Directly-follows: contains the list of directly-follows relations of the log.
For each activity, the number of possible occurrences per trace.
It is also possible to provide a noise threshold. In that case, more relations are found since
the conditions are relaxed.
Let’s suppose to take the running-example.xes log:
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
Then, we can calculate the log skeleton:
from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.0})
We can see the relations (equivalence, always_after, always_before, never_together,
directly_follows, activ_freq) as key of the object, and the values are the activities/couples of
activities that follow such pattern.
To see how the log skeleton really works, for classification/conformance purposes, let’s
change to another log (the receipt.xes log), and calculate an heavily filtered version of
that (to have less behavior)
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
from copy import deepcopy
from pm4py.algo.filtering.log.variants import variants_filter
filtered_log = variants_filter.apply_auto_filter(deepcopy(log))
Calculate the log skeleton on top of the filtered log, and then apply the classification as
follows:
from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
conf_result = lsk_conformance.apply(log, skeleton)
for trace in conf_result:
print(conf_result)
In such way, we can get for each trace whether it has been classified as belonging to the
filtered log, or not. When deviations are found, the trace does not belong to the language of
the original log.
We can also calculate a log skeleton on the original log, for example providing 0.03 as
noise threshold, and see which are the effects on the classification:
from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.03})
from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
conf_result = lsk_conformance.apply(log, skeleton)
for trace in conf_result:
print(conf_result)
We can see that some traces are classified as uncorrect also calculating the log skeleton on the
original log, if a noise threshold is provided.
Alignments between Logs
In some situations, performing an optimal alignment between an event log and a process model might
be unfeasible. Hence, getting an approximated alignment that highlights the main points of deviation
is an option. In PM4Py, we offer support for alignments between two event logs. Such alignment
operation is based on the edit distance, i.e., for a trace of the first log, the trace of the second log
which has the least edit distance is found. In the following example, we see how to perform
alignments between an event log and the simulated log obtained by performing a playout operation
on the process model.
We can load an example log and discover a process model using the inductive miner:
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
net, im, fm = pm4py.discover_petri_net_inductive(log)
Then, perform a playout operation on the process model:
from pm4py.simulation.playout import simulator
simulated_log = simulator.apply(net, im, fm)
Then, the alignments between the two logs are performed:
from pm4py.algo.conformance.logs_alignments import algorithm as logs_alignments
parameters = {}
alignments = logs_alignments.apply(log, simulated_log, parameters=parameters)
The result is a list of alignments, each one contains a list of moves (sync move, move on log n.1, move on log n.2).
With this utility, it's also possible to perform anti-alignments. In this case, an anti-alignment is corresponding to
a trace of the second log that has the biggest edit distance against the given trace of the first log.
To perform anti-alignments, the following code can be used:
from pm4py.algo.conformance.logs_alignments import algorithm as logs_alignments
parameters = {logs_alignments.Variants.EDIT_DISTANCE.value.Parameters.PERFORM_ANTI_ALIGNMENT: True}
alignments = logs_alignments.apply(log, simulated_log, parameters=parameters)
Temporal Profile
We propose in PM4Py an implementation of the temporal profile model. This has been described in:
Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).
A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the
provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log
where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple)
is associated to a couple of numbers, the first is the average and the second is the average standard deviation.
It is possible to use a temporal profile to perform conformance checking on an event log.
The times between the couple of activities in the log are assessed against the numbers stored in the temporal profile. Specifically,
a value is calculated that shows how many standard deviations the value is different from the average. If that value exceeds a threshold (by default set to 6,
according to the six-sigma principles), then the couple of activities is signaled.
The output of conformance checking based on a temporal profile is a list containing the deviations for each case in the log.
Each deviation is expressed as a couple of activities, along with the calculated value and the distance (based on number of standard deviations)
from the average.
We provide an example of conformance checking based on a temporal profile.
First, we can load an event log, and apply the discovery algorithm.
import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery
log = pm4py.read_xes("tests/input_data/receipt.xes")
parameters = {}
temporal_profile = temporal_profile_discovery.apply(log, parameters=parameters)
Then, we can apply conformance checking based on the temporal profile.
from pm4py.algo.conformance.temporal_profile import algorithm as temporal_profile_conformance
parameters = {}
results = temporal_profile_conformance.apply(log, temporal_profile, parameters=parameters)
Some parameters can be used in order to customize the conformance checking of the temporal profile:
Parameter Key
Type
Default
Description
Parameters.ACTIVITY_KEY
string
concept:name
The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY
string
start_timestamp
The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY
string
time:timestamp
The attribute to use as timestamp.
Parameters.ZETA
int
6
Multiplier for the standard deviation. Couples of events that are more distant than this are signaled by the temporal profile.
Process Trees
In PM4Py we offer support for process trees (visualization, conversion to Petri net and
generation of a log), for importing/exporting, and a functionality to generate them. In this
section, the
functionalities are examined.
Importing/Exporting Process Trees
In PM4Py, we offer support for importing/exporting process trees in the PTML format.
The following code can be used to import a process tree from a PTML file.
from pm4py.objects.process_tree.importer import importer as ptml_importer
tree = ptml_importer.apply("tests/input_data/running-example.ptml")
The following code can be used to export a process tree into a PTML file.
from pm4py.objects.process_tree.exporter import exporter as ptml_exporter
ptml_exporter.apply(tree, "running-example.ptml")
Generation of process trees
The approach 'PTAndLogGenerator', described by the scientific paper 'PTandLogGenerator: A
Generator for Artificial Event Data', has been implemented in the PM4Py library.
The code snippet can be used to generate a process tree.
from pm4py.simulation.tree_generator import simulator as tree_gen
parameters = {}
tree = tree_gen.apply(parameters=parameters)
Suppose the following start activity and their respective occurrences.
Parameter
Meaning
MODE
most frequent number of visible activities (default 20)
MIN
minimum number of visible activities (default 10)
MAX
maximum number of visible activities (default 30)
SEQUENCE
probability to add a sequence operator to tree (default 0.25)
CHOICE
probability to add a choice operator to tree (default 0.25)
PARALLEL
probability to add a parallel operator to tree (default 0.25)
LOOP
probability to add a loop operator to tree (default 0.25)
OR
probability to add an or operator to tree (default 0)
SILENT
probability to add silent activity to a choice or loop operator
(default 0.25)
DUPLICATE
probability to duplicate an activity label (default 0)
LT_DEPENDENCY
probability to add a random dependency to the tree (default 0)
INFREQUENT
probability to make a choice have infrequent paths (default 0.25)
NO_MODELS
number of trees to generate from model population (default 10)
UNFOLD
whether or not to unfold loops in order to include choices
underneath in dependencies: 0=False, 1=True
if lt_dependency <= 0: this should always be 0 (False)
if lt_dependency > 0: this can be 1 or 0 (True or False) (default
10)
MAX_REPEAT
maximum number of repetitions of a loop (only used when unfolding is
True) (default 10)
Generation of a log out of a process tree
The code snippet can be used to generate a log, with 100 cases, out of the process tree.
from pm4py.objects.process_tree import semantics
log = semantics.generate_log(tree, no_traces=100)
Conversion into Petri net
The code snippet can be used to convert the process tree into a Petri net.
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, im, fm = pt_converter.apply(tree)
Visualize a Process Tree
A process tree can be printed, as revealed on the right side.
print(tree)
A process tree can also be visualized, as revealed on the right side.
from pm4py.visualization.process_tree import visualizer as pt_visualizer
gviz = pt_visualizer.apply(tree, parameters={pt_visualizer.Variants.WO_DECORATION.value.Parameters.FORMAT: "png"})
pt_visualizer.view(gviz)
Converting a Petri net to a Process Tree
We propose an approach to convert a block-structured accepting Petri net to a process
tree. The implement approach is:
van Zelst, Sebastiaan J. "Translating Workflow Nets to Process Trees: An Algorithmic
Approach." arXiv preprint arXiv:2004.08213 (2020).
The approach, given an accepting Petri net, returns a process tree if the Petri net
is block-structured, while it raises an exception if the Petri net is not block-structured.
We propose an example of application. First, we load a XES log and we discover an accepting
Petri net
using the Alpha Miner algorithm.
import pm4py
import os
log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
net, im, fm = pm4py.discover_petri_net_alpha(log)
Then, we convert that to a process tree.
from pm4py.objects.conversion.wf_net import converter as wf_net_converter
tree = wf_net_converter.apply(net, im, fm)
print(tree)
The method succeeds, since the accepting Petri net is block-structured, and discovers a process
tree
(incidentally, the same process tree as if the inductive miner was applied).
Feature Selection
An operation of feature selection permits to represent the event log in a tabular way.
This is important for operations such as prediction and anomaly detection.
Automatic Feature Selection
In PM4Py, we offer ways to perform an automatic feature selection. As example, let us import the
receipt log and perform an automatic feature selection on top of it.
First, we import the receipt log:
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply("tests/input_data/receipt.xes")
Then, let’s perform the automatic feature selection:
from pm4py.objects.log.util import get_log_representation
data, feature_names = get_log_representation.get_default_representation(log)
print(feature_names)
Printing the value feature_names, we see that the following attributes were selected:
The attribute channel at the trace level (this assumes values Desk, Intern, Internet,
Post, e-mail)
The attribute department at the trace level (this assumes values Customer contact,
Experts, General)
The attribute group at the event level (this assumes values EMPTY, Group 1, Group 12,
Group 13, Group 14, Group 15, Group 2, Group 3, Group 4, Group 7).
No numeric attribute value is selected. If we print feature_names, we get the following
representation:
So, we see that we have different features for different values of the attribute. This is called
one-hot encoding. Actually, a case is assigned to 0 if it does not contain an event with the
given value for the attribute; a case is assigned to 1 if it contains at least one event with
the attribute.
If we represent the features as a dataframe:
import pandas as pd
df = pd.DataFrame(data, columns=feature_names)
print(df)
We can see the features assigned to each different case.
Manual feature selection
The manual feature selection permits to specify which attributes should be included in the
feature selection. These may include for example:
The activities performed in the process execution (contained usually in the event attribute
concept:name ).
The resources that perform the process execution (contained usually in the event attribute
org:resource ).
Some numeric attributes, at discretion of the user.
To do so, we have to call the method get_log_representation.
The types of features that can be considered by a manual feature selection are:
Name
Description
str_ev_attr
String attributes at the event level: these are hot-encoded into features that may
assume value 0 or value 1.
str_tr_attr
String attributes at the trace level: these are hot-encoded into features that may
assume value 0 or value 1.
num_ev_attr
Numeric attributes at the event level: these are encoded by including the last value of
the attribute among the events of the trace.
num_tr_attr
Numeric attributes at trace level: these are encoded by including the numerical value.
str_evsucc_attr
Successions related to the string attributes values at the event level: for example, if
we have a trace [A,B,C], it might be important to include not only the presence of the
single values A, B and C as features; but also the presence of the directly-follows
couples (A,B) and (B,C).
Let’s consider for example a feature selection where we are interested to:
If a process execution contains, or not, an activity.
If a process execution contains, or not, a resource.
If a process execution contains, or not, a directly-follows path between different
activities.
If a process execution contains, or not, a directly-follows path between different
resources.
We see that the number of features is way bigger in this setting
Other features are for example the cycle and the lead time associated to a case.
Here, we may suppose to have:
A log with lifecycles, where each event is instantaneous
OR an interval log, where events may be associated to two timestamps (start and end
timestamp).
The lead/cycle time can be calculated on top of interval logs. If we have a lifecycle log,
we need to convert that with:
from pm4py.objects.log.util import interval_lifecycle
log = interval_lifecycle.to_interval(log)
Then, features such as the lead/cycle time can be inserted through the instructions:
from pm4py.objects.log.util import interval_lifecycle
from pm4py.util import constants
log = interval_lifecycle.assign_lead_cycle_time(log, parameters={
constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY: "start_timestamp",
constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})
After the provision of the start timestamp attribute (in this case, start_timestamp) and
of the timestamp attribute (in this case, time:timestamp),
the following features are returned by the method:
@@approx_bh_partial_cycle_time => incremental cycle time associated to the event (the
cycle time of the last event is the cycle time of the instance)
@@approx_bh_partial_lead_time => incremental lead time associated to the event
@@approx_bh_overall_wasted_time => difference between the partial lead time and the
partial cycle time values
@@approx_bh_this_wasted_time => wasted time ONLY with regards to the activity
described by the ‘interval’ event
@@approx_bh_ratio_cycle_lead_time => measures the incremental Flow Rate (between 0
and 1).
These are all numerical attributes, hence we can refine the feature extraction by doing:
Some techniques (such as the clustering, prediction, anomaly detection) suffer if the
dimensionality of the dataset is too high. Hence, a dimensionality reduction technique (as PCA)
helps to cope with the complexity of the data.
Having a Pandas dataframe out of the features extracted from the log:
import pandas as pd
df = pd.DataFrame(data, columns=feature_names)
It is possible to reduce the number of features using a techniques like PCA.
Let’s create the PCA with a number of components equal to 5, and apply the PCA to the
dataframe.
from sklearn.decomposition import PCA
pca = PCA(n_components=5)
df2 = pd.DataFrame(pca.fit_transform(df))
So, from more than 400 columns, we pass to 5 columns that contains most of the variance.
Anomaly Detection
In this section, we consider the calculation of an anomaly score for the different cases. This is
based on the features extracted; and to work better requires the application of a dimensionality
reduction technique (such as the PCA in the previous section).
Let’s apply a method called IsolationForest to the dataframe. This permits to add a
column scores that is lower or equal than 0 when the case needs to be considered anomalous,
and is greater than 0 when the case needs not to be considered anomalous.
from sklearn.ensemble import IsolationForest
model=IsolationForest()
model.fit(df2)
df2["scores"] = model.decision_function(df2)
To see which cases are more anomalous, we can sort the dataframe inserting an index. Then,
the print will show which cases are more anomalous:
Decision tree about the ending activity of a process
Decision trees are objects that help the understandement of the conditions leading to a
particular outcome. In this section, several examples related to the construction of the
decision trees are provided.
Ideas behind the building of decision trees are provided in scientific paper: de Leoni,
Massimiliano, Wil MP van der Aalst, and Marcus Dees. 'A general process mining framework
for correlating, predicting and clustering dynamic behavior based on event logs.'
The general scheme is the following:
A representation of the log, on a given set of features, is obtained (for example,
using one-hot encoding on string attributes and keeping numeric attributes
as-they-are)
A representation of the target classes is constructed
The decision tree is calculated
The decision tree is represented in some ways
A process instance may potentially finish with different activities, signaling different
outcomes of the process instance. A decision tree may help to understand the reasons behind
each outcome.
First, a log could be loaded. Then, a representation of a log on a given set of
features could be obtained.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))
from pm4py.objects.log.util import get_log_representation
str_trace_attributes = []
str_event_attributes = ["concept:name"]
num_trace_attributes = []
num_event_attributes = ["amount"]
data, feature_names = get_log_representation.get_representation(
log, str_trace_attributes, str_event_attributes,
num_trace_attributes, num_event_attributes)
Or an automatic representation (automatic selection of the attributes) could be
obtained:
(Optional) The features that are extracted by those methods can be represented as a
Pandas dataframe:
import pandas as pd
dataframe = pd.DataFrame(data, columns=feature_names)
(Optional) And the dataframe can be exported then as a CSV file.
dataframe.to_csv("features.csv", index=False)
Then, the target classes are formed. Each endpoint of the process belongs to a different
class.
from pm4py.objects.log.util import get_class_representation
target, classes = get_class_representation.get_class_representation_by_str_ev_attr_value_value(log, "concept:name")
The decision tree could be then calculated and visualized.
from sklearn import tree
clf = tree.DecisionTreeClassifier()
clf.fit(data, target)
from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
gviz = dectree_visualizer.apply(clf, feature_names, classes)
Decision tree about the duration of a case (Root Cause
Analysis)
A decision tree about the duration of a case helps to understand the reasons behind an high
case duration (or, at least, a case duration that is above the threshold).
First, a log has to be loaded. A representation of a log on a given set of features
could be obtained.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))
from pm4py.objects.log.util import get_log_representation
str_trace_attributes = []
str_event_attributes = ["concept:name"]
num_trace_attributes = []
num_event_attributes = ["amount"]
data, feature_names = get_log_representation.get_representation(log, str_trace_attributes, str_event_attributes,
num_trace_attributes, num_event_attributes)
Or an automatic representation (automatic selection of the attributes) could be
obtained:
Then, the target classes are formed. There are two classes: First, traces that are below
the specified threshold (here, 200 days). Note that the time is given in seconds.
Second, traces that are above the specified
threshold.
The decision tree could be then calculated and visualized.
from sklearn import tree
clf = tree.DecisionTreeClassifier()
clf.fit(data, target)
from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
gviz = dectree_visualizer.apply(clf, feature_names, classes)
Decision Mining
Decision mining permits, provided:
An event log
A process model (an accepting Petri net)
A decision point
To retrieve the features of the cases that go in the different directions. This permits, for
example, to calculate a decision tree that explains the decisions.
Let’s start by importing a XES log:
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply("tests/input_data/running-example.xes")
Calculating a model using the inductive miner:
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
net, im, fm = inductive_miner.apply(log)
A visualization of the model can be obtained in the following way:
For this example, we choose the decision point p_10. There, a decision, is done between
the activities examine casually and examine throughly.
To execute the decision mining algorithm, once we have a log, model and a decision point,
the following code can be used:
from pm4py.algo.enhancement.decision import algorithm as decision_mining
X, y, class_names = decision_mining.apply(log, net, im, fm, decision_point="p_10")
As we see, the outputs of the apply method are the following:
X: a Pandas dataframe containing the features associated to the cases leading to a
decision.
y: a Pandas dataframe, that is a single column, containing the number of the class
that is the output of the decision (in this case, the values possible are 0 and 1, since we
have two target classes)
class_names: the names of the output classes of the decision (in this case, examine
casually and examine thoroughly).
These outputs can be used in a generic way with any classification or comparison technique.
In particular, decision trees can be useful. We provide a function to automate the discovery of
decision trees out of the decision mining technique.
The code that should be applied is the following:
from pm4py.algo.enhancement.decision import algorithm as decision_mining
clf, feature_names, classes = decision_mining.get_decision_tree(log, net, im, fm, decision_point="p_10")
Then, a visualization of the decision tree can be obtained in the following way:
from pm4py.visualization.decisiontree import visualizer as tree_visualizer
gviz = tree_visualizer.apply(clf, feature_names, classes)
Statistics
In PM4Py, it is possible to calculate different statistics on top of classic
event logs and dataframes.
Throughput Time
Given an event log, it is possible to retrieve the list of all the durations of the cases
(expressed in seconds).
The only parameter that is needed is the timestamp. The code on the right can be used.
from pm4py.statistics.traces.log import case_statistics
all_case_durations = case_statistics.get_all_casedurations(log, parameters={
case_statistics.Parameters.TIMESTAMP_KEY: "time:timestamp"})
It is also possible to retrieve, as example, the median case duration (that can be also be
calculated
on top of the previous list)
Given an event log, it is possible to retrieve the case arrival ratio, that is the average
distance between the arrival of two consecutive cases in the log.
from pm4py.statistics.traces.log import case_arrival
case_arrival_ratio = case_arrival.get_case_arrival_avg(log, parameters={
case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
It is also possible to calculate the case dispersion ratio, that is the average
distance between the finishing of two consecutive cases in the log.
from pm4py.statistics.traces.log import case_arrival
case_dispersion_ratio = case_arrival.get_case_dispersion_avg(log, parameters={
case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
Performance Spectrum
The performance spectrum is a powerful tool to analyse the time that is passed
between the different activities of the process.
The input of the performance spectrum is a list of activities of the log
(for which we want to consider the spectrum).
The output of the performance spectrum is a list of lists, each one containing the
timestamps in which the activities happened inside the cases.
An example application of the performance spectrum on the running-example
log, providing as list of activities for the spectrum the list containing
register request and decide, is the following:
from pm4py.statistics.performance_spectrum import algorithm as performance_spectrum
ps = performance_spectrum.apply(log, ["register request", "decide"],
parameters={performance_spectrum.Parameters.ACTIVITY_KEY: "concept:name",
performance_spectrum.Parameters.TIMESTAMP_KEY: "time:timestamp"})
The only parameters of the performance spectrum are the activity key and the timestamp key.
In such setting, the output of the performance spectrum is the following:
Given an interval event log (an EventLog object where each event is characterised by two
timestamps, a start timestamp usually contained in the
start_timestamp attribute and a completion timestamp usually contained in the time:timestamp
attribute),
the duration of the event is the difference between the completion timestamp and the start
timestamp.
This may be inficiated by nights (where an activity is not actively worked), weekends (where the
workers may not be at the workplace) and other kind of pauses.
In PM4Py, a way to consider only the time in which the activity could actually be worked (so,
excluding time outside of the working hours and weekends) is provided.
Given a start and end timestamp (expressed as UNIX timestamps), the business hours
calculation method could be called as follows:
from pm4py.util.business_hours import BusinessHours
from datetime import datetime
st = datetime.fromtimestamp(100000000)
et = datetime.fromtimestamp(200000000)
bh_object = BusinessHours(st, et)
worked_time = bh_object.getseconds()
print(worked_time)
To provide specific shifts and weekends (for example, always short weeks with 4 working days
and work days from 10 to 16) the following code could be used:
The Lead Time: the overall time in which the instance was worked, from the start to the end,
without considering if it was actively worked or not.
The Cycle Time: the overall time in which the instance was worked, from the start to the
end, considering only the times where it was actively worked.
For these concepts, it is important to consider only business hours (so, excluding nights and
weekends). Indeed, in that period the machinery and the workforce is at home, so could not
proceed in working the instance, so the time “wasted” there is not recoverable.
Within ‘interval’ event logs (that have a start and an end timestamp), it is possible to
calculate incrementally the lead time and the cycle time (event per event). The lead time and
the cycle time that are reported on the last event of the case are the ones related to the
process execution. With this, it is easy to understand which activities of the process have
caused a bottleneck (e.g. the lead time increases significantly more than the cycle time).
The algorithm implemented in PM4Py start sorting each case by the start timestamp (so,
activities started earlier are reported earlier in the log), and is able to calculate the lead
and cycle time in all the situations, also the complex ones reported in the following picture:
In the following, we aim to insert the following attributes to events inside a log:
Attribute
Description
@@approx_bh_partial_cycle_time
Incremental cycle time associated to the event (the cycle time of the last event is
the cycle time of the instance)
@@approx_bh_partial_lead_time
Incremental lead time associated to the event
@@approx_bh_overall_wasted_time
Difference between the partial lead time and the partial cycle time values
@@approx_bh_this_wasted_time
Wasted time ONLY with regards to the activity described by the ‘interval’ event
@@approx_bh_ratio_cycle_lead_time
Measures the incremental Flow Rate (between 0 and 1).
The method that calculates the lead and the cycle time could accept the following optional
parameters:
Name
Description
worktiming
The work timing (e.g. [7, 17])
weekends
The specification of the weekends (e.g. [6, 7])
And could be applied with the following line of code:
from pm4py.objects.log.util import interval_lifecycle
enriched_log = interval_lifecycle.assign_lead_cycle_time(log)
With this, an enriched log that contains for each event the corresponding attributes for
lead/cycle time is obtained.
Sojourn Time
This statistic work only with interval event logs, i.e., event logs where each
event has a start timestamp and a completion timestamp.
The average sojourn time statistic permits to know, for each activity, how much time
was spent executing the activity. This is calculated as the average of time passed
between the start timestamp and the completion timestamp for the activity's events.
We provide an example. First, we import an interval event log.
import pm4py
import os
log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
Then, we calculate the statistic, that requires the provision of the attribute that is the
start timestamp,
and of the attribute that is the completion timestamp.
from pm4py.statistics.sojourn_time.log import get as soj_time_get
soj_time = soj_time_get.apply(log, parameters={soj_time_get.Parameters.TIMESTAMP_KEY: "time:timestamp", soj_time_get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
print(soj_time)
The same statistic can be applied seamlessy on Pandas dataframes. We provide an alternative class
for doing so:
pm4py.statistics.sojourn_time.pandas.get
Concurrent Activities
This statistic work only with interval event logs, i.e., event logs where each
event has a start timestamp and a completion timestamp.
In an interval event log, the definition of an order between the events is weaker.
Different intersections between a couple of events in a case can happen:
An event where the start timestamp is greater or equal than the completion timestamp of the
other.
An event where the start timestamp is greater or equal than the start timestamp of the other
event, but
is lower than the completion timestamp of the other event.
In particular, the latter case define an event-based concurrency, where several events are
actively executed
at the same time.
We might be interested in retrieving the set of activities for which such concurrent execution
happens,
and the frequency of such occurrence. We offer this type of calculation in PM4Py.
We provide an example. First, we import an interval event log.
import pm4py
import os
log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
Then, we calculate the statistic, that requires the provision of the attribute that is the
start timestamp,
and of the attribute that is the completion timestamp.
from pm4py.statistics.concurrent_activities.log import get as conc_act_get
conc_act = conc_act_get.apply(log, parameters={conc_act_get.Parameters.TIMESTAMP_KEY: "time:timestamp", conc_act_get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
print(conc_act)
The same statistic can be applied seamlessy on Pandas dataframes. We provide an alternative class
for doing so:
pm4py.statistics.concurrent_activities.pandas.get
Eventually-Follows Graph
We provide an approach for the calculation of the eventually-follows graph.
The eventually-follows graph (EFG) is a graph that represents the partial order of the events
inside the process executions of the log.
Our implementation can be applied to both lifecycle logs, so logs where each event
has only one timestamp, both to interval logs, where each event has a start and
a completion timestamp. In the later, the start timestamp is actively considered for the
definition
of the EFG / partial order
In particular, the method assumes to work with lifecycle logs when a start timestamp is NOT
passed in the parameters, while it assumes to work with interval logs when a start timestamp
is passed in the parameters.
We provide an example. First, we import an interval event log.
import pm4py
import os
log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
Then, we calculate the statistic, that requires the provision of the attribute that is the
completion timestamp,
and possibly of the attribute that is the start timestamp
from pm4py.statistics.eventually_follows.log import get as efg_get
efg_graph = efg_get.apply(log)
print(efg_graph)
The same statistic can be applied seamlessy on Pandas dataframes. We provide an alternative class
for doing so:
pm4py.statistics.eventually_follows.pandas.get
Displaying Graphs
Graphs permits to understand several aspects of the current log (for example, the distribution of
a numeric attribute, or the distribution of case duration, or the events over time).
Distribution of case duration
In the following example, the distribution of case duration is shown in two different
graphs, a simple plot and a semi-logarithmic (on the X-axis plot).
The semi-logarithmic plot is less sensible to possible outliers.
First, the Receipt log is loaded. Then, the distribution related to case duration may be
obtained. We could obtain the simple plot,
Or the semi-logarithmic (on the X-axis) plot.
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log_path = os.path.join("tests","input_data","receipt.xes")
log = xes_importer.apply(log_path)
from pm4py.util import constants
from pm4py.statistics.traces.log import case_statistics
x, y = case_statistics.get_kde_caseduration(log, parameters={constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})
from pm4py.visualization.graphs import visualizer as graphs_visualizer
gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.CASES)
graphs_visualizer.view(gviz)
gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.CASES)
graphs_visualizer.view(gviz)
Distribution of events over time
In the following example, a graph representing the distribution of events over time is
obtained.
This is particularly important because it helps to understand in which time intervals the
greatest number of events is recorded.
The distribution related to events over time may be obtained.
The graph could be obtained.
from pm4py.algo.filtering.log.attributes import attributes_filter
x, y = attributes_filter.get_kde_date_attribute(log, attribute="time:timestamp")
from pm4py.visualization.graphs import visualizer as graphs_visualizer
gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.DATES)
graphs_visualizer.view(gviz)
Distribution of a numeric attribute
In the following example, two graphs related to the distribution of a numeric attribute will
be obtained, a normal plot and a semilogarithmic (on the X-axis) plot (that is less
sensitive to outliers).
First, a filtered version of the Road Traffic log is loaded.
Then, the distribution of the numeric attribute amount is obtained.
The standard graph could be then obtained, or the semi-logarithmic graph could be obtained
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log_path = os.path.join("tests", "input_data", "roadtraffic100traces.xes")
log = xes_importer.apply(log_path)
from pm4py.algo.filtering.log.attributes import attributes_filter
x, y = attributes_filter.get_kde_numeric_attribute(log, "amount")
from pm4py.visualization.graphs import visualizer as graphs_visualizer
gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
graphs_visualizer.view(gviz)
from pm4py.visualization.graphs import visualizer as graphs_visualizer
gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
graphs_visualizer.view(gviz)
Log-Model Evaluation
In PM4Py, it is possible to compare the behavior contained in the log and the behavior
contained in the model, in order to see if and how they match.
Four different dimensions exist in process mining, including the measurement of
replay fitness, the measurement of precision, the measurement of generalization,
the measurement of simplicity.
Since PM4Py 1.5.x, we offer a simplified interface for log-model evaluation.
This provides a restricted set of choices in comparison to the normal interface.
Calculates the precision using token-based replay.
Parameters: log - Event log petri_net - Petri net initial_marking - Initial marking final_marking - Final marking
Returns:
Precision dictionary (from TBR)
Replay Fitness
The calculation of the replay fitness aim to calculate how much of the behavior in the log
is admitted by the process model. We propose two methods to calculate replay fitness,
based on token-based replay and alignments respectively.
For token-based replay, the percentage of traces that are completely fit is returned,
along with a fitness value that is calculated as indicated in the scientific contribution:
Berti, Alessandro, and Wil MP van der Aalst. "Reviving Token-based Replay: Increasing
Speed While Improving Diagnostics." ATAED@ Petri Nets/ACSD. 2019.
For alignments, the percentage of traces that are completely fit is returned,
along with a fitness value that is calculated as the average of the fitness values
of the single traces.
The two variants of replay fitness are implemented as Variants.TOKEN_BASED
and Variants.ALIGNMENT_BASED respectively.
To calculate the replay fitness between an event log and a Petri net model, using the
token-based replay method, the code on the right side
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.replay_fitness import evaluator as replay_fitness_evaluator
fitness = replay_fitness_evaluator.apply(log, net, im, fm, variant=replay_fitness_evaluator.Variants.TOKEN_BASED)
To calculate the replay fitness between an event log and a Petri net model, using the
alignments method, the code on the right side
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.replay_fitness import evaluator as replay_fitness_evaluator
fitness = replay_fitness_evaluator.apply(log, net, im, fm, variant=replay_fitness_evaluator.Variants.ALIGNMENT_BASED)
Precision
We propose two approaches for the measurement of precision in PM4Py:
ETConformance (using token-based replay): the reference paper is
Muñoz-Gama, Jorge, and Josep Carmona. "A fresh look at precision in process
conformance." International Conference on Business Process Management. Springer,
Berlin, Heidelberg, 2010.
Align-ETConformance (using alignments): the reference paper is
Adriansyah, Arya, et al. "Measuring precision of modeled behavior." Information
systems and e-Business Management 13.1 (2015): 37-67.
The idea underlying the two approaches is the same: the different prefixes of the log are
replayed (whether possible) on the model. At the reached marking, the set of transitions
that are
enabled in the process model is compared with the set of activities that follow the prefix.
The more the sets are different, the more the precision value is low. The more the sets are
similar, the more the precision value is high.
This works only if the replay of the prefix on the process model works: if the replay does
not produce a result, the prefix is not considered
for the computation of precision. Hence, the precision calculated on top of unfit processes
is not really meaningful.
The main difference between the approaches is the replay method. Token-based replay is faster
but based on heuristics (hence the result of the replay might not be exact).
Alignments are exact, work on any kind of relaxed sound nets, but can be slow if the
state-space is huge.
The two variants, ETConformance and Align-ETConformance, are available as Variants.ETCONFORMANCE_TOKEN
and Variants.ALIGN_ETCONFORMANCE
in the implementation respectively.
To calculate the precision between an event log and a Petri net model, using the
ETConformance method, the code on the right side
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.precision import evaluator as precision_evaluator
prec = precision_evaluator.apply(log, net, im, fm, variant=precision_evaluator.Variants.ETCONFORMANCE_TOKEN)
To calculate the precision between an event log and a Petri net model, using the
Align-ETConformance method, the code on the right side
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.precision import evaluator as precision_evaluator
prec = precision_evaluator.apply(log, net, im, fm, variant=precision_evaluator.Variants.ALIGN_ETCONFORMANCE)
Generalization
Generalization is the third dimension to analyse how the log and the process model match.
In particular, we propose the generalization measure described in the following research
paper:
Buijs, Joos CAM, Boudewijn F. van Dongen, and Wil MP van der Aalst. "Quality dimensions
in process discovery:
The importance of fitness, precision, generalization and simplicity."
International Journal of Cooperative Information Systems 23.01 (2014): 1440001.
Basically, a model is general whether the elements of the model are visited enough often during
a replay operation
(of the log on the model). A model may be perfectly fitting the log and perfectly precise (for
example, reporting the traces
of the log as sequential models going from the initial marking to the final marking; a choice is
operated at the initial marking).
Hence, to measure generalization a token-based replay operation is performed, and the
generalization is calculated as
1 - avg_t (sqrt(1.0 / freq(t))))
where avg_t is the average of the inner value over all the transitions, sqrt is
the square root, freq(t) is
the frequency of t after the replay.
To calculate the generalization between an event log and a Petri net model, using the
generalization method proposed in this section, the code on the right side
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.generalization import evaluator as generalization_evaluator
gen = generalization_evaluator.apply(log, net, im, fm)
Simplicity
Simplicity is the fourth dimension to analyse a process model.
In this case, we define simplicity taking into account only the Petri net model.
The criteria that we use for simplicity is the inverse arc degree
as described in the following research paper
Blum, Fabian Rojas. Metrics in process discovery. Technical Report TR/DCC-2015-6,
Computer Science Department, University of Chile, 2015.
First of all, we consider the average degree for a place/transition of the Petri net,
that is defined as the sum of the number of input arcs and output arcs.
If all the places have at least one input arc and output arc, the number is at least 2.
Choosing a number k between 0 and infinity, the simplicity based on the inverse
arc degree is then defined as 1.0 / (1.0 + max(mean_degree - k, 0)).
To calculate the simplicity on a Petri net model, using the inverse arc degree, the
following code
can be used.
The resulting value is a number between 0 and 1.
from pm4py.evaluation.simplicity import evaluator as simplicity_evaluator
simp = simplicity_evaluator.apply(net)
Earth Mover Distance
The Earth Mover Distance as introduced in:
Leemans, Sander JJ, Anja F. Syring, and Wil MP van der Aalst.
“Earth movers’ stochastic conformance checking.”
International Conference on Business Process Management.
Springer, Cham, 2019.
provides a way to calculate the distance between two different stochastic languages.
Generally, one language is extracted from the event log, and one language is extracted from
the process model.
With language, we mean a set of traces that is weighted according to its probability.
For the event log, trivially taking the set of variants of the log, and dividing by the
total number of languages, provides the language of the model.
We can see how the language of the model can be discovered. We can import an event log
and calculate its language:
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.statistics.variants.log import get as variants_module
log = xes_importer.apply("tests/input_data/running-example.xes")
language = variants_module.get_language(log)
print(language)
The same thing does not happen in a natural way for the process model. In order to calculate
a language for the process model, a scalable approach (but non deterministic) is to playout
the model in order to obtain an event log.
Let’s first apply the Alpha Miner.
Then, we do the playout of the Petri net. We choose the STOCHASTIC_PLAYOUT variant.
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
net, im, fm = alpha_miner.apply(log)
from pm4py.simulation.playout import simulator
playout_log = simulator.apply(net, im, fm, variant=simulator.Variants.STOCHASTIC_PLAYOUT)
We can then calculate the language of the event log:
Obtaining the language of the model.
Then, the earth mover distance is calculated:
It is assured that the two languages contain the same words: if a language does not
contain a word, that is set to 0
A common ordering (for example, alphabetical ordering) is decided among the keys of the
languages.
The distance between the different keys is calculated (using a string distance function
such as the Levensthein function).
This permits to obtain a number greater or equal than 0 that express the distance between
the language of the log and the language of the model. This is an alternative measure for
the precision. To calculate the Earth Mover Distance, the Python package pyemd should
be installed (pip install pyemd).
The code to apply the Earth Mover Distance is the following:
from pm4py.evaluation.earth_mover_distance import evaluator
emd = evaluator.apply(model_language, language)
print(emd)
If the running-example log is chosen along with the Alpha Miner model, a value similar/equal
to 0.1733.
WOFLAN
WOFLAN is a popular approach for soundness checking on workflow nets, that is able to provide
meaningful statistics to the final user. WOFLAN is described in this PhD thesis:
WOFLAN is applied to an accepting Petri net (a Petri net with an initial and final marking)
and applies the following steps (the meaning of the steps is found in the thesis):
Checking if the Petri net and the markings are valid.
Checking if the Petri net is a workflow net.
Checking if all the places are covered by S-components.
Checking if there are not well-handled pairs.
Checking if there are places that are uncovered in uniform invariants.
Checking if there are places that are uncovered in weighted invariants.
Checking if the WPD is proper.
Checking for substates in the MCG.
Checking if there are unbounded sequences.
Checking for dead tasks.
Checking for live tasks.
Checking for non-live tasks.
Checking for sequences leading to deadlocks.
The order of application is described by the picture at the following link.
If the step has positive outcome, a Yes is written on the corresponding edge. If the step
has a negative outcome, a No is written on the corresponding edge.
Let's see how Woflan can be applied. First, we open a XES log
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply("tests/input_data/running-example.xes")
And we discover a model using the Heuristics Miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
net, im, fm = heuristics_miner.apply(log)
Then, the soundness can be checked by doing:
from pm4py.evaluation.soundness.woflan import algorithm as woflan
is_sound = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: True,
woflan.Parameters.PRINT_DIAGNOSTICS: False,
woflan.Parameters.RETURN_DIAGNOSTICS: False})
In this case, is_sound contains a boolean value (True if the Petri net is a sound workflow
net; False otherwise).
The list of parameters are:
Parameter
Description
PRINT_DIAGNOSTICS
Enables the printing of the diagnostics on the Petri net, when WOFLAN is
executed.
RETURN_DIAGNOSTICS
Returns a dictionary containing the diagnostics.
RETURN_ASAP_WHEN_NOT_SOUND
Stops the execution of WOFLAN when a condition determining that the Petri net
is not a sound workflow net is found.
On the provided Petri net, that is not sound, the output of the technique is False.
To know why such Petri net is not sound, we repeat the execution of the script setting
PRINT_DIAGNOSTICS to True and RETURN_ASAP_WHEN_NOT_SOUND to False (to get more
diagnostics) We get the following messages during the execution:
There are places uncovered in uniform and weighted invariants.
It is an improper WPD.
Some sequences are unbounded.
To get the diagnostics in a dictionary, the execution can be repeated with:
from pm4py.evaluation.soundness.woflan import algorithm as woflan
is_sound, dictio_diagnostics = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: False,
woflan.Parameters.PRINT_DIAGNOSTICS: False,
woflan.Parameters.RETURN_DIAGNOSTICS: True})
The dictionary dictio_diagnostics may contain the following keys (if the computation reach
the corresponding step):
Key
Description
S_C_NET
PLACE_INVARIANTS
UNIFORM_PLACE_INVARIANTS
S_COMPONENTS
UNCOVERED_PLACES_S_COMPONENT
NOT_WELL_HANDLED_PAIRS
LEFT
UNCOVERED_PLACES_UNIFORM
WEIGHTED_PLACE_INVARIANTS
UNCOVERED_PLACES_WEIGHTED
MCG
DEAD_TASKS
R_G_S_C
R_G
LOCKING_SCENARIOS
RESTRICTED_COVERABILITY_TREE
Simulation
In PM4Py, we offer different simulation algorithms, that starting from a model,
are able to produce an output that follows the model and the different rules that have
been provided by the user.
Playout of a Petri Net
A playout of a Petri net takes as input a Petri net along with an initial marking,
and returns a list of process executions that are allowed from the process model.
We offer different types of playouts:
Variant
Description
Variants.BASIC_PLAYOUT
A basic playout that accepts a Petri net along with an initial marking, and returns a
specified number of process executions (repetitions may be possible).
Variants.EXTENSIVE
A playout that accepts a Petri net along with an initial marking, and returns all the
executions that are possible according to the model, up to a provided
length of trace (may be computationally expensive).
The list of parameters for such variants are:
Variant
Parameter
Description
Variants.BASIC_PLAYOUT
Parameters.ACTIVITY_KEY
The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY
The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY
The trace attribute that should be used as case identifier in the playout log.
Parameters.NO_TRACES
The number of traces that the playout log should contain.
Parameters.MAX_TRACE_LENGTH
The maximum trace length (after which, the playout of the trace is stopped).
Variants.EXTENSIVE
Parameters.ACTIVITY_KEY
The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY
The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY
The trace attribute that should be used as case identifier in the playout log.
Parameters.MAX_TRACE_LENGTH
The maximum trace length (after which, the extensive playout is stopped).
An example application of the basic playout, given a Petri net, to get a log of 50 traces,
is the following:
A time-related simulation permits to know how probable is that a process execution is terminated
after a given amount of time. This leads to a better estimation of Service Level Agreements, or a
better identification of the process instances that are most likely to have an high throughput time.
All this starts from a performance DFG, for example the one discovered from the
running-example log
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
from pm4py.statistics.start_activities.log import get as start_activities
from pm4py.statistics.end_activities.log import get as end_activities
sa = start_activities.get_start_activities(log)
ea = end_activities.get_end_activities(log)
and the knowledge of the case arrival ratio. The case arrival ratio is the amount of time
that passes (in average, or median) between the arrival of two consecutive cases. It can be
provided by the user or inferred from the event log. The inference from the event log is
done by using the following command:
from pm4py.statistics.traces.log import case_arrival
ratio = case_arrival.get_case_arrival_avg(log)
print(ratio)
Using the DFG mining approach, it is possible to retrieve a Petri net model from the DFG. This
kind of models is the “default” one for Monte Carlo simulation (because its execution semantics
is very clear). Moreover, the Petri net extracted by the DFG mining approach is a sound workflow
net (that gives other good properties to the model).
The DFG mining approach can be applied in the following way:
from pm4py.objects.conversion.dfg import converter
net, im, fm = converter.apply(dfg_perf, variant=converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE,
parameters={converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE.value.Parameters.START_ACTIVITIES: sa,
converter.Variants.VERSION_TO_PETRI_NET_ACTIVITY_DEFINES_PLACE.value.Parameters.END_ACTIVITIES: ea})
To perform a basic Montecarlo simulation, the following code can be used. The following is a
sort of resource-constrained simulation, where it is assumed that a place can hold at most 1
token per time. Later, we will see how to provide an higher number of tokens that can be
hosted by a place.
from pm4py.simulation.montecarlo import simulator as montecarlo_simulation
from pm4py.algo.conformance.tokenreplay.algorithm import Variants
parameters = {}
parameters[
montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.TOKEN_REPLAY_VARIANT] = Variants.BACKWARDS
parameters[montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.PARAM_CASE_ARRIVAL_RATIO] = 10800
simulated_log, res = montecarlo_simulation.apply(log, net, im, fm, parameters=parameters)
During the replay operation, some debug messages are written to the screen. The main outputs of
the simulation process are:
Key
Description of the value
simulated_log
The traces that have been simulated during the simulation.
res
The result of the simulation (Python dictionary).
Among res, that is the result of the simulation, we have the following keys:
Key
Description of the value
places_interval_trees
an interval tree for each place, that hosts an interval for each time when it was
“full” according to the specified maximum amount of tokens per place.
transitions_interval_trees
an interval tree for each transition, that contains all the time intervals in which
the transition was enabled but not yet fired (so, the time between a transition was
fully enabled and the consumption of the tokens from the input places)
cases_ex_time
a list containing the throughput times for all the cases of the log
median_cases_ex_time
the median throughput time of the cases in the simulated log
input_case_arrival_ratio
the case arrival ratio that was provided by the user, or automatically calculated
from the event log.
total_cases_time
the difference between the last timestamp of the log, and the first timestamp of the
simulated log.
The last four items of the previous list are simple Python objects (floats and lists in the
specific). The interval trees objects can be used in the following way to get time-specific
information. For example, the following code snippet
prints for a random transition in the model, the number of intervals that are overlapping
for 11 different points (including the minimum and the maximum timestamp in the log) that
are uniformly distributed across the time interval of the log.
import random
last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
pick_trans = random.choice(list(res["transitions_interval_trees"]))
print(pick_trans)
n_div = 10
i = 0
while i < n_div:
timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
print("\t", timestamp, len(res["transitions_interval_trees"][pick_trans][timestamp]))
i = i + 1
The following code snippet instead prints, for a random transition in the model, the number
of intervals that are overlapping for 11 different points (including the minimum and the
maximum timestamp of the log) that are uniformly distributed across the time interval of the
log:
import random
last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
pick_place = random.choice(list(res["places_interval_trees"]))
print(pick_place)
n_div = 10
i = 0
while i < n_div:
timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
print("\t", timestamp, len(res["places_interval_trees"][pick_place][timestamp]))
i = i + 1
The information can be used to build some graphs like these (using external programs such as
Microsoft Excel).
The simulation process can be resumed as follows:
An event log and a model (DFG) is considered.
Internally in the simulation, a replay operation is done between the log and the model.
The replay operation leads to the construction of a stochastic map that associates to each
transition a probability distribution (for example, a normal distribution, an exponential
distribution …). The probability distribution that maximizes the likelihood of the observed
values during the replay is chosen. The user can force a specific transition (like
exponential) if he wants.
Moreover, during the replay operation, the frequency of each transition is found. That helps
in picking in a “weighted” way one of the transitions enabled in a marking, when the
simulation occurs.
The simulation process occurs. For each one of the trace that are generated (the distance
between the start of them is fixed) a thread is spawned, stochastic choices are made. The
possibility to use a given place (depending on the maximum number of resources that is
possible to use) is given by a semaphore object in Python.
A maximum amount of time is specified for the simulation. If one or more threads exceed that
amount of time, the threads are killed and the corresponding trace is not added to the
simulation log.
Hence, several parameters are important in order to perform a Monte Carlo simulation. These
parameters, that are inside the petri_semaph_fifo class, are (ordered by importance).
Variant
Parameter
Description
Variants.PETRI_SEMAPH_FIFO
Parameters.PARAM_NUM_SIMULATIONS
Number of simulations that are performed (the goal is to have such number of traces
in the model)
Parameters.PARAM_CASE_ARRIVAL_RATIO
The case arrival ratio that is specified by the user.
Parameters.PARAM_MAP_RESOURCES_PER_PLACE
A map containing for each place of the Petri net the maximum amount of tokens
Parameters.PARAM_DEFAULT_NUM_RESOURCES_PER_PLACE
If the map of resources per place is not specified, then use the specified maximum
number of resources per place.
Parameters.PARAM_MAX_THREAD_EXECUTION_TIME
Specifies the maximum execution time of the simulation (for example, 60 seconds).
Parameters.PARAM_SMALL_SCALE_FACTOR
Specifies the ratio between the “real” time scale and the simulation time scale. A
higher ratio means that the simulation goes faster but is in general less accurate.
A lower ratio means that the simulation goes slower and is in general more accurate
(in providing detailed diagnostics). The default choice is 864000 seconds (10 days).
So that means that a second in the simulation is corresponding to 10 days of real
log.
Parameters.PARAM_ENABLE_DIAGNOSTICS
Enables the printing of the simulation diagnostics through the usage of the
“logging” class of Python
Parameters.ACTIVITY_KEY
The attribute of the log that should be used as activity
Parameters.TIMESTAMP_KEY
The attribute of the log that should be used as timestamp
Parameters.TOKEN_REPLAY_VARIANT
The variant of the token-based replay to use: token_replay,
the classic variant, that cannot handle duplicate transitions;
backwards, the backwards token-based replay, that is slower but can handle
invisible transitions.
Parameters.PARAM_FORCE_DISTRIBUTION
If specified, the distribution that is forced for the transitions (normal,
exponential, ...)
Parameters.PARAM_DIAGN_INTERVAL
The time interval in which diagnostics should be printed (for example, diagnostics
should be printed every 10 seconds).
Extensive Playout of a Process Tree
An extensive playout operation permits to obtain (up to the provided limits) the entire language
of the process model. Doing an extensive playout operation on a Petri net can be incredibly
expensive (the reachability graph needs to be explored). Process trees, with their bottom-up
structure, permit to obtain the entire language of an event log in a much easier way, starting
from the language of the leafs (that is obvious) and then following specific merge rules for the
operators.
However, since the language of a process tree can be incredibly vast (when parallel operators are
involved) or also infinite (when loops are involved), the extensive playouts is possible up to
some limits:
A specification of the maximum number of occurrences for a loop must be done, if a loop is
there. This stops an extensive playout operation at the given number of occurences.
Since the number of different executions, when loops are involved, is still incredibly big,
it is possible to specify the maximum length of a trace to be returned. So, traces that are
above the maximum length are automatically discarded.
For further limiting the number of different executions, the maximum number of traces
returned by the algorithm might be provided.
Moreover, from the structure of the process tree, it is easy to infer the minimum length of a
trace allowed by the process model (always following the bottom-up approach).
Some reasonable settings for the extensive playout are the following:
Overall, the maximum number of traces returned by the algorithm is set to 100000.
The maximum length of a trace that is an output of the playout is, by default, set to the
minimum length of a trace accepted by a process tree.
The maximum number of loops is set to be the minimum length of a trace divided by two.
The list of parameters are:
Parameter
Description
MAX_LIMIT_NUM_TRACES
Maximum number of traces that are returned by the algorithm.
MAX_TRACE_LENGTH
Maximum length of a trace that is output of the algorithm.
MAX_LOOP_OCC
Maximum number of times we enter in a loop.
In the following, we see how the playout can be executed. First, a log can be imported:
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
Then, a process tree can be discovered using the inductive miner algorithm.
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
tree = inductive_miner.apply_tree(log)
We specify to retrieve traces of length at most equal to 3, and we want to retrieve at most
100000 traces.
At this point, the extensive playout operation is done.
Social Network Analysis
In PM4Py we offer support for different Social Network Analysis metrics, and support for the
discovery of roles.
Handover of Work
The Handover of Work metric measures how many times an individual is followed by another
individual in the execution of a business process.
To calculate the Handover of Work metric, the following code could be used:
from pm4py.algo.enhancement.sna import algorithm as sna
hw_values = sna.apply(log, variant=sna.Variants.HANDOVER_LOG)
Then, a visualization could be obtained through the NetworkX or through the Pyvis:
from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_hw_py = sna_visualizer.apply(hw_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_hw_py, variant=sna_visualizer.Variants.PYVIS)
Subcontracting
The subcontracting metric calculates how many times the work of an individual is interleaved
by the work of some other individual, only to eventually “return” to the original
individual. To measure the subcontracting metric, the following code could be used:
from pm4py.algo.enhancement.sna import algorithm as sna
sub_values = sna.apply(log, variant=sna.Variants.SUBCONTRACTING_LOG)
Then, a visualization could be obtained through the NetworkX or through the Pyvis:
from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_sub_py = sna_visualizer.apply(sub_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_sub_py, variant=sna_visualizer.Variants.PYVIS)
Working Together
The Working together metric calculates how many times two individuals work together for
resolving a process instance. To measure the Working Together metric, the following code
could be used:
from pm4py.algo.enhancement.sna import algorithm as sna
wt_values = sna.apply(log, variant=sna.Variants.WORKING_TOGETHER_LOG)
Then, a visualization could be obtained through the NetworkX or through the Pyvis:
from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_wt_py = sna_visualizer.apply(wt_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_wt_py, variant=sna_visualizer.Variants.PYVIS)
Similar Activities
The Similar Activities metric calculates how much similar is the work pattern between two
individuals. To measure the Similar Activities metric, the following code could be used:
from pm4py.algo.enhancement.sna import algorithm as sna
ja_values = sna.apply(log, variant=sna.Variants.JOINTACTIVITIES_LOG)
Then, a visualization could be obtained through the NetworkX or through the Pyvis:
from pm4py.visualization.sna import visualizer as sna_visualizer
gviz_ja_py = sna_visualizer.apply(ja_values, variant=sna_visualizer.Variants.PYVIS)
sna_visualizer.view(gviz_ja_py, variant=sna_visualizer.Variants.PYVIS)
Roles Discovery
A role is a set of activities in the log that are executed by a similar (multi)set of resources.
Hence, it is a specific function into organization. Grouping the activities in roles can help:
An article on roles detection, that has inspired the technique implemented in PM4Py, is:
Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement
through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data
Mining (CIDM). IEEE, 2013.
In understanding which activities are executed by which roles.
By understanding roles itself (numerosity of resources for a single activity may not provide
enough explanation)
Initially, each activity corresponds to a different role, and is associated to the multiset of
his originators. After that, roles are merged according to their similarity, until no more
merges are possible.
First, you need to import a log:
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
After that, the role detection algorithm can be applied:
from pm4py.algo.enhancement.roles import algorithm as roles_discovery
roles = roles_discovery.apply(log)
We can print the sets of activities that are grouped in roles by doing
print([x[0] for x in roles]).
BPMN Support
In PM4Py, we offer support for importing/exporting/layouting BPMN diagrams. The support is
limited to the following BPMN elements:
Events (start / end events)
Tasks
Gateways (exclusive, parallel, inclusive)
Moreover, we offer support to conversion from/to some process models implemented in PM4Py
(such as Petri nets and BPMN diagrams).
BPMN 2.0 – Importing
The BPMN 2.0 XML files can be imported using the following instructions:
import pm4py
import os
bpmn_graph = pm4py.read_bpmn(os.path.join("tests", "input_data", "running-example.bpmn"))
BPMN 2.0 – Exporting
The BPMN models can be exported using the following instructions (here, bpmn_graph is
the Python object hosting the model).
import pm4py
import os
pm4py.write_bpmn(bpmn_graph, "ru.bpmn", enable_layout=True)
The enable_layout, set to True, enables the automatic layouting of the BPMN model before the
export.
BPMN 2.0 – Layouting
A layouting operation tries to give a good position to the nodes and the edges of the BPMN
diagram. For our purposes, we chose an octilinear edges layout.
The following commands perform the layouting:
from pm4py.objects.bpmn.layout import layouter
bpmn_graph = layouter.apply(bpmn_graph)
BPMN 2.0 – Conversion to Petri net
A conversion of a BPMN model into a Petri net model enables different PM4Py algorithms
(such as conformance checking and simulation algorithms), hence is a particularly important
operation.
To convert a BPMN model into an (accepting) Petri net, the following code can be used:
from pm4py.objects.conversion.bpmn import converter as bpmn_converter
net, im, fm = bpmn_converter.apply(bpmn_graph)
BPMN 2.0 – Conversion from a process tree
Process trees are important classes of block-structured processes (and the output of the
inductive miner algorithm). These models can be easily converted to BPMN models.
Let’s see an example. First, we import a XES event log, and we discover a model using the
inductive miner:
import pm4py
import os
log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
tree = pm4py.discover_tree_inductive(log)
Then, we can convert that to a BPMN graph:
from pm4py.objects.conversion.process_tree import converter
bpmn_graph = converter.apply(tree, variant=converter.Variants.TO_BPMN)
Directly-Follows Graphs
The directly-follows graphs are one of the simplest class of process models.
The nodes are the activities of the DFG. The edges report the number of times
two activities follow each other. In PM4Py, we offer support for advanced
operations on top of the directly-follows graphs.
The simplified interface of PM4Py returns the DFG along with the start and end
activities of the log (and their occurrences). For some of the following operations,
knowing the total number of occurrences of each activity is also needed.
In particular, the discovery of the directly-follows graph,
along with the start and end activities of the log, can be done using
the command:
import pm4py
dfg, sa, ea = pm4py.discover_dfg(log)
Instead the discovery of the activities of the log, along with the number of occurrences,
can be done, assuming that concept:name is the attribute reporting the activity,
using:
Directly-follows graphs can contain a huge number of activities and paths, with some of them
being outliers. In this section, we will see how to filter on the activities and paths of the
graph, keeping a subset of its behavior.
We can load an example log and calculate the directly-follows graph.
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
dfg, sa, ea = pm4py.discover_dfg(log)
activities_count = pm4py.get_attribute_values(log, "concept:name")
The filtering on the activities percentage is applied as in the following snippet.
The most frequent activities according to the percentage are kept, along with
all the activities that keep the graph connected. If a percentage of 0 % is specified,
then the most frequent activity (and the activities keeping the graph connected)
is retrieved.
Specifying 0.2 as in the example, we want to keep the 20% of activities.
The filter is applied concurrently to the DFG, to the start activities,
to the end activities, and to the dictionary containing the activity occurrences. In such
way, consistency is kept.
from pm4py.objects.dfg.filtering import dfg_filtering
dfg, sa, ea, activities_count = dfg_filtering.filter_dfg_on_activities_percentage(dfg, sa, ea, activities_count, 0.2)
The filtering on the paths percentage is applied as in the following snippet.
The most frequent paths according to the percentage are kept, along with
all the paths that are necessary to keep the graph connected. If a percentage of 0 % is specified,
then the most frequent path (and the paths keeping the graph connected)
is retrieved.
Specifying 0.2 as in the example, we want to keep the 20% of paths.
The filter is applied concurrently to the DFG, to the start activities,
to the end activities, and to the dictionary containing the activity occurrences. In such
way, consistency is kept.
from pm4py.objects.dfg.filtering import dfg_filtering
dfg, sa, ea, activities_count = dfg_filtering.filter_dfg_on_paths_percentage(dfg, sa, ea, activities_count, 0.2)
Playout of a DFG
A playout operation on a directly-follows graph is useful to retrieve the traces
that are allowed from the directly-follows graph. In this case, a trace is a set of activities
visited in the DFG from the start node to the end node. We can assign a probability to each
trace (assuming that the DFG represents a Markov chain). In particular, we are interested in
getting the most likely traces. In this section, we will see how to perform the playout of
a directly-follows graph.
We can load an example log and calculate the directly-follows graph.
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
dfg, sa, ea = pm4py.discover_dfg(log)
activities_count = pm4py.get_attribute_values(log, "concept:name")
Then, we can perform the playout operation.
from pm4py.objects.dfg.utils import dfg_playout
parameters = {}
simulated_log = dfg_playout.apply(dfg, sa, ea, parameters=parameters)
The playout operation can be customized using some parameters.
Parameter Key
Type
Default
Description
Parameters.ACTIVITY_KEY
string
concept:name
The activity key of the simulated log.
Parameters.TIMESTAMP_KEY
string
time:timestamp
The timestamp key of the simulated log.
Parameters.MAX_NO_VARIANTS
integer
3000
The maximum number of variants generated by the method.
Parameters.MIN_WEIGHTED_PROBABILITY
integer
1
The minimum overall weighted probability that makes the method stop.
Parameters.MAX_NO_OCC_PER_ACTIVITY
integer
2
The maximum number of occurrences per activity in the traces of the log.
Parameters.INTERRUPT_SIMULATION_WHEN_DFG_COMPLETE
boolean
False
Interrupts the simulation when the DFG of the simulated
log has the same keys to the DFG of the original log.
Parameters.ADD_TRACE_IF_TAKES_NEW_ELS_TO_DFG
boolean
False
Adds a simulated trace to the simulated log only if it adds
elements to the simulated DFG, e.g., it adds behavior;
skip insertion otherwise.
Parameters.RETURN_VARIANTS
boolean
False
Returns the traces as variants with a likely number of occurrences.
Alignments on a DFG
A popular conformance checking technique is the one of alignments. Alignments are usually
performed on Petri nets; however, this could take time, since the state space of Petri nets
can be huge. It is also possible to perform alignments on a directly-follows graph.
Since the state space of a directly-follows graph is small, the result is a very efficient
computation of alignments. This permits to get quick diagnostics on the activities and paths
that are executed in a wrong way. In this section, we will show an example on how to perform
alignments between a process execution and a DFG.
We can load an example log and calculate the directly-follows graph.
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
dfg, sa, ea = pm4py.discover_dfg(log)
activities_count = pm4py.get_attribute_values(log, "concept:name")
Then, we can perform alignments between the process executions of the log
and the DFG:
from pm4py.objects.dfg.utils import dfg_alignment
alignments = dfg_alignment.apply(simulated_log, dfg, sa, ea)
The output of the alignments is equivalent to the one obtained against Petri nets.
In particular, the output is a list containing for each trace the result of the alignment.
Each alignment consists in some moves from the start to the end of both the trace and the DFG.
We can have sync moves, moves on log (whether a move in the process execution is not mimicked by the DFG) and moves on model
(whether a move is needed in the model that is not supported by the process execution).
Convert Directly-Follows Graph to a Workflow Net
The Directly-Follows Graph is the representation of a process provided by many commercial
tools. An idea of Sander Leemans is about converting the DFG into a workflow net that
perfectly mimic the DFG. This is called DFG mining.
The following steps are useful to load the log, calculate the DFG, convert it into a
workflow net and perform alignments.
First, we have to import the log. Subsequently, we have to mine the Directly-Follow
graph. This DFG can then be converted to a workflow net.
from pm4py.objects.log.importer.xes import importer as xes_importer
import os
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg = dfg_discovery.apply(log)
from pm4py.objects.conversion.dfg import converter as dfg_mining
net, im, fm = dfg_mining.apply(dfg)
CTMC Simulation (DFG)
A time-related simulation permits to know how probable is that a process execution is terminated
after a given amount of time.
This leads to a better estimation of Service Level Agreements, or a better identification of the
process instances that are most likely to have an high throughput time.
All this starts from a performance DFG, for example the one discovered from the
running-example log
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
from pm4py.statistics.start_activities.log import get as start_activities
from pm4py.statistics.end_activities.log import get as end_activities
sa = start_activities.get_start_activities(log)
ea = end_activities.get_end_activities(log)
For the simulation model, a CTMC (Continuous Time Markov Chain) is built from the DFG. This
model is very powerful, assuming that the frequency of the edges outgoing an activity is
similar. If that is not the case (for example, when an outgoing edge has frequency 1 and the
other 10000) the model works less well.
In order to ensure that the DFG contains, as much as possible, only frequent arcs, a
filtering operation needs to be applied. For example, it is possible to use the
variants-based filtering on the log. An example of application of variants-based filtering
is:
from pm4py.algo.filtering.log.variants import variants_filter
log = variants_filter.apply_auto_filter(log)
Given that the edge contains the average of time between the states, it is assumed by the CTMC
that the distribution of times follows an exponential distribution with the given average.
The simulation model can be easily constructed by doing:
from pm4py.objects.stochastic_petri import ctmc
reach_graph, tang_reach_graph, stochastic_map, q_matrix = ctmc.get_tangible_reachability_and_q_matrix_from_dfg_performance(dfg_perf, parameters={"start_activities": sa, "end_activities": ea})
print(tang_reach_graph.states)
The last line prints the states of the model, that are:
“source1” is the source state of the model (that is implicitly connected to the start activity
“register request”). “sink1” is the terminal state of the model (that is implicitly connected to
the end activities “pay compensation” and “reject request”). The other states of the model are
the ones in which you go after executing the corresponding activity (for example, “decide1” is
the state in which you sink after a “Decide” activity).
Starting from “source1”, we would like to know how much probable is that a process execution
is already over after 2 days. To do that, we perform a transient analysis starting from the
state “source1” specifiying as 172800 (2 days) the number of seconds:
# pick the source state
state = [x for x in tang_reach_graph.states if x.name == "source1"][0]
# analyse the distribution over the states of the system starting from the source after 172800.0 seconds (2 days)
transient_result = ctmc.transient_analysis_from_tangible_q_matrix_and_single_state(tang_reach_graph, q_matrix, state,
172800.0)
That means that we have the 22,72% of probability to have already finished the process execution
(being in the “sink” state) after 2 days. Let’s calculate that for 100 days (8640000 seconds):
According to the model, we have probability 99,999999995 % to have finished the process after
100 days! That is practically 100%.
Suppose we know how much probable is that, after a decision, the end of the process is
reached in 10 days.
This can be done:
state = [x for x in tang_reach_graph.states if x.name == "decide1"][0]
transient_result = ctmc.transient_analysis_from_tangible_q_matrix_and_single_state(tang_reach_graph, q_matrix, state,
864000.0)
print(transient_result)
so we have probability 92,9% to “sink” in 10 days after a decision.
Streaming Process Mining
Streaming Package General Structure
In PM4Py, we offer support for streaming process mining functionalities, including:
Streaming process discovery (DFG)
Streaming conformance checking (footprints and TBR)
Streaming importing of XES/CSV files
The management of the stream of events is done by the pm4py.streaming.stream.live_event_stream.LiveEventStream
class.
This class provides access to two methods:
register(algo): registers a new algorithm to the live event stream (that will be
notified when an event is added to the stream.
append(event): adds an event to the live event stream.
The LiveEventStream processes the incoming events using a thread pool. This helps to
manage a “flood” of events using a given number of different threads.
For the streaming algorithms, that are registered to the LiveEventStream, we provide an
interface that should be implemented. The following methods should be implemented inside each
streaming algorithm:
_process(event): a method that accepts and process an incoming event.
_current_result(): a method that returns the current state of the streaming
algorithm.
Streaming Process Discovery (Directly-Follows Graph)
The following example will show how to discover a DFG from a stream of events.
Let’s first define the (live) event stream:
from pm4py.streaming.stream.live_event_stream import LiveEventStream
live_event_stream = LiveEventStream()
Then, create the streaming DFG discovery object (that will contain the list of activities
and relationships inside the DFG):
from pm4py.streaming.algo.discovery.dfg import algorithm as dfg_discovery
streaming_dfg = dfg_discovery.apply()
Then, we need to register the streaming DFG discovery to the stream:
live_event_stream.register(streaming_dfg)
And start the stream:
live_event_stream.start()
To put some known event log in the stream, we need to import a XES log:
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))
And then convert that to a static event stream:
from pm4py.objects.conversion.log import converter as stream_converter
static_event_stream = stream_converter.apply(log, variant=stream_converter.Variants.TO_EVENT_STREAM)
Then, we can add all the events to the live stream:
for ev in static_event_stream:
live_event_stream.append(ev)
Then, stopping the stream, we make sure that the events in the queue are fully processed:
live_event_stream.stop()
At the end, we can get the directly-follows graph, along with the activities of the graph,
the set of start and end activities, by doing:
dfg, activities, sa, ea = streaming_dfg.get()
If we do print(dfg) on the running-example.xes log we obtain:
The following examples will show how to check conformance against a stream of events with the
footprints and token-based replay algorithms. For both the examples that follow, we assume to
work with the running-example.xes log and with a log discovered using inductive miner
infrequent with the default noise threshold (0.2).
The following code can be used to import the running-example.xes log
import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))
And convert that to a static stream of events:
from pm4py.objects.conversion.log import converter as log_converter
static_event_stream = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_STREAM)
Then, the following code can be used to discover a process tree using the inductive miner:
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
tree = inductive_miner.apply_tree(log, variant=inductive_miner.Variants.IMf)
And convert that to a Petri net:
from pm4py.objects.conversion.process_tree import converter as pt_converter
net, im, fm = pt_converter.apply(tree)
Now, we can apply the streaming TBR.
Then, we create a live event stream:
from pm4py.streaming.stream.live_event_stream import LiveEventStream
live_event_stream = LiveEventStream()
And the streaming token-based replay algorithm:
from pm4py.streaming.algo.conformance.tbr import algorithm as tbr_algorithm
streaming_tbr = tbr_algorithm.apply(net, im, fm)
Moreover, we can register that to the live event stream:
live_event_stream.register(streaming_tbr)
And start the live event stream:
live_event_stream.start()
After that, we can add each event of the log to the live event stream:
for ev in static_event_stream:
live_event_stream.append(ev)
And then, stop the event stream:
live_event_stream.stop()
And get statistics on the execution of the replay (how many missing tokens were needed?) as
a Pandas dataframe. This method can be called throughout the lifecycle of the stream,
providing the “picture” of the replay up to that point:
In addition to this, the following methods are available inside the streaming TBR that print
some warning during the replay. The methods can be overriden easily (for example, to send the
message with mail):
message_case_or_activity_not_in_event
message_activity_not_possible
message_missing_tokens
message_case_not_in_dictionary
message_final_marking_not_reached
Streaming Conformance Checking (footprints)
Footprints is another conformance checking method offered in PM4Py, which can be implemented in
the context of streaming events. In the following, we see an application of the streaming
footprints.
First of all, we can discover the footprints from the process model:
from pm4py.algo.discovery.footprints import algorithm as fp_discovery
footprints = fp_discovery.apply(tree)
Then, we can create the live event stream:
from pm4py.streaming.stream.live_event_stream import LiveEventStream
live_event_stream = LiveEventStream()
Then, we can create the streaming footprints object:
from pm4py.streaming.algo.conformance.footprints import algorithm as fp_conformance
streaming_footprints = fp_conformance.apply(footprints)
And register that to the stream:
live_event_stream.register(streaming_footprints)
After that, we can start the live event stream:
live_event_stream.start()
And append every event of the original log to this live event stream:
for ev in static_event_stream:
live_event_stream.append(ev)
In addition to this, the following methods are available inside the streaming footprints that
print some warning during the replay. The methods can be overriden easily (for example, to send
the message with mail):
message_case_or_activity_not_in_event
message_activity_not_possible
message_footprints_not_possible
message_start_activity_not_possible
message_end_activity_not_possible
message_case_not_in_dictionary
Streaming Conformance Checking (Temporal Profile)
We propose in PM4Py an implementation of the temporal profile model. This has been described in:
Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).
A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the
provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log
where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple)
is associated to a couple of numbers, the first is the average and the second is the average standard deviation.
It is possible to use a temporal profile to perform conformance checking on an event log.
The times between the couple of activities in the log are assessed against the numbers stored in the temporal profile. Specifically,
a value is calculated that shows how many standard deviations the value is different from the average. If that value exceeds a threshold (by default set to 6,
according to the six-sigma principles), then the couple of activities is signaled.
In PM4Py, we provide a streaming conformance checking algorithm based on the temporal profile.
The algorithm checks an incoming event against every event that happened previously in the case,
identifying deviations according to the temporal profile. This section provides an example where
a temporal profile is discovered, the streaming conformance checking is set-up and actually a log
is replayed on the stream.
We can load an event log, and apply the discovery algorithm.
import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery
log = pm4py.read_xes("tests/input_data/running-example.xes")
parameters = {}
temporal_profile = temporal_profile_discovery.apply(log, parameters=parameters)
We create the stream, register the temporal conformance checking algorithm and start the stream.
The conformance checker can be created with some parameters.
from pm4py.streaming.stream.live_event_stream import LiveEventStream
from pm4py.streaming.algo.conformance.temporal import algorithm as temporal_conformance_checker
stream = LiveEventStream()
parameters = {}
temp_cc = temporal_conformance_checker.apply(temporal_profile, parameters={})
stream.register(temp_cc)
stream.start()
Parameter Key
Type
Default
Description
Parameters.CASE_ID_KEY
string
case:concept:name
The attribute to use as case ID.
Parameters.ACTIVITY_KEY
string
concept:name
The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY
string
start_timestamp
The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY
string
time:timestamp
The attribute to use as timestamp.
Parameters.ZETA
int
6
Multiplier for the standard deviation. Couples of events that are more distant than this are signaled by the temporal profile.
We send the events of the log against the stream:
static_stream = pm4py.convert_to_event_stream(log)
for event in static_stream:
stream.append(event)
During the execution of the streaming temporal profile conformance checker, some warnings
are printed if a couple of events violate the temporal profile. Moreover, it is also possible to get
a dictionary containing the cases with deviations associated with all their deviations.
The following code is useful to get the results of the streaming temporal profile conformance
checking.
stream.stop()
res = temp_cc.get()
Streaming Importer (XES trace-by-trace)
In order to be able to process the traces of a XES event log that might not fit in the memory,
or when a sample of a big log is needed, the usage of the XES trace-by-trace streaming importer
helps to cope with the situation.
The importer can be used in a natural way, providing the path to the log:
from pm4py.streaming.importer.xes import importer as xes_importer
streaming_ev_object = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"), variant=xes_importer.Variants.XES_EVENT_STREAM)
And it is possible to iterate over the traces of this log (that are read trace-by-trace):
for trace in streaming_log_object:
print(trace)
Streaming Importer (XES event-by-event)
In order to be able to process the events of a XES event log that might not fit in the memory,
or when the sample of a big log is needed, the usage of the XES event-by-event streaming
importer helps to cope with the situation. In this case, the single events inside the traces are
picked during the iteration.
The importer can be used in a natural way, providing the path to the log:
from pm4py.streaming.importer.xes import importer as xes_importer
streaming_ev_object = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"), variant=xes_importer.Variants.XES_EVENT_STREAM)
And it is possible to iterate over the single events of this log (that are read during the
iteration):
for event in streaming_ev_object:
print(event)
Streaming Importer (CSV event-by-event)
In order to be able to process the events of a CSV event log that might not fit in the memory,
or when the sample of a big log is needed, Pandas might not be feasible. In this case, the
single rows of the CSV file are parsed during the iteration.
The importer can be used in a natural way, providing the path to a CSV log:
from pm4py.streaming.importer.csv import importer as csv_importer
log_object = csv_importer.apply(os.path.join("tests", "input_data", "running-example.csv"))
And it is possible to iterate over the single events of this log (that are read during the
iteration):