Supported/Described Version(s): pm4py 2.2.32
This documentation assumes that the reader has a basic understanding of process mining and python concepts.

Handling Event Data

In this section, information about importing and exporting event logs, stored in various data formats, is presented. Before we dive into the details of importing and exporting various different types of files containing event data, we first briefly explain the two basic notions of event data used within pm4py. We assume the reader to be farmiliar with the general concept of an event log. In general, we distingiush between two different event data object types:

  • Event Stream (objects.log.obj.EventStream); Simply represents a sequence of events. Events themselves are simply an extension of the Mapping class of python (collections.abc.Mapping), which allows us to use events as a dict. From a programming perspective, an Event Stream behaves exactly like a list object in Python. However, when applying lambda functions, the result needs to be explicitly casted to an EventStream object.
  • Event Log (objects.log.obj.EventLog); Represents a sequence of sequences of events. The concept of an event log is the more traditional view on event data, i.e., executions of a process are captured in traces of events. However, in pm4py, the Event Log maintains an order of traces. In this way, sorting traces using some specific sorting criterion is supported naturally, and, lambda functions and filters are easily applied on top of Event Logs as well.

Importing IEEE XES files

IEEE XES is a standard format describing how event logs are stored. For more information about the format, please study the IEEE XES Website. A simple synthetic event log (running-example.xes) can be downloaded from here. Note that several real event logs have been made available, over the past few years. You can find them here.


The example code on the right shows how to import an event log, stored in the IEEE XES format, given a file path to the log file. The code fragment uses the standard importer (iterparse, described in a later paragraph). Note that IEEE XES Event Logs are imported into an Event Log object, i.e., as described earlier.

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes('tests/input_data/running-example.xes')

Event logs are stored as an extension of the Python list data structure. To access a trace in the log, it is enough to provide its index in the event log. Consider the example on the right on how to access the different objects stored in the imported log.


if __name__ == "__main__":
    print(log[0]) #prints the first trace of the log
    print(log[0][0]) #prints the first event of the first trace

Importing CSV files

Apart from the IEEE XES standard, a lot of event logs are actually stored in a CSV file. In general, there is two ways to deal with CSV files in pm4py:

  • Import the CSV into a pandas DataFrame; In general, most existing algorithms in pm4py are coded to be flexible in terms of their input, i.e., if a certain event log object is provided that is not in the right form, we translate it to the appropriate form for you. Hence, after importing a dataframe, most algorithms are directly able to work with the data frame.
  • Convert the CSV into an event log object (similar to the result of the IEEE XES importer presented in the previous section); In this case, the first step is to import the CSV file using pandas (similar to the previous bullet) and subsequently converting it to the event log object. In the remainder of this section, we briefly highlight how to convert a pandas DataFrame to an event log. Note that most algorithms use the same type of conversion, in case a given event data object is not of the right type.

The example code on the right shows how to convert a CSV file into the pm4py internal event data object types. By default, the converter converts the dataframe to an Event Log object (i.e., not an Event Stream).


import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe = pd.read_csv('tests/input_data/running-example.csv', sep=',')
    dataframe = pm4py.format_dataframe(dataframe, case_id='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
    event_log = pm4py.convert_to_event_log(dataframe)

Note that the example code above does not directly work in a lot of cases. Let us consider a very simple example event log, and, assume it is stored as a csv-file:

CaseID Activity Timestamp clientID
1 register request 20200422T0455 1337
2 register request 20200422T0457 1479
1 submit payment 20200422T0503 1337

In this small example table, we observe four columns, i.e., CaseID, Activity, Timestamp and clientID. Clearly, when importing the data and converting it to an Event Log object, we aim to combine all rows (events) with the same value for the CaseID column together. Another interesting phenomenon in the example data is the fourth column, i.e., clientID. In fact, the client ID is an attribute that will not change over the course of execution a process instance, i.e., it is a case-level attribute. pm4py allows us to specify that a column actually describes a case-level attribute (under the assumption that the attribute does not change during the execution of a process).


The example code on the right shows how to convert the previously examplified csv data file. After loading the csv file of the example table, we rename the clientID column to case:clientID (this is a specific operation provided by pandas!).

import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe = pd.read_csv('tests/input_data/running-example-transformed.csv', sep=',')
    dataframe = dataframe.rename(columns={'clientID': 'case:clientID'})
    dataframe = pm4py.format_dataframe(dataframe, case_id='CaseID', activity_key='Activity', timestamp_key='Timestamp')
    event_log = pm4py.convert_to_event_log(dataframe)

Converting Event Data

In this section, we describe how to convert event log objects from one object type to another object type. There are three objects, which we are able to 'switch' between, i.e., Event Log, Event Stream and Data Frame objects. Please refer to the previous code snippet for an example of applying log conversion (applied when importing a CSV object). Finally, note that most algorithms internally use the converters, in order to be able to handle an input event data object of any form. In such a case, the default parameters are used.

To convert from any object to an event log, the following method can be used:

import pm4py
if __name__ == "__main__":
    event_log = pm4py.convert_to_event_log(dataframe)

To convert from any object to an event stream, the following method can be used:

import pm4py
if __name__ == "__main__":
    event_stream = pm4py.convert_to_event_stream(dataframe)

To convert from any object to a dataframe, the following method can be used:

import pm4py
if __name__ == "__main__":
    dataframe = pm4py.convert_to_dataframe(dataframe)

Exporting IEEE XES files

Exporting an Event Log object to an IEEE Xes file is fairly straightforward in pm4py. Consider the example code fragment on the right, which depicts this functionality.

import pm4py
if __name__ == "__main__":
    pm4py.write_xes(log, 'exported.xes')

In the example, the log object is assumed to be an Event Log object. The exporter also accepts an Event Stream or DataFrame object as an input. However, the exporter will first convert the given input object into an Event Log. Hence, in this case, standard parameters for the conversion are used. Thus, if the user wants more control, it is advisable to apply the conversion to Event Log, prior to exporting.

Exporting logs to CSV

To export an event log to a csv-file, pm4py uses Pandas. Hence, an event log is first converted to a Pandas Data Frame, after which it is written to disk.

import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe = pm4py.convert_to_dataframe(log)
    dataframe.to_csv('exported.csv')
                                

In case an event log object is provided that is not a dataframe, i.e., an Event Log or Event Stream, the conversion is applied, using the default parameter values, i.e., as presented in the Converting Event Data section. Note that exporting event data to as csv file has no parameters. In case more control over the conversion is needed, please apply a conversion to dataframe first, prior to exporting to csv.

I/O with Other File Types

At this moment, I/O of any format supported by Pandas (dataframes) is implicitly supported. As long as data can be loaded into a Pandas dataframe, pm4py is reasonably able to work with such files.

Filtering Event Data

pm4py also has various specific methods to filter an event log.

Filtering on timeframe

In the following paragraph, various methods regarding filtering with time frames are present. For each of the methods, the log and Pandas Dataframe methods are revealed.

One might be interested in only keeping the traces that are contained in a specific interval, e.g. 09 March 2011 and 18 January 2012.

import pm4py
if __name__ == "__main__":
    filtered_log = pm4py.filter_time_range(log, "2011-03-09 00:00:00", "2012-01-18 23:59:59", mode='traces_contained')

However, it is also possible to keep the traces that are intersecting with a time interval.

import pm4py
if __name__ == "__main__":
    filtered_log = pm4py.filter_time_range(log, "2011-03-09 00:00:00", "2012-01-18 23:59:59", mode='traces_intersecting')

Until now, only trace based techniques have been discussed. However, there is a method to keep the events that are contained in specific timeframe.

import pm4py
if __name__ == "__main__":
    filtered_log = pm4py.filter_time_range(log, "2011-03-09 00:00:00", "2012-01-18 23:59:59", mode='events')

Filter on case performance

This filter permits to keep only traces with duration that is inside a specified interval. In the examples, traces between 1 and 10 days are kept. Note that the time parameters are given in seconds.

import pm4py
if __name__ == "__main__":
    filtered_log = pm4py.filter_case_performance(log, 86400, 864000)

Filter on start activities

In general, pm4py is able to filter a log or a dataframe on start activities.


First of all, it might be necessary to know the starting activities. Therefore, code snippets are provided. Subsequently, an example of filtering is provided. The first snippet is working with log object, the second one is working on a dataframe. log_start is a dictionary that contains as key the activity and as value the number of occurrence.

import pm4py
if __name__ == "__main__":
    log_start = pm4py.get_start_activities(log)
    filtered_log = pm4py.filter_start_activities(log, ["S1"]) #suppose "S1" is the start activity you want to filter on

Filter on end activities

In general, pm4py offers the possibility to filter a log or a dataframe on end activities.

This filter permits to keep only traces with an end activity among a set of specified activities. First of all, it might be necessary to know the end activities. Therefore, a code snippet is provided.

import pm4py
if __name__ == "__main__":
    end_activities = pm4py.get_end_activities(log)
    filtered_log = pm4py.filter_end_activities(log, ["pay compensation"])

Filter on variants

A variant is a set of cases that share the same control-flow perspective, so a set of cases that share the same classified events (activities) in the same order. In this section, we will focus for all methods first on log objects, then we will continue with the dataframe.

To retrieve the variants from the log, the code snippet can be used:

import pm4py
if __name__ == "__main__":
    variants = pm4py.get_variants(log)

To filter on a given collection of variants, the following code snippet can be used:

import pm4py
if __name__ == "__main__":
    variants = pm4py.filter_variants(log, ["A,B,C,D", "A,E,F,G", "A,C,D"])

Other variants-based filters are offered.

The filters on the top-k variants keeps in the log only the cases following one of the k most frequent variants:

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/receipt.xes")
    k = 2
    filtered_log = pm4py.filter_variants_top_k(log, k)
The filters on variants coverage keeps the cases following the top variants of the log, following the conditions that each variant covers the specified percentage of cases in the log. If min_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 1 and variant 2

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/receipt.xes")
    perc = 0.1
    filtered_log = pm4py.filter_variants_by_coverage_percentage(log, perc)
                            

Filter on attributes values

Filtering on attributes values permits alternatively to:

  • Keep cases that contains at least an event with one of the given attribute values
  • Remove cases that contains an event with one of the the given attribute values
  • Keep events (trimming traces) that have one of the given attribute values
  • Remove events (trimming traces) that have one of the given attribute values

Example of attributes are the resource (generally contained in org:resource attribute) and the activity (generally contained in concept:name attribute). As noted before, the first method can be applied on log objects, the second on dataframe objects.

To get the list of resources and activities contained in the log, the following code could be used.

import pm4py
if __name__ == "__main__":
    activities = pm4py.get_event_attribute_values(log, "concept:name")
    resources = pm4py.get_event_attribute_values(log, "org:resource")

To filter traces containing/not containing a given list of resources, the following code could be used.


if __name__ == "__main__":
    tracefilter_log_pos = pm4py.filter_event_attribute_values(log, "org:resource", ["Resource10"], level="case", retain=True)
    tracefilter_log_neg = pm4py.filter_event_attribute_values(log, "org:resource", ["Resource10"], level="case", retain=False)

It is also possible to keep only the events performed by a given list of resources (trimming the cases). The following code can be used.


if __name__ == "__main__":
    tracefilter_log_pos = pm4py.filter_event_attribute_values(log, "org:resource", ["Resource10"], level="event", retain=True)
    tracefilter_log_neg = pm4py.filter_event_attribute_values(log, "org:resource", ["Resource10"], level="event", retain=False)

Filter on numeric attribute values

Filtering on numeric attribute values provide options that are similar to filtering on string attribute values (that we already considered).

First, we import, the log. Subsequently, we want to keep only the events satisfying an amount comprised between 34 and 36. An additional filter aims to to keep only cases with at least an event satisfying the specified amount. The filter on cases provide the option to specify up to two attributes that are checked on the events that shall satisfy the numeric range. For example, if we are interested in cases having an event with activity Add penalty that has an amount between 34 and 500, a code snippet is also provided.

import os
import pandas as pd
import pm4py

if __name__ == "__main__":
    df = pd.read_csv(os.path.join("tests", "input_data", "roadtraffic100traces.csv"))
    df = pm4py.format_dataframe(df)

    from pm4py.algo.filtering.pandas.attributes import attributes_filter
    filtered_df_events = attributes_filter.apply_numeric_events(df, 34, 36,
                                                 parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

    filtered_df_cases = attributes_filter.apply_numeric(df, 34, 36,
                                                 parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount"})

    filtered_df_cases = attributes_filter.apply_numeric(df, 34, 500,
                                                 parameters={attributes_filter.Parameters.CASE_ID_KEY: "case:concept:name", attributes_filter.Parameters.ATTRIBUTE_KEY: "amount",
                                                             attributes_filter.Parameters.STREAM_FILTER_KEY1: "concept:name",
                                                             attributes_filter.Parameters.STREAM_FILTER_VALUE1: "Add penalty"})

Between Filter

The between filter transforms the event log by identifying, in the current set of cases, all the subcases going from a source activity to a target activity.

This is useful to analyse in detail the behavior in the log between such couple of activities (e.g., the throughput time, which activities are included, the level of conformance).

The between filter between two activities is applied as follows.


import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")

    filtered_log = pm4py.filter_between(log, "check ticket", "decide")
                                

Case Size Filter

The case size filter keeps only the cases in the log with a number of events included in a range that is specified by the user.

This can have two purposes: eliminating cases that are too short (which are obviously incomplete or outliers), or are too long (too much rework).

The case size filter can be applied as follows:


import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")

    filtered_log = pm4py.filter_case_size(log, 5, 10)
                                

Rework Filter

The filter described in this section has the purpose to identify the cases where a given activity has been repeated.

The rework filter is applied as follows. In this case, we search for all the cases having at least 2 occurrences of the activity reinitiate request.


import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")

    filtered_log = pm4py.filter_activities_rework(log, "reinitiate request", 2)
                                

Paths Performance Filter

The paths performance filter identifies the cases in which a given path between two activities takes a duration that is included in a range that is specified by the user.

This can be useful to identify the cases in which a large amount of time is passed between two activities.

The paths filter is applied as follows. In this case, we are looking for cases containing at least one occurrence of the path between decide and pay compensation having a duration included between 2 days and 10 days (where each day has a duration of 86400 seconds).


import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")

    filtered_log = pm4py.filter_paths_performance(log, ("decide", "pay compensation"), 2*86400, 10*86400)
                                

Object-Centric Event Logs

In pm4py we offer support for object-centric event logs (importing/exporting).

Motivation

Traditional event logs, used by mainstream process mining techniques, require the events to be related to a case. A case is a set of events for a particular purpose. A case notion is a criteria to assign a case to the events.

However, in real processes this leads to two problems:

  • If we consider the Order-to-Cash process, an order could be related to many different deliveries. If we consider the delivery as case notion, the same event of Create Order needs to be replicated in different cases (all the deliveries involving the order). This is called the convergence problem.
  • If we consider the Order-to-Cash process, an order could contain different order items, each one with a different lifecycle. If we consider the order as case notion, several instances of the activities for the single items may be contained in the case, and this make the frequency/performance annotation of the process problematic. This is called the divergence problem.

Object-centric event logs relax the assumption that an event is related to exactly one case. Indeed, an event can be related to different objects of different object types.

Essentially, we can describe the different components of an object-centric event log as:

  • Events, having an identifier, an activity, a timestamp, a list of related objects and a dictionary of other attributes.
  • Objects, having an identifier, a type and a dictionary of other attributes.
  • Attribute names, e.g., the possible keys for the attributes of the event/object attribute map.
  • Object types, e.g., the possible types for the objects.

Supported Formats

Several historical formats (OpenSLEX, XOC) have been proposed for the storage of object-centric event logs. In particular, the OCEL standard proposes lean and intercompatible formats for the storage of object-centric event logs. These include:

  • XML-OCEL: a storage format based on XML for object-centric event logs. An example of XML-OCEL event log is reported here.
  • JSON-OCEL: a storage format based on JSON for object-centric event logs. An example of JSON-OCEL event log is reported here.

Among the commonalities of these formats, the event/object identifier is ocel:id, the activity identifier is ocel:activity, the timestamp of the event is ocel:timestamp, the type of the object is ocel:type. Moreover, the list of related objects for the events is identified by ocel:omap, the attribute map for the events is identified by ocel:vmap, the attribute map for the objects is identified by ocel:ovmap.

Ignoring the attributes at the object level, we can also represent the object-centric event log in a CSV format (an example is reported here). There, a row represent an event, where the event identifier is ocel:eid, and the related objects for a given type OTYPE are reported as a list under the voice ocel:type:OTYPE.

Importing/Export OCELs

For all the supported formats, an OCEL event log can be read by doing:


import pm4py

if __name__ == "__main__":
    path = "tests/input_data/ocel/example_log.jsonocel"
    ocel = pm4py.read_ocel(path)
                                

An OCEL can also be exported easily by doing (ocel is assumed to be an object-centric event log):


import pm4py

if __name__ == "__main__":
    path = "./output.jsonocel"
    pm4py.write_ocel(ocel, path)
                                

Basic Statistics on OCELs

We offer some basic statistics that can be calculated on OCELs.

The simplest way of obtaining some statistics on OCELs is by doing the print of the OCEL object:


if __name__ == "__main__":
    print(ocel)
                                

In the previous case, some statistics will be printed as follows:

Object-Centric Event Log (number of events: 23, number of objects: 15, number of activities: 15, number of object types: 3, events-objects relationships: 39)
Activities occurrences: {'Create Order': 3, 'Create Delivery': 3, 'Delivery Successful': 3, 'Invoice Sent': 2, 'Payment Reminder': 2, 'Confirm Order': 1, 'Item out of Stock': 1, 'Item back in Stock': 1, 'Delivery Failed': 1, 'Retry Delivery': 1, 'Pay Order': 1, 'Remove Item': 1, 'Cancel Order': 1, 'Add Item to Order': 1, 'Send for Credit Collection': 1}
Object types occurrences: {'element': 9, 'order': 3, 'delivery': 3}
Please use ocel.get_extended_table() to get a dataframe representation of the events related to the objects.

The retrieval of the names of the attributes in the log can be obtained doing:


if __name__ == "__main__":
    attribute_names = pm4py.ocel_get_attribute_names(ocel)
                                

The retrieval of the object types contained in the event log can be otained doing:


if __name__ == "__main__":
    attribute_names = pm4py.ocel_get_object_types(ocel)
                                

The retrieval of a dictionary containing the set of activities for each object type can be obtained using the command on the right. In this case, the key of the dictionary will be the object type, and the value the set of activities which appears for the object type.


if __name__ == "__main__":
    object_type_activities = pm4py.ocel_object_type_activities(ocel)
                                

It is possible to obtain for each event identifier and object type the number of related objects to the event. The output will be a dictionary where the first key will be the event identifier, the second key will be the object type and the value will be the number of related objects per type.


if __name__ == "__main__":
    ocel_objects_ot_count = pm4py.ocel_objects_ot_count(ocel)
                                

It is possible to calculate the so-called temporal summary of the object-centric event log. The temporal summary is a table (dataframe) in which the different timestamps occurring in the log are reported along with the set of activities happening in a given point of time and the objects involved in such


if __name__ == "__main__":
    temporal_summary = pm4py.ocel_temporal_summary(ocel)
                                

It is possible to calculate the so-called objects summary of the object-centric event log. The objects summary is a table (dataframe) in which the different objects occurring in the log are reported along with the list of activities of the events related to the object, the start/end timestamps of the lifecycle, the duration of the lifecycle and the other objects related to the given object in the interaction graph.


if __name__ == "__main__":
    temporal_summary = pm4py.ocel_objects_summary(ocel)
                                

Internal Data Structure

In this section, we describe the data structure used in pm4py to store object-centric event logs. We have in total three Pandas dataframes:

  • The events dataframe: this stores a row for each event. Each row contains the event identifier (ocel:eid), the activity (ocel:activity), the timestamp (ocel:timestamp), and the values for the other event attributes (one per column).
  • The objects dataframe: this stores a row for each object. Each row contains the object identifier (ocel:oid), the type (ocel:type), and the values for the object attributes (one per column).
  • The relations dataframe: this stores a row for every relation event->object. Each row contains the event identifier (ocel:eid), the object identifier (ocel:oid), the type of the related object (ocel:type).

These dataframes can be accessed as properties of the OCEL object (e.g., ocel.events, ocel.objects, ocel.relations), and be obviously used for any purposes (filtering, discovery).

Filtering Object-Centric Event Logs

In this section, we describe some filtering operations available in pm4py and specific for object-centric event logs. There are filters at three levels:

  • Filters at the event level (operating first at the ocel.events structure and then propagating the result to the other parts of the object-centric log).
  • Filters at the object level (operating first at the ocel.objects structure and then propagating the result to the other parts of the object-centric log).
  • Filters at the relations level (operating first at the ocel.relations structure and then propagating the result to the other parts of the object-centric log).

Filter on Event Attributes

We can keep the events with a given attribute falling inside the specified list of values by using pm4py.filter_ocel_event_attribute. An example, filtering on the ocel:activity (the activity) attribute is reported on the right. The positive boolean tells if to filter the events with an activity falling in the list or to filter the events NOT falling in the specified list (if positive is False).


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_event_attribute(ocel, "ocel:activity", ["Create Fine", "Send Fine"], positive=True)
                                

Filter on Object Attributes

We can keep the objects with a given attribute falling inside the specified list of values by using pm4py.filter_ocel_object_attribute. An example, filtering on the ocel:type (the object type) attribute is reported on the right. The positive boolean tells if to filter the objects with a type falling in the list or to filter the objects NOT falling in the specified list (if positive is False).


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_object_attribute(ocel, "ocel:type", ["order", "delivery"], positive=True)
                                

Filter on Allowed Activities per Object Type

Sometimes, object-centric event logs include more relations between events and objects than legit. This could lead back to the divergence problem. We introduce a filter on the allowed activities per object type. This helps in keeping for each activity only the meaningful object types, excluding the others. An example application of the filter is reported on the right. In this case, we keep for the order object type only the Create Order activity, and for the item object type only the Create Order and Create Delivery activities.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(ocel, {"order": ["Create Order"], "item": ["Create Order", "Create Delivery"]})
                                

Filter on the Number of Objects per Type

With this filter, we want to search for some patterns in the log (for example, the events related to at least 1 order and 2 items). This helps in identifying exceptional patterns (e.g., an exceptional number of related objects per event). An example is reported on the right.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_object_per_type_count(ocel, {"order": 1, "element": 2})
                                

Filter on Start/End Events per Object

In some contexts, we may want to identify the events in which an object of a given type starts/completes his lifecycle. This may pinpoint some uncompleteness in the recordings. Examples are reported on the right.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(ocel, "order")
    filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(ocel, "order")
                                

Filter on Event Timestamp

An useful filter, to restrict the behavior of the object-centric event log to a specific time interval, is the timestamp filter (analogous to its traditional counterpart). An example is reported on the right.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_events_timestamp(ocel, "1981-01-01 00:00:00", "1982-01-01 00:00:00", timestamp_key="ocel:timestamp")
                                

Filter on Object Types

In this filter, we want to keep a limited set of object types of the log by manually specifying the object types to retain. Only the events related to at least one object of a provided object type are kept.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order', 'element'])
                                

Filter on Event Identifiers

In this filter, we want to keep some events of the object-centric by explicitly specifying the identifier of the same events.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1', 'e2'])
                                

Filter on Connected Components

In this filter, we want to keep the events related to the connected component of a provided object in the objects interaction graph. So a subset of events of the original log, loosely interconnected, are kept in the filtered log


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'o1')
                                

Filter on Object Identifiers

In this filter, we want to keep a subset of the objects (identifiers) of the original object-centric event log. Therefore, only the events related to at least one of these objects are kept in the object-centric event log.


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1', 'i1'])
                                

It's also possible to iteratively expand the set of objects of the filter to the objects that are interconnected to the given objects in the objects interaction graph. This is done with the parameter level. An example is provided where the expansion of the set of objects to the 'nearest' ones is done:


if __name__ == "__main__":
    filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=2)
                                

Sampling on the Events

It is possible to keep a random subset of the events of the original object-centric event log. In this case, the interactions between the objects are likely to be lost.


if __name__ == "__main__":
    filtered_ocel = pm4py.sample_events(ocel, num_events=100)
                                

Flattening to a Traditional Log

Flattening permits to convert an object-centric event log to a traditional event log with the specification of an object type. This allows for the application of traditional process mining techniques to the flattened log.

An example in which an event log is imported, and a flattening operation is applied on the order object type, is the following:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    flattened_log = pm4py.ocel_flattening(ocel, "order")
                                

Timestamp-Based Interleavings

The situation in which an object-centric event log is produced directly at the extraction phase from the information systems is uncommon. Extractors for this settings are quite uncommon nowadays.

More frequent is the situation where some event logs can be extracted from the system and then their cases are related. So we can use the classical extractors to extract the event logs, and additionally extract only the relationships between the cases.

This information can be used to mine the relationships between events. In particular, the method of timestamp-based interleavings can be used. These consider the temporal flow between the different processes, based on the provided case relations: you can go from the left-process to the right-process, and from the right-process to the left-process.

In the following, we will assume the cases to be Pandas dataframes (with the classical pm4py naming convention, e.g. case:concept:name, concept:name and time:timestamp) and a case relations dataframe is defined between them (with the related cases being expressed respectively as case:concept:name_LEFT and case:concept:name_RIGHT.

In this example, we load two event logs, and a dataframe containing the relationships between them. Then, we apply the timestamp-based interleaved miner.


import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe1 = pd.read_csv("tests/input_data/interleavings/receipt_even.csv")
    dataframe1 = pm4py.format_dataframe(dataframe1)
    dataframe2 = pd.read_csv("tests/input_data/interleavings/receipt_odd.csv")
    dataframe2 = pm4py.format_dataframe(dataframe2)
    case_relations = pd.read_csv("tests/input_data/interleavings/case_relations.csv")

    from pm4py.algo.discovery.ocel.interleavings import algorithm as interleavings_discovery
    interleavings = interleavings_discovery.apply(dataframe1, dataframe2, case_relations)
                                

The resulting interleavings dataframe will contain several columns, including for each row (that is a couple of related events, the first belonging to the first dataframe, the second belonging to the second dataframe):

  • All the columns of the event (of the interleaving) of the first dataframe (with prefix LEFT).
  • All the columns of the event (of the interleaving) of the second dataframe (with prefix RIGHT).
  • The column @@direction indicating the direction of the interleaving (with LR we go left-to-right so from the first dataframe to the second dataframe; with RL we go right-to-left, so from the second dataframe to the first dataframe).
  • The columns @@source_activity and @@target_activity contain respectively the source and target activity of the interleaving.
  • The columns @@source_timestamp and @@target_timestamp contain respectively the source and target timestamp of the interleaving.
  • The column @@left_index contains the index of the event of the first of the two dataframes.
  • The column @@right_index contains the index of the event of the second of the two dataframes.
  • The column @@timestamp_diff contains the difference between the two timestamps (can be useful to aggregate on the time).

We provide a visualization of the interleavings between the two logs. The visualization considers the DFG of the two logs and shows the interleavings between them (decorated by the frequency/performance of the relationship).

An example of frequency-based interleavings visualization is reported on the right.


import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe1 = pd.read_csv("tests/input_data/interleavings/receipt_even.csv")
    dataframe1 = pm4py.format_dataframe(dataframe1)
    dataframe2 = pd.read_csv("tests/input_data/interleavings/receipt_odd.csv")
    dataframe2 = pm4py.format_dataframe(dataframe2)
    case_relations = pd.read_csv("tests/input_data/interleavings/case_relations.csv")

    from pm4py.algo.discovery.ocel.interleavings import algorithm as interleavings_discovery
    interleavings = interleavings_discovery.apply(dataframe1, dataframe2, case_relations)

    from pm4py.visualization.ocel.interleavings import visualizer as interleavings_visualizer

    # visualizes the frequency of the interleavings
    gviz_freq = interleavings_visualizer.apply(dataframe1, dataframe2, interleavings, parameters={"annotation": "frequency", "format": "svg"})
    interleavings_visualizer.view(gviz_freq)
                                

An example of performance-based interleavings visualization is reported on the right.


import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe1 = pd.read_csv("tests/input_data/interleavings/receipt_even.csv")
    dataframe1 = pm4py.format_dataframe(dataframe1)
    dataframe2 = pd.read_csv("tests/input_data/interleavings/receipt_odd.csv")
    dataframe2 = pm4py.format_dataframe(dataframe2)
    case_relations = pd.read_csv("tests/input_data/interleavings/case_relations.csv")

    from pm4py.algo.discovery.ocel.interleavings import algorithm as interleavings_discovery
    interleavings = interleavings_discovery.apply(dataframe1, dataframe2, case_relations)

    from pm4py.visualization.ocel.interleavings import visualizer as interleavings_visualizer

    # visualizes the performance of the interleavings
    gviz_perf = interleavings_visualizer.apply(dataframe1, dataframe2, interleavings, parameters={"annotation": "performance", "aggregation_measure": "median", "format": "svg"})
    interleavings_visualizer.view(gviz_perf)
                                

The parameters offered by the visualization of the interleavings follows:

  • Parameters.FORMAT: the format of the visualization (svg, png).
  • Parameters.BGCOLOR: background color of the visualization (default: transparent).
  • Parameters.RANKDIR: the direction of visualization of the diagram (LR, TB).
  • Parameters.ANNOTATION: the annotation to be used (frequency, performance).
  • Parameters.AGGREGATION_MEASURE: the aggregation to be used (mean, median, min, max).
  • Parameters.ACTIVITY_PERCENTAGE: the percentage of activities that shall be included in the two DFGs and the interleavings visualization.
  • Parameters.PATHS_PERCENTAG: the percentage of paths that shall be included in the two DFGs and the interleavings visualization.
  • Parameters.DEPENDENCY_THRESHOLD: the dependency threshold that shall be used to filter the edges of the DFG.
  • Parameters.MIN_FACT_EDGES_INTERLEAVINGS: parameter that regulates the fraction of interleavings that is shown in the diagram.

Creating an OCEL out of the Interleavings

Given two logs having related cases, we saw how to calculate the interleavings between the logs.

In this section, we want to exploit the information contained in the two logs and in their interleavings to create an object-centric event log (OCEL). This will contain the events of the two event logs and the connections between them. The OCEL can be used with any object-centric process mining technique.

An example is reported on the right.


import pandas as pd
import pm4py

if __name__ == "__main__":
    dataframe1 = pd.read_csv("tests/input_data/interleavings/receipt_even.csv")
    dataframe1 = pm4py.format_dataframe(dataframe1)
    dataframe2 = pd.read_csv("tests/input_data/interleavings/receipt_odd.csv")
    dataframe2 = pm4py.format_dataframe(dataframe2)
    case_relations = pd.read_csv("tests/input_data/interleavings/case_relations.csv")

    from pm4py.algo.discovery.ocel.interleavings import algorithm as interleavings_discovery
    interleavings = interleavings_discovery.apply(dataframe1, dataframe2, case_relations)

    from pm4py.objects.ocel.util import log_ocel
    ocel = log_ocel.from_interleavings(dataframe1, dataframe2, interleavings)
                                

Merging Related Logs (Case Relations)

If two event logs of two inter-related process are considered, it may make sense for some analysis to merge them. The resulting log will contain cases which contain events of the first and the second event log.

This happens when popular enterprise processes such as the P2P and the O2C are considered. If a sales order is placed which require a material that is not available, a purchase order can be operated to a supplier in order to get the material and fulfill the sales order.

For the merge operation, we will need to consider:

  • A reference event log (whose cases will be enriched by the events of the other event log.
  • An event log to be merged (its events end up in the cases of the reference event log).
  • A set of case relationships between them.

An example is reported on the right. The result is a traditional event log.


import pandas as pd
import pm4py
from pm4py.algo.merging.case_relations import algorithm as case_relations_merging
import os

if __name__ == "__main__":
    dataframe1 = pd.read_csv(os.path.join("tests", "input_data", "interleavings", "receipt_even.csv"))
    dataframe1 = pm4py.format_dataframe(dataframe1)
    dataframe2 = pd.read_csv(os.path.join("tests", "input_data", "interleavings", "receipt_odd.csv"))
    dataframe2 = pm4py.format_dataframe(dataframe2)
    case_relations = pd.read_csv(os.path.join("tests", "input_data", "interleavings", "case_relations.csv"))
    merged = case_relations_merging.apply(dataframe1, dataframe2, case_relations)
                                

Network Analysis

The classical social network analysis methods (such as the ones described in this page at the later sections) are based on the order of the events inside a case. For example, the Handover of Work metric considers the directly-follows relationships between resources during the work of a case. An edge is added between the two resources if such relationships occurs.

Real-life scenarios may be more complicated. At first, is difficult to collect events inside the same case without having convergence/divergence issues (see first section of the OCEL part). At second, the type of relationship may also be important. Consider for example the relationship between two resources: this may be more efficient if the activity that is executed is liked by the resources, rather than disgusted.

The network analysis that we introduce in this section generalizes some existing social network analysis metrics, becoming independent from the choice of a case notion and permitting to build a multi-graph instead of a simple graph.

With this, we assume events to be linked by signals. An event emits a signal (that is contained as one attribute of the event) that is assumed to be received by other events (also, this is an attribute of these events) that follow the first event in the log. So, we assume there is an OUT attribute (of the event) that is identical to the IN attribute (of the other events).

When we collect this information, we can build the network analysis graph:

  • The source node of the relation is given by an aggregation over a node_column_source attribute.
  • The target node of the relation is given by an aggregation over a node_column_target attribute.
  • The type of edge is given by an aggregation over an edge_column attribute.
  • The network analysis graph can either be annotated with frequency or performance information.

In the right, an example of network analysis, producing a multigraph annotated with frequency information, and performing a visualization of the same, is reported.


import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))

    frequency_edges = pm4py.discover_network_analysis(log, out_column="case:concept:name", in_column="case:concept:name", node_column_source="org:group", node_column_target="org:group", edge_column="concept:name", performance=False)
    pm4py.view_network_analysis(frequency_edges, variant="frequency", format="svg", edge_threshold=10)
                                

In the previous example, we have loaded one traditional event log (the receipt.xes event log), and performed the network analysis with the follows choice of parameters:

  • The OUT-column is set to case:concept:name and the IN-column is set also to case:concept:name (that means, succeeding events of the same case are connected).
  • The node_column_source and node_column_target attribute are set to org:group (we want to see the network of relations between different organizational groups.
  • The edge_column attribute is set to concept:name (we want to see the frequency/performance of edges between groups, depending on the activity, so we can evaluate advantageous exchanges).

Note that in the previous case, we resorted to use the case identifier as OUT/IN column, but that's just a specific example (the OUT and IN columns can be different, and differ from the case identifier).

In the right, an example of network analysis, producing a multigraph annotated with performance information, and performing a visualization of the same, is reported.


import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))

    performance_edges = pm4py.discover_network_analysis(log, out_column="case:concept:name", in_column="case:concept:name", node_column_source="org:group", node_column_target="org:group", edge_column="concept:name", performance=True)
    pm4py.view_network_analysis(performance_edges, variant="performance", format="svg", edge_threshold=10)
                                

The visualization supports the following parameters:

  • format: the format of the visualization (default: png).
  • bgcolor: the background color of the produced picture.
  • activity_threshold: the minimum number of occurrences for an activity to be included (default: 1).
  • edge_threshold: the minimum number of occurrences for an edge to be included (default: 1).

Link Analysis

While the goal of the network analysis is to provide an aggregated visualization of the links between different events, the goal of link analysis is just the discovery of the links between the events, to be able to reason about them.

In the examples that follow, we are going to consider the document flow table VBFA of SAP. This table contains some properties and the connections between sales orders documents (e.g. the order document itself, the delivery documents, the invoice documents). Reasoning on the properties of the links could help to understand anomalous situations (e.g. the currency/price is changed during the order's lifecycle).

A link analysis starts from the production of a link analysis dataframe. This contains the linked events according to the provided specification of the attributes. First, we load a CSV containing the information from a VBFA table extracted from an educational instance of SAP. Then, we do some pre-processing to ensure the consistency of the data contained in the dataframe. Then, we discover the link analysis dataframe.


import pandas as pd
from pm4py.algo.discovery.ocel.link_analysis import algorithm as link_analysis
import os

if __name__ == "__main__":
    dataframe = pd.read_csv(os.path.join("tests", "input_data", "ocel", "VBFA.zip"), compression="zip", dtype="str")
    dataframe["time:timestamp"] = dataframe["ERDAT"] + " " + dataframe["ERZET"]
    dataframe["time:timestamp"] = pd.to_datetime(dataframe["time:timestamp"], format="%Y%m%d %H%M%S")
    dataframe["RFWRT"] = dataframe["RFWRT"].astype(float)
    dataframe = link_analysis.apply(dataframe, parameters={"out_column": "VBELN", "in_column": "VBELV",
                                                           "sorting_column": "time:timestamp", "propagate": True})

                                

At this point, several analysis could be performed. For example, findings the interconnected documents for which the currency differs between the two documents can be done as follows.


if __name__ == "__main__":
    df_currency = dataframe[(dataframe["WAERS_out"] != " ") & (dataframe["WAERS_in"] != " ") & (
                dataframe["WAERS_out"] != dataframe["WAERS_in"])]
    print(df_currency[["WAERS_out", "WAERS_in"]].value_counts())
                                

It is also possible to evaluate the amount of the documents, in order to identify discrepancies.


if __name__ == "__main__":
    df_amount = dataframe[(dataframe["RFWRT_out"] > 0) & (dataframe["RFWRT_out"] < dataframe["RFWRT_in"])]
    print(df_amount[["RFWRT_out", "RFWRT_in"]])
                                

The parameters of the link analysis algorithm are:

  • Parameters.OUT_COLUMN: the column of the dataframe that is used to link the source events to the target events.
  • Parameters.IN_COLUMN: the column of the dataframe that is used to link the target events to the source events.
  • Parameters.SORTING_COLUMN: the attribute which is used preliminarly to sort the dataframe.
  • Parameters.INDEX_COLUMN: the name of the column of the dataframe that should be used to store the incremental event index.
  • Parameters.LOOK_FORWARD: merge an event e1 with an event e2 (e1.OUT = e2.IN) only if the index in the dataframe of e1 is lower than the index of the dataframe of e2.
  • Parameters.KEEP_FIRST_OCCURRENCE if several events e21, e22 are such that e1.OUT = e21.IN = e22.IN, keep only the relationship between e1 and e21.
  • Parameters.PROPAGATE: propagate the discovered relationships. If e1, e2, e3 are such that e1.OUT = e2.IN and e2.OUT = e3.IN, then consider e1 to be in relationship also with e3.

OC-DFG discovery

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

We provide both the discovery of the OC-DFG (which provides a generic objects allowing for many different choices of the metrics), and the visualization of the same.

An example, in which an object-centric event log is loaded, an object-centric directly-follows multigraph is discovered, and visualized with frequency annotation on the screen, is provided on the right.


import pm4py
import os

if __name__ == "__main__":
    ocel = pm4py.read_ocel(os.path.join("tests", "input_data", "ocel", "example_log.jsonocel"))
    ocdfg = pm4py.discover_ocdfg(ocel)
    # views the model with the frequency annotation
    pm4py.view_ocdfg(ocdfg, format="svg")
                                

An example, in which an object-centric event log is loaded, an object-centric directly-follows multigraph is discovered, and visualized with performance annotation on the screen, is provided on the right.


import pm4py
import os

if __name__ == "__main__":
    ocel = pm4py.read_ocel(os.path.join("tests", "input_data", "ocel", "example_log.jsonocel"))
    ocdfg = pm4py.discover_ocdfg(ocel)
    # views the model with the performance annotation
    pm4py.view_ocdfg(ocdfg, format="svg", annotation="performance", performance_aggregation="median")
                                

The visualization supports the following parameters:

  • annotation: The annotation to use for the visualization. Values: frequency (the frequency annotation), performance (the performance annotation).
  • act_metric: The metric to use for the activities. Available values: events (number of events), unique_objects (number of unique objects), total_objects (number of total objects).
  • edge_metric: The metric to use for the edges. Available values: event_couples (number of event couples), unique_objects (number of unique objects), total_objects (number of total objects).
  • act_threshold: The threshold to apply on the activities frequency (default: 0). Only activities having a frequency >= than this are kept in the graph.
  • edge_threshold: The threshold to apply on the edges frequency (default 0). Only edges having a frequency >= than this are kept in the graph.
  • performance_aggregation: The aggregation measure to use for the performance: mean, median, min, max, sum
  • format: The format of the output visualization (default: png)

OC-PN discovery

Object-centric Petri Nets (OC-PN) are formal models, discovered on top of the object-centric event logs, using an underlying process discovery algorithm (such as the Inductive Miner). They have been described in the scientific paper:

van der Aalst, Wil MP, and Alessandro Berti. "Discovering object-centric Petri nets." Fundamenta informaticae 175.1-4 (2020): 1-40.

In pm4py, we offer a basic implementation of object-centric Petri nets (without any additional decoration).

An example, in which an object-centric event log is loaded, the discovery algorithm is applied, and the OC-PN is visualized, is reported on the right.


import pm4py
import os

if __name__ == "__main__":
    ocel = pm4py.read_ocel(os.path.join("tests", "input_data", "ocel", "example_log.jsonocel"))
    model = pm4py.discover_oc_petri_net(ocel)
    pm4py.view_ocpn(model, format="svg")
                                

Object Graphs on OCEL

It is possible to catch the interaction between the different objects of an OCEL in different ways. In pm4py, we offer support for the computation of some object-based graphs:

  • The objects interaction graph connects two objects if they are related in some event of the log.
  • The objects descendants graph connects an object, which is related to an event but does not start its lifecycle with the given event, to all the objects that start their lifecycle with the given event.
  • The objects inheritance graph connects an object, which terminates its lifecycle with the given event, to all the objects that start their lifecycle with the given event.
  • The objects cobirth graph connects objects which start their lifecycle within the same event.
  • The objects codeath graph connects objects which complete their lifecycle within the same event.

The object interactions graph can be computed as follows:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.graphs import object_interaction_graph
    graph = object_interaction_graph.apply(ocel)
                                

The object descendants graph can be computed as follows:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.graphs import object_descendants_graph
    graph = object_descendants_graph.apply(ocel)
                                

The object inheritance graph can be computed as follows:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.graphs import object_inheritance_graph
    graph = object_inheritance_graph.apply(ocel)
                                

The object cobirth graph can be computed as follows:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.graphs import object_cobirth_graph
    graph = object_cobirth_graph.apply(ocel)
                                

The object codeath graph can be computed as follows:


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.graphs import object_codeath_graph
    graph = object_codeath_graph.apply(ocel)
                                

Feature Extraction on OCEL - Object-Based

For machine learning purposes, we might want to create a feature matrix, which contains a row for every object of the object-centric event log.

The dimensions which can be considered for the computation of features are different:

  • The lifecycle of an object (sequence of events in the log which are related to an object). From this dimension, several features, including the length of the lifecycle, the duration of the lifecycle, can be computed. Moreover, the sequence of the activities inside the lifecycle can be computed. For example, the one-hot encoding of the activities can be considered (every activity is associated to a different column, and the number of events of the lifecycle having the given activity is reported).
  • Features extracted from the graphs computed on the OCEL (objects interaction graph, objects descendants graph, objects inheritance graph, objects cobirth/codeath graph). For every one of these, the number of objects connected to a given object are considered as feature.
  • The number of objects having a lifecycle intersecting (on the time dimension) with the current object.
  • The one-hot-encoding of a specified collection of string attributes.
  • The encoding of the values of a specified collection of numeric attributes.

To compute the object-based features, the following command can be used (we would like to consider oattr1 as the only string attribute to one-hot-encode, and oattr2 as the only numeric attribute to encode). If no string/numeric attributes should be included, the parameters can be omitted.


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.features.objects import algorithm
    data, feature_names = algorithm.apply(ocel,
                                        parameters={"str_obj_attr": ["oattr1"], "num_obj_attr": ["oattr2"]})
                                

Feature Extraction on OCEL - Event-Based

For machine learning purposes, we might want to create a feature matrix, which contains a row for every event of the object-centric event log.

The dimensions which can be considered for the computation of features are different:

  • The timestamp of the event. This can be encoded in different way (absolute timestamp, hour of the day, day of the week, month).
  • The activity of the event. An one-hot encoding of the activity values can be performed.
  • The related objects to the event. Features such as the total number of related objects, the number of related objects per type, the number of objects which start their lifecycle with the current event, the number of objects which complete their lifecycle with the current event) can be considered.
  • The one-hot-encoding of a specified collection of string attributes.
  • The encoding of the values of a specified collection of numeric attributes.

To compute the event-based features, the following command can be used (we would like to consider prova as the only string attribute to one-hot-encode, and prova2 as the only numeric attribute to encode). If no string/numeric attributes should be included, the parameters can be omitted.


import pm4py

if __name__ == "__main__":
    ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
    from pm4py.algo.transformation.ocel.features.events import algorithm
    data, feature_names = algorithm.apply(ocel,
                                        parameters={"str_obj_attr": ["prova"], "num_obj_attr": ["prova2"]})
                                

OCEL validation

The validation process permits to recognise valid JSON-OCEL/XML-OCEL files before starting the parsing. This is done against a schema which contains the basic structure that should be followed by JSON-OCEL and XML-OCEL files.

The validation of a JSON-OCEL file is done as follows:


from pm4py.objects.ocel.validation import jsonocel

if __name__ == "__main__":
    validation_result = jsonocel.apply("tests/input_data/ocel/example_log.jsonocel", "tests/input_data/ocel/schema.json")
    print(validation_result)
                                

The validation of a XML-OCEL file is done as follows:


from pm4py.objects.ocel.validation import xmlocel

if __name__ == "__main__":
    validation_result = xmlocel.apply("tests/input_data/ocel/example_log.xmlocel", "tests/input_data/ocel/schema.xml")
    print(validation_result)
                                

Process Discovery

Process Discovery algorithms want to find a suitable process model that describes the order of events/activities that are executed during a process execution.

In the following, we made up an overview to visualize the advantages and disadvantages of the mining algorithms.

Alpha Alpha+ Heuristic Inductive
Cannot handle loops of length one and length two Can handle loops of length one and length two Takes frequency into account Can handle invisible tasks
Invisible and duplicated tasks cannot be discovered Invisible and duplicated tasks cannot be discovered Detects short loops Model is sound
Discovered model might not be sound Discovered model might not be sound Does not guarantee a sound model Most used process mining algorithm
Weak against noise Weak against noise

Alpha Miner

The alpha miner is one of the most known Process Discovery algorithm and is able to find:

  • A Petri net model where all the transitions are visible and unique and correspond to classified events (for example, to activities).
  • An initial marking that describes the status of the Petri net model when a execution starts.
  • A final marking that describes the status of the Petri net model when a execution ends.

We provide an example where a log is read, the Alpha algorithm is applied and the Petri net along with the initial and the final marking are found. The log we take as input is the running-example.xes.

First, the log has to be imported.

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests","input_data","running-example.xes"))

Subsequently, the Alpha Miner is applied.


if __name__ == "__main__":
    net, initial_marking, final_marking = pm4py.discover_petri_net_alpha(log)

Inductive Miner

In pm4py, we offer an implementation of the inductive miner (IM), of the inductive miner infrequent (IMf), and of the inductive miner directly-follows (IMd) algorithm. The papers describing the approaches are the following:

The basic idea of Inductive Miner is about detecting a 'cut' in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Two process models can be derived: Petri Net and Process Tree.

To mine a Petri Net, we provide an example. A log is read, the inductive miner is applied and the Petri net along with the initial and the final marking are found. The log we take as input is the running-example.xes. First, the log is read, then the inductive miner algorithm is applied.

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests","input_data","running-example.xes"))
    net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(log)

To obtain a process tree, the provided code snippet can be used. The last two lines of code are responsible for the visualization of the process tree.


import pm4py

if __name__ == "__main__":
    tree = pm4py.discover_process_tree_inductive(log)

    pm4py.view_process_tree(tree)

It is also possible to convert a process tree into a petri net.


import pm4py

if __name__ == "__main__":
    net, initial_marking, final_marking = pm4py.convert_to_petri_net(tree)

Heuristic Miner

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

It is possible to obtain a Heuristic Net and a Petri Net.

To apply the Heuristics Miner to discover an Heuristics Net, it is necessary to import a log. Then, a Heuristic Net can be found. There are also numerous possible parameters that can be inspected by clicking on the following button.

import pm4py
import os

if __name__ == "__main__":
    log_path = os.path.join("tests", "compressed_input_data", "09_a32f0n00.xes.gz")
    log = pm4py.read_xes(log_path)

    heu_net = pm4py.discover_heuristics_net(log, dependency_threshold=0.99)
Parameter name Meaning
dependency_threshold dependency threshold of the Heuristics Miner (default: 0.5)
and_threshold AND measure threshold of the Heuristics Miner (default: 0.65)
loop_two_threshold thresholds for the loops of length 2 (default 0.5)

To visualize the Heuristic Net, code is also provided on the right-hand side.


import pm4py

if __name__ == "__main__":
    pm4py.view_heuristics_net(heu_net)
                                

To obtain a Petri Net that is based on the Heuristics Miner, the code on the right hand side can be used. Also this Petri Net can be visualized.

import pm4py

if __name__ == "__main__":
    net, im, fm = pm4py.discover_petri_net_heuristics(log, dependency_threshold=0.99)

    pm4py.view_petri_net(net, im, fm)

Directly-Follows Graph

Process models modeled using Petri nets have a well-defined semantic: a process execution starts from the places included in the initial marking and finishes at the places included in the final marking. In this section, another class of process models, Directly-Follows Graphs, are introduced. Directly-Follows graphs are graphs where the nodes represent the events/activities in the log and directed edges are present between nodes if there is at least a trace in the log where the source event/activity is followed by the target event/activity. On top of these directed edges, it is easy to represent metrics like frequency (counting the number of times the source event/activity is followed by the target event/activity) and performance (some aggregation, for example, the mean, of time inter-lapsed between the two events/activities).

First, we have to import the log. Subsequently, we can extract the Directly-Follows Graph. In addition, code is provided to visualize the Directly-Follows Graph. This visualization is a colored visualization of the Directly-Follows graph that is decorated with the frequency of activities.

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests","input_data","running-example.xes"))
    dfg, start_activities, end_activities = pm4py.discover_dfg(log)
    pm4py.view_dfg(dfg, start_activities, end_activities)

To get a Directly-Follows graph decorated with the performance between the edges, two parameters of the previous code have to be replaced.

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests","input_data","running-example.xes"))
    performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(log)
    pm4py.view_performance_dfg(performance_dfg, start_activities, end_activities)

To save the obtained DFG, for instance in the SVG format, code is also provided on the right-hand side.


import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests","input_data","running-example.xes"))
    performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(log)
    pm4py.save_vis_performance_dfg(performance_dfg, start_activities, end_activities, 'perf_dfg.svg')
                                

Adding information about Frequency/Performance

Similar to the Directly-Follows graph, it is also possible to decorate the Petri net with frequency or performance information. This is done by using a replay technique on the model and then assigning frequency/performance to the paths. The variant parameter of the visualizer specifies which annotation should be used. The values for the variant parameter are the following:

  • pn_visualizer.Variants.WO_DECORATION: This is the default value and indicates that the Petri net is not decorated.
  • pn_visualizer.Variants.FREQUENCY: This indicates that the model should be decorated according to frequency information obtained by applying replay.
  • pn_visualizer.Variants.PERFORMANCE: This indicates that the model should be decorated according to performance (aggregated by mean) information obtained by applying replay.

In the case the frequency and performance decoration are chosen, it is required to pass the log as a parameter of the visualization (it needs to be replayed).

The code on the right-hand side can be used to obtain the Petri net mined by the Inductive Miner decorated with frequency information.


from pm4py.visualization.petri_net import visualizer as pn_visualizer

if __name__ == "__main__":
    parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}
    gviz = pn_visualizer.apply(net, initial_marking, final_marking, parameters=parameters, variant=pn_visualizer.Variants.FREQUENCY, log=log)
    pn_visualizer.save(gviz, "inductive_frequency.png")

Correlation Miner

In Process Mining, we are used to have logs containing at least:

  • A case identifier
  • An activity
  • A timestamp

The case identifier associates an event, happening to a system, to a particular execution of the process. This permits to apply algorithms such as process discovery, conformance checking, …

However, in some systems (for example, the data collected from IoT systems), it may be difficult to associate a case identifier. On top of these logs, performing classic process mining is impossible. Correlation mining borns as a response to the challenge to extract a process model from such event logs, that permits to read useful information that is contained in the logs without a case identifier, that contains only:

  • An activity column
  • A timestamp column

In this description, we assume there is a total order on events (that means that no events happen in the same timestamp). Situations where a total order is not defined are more complicated.

The Correlation Miner is an approach proposed in:

Pourmirza, Shaya, Remco Dijkman, and Paul Grefen. “Correlation miner: mining business process models and event correlations without case identifiers.” International Journal of Cooperative Information Systems 26.02 (2017): 1742002.

That aims to resolve this problem by resolving an (integer) linear problem defined on top of:

  • The P/S matrix: expressing the relationship of order between the activities as recorded in the log.
  • The Duration matrix: expressing an aggregation of the duration between two activities, obtained by solving an optimization problem

The solution of this problem provides a set of couples of activities that are, according to the approach, in directly-follows relationship, along with the strength of the relationship. This is the “frequency” DFG.

A “performance” DFG can be obtained by the duration matrix, keeping only the entries that appear in the solution of the problem (i.e., the couples of activities that appear in the “frequency” DFG).

This can be then visualized (using for example the pm4py DFG visualization).

To have a “realistic” example (for which we know the “real” DFG), we can take an existing log and simply remove the case ID column, trying then to reconstruct the DFG without having that.

Let’s try an example of that. First, we load a CSV file into a Pandas dataframe, keeping only the concept:name and the time:timestamp columns:


import pandas as pd
import pm4py

if __name__ == "__main__":
    df = pd.read_csv(os.path.join("tests", "input_data", "receipt.csv"))
    df = pm4py.format_dataframe(df)
    df = df[["concept:name", "time:timestamp"]]
                                
Then, we can apply the Correlation Miner approach:

from pm4py.algo.discovery.correlation_mining import algorithm as correlation_miner

if __name__ == "__main__":
    frequency_dfg, performance_dfg = correlation_miner.apply(df, parameters={"pm4py:param:activity_key": "concept:name",
                                    "pm4py:param:timestamp_key": "time:timestamp"})
                                
To better visualize the DFG, we can retrieve the frequency of activities

if __name__ == "__main__":
    activities_freq = dict(df["concept:name"].value_counts())
                                
And then perform the visualization of the DFG:

from pm4py.visualization.dfg import visualizer as dfg_visualizer

if __name__ == "__main__":
    gviz_freq = dfg_visualizer.apply(frequency_dfg, variant=dfg_visualizer.Variants.FREQUENCY, activities_count=activities_freq, parameters={"format": "svg"})
    gviz_perf = dfg_visualizer.apply(performance_dfg, variant=dfg_visualizer.Variants.PERFORMANCE, activities_count=activities_freq, parameters={"format": "svg"})
    dfg_visualizer.view(gviz_freq)
    dfg_visualizer.view(gviz_perf)
                                

Visualizing the DFGs, we can say that the correlation miner was able to discover a visualization where the main path is clear. Different variants of the correlation miner are available:

Variant Description
Variants.CLASSIC Calculates the P/S matrix and the duration matrix in the classic way (the entire list of events is used)
Variants.TRACE_BASED Calculates the P/S matrix and the duration matrix on a classic event log, trace-by-trace, and merges the results. The resolution of the linear problem permits to obtain a model that is more understandable than the classic DFG calculated on top of the log.
Variants.CLASSIC_SPLIT Calculates the P/S matrix and the duration matrix on the entire list of events, as in the classic version, but splits that in chunks to fasten the computation. Hence, the generated model is less accurate (in comparison to the CLASSIC version) but the calculation is faster. The default chunk size is 100000 events.

Temporal Profile

We propose in pm4py an implementation of the temporal profile model. This has been described in:

Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).

A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple) is associated to a couple of numbers, the first is the average and the second is the average standard deviation.

We provide an example of discovery for the temporal profile.

We can load an event log, and apply the discovery algorithm.

import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    temporal_profile = temporal_profile_discovery.apply(log)
                                

Some parameters can be used in order to customize the execution of the temporal profile:

Parameter Key Type Default Description
Parameters.ACTIVITY_KEY string concept:name The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY string start_timestamp The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY string time:timestamp The attribute to use as timestamp.

Petri Net management

Petri nets are one of the most common formalism to express a process model. A Petri net is a directed bipartite graph, in which the nodes represent transitions and places. Arcs are connecting places to transitions and transitions to places, and have an associated weight. A transition can fire if each of its input places contains a number of tokens that is at least equal to the weight of the arc connecting the place to the transition. When a transition is fired, then tokens are removed from the input places according to the weight of the input arc, and are added to the output places according to the weight of the output arc.

A marking is a state in the Petri net that associates each place to a number of tokens and is uniquely associated to a set of enabled transitions that could be fired according to the marking.

Process Discovery algorithms implemented in pm4py returns a Petri net along with an initial marking and a final marking. An initial marking is the initial state of execution of a process, a final marking is a state that should be reached at the end of the execution of the process.

Importing and exporting

Petri nets, along with their initial and final marking, can be imported/exported from the PNML file format. The code on the right-hand side can be used to import a Petri net along with the initial and final marking.

First, we have to import the log. Subsequently, the Petri net is visualized by using the Petri Net visualizer. In addition, the Petri net is exported with its initial marking or initial marking and final marking.

import os
import pm4py

if __name__ == "__main__":
    net, initial_marking, final_marking = pm4py.read_pnml(os.path.join("tests","input_data","running-example.pnml"))
    pm4py.view_petri_net(net, initial_marking, final_marking)

    pm4py.write_pnml(net, initial_marking, final_marking, "petri.pnml")

Petri Net properties

This section is about how to get the properties of a Petri Net. A property of the pet is, for example, a the enabled transition in a particular marking. However, also a list of places, transitions or arcs can be inspected.

The list of transitions enabled in a particular marking can be obtained using the right-hand code.


from pm4py.objects.petri_net import semantics

if __name__ == "__main__":
    transitions = semantics.enabled_transitions(net, initial_marking)

The function print(transitions) reports that only the transition register request is enabled in the initial marking in the given Petri net. To obtain all places, transitions, and arcs of the Petri net, the code which can be obtained on the right-hand side can be used.


if __name__ == "__main__":
    places = net.places
    transitions = net.transitions
    arcs = net.arcs

Each place has a name and a set of input/output arcs (connected at source/target to a transition). Each transition has a name and a label and a set of input/output arcs (connected at source/target to a place). The code on the right-hand side prints for each place the name, and for each input arc of the place the name and the label of the corresponding transition. However, there also exsits trans.name, trans.label, arc.target.name.


if __name__ == "__main__":
    for place in places:
     print("\nPLACE: "+place.name)
     for arc in place.in_arcs:
      print(arc.source.name, arc.source.label)

Creating a new Petri Net

In this section, an overview of the code necessary to create a new Petri net with places, transitions, and arcs is provided. A Petri net object in pm4py should be created with a name.

The code on the right-hand side creates a Petri Net with the name new_petri_net.


# creating an empty Petri net
from pm4py.objects.petri_net.obj import PetriNet, Marking

if __name__ == "__main__":
    net = PetriNet("new_petri_net")

In addition, three places are created, namely source, sink, and p_1. These places are added to the previously created Petri Net.


if __name__ == "__main__":
    # creating source, p_1 and sink place
    source = PetriNet.Place("source")
    sink = PetriNet.Place("sink")
    p_1 = PetriNet.Place("p_1")
    # add the places to the Petri Net
    net.places.add(source)
    net.places.add(sink)
    net.places.add(p_1)

Similar to the places, transitions can be created. However, they need to be assigned a name and a label.


if __name__ == "__main__":
    # Create transitions
    t_1 = PetriNet.Transition("name_1", "label_1")
    t_2 = PetriNet.Transition("name_2", "label_2")
    # Add the transitions to the Petri Net
    net.transitions.add(t_1)
    net.transitions.add(t_2)

Arcs that connect places with transitions or transitions with places might be necessary. To add arcs, code is provided. The first parameter specifies the starting point of the arc, the second parameter its target and the last parameter states the Petri net it belongs to.

# Add arcs
if __name__ == "__main__":
    from pm4py.objects.petri_net.utils import petri_utils
    petri_utils.add_arc_from_to(source, t_1, net)
    petri_utils.add_arc_from_to(t_1, p_1, net)
    petri_utils.add_arc_from_to(p_1, t_2, net)
    petri_utils.add_arc_from_to(t_2, sink, net)

To complete the Petri net, an initial and possibly a final marking need to be defined. To accomplish this, we define the initial marking to contain 1 token in the source place and the final marking to contain 1 token in the sink place.

# Adding tokens
if __name__ == "__main__":
    initial_marking = Marking()
    initial_marking[source] = 1
    final_marking = Marking()
    final_marking[sink] = 1

The resulting Petri net along with the initial and final marking can be exported, or visualized.


import pm4py
if __name__ == "__main__":
    pm4py.write_pnml(net, initial_marking, final_marking, "createdPetriNet1.pnml")

    pm4py.view_petri_net(net, initial_marking, final_marking)

To obtain a specific output format (e.g. svg or png) a format parameter should be provided to the algorithm. The code snippet explains how to obtain an SVG representation of the Petri net. The last lines provide an option to save the visualization of the model.


import pm4py
if __name__ == "__main__":
    pm4py.view_petri_net(net, initial_marking, final_marking, format="svg")
    pm4py.save_vis_petri_net(net, initial_marking, final_marking, "net.svg")

Maximal Decomposition

The decomposition technique proposed in this section is useful for conformance checking purpose. Indeed, splitting the overall model in smaller models can reduce the size of the state space, hence increasing the performance of the conformance checking operation. We propose to use the decomposition technique (maximal decomposition of a Petri net) described in:

Van der Aalst, Wil MP. “Decomposing Petri nets for process mining: A generic approach.” Distributed and Parallel Databases 31.4 (2013): 471-507.

We can see an example of maximal decomposition on top of the Petri net extracted by the Alpha Miner on top of the Running Example log.

Let’s first load the running example log and apply the Alpha Miner.


import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
    net, im, fm = pm4py.discover_petri_net_alpha(log)
                                

Then, the decomposition can be found using:


from pm4py.objects.petri_net.utils.decomposition import decompose

if __name__ == "__main__":
    list_nets = decompose(net, im, fm)
                                

If we want to represent each one of the Petri nets, we can use a FOR loop:


import pm4py

if __name__ == "__main__":
    for index, model in enumerate(list_nets):
        subnet, s_im, s_fm = model

        pm4py.save_vis_petri_net(subnet, s_im, s_fm, str(index)+".png")
                                

A log that is fit according to the original model is also fit (projecting on the activities of the net) for these nets. Conversely, any deviation on top of these models represent a deviation also on the original model.

Reachability Graph

A reachability graph is a transition system that can constructed on any Petri net along with an initial marking, and is the graph of all the markings of the Petri net. These markings are connected by as many edges as many transitions connect the two different markings.

The main goal of the reachability graph is to provide an understanding of the state space of the Petri net. Usually, Petri nets containing a lot of concurrency have an incredibly big reachability graph. The same computation of the reachability graph may be unfeasible for such models.

The calculation of the reachability graph, having the Petri net and the initial marking, can be done with the following code:

from pm4py.objects.petri_net.utils import reachability_graph

if __name__ == "__main__":
    ts = reachability_graph.construct_reachability_graph(net, im)
                                
The visualization of the reachability graph is then possible through the code snippet:

from pm4py.visualization.transition_system import visualizer as ts_visualizer

if __name__ == "__main__":
    gviz = ts_visualizer.apply(ts, parameters={ts_visualizer.Variants.VIEW_BASED.value.Parameters.FORMAT: "svg"})
    ts_visualizer.view(gviz)
                                

Petri Nets with Reset / Inhibitor arcs

The support to Petri nets with reset / inhibitor arcs is provided through the arctype property of a PetriNet.Arc object. In particular, the arctype property could assume two different values:

  • inhibitor: defines an inhibitor arc. An inhibitor arcs blocks the firing of all the transitions to which is connected, assuming that there is one token in the source place.
  • reset: defines a reset arc. A reset arc sucks all the tokens from its source place whenever the target transition is fired.

The corresponding semantic, that is identical in signature to the classic semantics of Petri nets, is defined in pm4py.objects.petri_net.inhibitor_reset.semantics.

Data Petri nets

Data Petri nets include the execution context in the marking object, in such way that the execution of a transition may depend on the value of this execution context, and not only on the tokens. Data Petri nets are defined extensively in the following scientific contribution:

Mannhardt, Felix, et al. "Balanced multi-perspective checking of process conformance." Computing 98.4 (2016): 407-437.

The semantics of a data Petri net requires the specification of the execution context (as dictionary associating to attribute keys some values), and is defined in pm4py.objects.petri_net.data_petri_nets.semantics. In particular, the following methods require the execution context:

  • semantics.enabled_transitions(pn, m, e): checks the enabled transitions in the provided Petri net pn and marking m when the execution context is updated with the information coming from the current event.
  • semantics.execute(t, pn, m, e): executes (whether possible) the transition t in the marking m where the execution context is updated with the information coming from the current event.

Conformance Checking

Conformance checking is a techniques to compare a process model with an event log of the same process. The goal is to check if the event log conforms to the model, and, vice versa.

In pm4py, two fundamental techniques are implemented: token-based replay and alignments.

Token-based replay

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

For each trace, there are four values which have to be determined: produced tokens, remaining tokens, missing tokens, and consumed tokens. Based on that, a fomrula can be dervied, whereby a petri net (n) and a trace (t) are given as input:

fitness(n, t)=12(1-rp)+12(1-mc)

To apply the formula on the whole event log, p, r, m, and c are calculated for each trace, summed up, and finally placed into the formula above at the end.

In pm4py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached.

The example on the right shows how to apply token-based replay on a log and a Petri net. First, the log is loaded. Then, the Alpha Miner is applied in order to discover a Petri net. Eventually, the token-based replay is applied. The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:
  • trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.
  • activated_transitions: list of transitions activated in the model by the token-based replay.
  • reached_marking: marking reached at the end of the replay.
  • missing_tokens: number of missing tokens.
  • consumed_tokens: number of consumed tokens.
  • remaining_tokens: number of remaining tokens.
  • produced_tokens: number of produced tokens.
import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))

    net, initial_marking, final_marking = pm4py.discover_petri_net_alpha(log)

    replayed_traces = pm4py.conformance_diagnostics_token_based_replay(log, net, initial_marking, final_marking)

Diagnostics (TBR)

The execution of token-based replay in pm4py permits to obtain detailed information about transitions that did not execute correctly, or activities that are in the log and not in the model. In particular, executions that do not match the model are expected to take longer throughput time.

The diagnostics that are provided by pm4py are the following:

  • Throughput analysis on the transitions that are executed in an unfit way according to the process model (the Petri net).
  • Throughput analysis on the activities that are not contained in the model.
  • Root Cause Analysis on the causes that lead to an unfit execution of the transitions.
  • Root Cause Analysis on the causes that lead to executing activities that are not contained in the process model.

To provide an execution contexts for the examples, a log must be loaded, and a model that is not perfectly fitting is required. To load the log, the following instructions could be used:


import os
import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
                                

To create an unfit model, a filtering operation producing a log where only part of the behavior is kept can be executed:


import pm4py
if __name__ == "__main__":
    filtered_log = pm4py.filter_variants_top_k(log, 3)
                                

Then, applying the Inductive Miner algorithm:


import pm4py
if __name__ == "__main__":
    net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(filtered_log)
                                

We then apply the token-based replay with special settings. In particular, with disable_variants set to True we avoid to replay only a case with variant; with enable_pltr_fitness set to True we tell the algorithm to return localized Conformance Checking application.


from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay
if __name__ == "__main__":
    parameters_tbr = {token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.DISABLE_VARIANTS: True, token_based_replay.Variants.TOKEN_REPLAY.value.Parameters.ENABLE_PLTR_FITNESS: True}
    replayed_traces, place_fitness, trans_fitness, unwanted_activities = token_based_replay.apply(log, net,
                                                                                                  initial_marking,
                                                                                                  final_marking,
                                                                                                  parameters=parameters_tbr)
                                

Then, we pass to diagnostics information.

Throughput analysis (unfit execution)

To perform throughput analysis on the transitions that were executed unfit, and then print on the console the result, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
if __name__ == "__main__":
    trans_diagnostics = duration_diagnostics.diagnose_from_trans_fitness(log, trans_fitness)
    for trans in trans_diagnostics:
        print(trans, trans_diagnostics[trans])
                                

Obtaining an output where is clear that unfit executions lead to much higher throughput times (from 126 to 146 times higher throughput time).

Throughput analysis (activities)

To perform throughput analysis on the process executions containing activities that are not in the model, and then print the result on the screen, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import duration_diagnostics
if __name__ == "__main__":
    act_diagnostics = duration_diagnostics.diagnose_from_notexisting_activities(log, unwanted_activities)
    for act in act_diagnostics:
        print(act, act_diagnostics[act])
                                

Root Cause Analysis

The output of root cause analysis in the diagnostics context is a decision tree that permits to understand the causes of a deviation. In the following examples, for each deviation, a different decision tree is built and visualized.

In the following examples, that consider the Receipt log, the decision trees will be built on the following choice of attributes (i.e. only org:group attribute will be considered).


if __name__ == "__main__":
    # build decision trees
    string_attributes = ["org:group"]
    numeric_attributes = []
    parameters = {"string_attributes": string_attributes, "numeric_attributes": numeric_attributes}
                                

Root Cause Analysis (unfit execution)

To perform root cause analysis on the transitions that are executed in an unfit way, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
if __name__ == "__main__":
    trans_root_cause = root_cause_analysis.diagnose_from_trans_fitness(log, trans_fitness, parameters=parameters)
                                

To visualize the decision trees obtained by root cause analysis, the following code could be used:


from pm4py.visualization.decisiontree import visualizer as dt_vis

if __name__ == "__main__":
    for trans in trans_root_cause:
        clf = trans_root_cause[trans]["clf"]
        feature_names = trans_root_cause[trans]["feature_names"]
        classes = trans_root_cause[trans]["classes"]
        # visualization could be called
        gviz = dt_vis.apply(clf, feature_names, classes)
        dt_vis.view(gviz)
                                

Root Cause Analysis (activities that are not in the model)

To perform root cause analysis on activities that are executed but are not in the process model, the following code could be used:


from pm4py.algo.conformance.tokenreplay.diagnostics import root_cause_analysis
if __name__ == "__main__":
    act_root_cause = root_cause_analysis.diagnose_from_notexisting_activities(log, unwanted_activities,
                                                                              parameters=parameters)
                                

To visualize the decision trees obtained by root cause analysis, the following code could be used:


from pm4py.visualization.decisiontree import visualizer as dt_vis
if __name__ == "__main__":
    for act in act_root_cause:
        clf = act_root_cause[act]["clf"]
        feature_names = act_root_cause[act]["feature_names"]
        classes = act_root_cause[act]["classes"]
        # visualization could be called
        gviz = dt_vis.apply(clf, feature_names, classes)
        dt_vis.view(gviz)
                                

Alignments

pm4py comes with the following set of linear solvers: Scipy (available for any platform), CVXOPT (available for the most widely used platforms including Windows/Linux). Alternatively, ORTools can also be used and installed from PIP.

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.
  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.
  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.
    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

First, we have to import the log. Subsequently, we apply the Inductive Miner on the imported log. In addition, we compute the alignments.

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))

    net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(log)

    import pm4py
    aligned_traces = pm4py.conformance_diagnostics_alignments(log, net, initial_marking, final_marking)

To inspect the alignments, a code snippet is provided. However, the output (a list) reports for each trace the corresponding alignment along with its statistics. With each trace, a dictionary containing among the others the following information is associated:

  • alignment: contains the alignment (sync moves, moves on log, moves on model)
  • cost: contains the cost of the alignment according to the provided cost function
  • fitness: is equal to 1 if the trace is perfectly fitting
print(alignments)

To use a different classifier, we refer to the Classifier section. However, the following code defines a custom classifier for each event of each trace in the log.


if __name__ == "__main__":
    for trace in log:
     for event in trace:
      event["customClassifier"] = event["concept:name"] + event["concept:name"]

A parameters dictionary containing the activity key can be formed.


# define the activity key in the parameters
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments
from pm4py.objects.conversion.process_tree import converter as process_tree_converter
parameters = {"pm4py:param:activity_key": "customClassifier"}

Then, a process model is computed, and alignments are also calculated. Besides, the fitness value is calculated and the resulting values are printed.

# calculate process model using the given classifier
if __name__ == "__main__":
    process_tree = inductive_miner.apply_tree(log, parameters=parameters)
    net, initial_marking, final_marking = process_tree_converter.apply(process_tree)
    aligned_traces = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)

    from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
    log_fitness = replay_fitness.evaluate(aligned_traces, variant=replay_fitness.Variants.ALIGNMENT_BASED)

    print(log_fitness)

It is also possible to select other parameters for the alignments.

  • Model cost function: associating to each transition in the Petri net the corresponding cost of a move-on-model.
  • Sync cost function: associating to each visible transition in the Petri net the cost of a sync move.

On the right-hand side, an implementation of a custom model cost function, and sync cost function can be noted. Also, the model cost funtions and sync cost function has to be inserted later in the parameters. Subsequently, the replay is done.


if __name__ == "__main__":
    model_cost_function = dict()
    sync_cost_function = dict()
    for t in net.transitions:
     # if the label is not None, we have a visible transition
     if t.label is not None:
      # associate cost 1000 to each move-on-model associated to visible transitions
      model_cost_function[t] = 1000
      # associate cost 0 to each move-on-log
      sync_cost_function[t] = 0
     else:
      # associate cost 1 to each move-on-model associated to hidden transitions
      model_cost_function[t] = 1

    parameters = {}
    parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_MODEL_COST_FUNCTION] = model_cost_function
    parameters[alignments.Variants.VERSION_STATE_EQUATION_A_STAR.value.Parameters.PARAM_SYNC_COST_FUNCTION] = sync_cost_function

    aligned_traces = alignments.apply_log(log, net, initial_marking, final_marking, parameters=parameters)

Decomposition of Alignments

Alignments represent a computationally expensive problem on models that contain a lot of concurrency. Yet, they are the conformance checking technique that provides the best results in term of finding a match between the process execution(s) and the model. To overcome the difficulties related to the size of the state space, various attempts to decompose the model into “smaller” pieces, into which the alignment is easier and still permit to diagnose problems, have been done.

We have seen how to obtain a maximal decomposition of the Petri net model. Now we can see how to perform the decomposition of alignments (that is based on a maximal decomposition of the Petri net model). The approach described here has been published in:

Lee, Wai Lam Jonathan, et al. “Recomposing conformance: Closing the circle on decomposed alignment-based conformance checking in process mining.” Information Sciences 466 (2018): 55-91.

The recomposition permits to understand whether each step of the process has been executed in a sync way or some deviations happened. First, an alignment is performed on top of the decomposed Petri nets.

Then, the agreement between the activities at the border is checked. If a disagreement is found, the two components that are disagreeing are merged and the alignment is repeated on them.

When the steps are agreeing between the different alignments of the components, these can be merged in a single alignment. The order of recomposition is based on the Petri net graph. Despite that, in the case of concurrency, the “recomposed” alignment contains a valid list of moves that may not be in the correct order.

To perform alignments through decomposition/recomposition, the following code can be used. A maximum number of border disagreements can be provided to the algorithm. If the number of border disagreements is reached, then the alignment is interrupted a None as alignment of the specific trace is returned.


from pm4py.algo.conformance.alignments.decomposed import algorithm as decomp_alignments

if __name__ == "__main__":
    conf = decomp_alignments.apply(log, net, initial_marking, final_marking, parameters={decomp_alignments.Variants.RECOMPOS_MAXIMAL.value.Parameters.PARAM_THRESHOLD_BORDER_AGREEMENT: 2})

Since decomposed models are expected to have less concurrency, the components are aligned using a Dijkstra approach. In the case of border disagreements, this can degrade the performance of the algorithm.

It should be noted that this is not an approximation technique; according to the authors, it should provide the same fitness as the original alignments.

Since the alignment is recomposed, we can use the fitness evaluator to evaluate the fitness (that is not related to the computation of fitness described in the paper).

from pm4py.algo.evaluation.replay_fitness import algorithm as rp_fitness_evaluator

if __name__ == "__main__":
    fitness = rp_fitness_evaluator.evaluate(conf, variant=rp_fitness_evaluator.Variants.ALIGNMENT_BASED)
                                

Footprints

Footprints are a very basic (but scalable) conformance checking technique to compare entities (such that event logs, DFGs, Petri nets, process trees, any other kind of model). Essentially, a relationship between any couple of activities of the log/model is inferred. This can include:

  • Directly-Follows Relationships: in the log/model, it is possible that the activity A is directly followed by B.
  • Directly-Before Relationships: in the log/model, it is possible that the activity B is directly preceded by A.
  • Parallel behavior: it is possible that A is followed by B and B is followed by A

A footprints matrix can be calculated, that describes for each couple of activities the footprint relationship. It is possible to calculate that for different types of models and for the entire event log, but also trace-by-trace (if the local behavior is important).

Let’s assume that the running-example.xes event log is loaded:

import pm4py
import os
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
And the inductive miner is applied on such log:

if __name__ == "__main__":
    net, im, fm = pm4py.discover_petri_net_inductive(log)
To calculate the footprints for the entire log, the following code can be used:

from pm4py.algo.discovery.footprints import algorithm as footprints_discovery

if __name__ == "__main__":
    fp_log = footprints_discovery.apply(log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG)

The footprints of the entire log are:

{‘sequence’: {(‘examine casually’, ‘decide’), (‘decide’, ‘pay compensation’), (‘register request’, ‘examine thoroughly’), (‘reinitiate request’, ‘examine casually’), (‘check ticket’, ‘decide’), (‘register request’, ‘examine casually’), (‘reinitiate request’, ‘examine thoroughly’), (‘decide’, ‘reject request’), (‘examine thoroughly’, ‘decide’), (‘reinitiate request’, ‘check ticket’), (‘register request’, ‘check ticket’), (‘decide’, ‘reinitiate request’)}, ‘parallel’: {(‘examine casually’, ‘check ticket’), (‘check ticket’, ‘examine casually’), (‘check ticket’, ‘examine thoroughly’), (‘examine thoroughly’, ‘check ticket’)}, ‘start_activities’: {‘register request’}, ‘end_activities’: {‘pay compensation’, ‘reject request’}, ‘activities’: {‘reject request’, ‘register request’, ‘check ticket’, ‘decide’, ‘pay compensation’, ‘examine thoroughly’, ‘examine casually’, ‘reinitiate request’}}

The data structure is a dictionary with, as keys, sequence (expressing directly-follows relationships) and parallel (expressing the parallel behavior that can happen in either way).

The footprints of the log, trace-by-trace, can be calculated as follows, and are a list of footprints for each trace:

from pm4py.algo.discovery.footprints import algorithm as footprints_discovery

if __name__ == "__main__":
    fp_trace_by_trace = footprints_discovery.apply(log, variant=footprints_discovery.Variants.TRACE_BY_TRACE)
The footprints of the Petri net model can be calculated as follows:

if __name__ == "__main__":
    fp_net = footprints_discovery.apply(net, im, fm)

And are the following:

{‘sequence’: {(‘check ticket’, ‘decide’), (‘reinitiate request’, ‘examine casually’), (‘register request’, ‘examine thoroughly’), (‘decide’, ‘reject request’), (‘register request’, ‘check ticket’), (‘register request’, ‘examine casually’), (‘decide’, ‘reinitiate request’), (‘reinitiate request’, ‘examine thoroughly’), (‘decide’, ‘pay compensation’), (‘reinitiate request’, ‘check ticket’), (‘examine casually’, ‘decide’), (‘examine thoroughly’, ‘decide’)}, ‘parallel’: {(‘check ticket’, ‘examine thoroughly’), (‘examine thoroughly’, ‘check ticket’), (‘check ticket’, ‘examine casually’), (‘examine casually’, ‘check ticket’)}, ‘activities’: {‘decide’, ‘examine casually’, ‘reinitiate request’, ‘check ticket’, ‘examine thoroughly’, ‘register request’, ‘reject request’, ‘pay compensation’}, ‘start_activities’: {‘register request’}}

The data structure is a dictionary with, as keys, sequence (expressing directly-follows relationships) and parallel (expressing the parallel behavior that can happen in either way).

It is possible to visualize a comparison between the footprints of the (entire) log and the footprints of the (entire) model.

First of all, let’s see how to visualize a single footprints table, for example the one of the model. The following code can be used:

from pm4py.visualization.footprints import visualizer as fp_visualizer

if __name__ == "__main__":
    gviz = fp_visualizer.apply(fp_net, parameters={fp_visualizer.Variants.SINGLE.value.Parameters.FORMAT: "svg"})
    fp_visualizer.view(gviz)
To compare the two footprints tables, the following code can be used. Please note that the visualization will look the same, if no deviations are discovered. If deviations are found they are colored by red.

from pm4py.visualization.footprints import visualizer as fp_visualizer

if __name__ == "__main__":
    gviz = fp_visualizer.apply(fp_log, fp_net, parameters={fp_visualizer.Variants.COMPARISON.value.Parameters.FORMAT: "svg"})
    fp_visualizer.view(gviz)
To actually find some deviations, let’s repeat the procedure on the receipt.xes log, applying a heavy filter on the log to discover a simpler model:

import pm4py
import os
from copy import deepcopy

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    filtered_log = pm4py.filter_variants_top_k(log, 3)

    net, im, fm = pm4py.discover_petri_net_inductive(filtered_log)
With a conformance checking operation, we want instead to compare the behavior of the traces of the log against the footprints of the model. This can be done using the following code:

if __name__ == "__main__":
    conf_fp = pm4py.conformance_diagnostics_footprints(fp_trace_by_trace, fp_net)

And will contain, for each trace of the log, a set with the deviations. Extract of the list for some traces:

{(‘T06 Determine necessity of stop advice’, ‘T04 Determine confirmation of receipt’), (‘T02 Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’)}
set()
{(‘T19 Determine report Y to stop indication’, ‘T20 Print report Y to stop indication’), (‘T10 Determine necessity to stop indication’, ‘T16 Report reasons to hold request’), (‘T16 Report reasons to hold request’, ‘T17 Check report Y to stop indication’), (‘T17 Check report Y to stop indication’, ‘T19 Determine report Y to stop indication’)}
set()
set()
{(‘T02 Check confirmation of receipt’, ‘T06 Determine necessity of stop advice’), (‘T10 Determine necessity to stop indication’, ‘T04 Determine confirmation of receipt’), (‘T04 Determine confirmation of receipt’, ‘T03 Adjust confirmation of receipt’), (‘T03 Adjust confirmation of receipt’, ‘T02 Check confirmation of receipt’)}
set()

We can see that for the first trace that contains deviations, there are two deviations, the first related to T06 Determine necessity of stop advice being executed before T04 Determine confirmation of receipt; the second related to T02 Check confirmation of receipt being followed by T06 Determine necessity of stop advice.

The traces for which the conformance returns nothing are fit (at least according to the footprints).

Footprints conformance checking is a way to identify obvious deviations, behavior of the log that is not allowed by the model.

On the log side, their scalability is wonderful! The calculation of footprints for a Petri net model may be instead more expensive.

If we change the underlying model, from Petri nets to process tree, it is possible to exploit its bottomup structure in order to calculate the footprints almost instantaneously.

Let’s open a log, calculate a process tree and then apply the discovery of the footprints. We open the running-example log:

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
And apply the inductive miner to discover a process tree:

if __name__ == "__main__":
    tree = pm4py.discover_process_tree_inductive(log)
Then, the footprints can be discovered. We discover the footprints on the entire log, we discover the footprints trace-by-trace in the log, and then we discover the footprints on the process tree:

from pm4py.algo.discovery.footprints import algorithm as fp_discovery

if __name__ == "__main__":
    fp_log = fp_discovery.apply(log, variant=fp_discovery.Variants.ENTIRE_EVENT_LOG)
    fp_trace_trace = fp_discovery.apply(log, variant=fp_discovery.Variants.TRACE_BY_TRACE)
    fp_tree = fp_discovery.apply(tree, variant=fp_discovery.Variants.PROCESS_TREE)

Each one of these contains:

  • A list of sequential footprints contained in the log/allowed by the model
  • A list of parallel footprints contained in the log/allowed by the model
  • A list of activities contained in the log/allowed by the model
  • A list of start activities contained in the log/allowed by the model
  • A list of end activities contained in the log/allowed by the model
It is possible to execute an enhanced conformance checking between the footprints of the (entire) log, and the footprints of the model, by doing:

from pm4py.algo.conformance.footprints import algorithm as fp_conformance

if __name__ == "__main__":
    conf_result = fp_conformance.apply(fp_log, fp_tree, variant=fp_conformance.Variants.LOG_EXTENSIVE)

The result contains, for each item of the previous list, the violations.

Given the result of conformance checking, it is possible to calculate the footprints-based fitness and precision of the process model, by doing:

from pm4py.algo.conformance.footprints.util import evaluation

if __name__ == "__main__":
    fitness = evaluation.fp_fitness(fp_log, fp_tree, conf_result)
    precision = evaluation.fp_precision(fp_log, fp_tree)

These values are both included in the interval [0,1]

Log Skeleton

The concept of log skeleton has been described in the contribution

Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

And is claimingly the most accurate classification approach to decide whether a trace belongs to (the language) of a log or not.

For a log, an object containing a list of relations is calculated.

  • Equivalence: contains the couples of activities that happen ALWAYS with the same frequency inside a trace.
  • Always-after: contains the couples of activities (A,B) such that an occurrence of A is ALWAYS followed, somewhen in the future of the trace, by an occurrence of B.
  • Always-before: contains the couples of activities (B,A) such that an occurrence of B is ALWAYS preceded, somewhen in the past of the trace, by an occurrence of A.
  • Never-together: contains the couples of activities (A,B) that NEVER happens together in the history of the trace.
  • Directly-follows: contains the list of directly-follows relations of the log.
  • For each activity, the number of possible occurrences per trace.

It is also possible to provide a noise threshold. In that case, more relations are found since the conditions are relaxed.

Let’s suppose to take the running-example.xes log:

import pm4py
import os
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
Then, we can calculate the log skeleton:

from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
if __name__ == "__main__":
    skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.0})

We can also print that:

{‘equivalence’: {(‘pay compensation’, ‘register request’), (‘examine thoroughly’, ‘register request’), (‘reject request’, ‘register request’), (‘pay compensation’, ‘examine casually’)}, ‘always_after’: {(‘register request’, ‘check ticket’), (‘examine thoroughly’, ‘decide’), (‘register request’, ‘decide’)}, ‘always_before’: {(‘pay compensation’, ‘register request’), (‘pay compensation’, ‘decide’), (‘pay compensation’, ‘check ticket’), (‘reject request’, ‘decide’), (‘pay compensation’, ‘examine casually’), (‘reject request’, ‘check ticket’), (‘examine thoroughly’, ‘register request’), (‘reject request’, ‘register request’)}, ‘never_together’: {(‘pay compensation’, ‘reject request’), (‘reject request’, ‘pay compensation’)}, ‘directly_follows’: set(), ‘activ_freq’: {‘register request’: {1}, ‘examine casually’: {0, 1, 3}, ‘check ticket’: {1, 2, 3}, ‘decide’: {1, 2, 3}, ‘reinitiate request’: {0, 1, 2}, ‘examine thoroughly’: {0, 1}, ‘pay compensation’: {0, 1}, ‘reject request’: {0, 1}}}

We can see the relations (equivalence, always_after, always_before, never_together, directly_follows, activ_freq) as key of the object, and the values are the activities/couples of activities that follow such pattern.

To see how the log skeleton really works, for classification/conformance purposes, let’s change to another log (the receipt.xes log), and calculate an heavily filtered version of that (to have less behavior)

import pm4py
import os
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    from copy import deepcopy
    filtered_log = pm4py.filter_variants_top_k(log, 3)
Calculate the log skeleton on top of the filtered log, and then apply the classification as follows:

from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance
if __name__ == "__main__":
    conf_result = lsk_conformance.apply(log, skeleton)

In such way, we can get for each trace whether it has been classified as belonging to the filtered log, or not. When deviations are found, the trace does not belong to the language of the original log.

We can also calculate a log skeleton on the original log, for example providing 0.03 as noise threshold, and see which are the effects on the classification:

from pm4py.algo.discovery.log_skeleton import algorithm as lsk_discovery
from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conformance

if __name__ == "__main__":
    skeleton = lsk_discovery.apply(log, parameters={lsk_discovery.Variants.CLASSIC.value.Parameters.NOISE_THRESHOLD: 0.03})

    conf_result = lsk_conformance.apply(log, skeleton)

We can see that some traces are classified as uncorrect also calculating the log skeleton on the original log, if a noise threshold is provided.

Alignments between Logs

In some situations, performing an optimal alignment between an event log and a process model might be unfeasible. Hence, getting an approximated alignment that highlights the main points of deviation is an option. In pm4py, we offer support for alignments between two event logs. Such alignment operation is based on the edit distance, i.e., for a trace of the first log, the trace of the second log which has the least edit distance is found. In the following example, we see how to perform alignments between an event log and the simulated log obtained by performing a playout operation on the process model.

We can load an example log and discover a process model using the inductive miner:

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    net, im, fm = pm4py.discover_petri_net_inductive(log)
Then, perform a playout operation on the process model:

if __name__ == "__main__":
    simulated_log = pm4py.play_out(net, im, fm)
Then, the alignments between the two logs are performed:

from pm4py.algo.conformance.alignments.edit_distance import algorithm as logs_alignments
if __name__ == "__main__":
    alignments = logs_alignments.apply(log, simulated_log)

The result is a list of alignments, each one contains a list of moves (sync move, move on log n.1, move on log n.2).

With this utility, it's also possible to perform anti-alignments. In this case, an anti-alignment is corresponding to a trace of the second log that has the biggest edit distance against the given trace of the first log.

To perform anti-alignments, the following code can be used:

from pm4py.algo.conformance.alignments.edit_distance import algorithm as logs_alignments
if __name__ == "__main__":
    parameters = {logs_alignments.Variants.EDIT_DISTANCE.value.Parameters.PERFORM_ANTI_ALIGNMENT: True}
    alignments = logs_alignments.apply(log, simulated_log, parameters=parameters)

Temporal Profile

We propose in pm4py an implementation of the temporal profile model. This has been described in:

Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).

A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple) is associated to a couple of numbers, the first is the average and the second is the average standard deviation.

It is possible to use a temporal profile to perform conformance checking on an event log. The times between the couple of activities in the log are assessed against the numbers stored in the temporal profile. Specifically, a value is calculated that shows how many standard deviations the value is different from the average. If that value exceeds a threshold (by default set to 6, according to the six-sigma principles), then the couple of activities is signaled.

The output of conformance checking based on a temporal profile is a list containing the deviations for each case in the log. Each deviation is expressed as a couple of activities, along with the calculated value and the distance (based on number of standard deviations) from the average.

We provide an example of conformance checking based on a temporal profile.

First, we can load an event log, and apply the discovery algorithm.

import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/receipt.xes")
    temporal_profile = temporal_profile_discovery.apply(log)
                                
Then, we can apply conformance checking based on the temporal profile.

from pm4py.algo.conformance.temporal_profile import algorithm as temporal_profile_conformance
if __name__ == "__main__":
    results = temporal_profile_conformance.apply(log, temporal_profile)
                                

Some parameters can be used in order to customize the conformance checking of the temporal profile:

Parameter Key Type Default Description
Parameters.ACTIVITY_KEY string concept:name The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY string start_timestamp The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY string time:timestamp The attribute to use as timestamp.
Parameters.ZETA int 6 Multiplier for the standard deviation. Couples of events that are more distant than this are signaled by the temporal profile.

LTL Checking

LTL Checking is a form of filtering/conformance checking in which some rules are verified against the process executions contained in the log.

This permits to check more complex patterns such as:

  • Four eyes principle: two given activities should be executed by two different people. For example, the approval of an expense refund should be generally done by a different person rather than the insertion of the expense refund.
  • Activity repeated by different people: the same activity in a process execution is repeated (that means rework) from different people.

The verification of LTL rules requires the insertion of the required parameters (of the specific rule). Hence, this form of conformance checking is not automatic.

The LTL rules that are implemented in pm4py are found in the following table:

LTL rule Description
ltl.ltl_checker.A_eventually_B(log, A, B) Applies the A eventually B rule,
finding the cases in which an event with activity A was followed in the future by an event with activity B.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
B: the activity B of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which respect the rule)
ltl.ltl_checker.A_eventually_B_eventually_C(log, A, B, C) Applies the A eventually B eventually C rule,
finding the cases in which an event with activity A was followed in the future by an event with activity B which was followed by an event with activity C.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
B: the activity B of the rule (an activity of the log)
C: the activity C of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which respect the rule)
ltl.ltl_checker.A_eventually_B_eventually_C_eventually_D(log, A, B, C, D) Applies the A eventually B eventually C eventually D rule,
finding the cases in which an event with activity A was followed in the future by an event with activity B which was followed by an event with activity C which was followed by an event with activity D.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
B: the activity B of the rule (an activity of the log)
C: the activity C of the rule (an activity of the log)
D: the activity D of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which respect the rule)
ltl.ltl_checker.A_next_B_next_C(log, A, B, C) Applies the A next B next C rule,
finding the cases in which an event with activity A was directly followed by an event with activity B which was directly followed by an event with activity C.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
B: the activity B of the rule (an activity of the log)
C: the activity C of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which respect the rule)
ltl.ltl_checker.four_eyes_principle(log, A, B) Applies the four eyes principle on the activities A and B.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
B: the activity B of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which have A and B done by the same person)
ltl.ltl_checker.attr_value_different_persons(log, A) Finds the process executions in which the activity A is repeated by different people.
Parameters:
log: event log
A: the activity A of the rule (an activity of the log)
Returns:
Filtered log object (containing the cases which have A repeated by different people)

The rules can be applied on both traditional event logs (XES) and Pandas dataframes, by looking at the packages pm4py.algo.filtering.log.ltl and pm4py.algo.filtering.pandas.ltl respectively.

Process Trees

In pm4py we offer support for process trees (visualization, conversion to Petri net and generation of a log), for importing/exporting, and a functionality to generate them. In this section, the functionalities are examined.

Importing/Exporting Process Trees

In pm4py, we offer support for importing/exporting process trees in the PTML format.

The following code can be used to import a process tree from a PTML file.


import pm4py

if __name__ == "__main__":
    tree = pm4py.read_ptml("tests/input_data/running-example.ptml")
                                

The following code can be used to export a process tree into a PTML file.


import pm4py

if __name__ == "__main__":
    pm4py.write_ptml(tree, "running-example.ptml")
                                

Generation of process trees

The approach 'PTAndLogGenerator', described by the scientific paper 'PTandLogGenerator: A Generator for Artificial Event Data', has been implemented in the pm4py library.

The code snippet can be used to generate a process tree.


import pm4py
if __name__ == "__main__":
    tree = pm4py.generate_process_tree()
Suppose the following start activity and their respective occurrences.
Parameter Meaning
MODE most frequent number of visible activities (default 20)
MIN minimum number of visible activities (default 10)
MAX maximum number of visible activities (default 30)
SEQUENCE probability to add a sequence operator to tree (default 0.25)
CHOICE probability to add a choice operator to tree (default 0.25)
PARALLEL probability to add a parallel operator to tree (default 0.25)
LOOP probability to add a loop operator to tree (default 0.25)
OR probability to add an or operator to tree (default 0)
SILENT probability to add silent activity to a choice or loop operator (default 0.25)
DUPLICATE probability to duplicate an activity label (default 0)
LT_DEPENDENCY probability to add a random dependency to the tree (default 0)
INFREQUENT probability to make a choice have infrequent paths (default 0.25)
NO_MODELS number of trees to generate from model population (default 10)
UNFOLD

whether or not to unfold loops in order to include choices underneath in dependencies: 0=False, 1=True

if lt_dependency <= 0: this should always be 0 (False)

if lt_dependency > 0: this can be 1 or 0 (True or False) (default 10)

MAX_REPEAT maximum number of repetitions of a loop (only used when unfolding is True) (default 10)

Generation of a log out of a process tree

The code snippet can be used to generate a log, with 100 cases, out of the process tree.
import pm4py
if __name__ == "__main__":
    log = pm4py.play_out(tree)
    print(len(log))

Conversion into Petri net

The code snippet can be used to convert the process tree into a Petri net.
import pm4py
if __name__ == "__main__":
    net, im, fm = pm4py.convert_to_petri_net(tree)

Visualize a Process Tree

A process tree can be printed, as revealed on the right side.

if __name__ == "__main__":
    print(tree)
A process tree can also be visualized, as revealed on the right side.
import pm4py
if __name__ == "__main__":
    pm4py.view_process_tree(tree, format='png')

Converting a Petri net to a Process Tree

We propose an approach to convert a block-structured accepting Petri net to a process tree. The implement approach is:

van Zelst, Sebastiaan J. "Translating Workflow Nets to Process Trees: An Algorithmic Approach." arXiv preprint arXiv:2004.08213 (2020).

The approach, given an accepting Petri net, returns a process tree if the Petri net is block-structured, while it raises an exception if the Petri net is not block-structured.

We propose an example of application. First, we load a XES log and we discover an accepting Petri net using the Alpha Miner algorithm.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
    net, im, fm = pm4py.discover_petri_net_alpha(log)
Then, we convert that to a process tree.

import pm4py

if __name__ == "__main__":
    tree = pm4py.convert_to_process_tree(net, im, fm)
    print(tree)
                                

The method succeeds, since the accepting Petri net is block-structured, and discovers a process tree (incidentally, the same process tree as if the inductive miner was applied).

Frequency Annotation of a Process Tree

A process tree does not include any frequency/performance annotation by default.

A log can be matched against a process tree in an optimal way using the alignments algorithm. The results of the alignments algorithm contains the list of leaves/operators visited during the replay. This can be used to infer the frequency at the case/event level of every node of the process tree.

The following code can be used to decorate the frequency of the nodes of a process tree:

import pm4py
from pm4py.algo.conformance.alignments.process_tree.util import search_graph_pt_frequency_annotation
if __name__ == "__main__":
    aligned_traces = pm4py.conformance_diagnostics_alignments(log, tree)
    tree = search_graph_pt_frequency_annotation.apply(tree, aligned_traces)
                                
A frequency-based visualization of the process tree is also available:

from pm4py.visualization.process_tree import visualizer as pt_visualizer
if __name__ == "__main__":
    gviz = pt_visualizer.apply(tree, parameters={"format": "svg"}, variant=pt_visualizer.Variants.FREQUENCY_ANNOTATION)
    pt_visualizer.view(gviz)
                                

Feature Selection

An operation of feature selection permits to represent the event log in a tabular way. This is important for operations such as prediction and anomaly detection.

Automatic Feature Selection

In pm4py, we offer ways to perform an automatic feature selection. As example, let us import the receipt log and perform an automatic feature selection on top of it.

First, we import the receipt log:

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/receipt.xes")
                                
Then, let’s perform the automatic feature selection:

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, feature_names = log_to_features.apply(log)
    print(feature_names)
                                

Printing the value feature_names, we see that the following attributes were selected:

  • The attribute channel at the trace level (this assumes values Desk, Intern, Internet, Post, e-mail)
  • The attribute department at the trace level (this assumes values Customer contact, Experts, General)
  • The attribute group at the event level (this assumes values EMPTY, Group 1, Group 12, Group 13, Group 14, Group 15, Group 2, Group 3, Group 4, Group 7).

No numeric attribute value is selected. If we print feature_names, we get the following representation:

[‘trace:channel@Desk’, ‘trace:channel@Intern’, ‘trace:channel@Internet’, ‘trace:channel@Post’, ‘trace:channel@e-mail’, ‘trace:department@Customer contact’, ‘trace:department@Experts’, ‘trace:department@General’, ‘event:org:group@EMPTY’, ‘event:org:group@Group 1’, ‘event:org:group@Group 12’, ‘event:org:group@Group 13’, ‘event:org:group@Group 14’, ‘event:org:group@Group 15’, ‘event:org:group@Group 2’, ‘event:org:group@Group 3’, ‘event:org:group@Group 4’, ‘event:org:group@Group 7’]

So, we see that we have different features for different values of the attribute. This is called one-hot encoding. Actually, a case is assigned to 0 if it does not contain an event with the given value for the attribute; a case is assigned to 1 if it contains at least one event with the attribute.

If we represent the features as a dataframe:

import pandas as pd
if __name__ == "__main__":
    df = pd.DataFrame(data, columns=feature_names)
    print(df)
                                

We can see the features assigned to each different case.

Manual feature selection

The manual feature selection permits to specify which attributes should be included in the feature selection. These may include for example:

  • The activities performed in the process execution (contained usually in the event attribute concept:name ).
  • The resources that perform the process execution (contained usually in the event attribute org:resource ).
  • Some numeric attributes, at discretion of the user.

To do so, we have to call the method log_to_features.apply. The types of features that can be considered by a manual feature selection are:

Name Description
str_ev_attr String attributes at the event level: these are hot-encoded into features that may assume value 0 or value 1.
str_tr_attr String attributes at the trace level: these are hot-encoded into features that may assume value 0 or value 1.
num_ev_attr Numeric attributes at the event level: these are encoded by including the last value of the attribute among the events of the trace.
num_tr_attr Numeric attributes at trace level: these are encoded by including the numerical value.
str_evsucc_attr Successions related to the string attributes values at the event level: for example, if we have a trace [A,B,C], it might be important to include not only the presence of the single values A, B and C as features; but also the presence of the directly-follows couples (A,B) and (B,C).

Let’s consider for example a feature selection where we are interested to:

  • If a process execution contains, or not, an activity.
  • If a process execution contains, or not, a resource.
  • If a process execution contains, or not, a directly-follows path between different activities.
  • If a process execution contains, or not, a directly-follows path between different resources.
We see that the number of features is way bigger in this setting

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, feature_names = log_to_features.apply(log, parameters={"str_ev_attr": ["concept:name", "org:resource"], "str_tr_attr": [], "num_ev_attr": [], "num_tr_attr": [], "str_evsucc_attr": ["concept:name", "org:resource"]})
    print(len(feature_names))
                                

Calculating useful features

Other features are for example the cycle and the lead time associated to a case.

Here, we may suppose to have:

  • A log with lifecycles, where each event is instantaneous
  • OR an interval log, where events may be associated to two timestamps (start and end timestamp).
The lead/cycle time can be calculated on top of interval logs. If we have a lifecycle log, we need to convert that with:

from pm4py.objects.log.util import interval_lifecycle
if __name__ == "__main__":
    log = interval_lifecycle.to_interval(log)
                                
Then, features such as the lead/cycle time can be inserted through the instructions:

from pm4py.objects.log.util import interval_lifecycle
from pm4py.util import constants

if __name__ == "__main__":
    log = interval_lifecycle.assign_lead_cycle_time(log, parameters={
        constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY: "start_timestamp",
        constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})
                                

After the provision of the start timestamp attribute (in this case, start_timestamp) and of the timestamp attribute (in this case, time:timestamp), the following features are returned by the method:

  • @@approx_bh_partial_cycle_time => incremental cycle time associated to the event (the cycle time of the last event is the cycle time of the instance)
  • @@approx_bh_partial_lead_time => incremental lead time associated to the event
  • @@approx_bh_overall_wasted_time => difference between the partial lead time and the partial cycle time values
  • @@approx_bh_this_wasted_time => wasted time ONLY with regards to the activity described by the ‘interval’ event
  • @@approx_bh_ratio_cycle_lead_time => measures the incremental Flow Rate (between 0 and 1).
These are all numerical attributes, hence we can refine the feature extraction by doing:

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, feature_names = log_to_features.apply(log, parameters={"str_ev_attr": ["concept:name", "org:resource"], "str_tr_attr": [], "num_ev_attr": ["@@approx_bh_partial_cycle_time", "@@approx_bh_partial_lead_time",  "@@approx_bh_overall_wasted_time", "@@approx_bh_this_wasted_time", "@approx_bh_ratio_cycle_lead_time"], "num_tr_attr": [], "str_evsucc_attr": ["concept:name", "org:resource"]})
                                

We provide also the calculation of additional intra/inter case features, which can be enabled as additional boolean parameters of the log_to_features.apply method. These include:

  • ENABLE_CASE_DURATION: enables the case duration as additional feature.
  • ENABLE_TIMES_FROM_FIRST_OCCURRENCE: enables the addition of the times from start of the case, to the end of the case, from the first occurrence of an activity of a case.
  • ENABLE_TIMES_FROM_LAST_OCCURRENCE: enables the addition of the times from start of the case, to the end of the case, from the last occurrence of an activity of a case.
  • ENABLE_DIRECT_PATHS_TIMES_LAST_OCC: add the duration of the last occurrence of a directed (i, i+1) path in the case as feature.
  • ENABLE_INDIRECT_PATHS_TIMES_LAST_OCC: add the duration of the last occurrence of an indirect (i, j) path in the case as feature.
  • ENABLE_WORK_IN_PROGRESS: enables the work in progress (number of concurrent cases) as a feature.
  • ENABLE_RESOURCE_WORKLOAD: enables the resource workload as a feature.

PCA – Reducing the number of features

Some techniques (such as the clustering, prediction, anomaly detection) suffer if the dimensionality of the dataset is too high. Hence, a dimensionality reduction technique (as PCA) helps to cope with the complexity of the data.

Having a Pandas dataframe out of the features extracted from the log:

import pandas as pd

if __name__ == "__main__":
    df = pd.DataFrame(data, columns=feature_names)
                                

It is possible to reduce the number of features using a techniques like PCA.

Let’s create the PCA with a number of components equal to 5, and apply the PCA to the dataframe.

from sklearn.decomposition import PCA

if __name__ == "__main__":
    pca = PCA(n_components=5)
    df2 = pd.DataFrame(pca.fit_transform(df))
                                

So, from more than 400 columns, we pass to 5 columns that contains most of the variance.

Anomaly Detection

In this section, we consider the calculation of an anomaly score for the different cases. This is based on the features extracted; and to work better requires the application of a dimensionality reduction technique (such as the PCA in the previous section).

Let’s apply a method called IsolationForest to the dataframe. This permits to add a column scores that is lower or equal than 0 when the case needs to be considered anomalous, and is greater than 0 when the case needs not to be considered anomalous.

from sklearn.ensemble import IsolationForest
if __name__ == "__main__":
    model=IsolationForest()
    model.fit(df2)
    df2["scores"] = model.decision_function(df2)
                                
To see which cases are more anomalous, we can sort the dataframe inserting an index. Then, the print will show which cases are more anomalous:

if __name__ == "__main__":
    df2["@@index"] = df2.index
    df2 = df2[["scores", "@@index"]]
    df2 = df2.sort_values("scores")
    print(df2)
                                

Evolution of the Features

We may be interested to evaluate the evolution of the features over time, to identify the positions of the event log with a behavior that is different from the mainstream behavior.

In pm4py, we provide a method to graph the evolution of features over the time.

This can be done as in the following example:

import os
import pm4py
from pm4py.algo.transformation.log_to_features.util import locally_linear_embedding
from pm4py.visualization.graphs import visualizer

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    x, y = locally_linear_embedding.apply(log)
    gviz = visualizer.apply(x, y, variant=visualizer.Variants.DATES,
                            parameters={"title": "Locally Linear Embedding", "format": "svg", "y_axis": "Intensity"})
    visualizer.view(gviz)
                                

Event-based Feature Extraction

Some machine learning methods (for example, LSTM-based deep learning) do not require a specification of the features at the case level (in that, every case is transformed to a single vector of numerical features), but require the specification of a numerical row for each event of the case, containing the features of the given event.

We can do a default extraction of the event-based features. In this case, the features to be extracted are extracted automatically.

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, features = log_to_features.apply(log, variant=log_to_features.Variants.EVENT_BASED)
                                
We can also specify manually the set of features that shall be extracted. The name of the parameters (str_ev_attr and num_ev_attr) is equivalent to the explanation provided in the previous sections.

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, features = log_to_features.apply(log, variant=log_to_features.Variants.EVENT_BASED, parameters={"str_ev_attr": ["concept:name"], "num_ev_attr": []})
                                

Decision tree about the ending activity of a process

Decision trees are objects that help the understandement of the conditions leading to a particular outcome. In this section, several examples related to the construction of the decision trees are provided.

Ideas behind the building of decision trees are provided in scientific paper: de Leoni, Massimiliano, Wil MP van der Aalst, and Marcus Dees. 'A general process mining framework for correlating, predicting and clustering dynamic behavior based on event logs.'

The general scheme is the following:

  • A representation of the log, on a given set of features, is obtained (for example, using one-hot encoding on string attributes and keeping numeric attributes as-they-are)
  • A representation of the target classes is constructed
  • The decision tree is calculated
  • The decision tree is represented in some ways

A process instance may potentially finish with different activities, signaling different outcomes of the process instance. A decision tree may help to understand the reasons behind each outcome.

First, a log could be loaded. Then, a representation of a log on a given set of features could be obtained.

import os
import pm4py
log = pm4py.read_xes(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))

from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

if __name__ == "__main__":
    data, feature_names = log_to_features.apply(log, parameters={"str_tr_attr": [], "str_ev_attr": ["concept:name"], "num_tr_attr": [], "num_ev_attr": ["amount"]})
Or an automatic representation (automatic selection of the attributes) could be obtained:
data, feature_names = log_to_features.apply(log)
(Optional) The features that are extracted by those methods can be represented as a Pandas dataframe:

import pandas as pd
if __name__ == "__main__":
    dataframe = pd.DataFrame(data, columns=feature_names)
                                
(Optional) And the dataframe can be exported then as a CSV file.

if __name__ == "__main__":
    dataframe.to_csv("features.csv", index=False)
                                
Then, the target classes are formed. Each endpoint of the process belongs to a different class.
from pm4py.objects.log.util import get_class_representation
if __name__ == "__main__":
    target, classes = get_class_representation.get_class_representation_by_str_ev_attr_value_value(log, "concept:name")
The decision tree could be then calculated and visualized.
from sklearn import tree
if __name__ == "__main__":
    clf = tree.DecisionTreeClassifier()
    clf.fit(data, target)

    from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
    gviz = dectree_visualizer.apply(clf, feature_names, classes)

Decision tree about the duration of a case (Root Cause Analysis)

A decision tree about the duration of a case helps to understand the reasons behind an high case duration (or, at least, a case duration that is above the threshold).

First, a log has to be loaded. A representation of a log on a given set of features could be obtained.

import os
import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "roadtraffic50traces.xes"))

    from pm4py.algo.transformation.log_to_features import algorithm as log_to_features

    data, feature_names = log_to_features.apply(log, parameters={"str_tr_attr": [], "str_ev_attr": ["concept:name"], "num_tr_attr": [], "num_ev_attr": ["amount"]})
Or an automatic representation (automatic selection of the attributes) could be obtained:
data, feature_names = log_to_features.apply(log)
Then, the target classes are formed. There are two classes: First, traces that are below the specified threshold (here, 200 days). Note that the time is given in seconds. Second, traces that are above the specified threshold.
from pm4py.objects.log.util import get_class_representation
if __name__ == "__main__":
    target, classes = get_class_representation.get_class_representation_by_trace_duration(log, 2 * 8640000)
The decision tree could be then calculated and visualized.
from sklearn import tree
if __name__ == "__main__":
    clf = tree.DecisionTreeClassifier()
    clf.fit(data, target)

    from pm4py.visualization.decisiontree import visualizer as dectree_visualizer
    gviz = dectree_visualizer.apply(clf, feature_names, classes)

Decision Mining

Decision mining permits, provided:

  • An event log
  • A process model (an accepting Petri net)
  • A decision point

To retrieve the features of the cases that go in the different directions. This permits, for example, to calculate a decision tree that explains the decisions.

Let’s start by importing a XES log:

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
                                
Calculating a model using the inductive miner:

if __name__ == "__main__":
    net, im, fm = pm4py.discover_petri_net_inductive(log)
                                
A visualization of the model can be obtained in the following way:

from pm4py.visualization.petri_net import visualizer

if __name__ == "__main__":
    gviz = visualizer.apply(net, im, fm, parameters={visualizer.Variants.WO_DECORATION.value.Parameters.DEBUG: True})
    visualizer.view(gviz)
                                

For this example, we choose the decision point p_10. There, a decision, is done between the activities examine casually and examine throughly.

To execute the decision mining algorithm, once we have a log, model and a decision point, the following code can be used:

from pm4py.algo.decision_mining import algorithm as decision_mining

if __name__ == "__main__":
    X, y, class_names = decision_mining.apply(log, net, im, fm, decision_point="p_10")
                                

As we see, the outputs of the apply method are the following:

  • X: a Pandas dataframe containing the features associated to the cases leading to a decision.
  • y: a Pandas dataframe, that is a single column, containing the number of the class that is the output of the decision (in this case, the values possible are 0 and 1, since we have two target classes)
  • class_names: the names of the output classes of the decision (in this case, examine casually and examine thoroughly).

These outputs can be used in a generic way with any classification or comparison technique.

In particular, decision trees can be useful. We provide a function to automate the discovery of decision trees out of the decision mining technique.

The code that should be applied is the following:

from pm4py.algo.decision_mining import algorithm as decision_mining

if __name__ == "__main__":
    clf, feature_names, classes = decision_mining.get_decision_tree(log, net, im, fm, decision_point="p_10")
                                
Then, a visualization of the decision tree can be obtained in the following way:

from pm4py.visualization.decisiontree import visualizer as tree_visualizer

if __name__ == "__main__":
    gviz = tree_visualizer.apply(clf, feature_names, classes)
                                

Feature Extraction on Dataframes

While the feature extraction that is described in the previous sections is generic, it could not be the optimal choice (in terms of performance in the feature extraction) when working on Pandas dataframes.

We offer also the possibility to extract a feature table, that requires the provision of the dataframe and of a set of columns to extract as features, and outputs another dataframe having the following columns:

  • The case identifier.
  • For each string column that has been provided as attribute, an one-hot encoding (counting the number of occurrences of the given attribute value) for all the possible values is performed.
  • For every numeric column that has been provided as attribute, the last value of the attribute in the case is kept.

An example of such feature extraction, keeping the concept:name (activity) and the amount (cost) as features in the table, can be calculated as follows:


import pm4py
import pandas as pd
from pm4py.objects.log.util import dataframe_utils

if __name__ == "__main__":
    dataframe = pd.read_csv("tests/input_data/roadtraffic100traces.csv")
    dataframe = pm4py.format_dataframe(dataframe)
    feature_table = dataframe_utils.get_features_df(dataframe, ["concept:name", "amount"])
                                

The feature table will contain, in the aforementioned example, the following columns:

['case:concept:name', 'concept:name_CreateFine', 'concept:name_SendFine', 'concept:name_InsertFineNotification', 'concept:name_Addpenalty', 'concept:name_SendforCreditCollection', 'concept:name_Payment', 'concept:name_InsertDateAppealtoPrefecture', 'concept:name_SendAppealtoPrefecture', 'concept:name_ReceiveResultAppealfromPrefecture', 'concept:name_NotifyResultAppealtoOffender', 'amount']

Discovery of a Data Petri net

Given a Petri net, discovered by a classical process discovery algorithm (e.g., the Alpha Miner or the Inductive Miner), we can transform it to a data Petri net by applying the decision mining at every decision point of it, and transforming the resulting decision tree to a guard. An example follows.

An event log is loaded, the inductive miner algorithm applies and then decision mining is used to discover a data Petri net.

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
    net, im, fm = pm4py.discover_petri_net_inductive(log)
    from pm4py.algo.decision_mining import algorithm as decision_mining
    net, im, fm = decision_mining.create_data_petri_nets_with_decisions(log, net, im, fm)
                                
The guards which are discovered for every transition can be printed as follows. They are boolean conditions, which are therefore interpreted by the execution engine.

if __name__ == "__main__":
    for t in net.transitions:
        if "guard" in t.properties:
            print("")
            print(t)
            print(t.properties["guard"])
                                

Statistics

In pm4py, it is possible to calculate different statistics on top of classic event logs and dataframes.

Throughput Time

Given an event log, it is possible to retrieve the list of all the durations of the cases (expressed in seconds). The only parameter that is needed is the timestamp. The code on the right can be used.

import pm4py
if __name__ == "__main__":
    all_case_durations = pm4py.get_all_case_durations(log)
                                    

Case Arrival/Dispersion Ratio

Given an event log, it is possible to retrieve the case arrival ratio, that is the average distance between the arrival of two consecutive cases in the log.

import pm4py
if __name__ == "__main__":
    case_arrival_ratio = pm4py.get_case_arrival_average(log)
                                    
It is also possible to calculate the case dispersion ratio, that is the average distance between the finishing of two consecutive cases in the log.

from pm4py.statistics.traces.generic.log import case_arrival
if __name__ == "__main__":
    case_dispersion_ratio = case_arrival.get_case_dispersion_avg(log, parameters={
        case_arrival.Parameters.TIMESTAMP_KEY: "time:timestamp"})
                                    

Performance Spectrum

The performance spectrum is a novel visualization of the performance of the process of the time elapsed between different activities in the process executions. The performance spectrum has initially been described in:

Denisov, Vadim, et al. "The Performance Spectrum Miner: Visual Analytics for Fine-Grained Performance Analysis of Processes." BPM (Dissertation/Demos/Industry). 2018.

The performance spectrum assumes to work with an event log and a list of activities that are considered to build the spectrum. In the following example, the performance spectrum is built on the receipt event log including the Confirmation of receipt, T04 Determine confirmation of receipt and T10 Determine necessity to stop indication activities.

The event log is loaded, and the performance spectrum (containing the timestamps at which the different activities happened inside the process execution) is computed and visualized:

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    pm4py.view_performance_spectrum(log, ["Confirmation of receipt", "T04 Determine confirmation of receipt",
                                         "T10 Determine necessity to stop indication"], format="svg")
                                    

In the aforementioned example, we see three horizontal lines, corresponding to the activities included in the spectrum, and many oblique lines that represent the elapsed times between two activities. The more obliquous lines are highlighted by a different color.

This permits to identify the timestamps in which the execution was more bottlenecked, and possible patterns (FIFO, LIFO).

Business Hours

Given an interval event log (an EventLog object where each event is characterised by two timestamps, a start timestamp usually contained in the start_timestamp attribute and a completion timestamp usually contained in the time:timestamp attribute), the duration of the event is the difference between the completion timestamp and the start timestamp.

This may be inficiated by nights (where an activity is not actively worked), weekends (where the workers may not be at the workplace) and other kind of pauses. In pm4py, a way to consider only the time in which the activity could actually be worked (so, excluding time outside of the working hours and weekends) is provided.

Given a start and end timestamp (expressed as UNIX timestamps), the business hours calculation method could be called as follows:

from pm4py.util.business_hours import BusinessHours
from datetime import datetime

if __name__ == "__main__":
    st = datetime.fromtimestamp(100000000)
    et = datetime.fromtimestamp(200000000)
    bh_object = BusinessHours(st, et)
    worked_time = bh_object.getseconds()
    print(worked_time)
                                    
To provide specific shifts and weekends (for example, always short weeks with 4 working days and work days from 10 to 16) the following code could be used:

if __name__ == "__main__":
    bh_object = BusinessHours(st, et, worktiming=[10, 16], weekends=[5, 6, 7])
    worked_time = bh_object.getseconds()
    print(worked_time)
                                    

Cycle Time and Waiting Time

Two important KPI for a process executions are:

  • The Lead Time: the overall time in which the instance was worked, from the start to the end, without considering if it was actively worked or not.
  • The Cycle Time: the overall time in which the instance was worked, from the start to the end, considering only the times where it was actively worked.

For these concepts, it is important to consider only business hours (so, excluding nights and weekends). Indeed, in that period the machinery and the workforce is at home, so could not proceed in working the instance, so the time “wasted” there is not recoverable.

Within ‘interval’ event logs (that have a start and an end timestamp), it is possible to calculate incrementally the lead time and the cycle time (event per event). The lead time and the cycle time that are reported on the last event of the case are the ones related to the process execution. With this, it is easy to understand which activities of the process have caused a bottleneck (e.g. the lead time increases significantly more than the cycle time).

The algorithm implemented in pm4py start sorting each case by the start timestamp (so, activities started earlier are reported earlier in the log), and is able to calculate the lead and cycle time in all the situations, also the complex ones reported in the following picture:

In the following, we aim to insert the following attributes to events inside a log:

Attribute Description
@@approx_bh_partial_cycle_time Incremental cycle time associated to the event (the cycle time of the last event is the cycle time of the instance)
@@approx_bh_partial_lead_time Incremental lead time associated to the event
@@approx_bh_overall_wasted_time Difference between the partial lead time and the partial cycle time values
@@approx_bh_this_wasted_time Wasted time ONLY with regards to the activity described by the ‘interval’ event
@@approx_bh_ratio_cycle_lead_time Measures the incremental Flow Rate (between 0 and 1).

The method that calculates the lead and the cycle time could accept the following optional parameters:

Name Description
worktiming The work timing (e.g. [7, 17])
weekends The specification of the weekends (e.g. [6, 7])
And could be applied with the following line of code:

from pm4py.objects.log.util import interval_lifecycle
if __name__ == "__main__":
    enriched_log = interval_lifecycle.assign_lead_cycle_time(log)
                                    

With this, an enriched log that contains for each event the corresponding attributes for lead/cycle time is obtained.

Sojourn Time

This statistic work only with interval event logs, i.e., event logs where each event has a start timestamp and a completion timestamp.

The average sojourn time statistic permits to know, for each activity, how much time was spent executing the activity. This is calculated as the average of time passed between the start timestamp and the completion timestamp for the activity's events.

We provide an example. First, we import an interval event log.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
                                    
Then, we calculate the statistic, that requires the provision of the attribute that is the start timestamp, and of the attribute that is the completion timestamp.

from pm4py.statistics.sojourn_time.log import get as soj_time_get

if __name__ == "__main__":
    soj_time = soj_time_get.apply(log, parameters={soj_time_get.Parameters.TIMESTAMP_KEY: "time:timestamp", soj_time_get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
    print(soj_time)
                                    

The same statistic can be applied seamlessy on Pandas dataframes. We provide an alternative class for doing so: pm4py.statistics.sojourn_time.pandas.get

Concurrent Activities

This statistic work only with interval event logs, i.e., event logs where each event has a start timestamp and a completion timestamp.

In an interval event log, the definition of an order between the events is weaker. Different intersections between a couple of events in a case can happen:

  • An event where the start timestamp is greater or equal than the completion timestamp of the other.
  • An event where the start timestamp is greater or equal than the start timestamp of the other event, but is lower than the completion timestamp of the other event.

In particular, the latter case define an event-based concurrency, where several events are actively executed at the same time.

We might be interested in retrieving the set of activities for which such concurrent execution happens, and the frequency of such occurrence. We offer this type of calculation in pm4py.

We provide an example. First, we import an interval event log.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
                                    
Then, we calculate the statistic, that requires the provision of the attribute that is the start timestamp, and of the attribute that is the completion timestamp.

from pm4py.statistics.concurrent_activities.log import get as conc_act_get

if __name__ == "__main__":
    conc_act = conc_act_get.apply(log, parameters={conc_act_get.Parameters.TIMESTAMP_KEY: "time:timestamp", conc_act_get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
    print(conc_act)
                                    

The same statistic can be applied seamlessy on Pandas dataframes. We provide an alternative class for doing so: pm4py.statistics.concurrent_activities.pandas.get

Eventually-Follows Graph

We provide an approach for the calculation of the eventually-follows graph. The eventually-follows graph (EFG) is a graph that represents the partial order of the events inside the process executions of the log.

Our implementation can be applied to both lifecycle logs, so logs where each event has only one timestamp, both to interval logs, where each event has a start and a completion timestamp. In the later, the start timestamp is actively considered for the definition of the EFG / partial order

In particular, the method assumes to work with lifecycle logs when a start timestamp is NOT passed in the parameters, while it assumes to work with interval logs when a start timestamp is passed in the parameters.

We provide an example. First, we import an interval event log.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "interval_event_log.xes"))
                                    
Then, we calculate the statistic, that requires the provision of the attribute that is the completion timestamp, and possibly of the attribute that is the start timestamp

import pm4py

if __name__ == "__main__":
    efg_graph = pm4py.discover_eventually_follows_graph(log)
                                    

Displaying Graphs

Graphs permits to understand several aspects of the current log (for example, the distribution of a numeric attribute, or the distribution of case duration, or the events over time).

Distribution of case duration

In the following example, the distribution of case duration is shown in two different graphs, a simple plot and a semi-logarithmic (on the X-axis plot). The semi-logarithmic plot is less sensible to possible outliers. First, the Receipt log is loaded. Then, the distribution related to case duration may be obtained. We could obtain the simple plot, Or the semi-logarithmic (on the X-axis) plot.

import os
import pm4py

if __name__ == "__main__":
    log_path = os.path.join("tests","input_data","receipt.xes")
    log = pm4py.read_xes(log_path)

    from pm4py.util import constants
    from pm4py.statistics.traces.generic.log import case_statistics
    x, y = case_statistics.get_kde_caseduration(log, parameters={constants.PARAMETER_CONSTANT_TIMESTAMP_KEY: "time:timestamp"})

    from pm4py.visualization.graphs import visualizer as graphs_visualizer

    gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.CASES)
    graphs_visualizer.view(gviz)

    gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.CASES)
    graphs_visualizer.view(gviz)
                                    

Distribution of events over time

In the following example, a graph representing the distribution of events over time is obtained. This is particularly important because it helps to understand in which time intervals the greatest number of events is recorded. The distribution related to events over time may be obtained. The graph could be obtained.

from pm4py.algo.filtering.log.attributes import attributes_filter

if __name__ == "__main__":
    x, y = attributes_filter.get_kde_date_attribute(log, attribute="time:timestamp")

    from pm4py.visualization.graphs import visualizer as graphs_visualizer

    gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.DATES)
    graphs_visualizer.view(gviz)
                                    

Distribution of a numeric attribute

In the following example, two graphs related to the distribution of a numeric attribute will be obtained, a normal plot and a semilogarithmic (on the X-axis) plot (that is less sensitive to outliers). First, a filtered version of the Road Traffic log is loaded. Then, the distribution of the numeric attribute amount is obtained. The standard graph could be then obtained, or the semi-logarithmic graph could be obtained

import os
import pm4py

log_path = os.path.join("tests", "input_data", "roadtraffic100traces.xes")

if __name__ == "__main__":
    log = pm4py.read_xes(log_path)

    from pm4py.algo.filtering.log.attributes import attributes_filter

    x, y = attributes_filter.get_kde_numeric_attribute(log, "amount")

    from pm4py.visualization.graphs import visualizer as graphs_visualizer

    gviz = graphs_visualizer.apply_plot(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
    graphs_visualizer.view(gviz)

    from pm4py.visualization.graphs import visualizer as graphs_visualizer

    gviz = graphs_visualizer.apply_semilogx(x, y, variant=graphs_visualizer.Variants.ATTRIBUTES)
    graphs_visualizer.view(gviz)
                                    

Dotted Chart

The dotted chart is a classic visualization of the events inside an event log across different dimensions. Each event of the event log is corresponding to a point. The dimensions are projected on a graph having:

  • X axis: the values of the first dimension are represented there.
  • Y-axis: the values of the second dimension are represented there.
  • Color: the values of the third dimension are represented as different colors for the points of the dotted chart.

The values can be either string, numeric or date values, and are managed accordingly by the dotted chart.

The dotted chart can be built on different attributes. A convenient choice for the dotted chart is to visualize the distribution of cases and events over the time, with the following choices:

  • X-axis: the timestamp of the event.
  • Y-axis: the index of the case inside the event log.
  • Color: the activity of the event.

The aforementioned choice permits to identify visually patterns such as:

  • Batches.
  • Variations in the case arrival rate.
  • Variations in the case finishing rate.

In the following examples, we will build and visualize the dotted chart based on different selections of the attributes (default and custom).

To build the default dotted chart on the receipt event log, the following code can be used:

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    pm4py.view_dotted_chart(log, format="svg")
                                    
To build the dotted chart on the receipt event log representing as the different dimensions the concept:name (activity), the org:resource (organizational resource) and org:group (organizational group), the following code can be used:

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    pm4py.view_dotted_chart(log, format="svg", attributes=["concept:name", "org:resource", "org:group"])
                                    

Events Distribution

Observing the distribution of events over time permits to infer useful information about the work shifts, the working days, and the period of the year that are more or less busy.

The distribution of events over time can be visualized as follows. An event log is loaded, and the distribution over the hours of day / days of a week / days of a month / months / years is calculated.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    pm4py.view_events_distribution_graph(log, distr_type="days_week", format="svg")
                                    

The possible values for the parameter distr_type are:

  • hours: plots the distribution over the hours of a day.
  • days_week: plots the distribution over the days of a week.
  • days_month: plots the distribution over the days of a month.
  • months: plots the distribution over the months of a year.
  • yearsr: plots the distribution over the different years of the log.

Detection of Batches

We say that an activity is executed in batches by a given resource when the resource executes several times the same activity in a short period of time.

Identifying such activities may identify points of the process that can be automated, since the activity of the person may be repetitive.

An example calculation on an event log follows.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    from pm4py.algo.discovery.batches import algorithm
    batches = algorithm.apply(log)
                        
The results can be printed on the screen as follows:

if __name__ == "__main__":
    for act_res in batches:
        print("")
        print("activity: "+act_res[0][0]+" resource: "+act_res[0][1])
        print("number of distinct batches: "+str(act_res[1]))
        for batch_type in act_res[2]:
            print(batch_type, len(act_res[2][batch_type]))
                                    

There are indeed different types of batches that are detected by our method:

  • Simultaneous: all the events in the batch have identical start and end timestamps.
  • Batching at start: all the events in the batch have identical start timestamp.
  • Batching at end: all the events in the batch have identical end timestamp.
  • Sequential batching: for all the consecutive events, the end of the first is equal to the start of the second.
  • Concurrent batching: for all the consecutive events that are not sequentially matched.

Rework (activities)

The rework statistic permits to identify the activities which have been repeated during the same process execution. This shows the underlying inefficiencies in the process.

In our implementation, the rework takes into account an event log / Pandas dataframe and returns a dictionary associating to each activity the number of cases containing the rework for the given activity.

An example calculation on an event log follows.

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    rework = pm4py.get_rework_cases_per_activity(log)
                                    

Rework (cases)

We define as rework at the case level the number of events of a case having an activity which has appeared previously in the case.

For example, if a case contains the following activities: A,B,A,B,C,D; the rework is 2 since the events in position 3 and 4 are referring to activities that have already been included previously.

The rework statistic can be useful to identify the cases in which many events are repetitions of activities that have already been performed.

An example calculation on an event log follows. At the end of the computation, dictio will contain the following entries for the six cases of the running example log:

{'3': {'number_activities': 9, 'rework': 2}, '2': {'number_activities': 5, 'rework': 0}, '1': {'number_activities': 5, 'rework': 0}, '6': {'number_activities': 5, 'rework': 0}, '5': {'number_activities': 13, 'rework': 7}, '4': {'number_activities': 5, 'rework': 0}}


import pm4py
from pm4py.statistics.rework.cases.log import get as cases_rework_get

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")

    dictio = cases_rework_get.apply(log)
                                    

Query Structure - Paths over Time

We provide a feature to include the information over the paths contained in the event log in a data structure that is convenient to query in a specific point of time or an interval. This is done using an interval tree data structure.

This can be useful to compute quickly the workload of the resources in a given interval of time, or to measure the number of open cases in a time interval.

To tranform the event log to an interval tree, the following code can be used:

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/receipt.xes")

    from pm4py.algo.transformation.log_to_interval_tree import algorithm as log_to_interval_tree

    it = log_to_interval_tree.apply(log)
                                    
The following example uses the data structure to compute the workload (number of events) for every resource in the specified interval.

from collections import Counter
if __name__ == "__main__":
    intersecting_events = it[1318333540:1318333540+30*86400]
    res_workload = Counter(x.data["target_event"]["org:resource"] for x in intersecting_events)
                                    
The following example uses the data structure to compute, for each directly-follows path, the number of cases that are open in the path.

from collections import Counter
if __name__ == "__main__":
    intersecting_events = it[1318333540:1318333540+30*86400]
    open_paths = Counter((x.data["source_event"]["concept:name"], x.data["target_event"]["concept:name"]) for x in intersecting_events)
                                    

Log-Model Evaluation

In pm4py, it is possible to compare the behavior contained in the log and the behavior contained in the model, in order to see if and how they match. Four different dimensions exist in process mining, including the measurement of replay fitness, the measurement of precision, the measurement of generalization, the measurement of simplicity.

Replay Fitness

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution:

Berti, Alessandro, and Wil MP van der Aalst. "Reviving Token-based Replay: Increasing Speed While Improving Diagnostics." ATAED@ Petri Nets/ACSD. 2019.

For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.

The two variants of replay fitness are implemented as Variants.TOKEN_BASED and Variants.ALIGNMENT_BASED respectively.

To calculate the replay fitness between an event log and a Petri net model, using the token-based replay method, the code on the right side can be used. The resulting value is a number between 0 and 1.

import pm4py

if __name__ == "__main__":
    fitness = pm4py.fitness_token_based_replay(log, net, im, fm)
                                    
To calculate the replay fitness between an event log and a Petri net model, using the alignments method, the code on the right side can be used. The resulting value is a number between 0 and 1.

import pm4py

if __name__ == "__main__":
    fitness = pm4py.fitness_alignments(log, net, im, fm)
                                    

Precision

We propose two approaches for the measurement of precision in pm4py:
  • ETConformance (using token-based replay): the reference paper is Muñoz-Gama, Jorge, and Josep Carmona. "A fresh look at precision in process conformance." International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.
  • Align-ETConformance (using alignments): the reference paper is Adriansyah, Arya, et al. "Measuring precision of modeled behavior." Information systems and e-Business Management 13.1 (2015): 37-67.

The idea underlying the two approaches is the same: the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

This works only if the replay of the prefix on the process model works: if the replay does not produce a result, the prefix is not considered for the computation of precision. Hence, the precision calculated on top of unfit processes is not really meaningful.

The main difference between the approaches is the replay method. Token-based replay is faster but based on heuristics (hence the result of the replay might not be exact). Alignments are exact, work on any kind of relaxed sound nets, but can be slow if the state-space is huge.

The two variants, ETConformance and Align-ETConformance, are available as Variants.ETCONFORMANCE_TOKEN and Variants.ALIGN_ETCONFORMANCE in the implementation respectively.

To calculate the precision between an event log and a Petri net model, using the ETConformance method, the code on the right side can be used. The resulting value is a number between 0 and 1.

import pm4py

if __name__ == "__main__":
    prec = pm4py.precision_token_based_replay(log, net, im, fm)
                                    
To calculate the precision between an event log and a Petri net model, using the Align-ETConformance method, the code on the right side can be used. The resulting value is a number between 0 and 1.

import pm4py

if __name__ == "__main__":
    prec = pm4py.precision_alignments(log, net, im, fm)
                                    

Generalization

Generalization is the third dimension to analyse how the log and the process model match. In particular, we propose the generalization measure described in the following research paper:

Buijs, Joos CAM, Boudewijn F. van Dongen, and Wil MP van der Aalst. "Quality dimensions in process discovery: The importance of fitness, precision, generalization and simplicity." International Journal of Cooperative Information Systems 23.01 (2014): 1440001.

Basically, a model is general whether the elements of the model are visited enough often during a replay operation (of the log on the model). A model may be perfectly fitting the log and perfectly precise (for example, reporting the traces of the log as sequential models going from the initial marking to the final marking; a choice is operated at the initial marking). Hence, to measure generalization a token-based replay operation is performed, and the generalization is calculated as 1 - avg_t (sqrt(1.0 / freq(t)))) where avg_t is the average of the inner value over all the transitions, sqrt is the square root, freq(t) is the frequency of t after the replay.
To calculate the generalization between an event log and a Petri net model, using the generalization method proposed in this section, the code on the right side can be used. The resulting value is a number between 0 and 1.

from pm4py.algo.evaluation.generalization import algorithm as generalization_evaluator

if __name__ == "__main__":
    gen = generalization_evaluator.apply(log, net, im, fm)
                                    

Simplicity

Simplicity is the fourth dimension to analyse a process model. In this case, we define simplicity taking into account only the Petri net model. The criteria that we use for simplicity is the inverse arc degree as described in the following research paper

Blum, Fabian Rojas. Metrics in process discovery. Technical Report TR/DCC-2015-6, Computer Science Department, University of Chile, 2015.

First of all, we consider the average degree for a place/transition of the Petri net, that is defined as the sum of the number of input arcs and output arcs. If all the places have at least one input arc and output arc, the number is at least 2. Choosing a number k between 0 and infinity, the simplicity based on the inverse arc degree is then defined as 1.0 / (1.0 + max(mean_degree - k, 0)).

To calculate the simplicity on a Petri net model, using the inverse arc degree, the following code can be used. The resulting value is a number between 0 and 1.

from pm4py.algo.evaluation.simplicity import algorithm as simplicity_evaluator

if __name__ == "__main__":
    simp = simplicity_evaluator.apply(net)
                                    

Earth Mover Distance

The Earth Mover Distance as introduced in:

Leemans, Sander JJ, Anja F. Syring, and Wil MP van der Aalst. “Earth movers’ stochastic conformance checking.” International Conference on Business Process Management. Springer, Cham, 2019.

provides a way to calculate the distance between two different stochastic languages.

Generally, one language is extracted from the event log, and one language is extracted from the process model. With language, we mean a set of traces that is weighted according to its probability.

For the event log, trivially taking the set of variants of the log, and dividing by the total number of languages, provides the language of the model.

We can see how the language of the model can be discovered. We can import an event log and calculate its language:

import pm4py
from pm4py.statistics.variants.log import get as variants_module

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    language = variants_module.get_language(log)
    print(language)
                                    

Obtaining the following probability distribution:

{(‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘examine thoroughly’, ‘check ticket’, ‘decide’, ‘reject request’): 0.16666666666666666, (‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘pay compensation’): 0.16666666666666666, (‘register request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reinitiate request’, ‘check ticket’, ‘examine casually’, ‘decide’, ‘reinitiate request’, ‘examine casually’, ‘check ticket’, ‘decide’, ‘reject request’): 0.16666666666666666, (‘register request’, ‘check ticket’, ‘examine thoroughly’, ‘decide’, ‘reject request’): 0.16666666666666666}

The same thing does not happen in a natural way for the process model. In order to calculate a language for the process model, a scalable approach (but non deterministic) is to playout the model in order to obtain an event log.

Let’s first apply the Alpha Miner. Then, we do the playout of the Petri net. We choose the STOCHASTIC_PLAYOUT variant.

if __name__ == "__main__":
    net, im, fm = pm4py.discover_petri_net_alpha(log)
                                    
We can then calculate the language of the event log:

from pm4py.algo.simulation.playout.petri_net import algorithm as simulator
if __name__ == "__main__":
    playout_log = simulator.apply(net, im, fm, parameters={simulator.Variants.STOCHASTIC_PLAYOUT.value.Parameters.LOG: log},
                                  variant=simulator.Variants.STOCHASTIC_PLAYOUT)
    model_language = variants_module.get_language(playout_log)
                                    

Obtaining the language of the model. Then, the earth mover distance is calculated:

  • It is assured that the two languages contain the same words: if a language does not contain a word, that is set to 0
  • A common ordering (for example, alphabetical ordering) is decided among the keys of the languages.
  • The distance between the different keys is calculated (using a string distance function such as the Levensthein function).

This permits to obtain a number greater or equal than 0 that express the distance between the language of the log and the language of the model. This is an alternative measure for the precision. To calculate the Earth Mover Distance, the Python package pyemd should be installed (pip install pyemd).

The code to apply the Earth Mover Distance is the following:

from pm4py.algo.evaluation.earth_mover_distance import algorithm as emd_evaluator
if __name__ == "__main__":
    emd = emd_evaluator.apply(model_language, language)
    print(emd)
                                    

If the running-example log is chosen along with the Alpha Miner model, a value similar/equal to 0.1733.

WOFLAN

WOFLAN is a popular approach for soundness checking on workflow nets, that is able to provide meaningful statistics to the final user. WOFLAN is described in this PhD thesis:

http://www.processmining.org/_media/publications/everbeek_phdthesis.pdf

The definition of workflow net and soundness can also be found at:

https://en.wikipedia.org/wiki/Petri_net

WOFLAN is applied to an accepting Petri net (a Petri net with an initial and final marking) and applies the following steps (the meaning of the steps is found in the thesis):

  • Checking if the Petri net and the markings are valid.
  • Checking if the Petri net is a workflow net.
  • Checking if all the places are covered by S-components.
  • Checking if there are not well-handled pairs.
  • Checking if there are places that are uncovered in uniform invariants.
  • Checking if there are places that are uncovered in weighted invariants.
  • Checking if the WPD is proper.
  • Checking for substates in the MCG.
  • Checking if there are unbounded sequences.
  • Checking for dead tasks.
  • Checking for live tasks.
  • Checking for non-live tasks.
  • Checking for sequences leading to deadlocks.

The order of application is described by the picture at the following link. If the step has positive outcome, a Yes is written on the corresponding edge. If the step has a negative outcome, a No is written on the corresponding edge.

Let's see how Woflan can be applied. First, we open a XES log

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
                                    
And we discover a model using the Heuristics Miner

import pm4py

if __name__ == "__main__":
    net, im, fm = pm4py.discover_petri_net_heuristics(log)
                                    
Then, the soundness can be checked by doing:

from pm4py.algo.analysis.woflan import algorithm as woflan

if __name__ == "__main__":
    is_sound = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: True,
                                                     woflan.Parameters.PRINT_DIAGNOSTICS: False,
                                                     woflan.Parameters.RETURN_DIAGNOSTICS: False})
                                    

In this case, is_sound contains a boolean value (True if the Petri net is a sound workflow net; False otherwise).

The list of parameters are:

Parameter Description
PRINT_DIAGNOSTICS Enables the printing of the diagnostics on the Petri net, when WOFLAN is executed.
RETURN_DIAGNOSTICS Returns a dictionary containing the diagnostics.
RETURN_ASAP_WHEN_NOT_SOUND Stops the execution of WOFLAN when a condition determining that the Petri net is not a sound workflow net is found.

On the provided Petri net, that is not sound, the output of the technique is False.

To know why such Petri net is not sound, we repeat the execution of the script setting PRINT_DIAGNOSTICS to True and RETURN_ASAP_WHEN_NOT_SOUND to False (to get more diagnostics) We get the following messages during the execution:

Input is ok.
Petri Net is a workflow net.
The following places are not covered by an s-component: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1].
Not well-handled pairs are: [(1, 6), (5, 6), (17, 82), (1, 20), (25, 20), (39, 82), (1, 46), (5, 46), (25, 46), (35, 46), (25, 56), (35, 56), (1, 62), (5, 62), (5, 74), (35, 74), (89, 82)].
The following places are uncovered in uniform invariants: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1]
The following places are uncovered in weighted invariants: [splace_in_decide_check ticket_0, splace_in_check ticket_0, pre_check ticket, splace_in_check ticket_1]
Improper WPD. The following are the improper conditions: [0, 176, 178, 179, 186, 190, 193, 196, 199, 207, 214, 215, 216, 217, 222, 233, 235].
The following sequences are unbounded: [[register request, hid_10, hid_3, check ticket, hid_1, examine casually, hid_7, decide, hid_13], [register request, hid_9, hid_5, examine thoroughly, hid_8, decide, hid_13], [register request, hid_9, hid_5, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_16], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_17, hid_2, examine casually, check ticket, hid_7, decide, hid_13], [register request, hid_9, hid_3, hid_5, check ticket, examine thoroughly, hid_8, decide, hid_14, reinitiate request, hid_17, hid_2, hid_4, examine casually, hid_7, decide, hid_14, reinitiate request, hid_17, hid_2, examine casually, check ticket, hid_7, decide, hid_14, reinitiate request, hid_16]]

From there, we can read that:

  • There are places not covered in an S-component.
  • There are no well-handled pairs.
  • There are places uncovered in uniform and weighted invariants.
  • It is an improper WPD.
  • Some sequences are unbounded.
To get the diagnostics in a dictionary, the execution can be repeated with:

from pm4py.algo.analysis.woflan import algorithm as woflan

if __name__ == "__main__":
    is_sound, dictio_diagnostics = woflan.apply(net, im, fm, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: False,
                                                     woflan.Parameters.PRINT_DIAGNOSTICS: False,
                                                     woflan.Parameters.RETURN_DIAGNOSTICS: True})
                                    

The dictionary dictio_diagnostics may contain the following keys (if the computation reach the corresponding step):

Key Description
S_C_NET
PLACE_INVARIANTS
UNIFORM_PLACE_INVARIANTS
S_COMPONENTS
UNCOVERED_PLACES_S_COMPONENT
NOT_WELL_HANDLED_PAIRS
LEFT
UNCOVERED_PLACES_UNIFORM
WEIGHTED_PLACE_INVARIANTS
UNCOVERED_PLACES_WEIGHTED
MCG
DEAD_TASKS
R_G_S_C
R_G
LOCKING_SCENARIOS
RESTRICTED_COVERABILITY_TREE

Simulation

In pm4py, we offer different simulation algorithms, that starting from a model, are able to produce an output that follows the model and the different rules that have been provided by the user.

Playout of a Petri Net

A playout of a Petri net takes as input a Petri net along with an initial marking, and returns a list of process executions that are allowed from the process model.

We offer different types of playouts:

Variant Description
Variants.BASIC_PLAYOUT A basic playout that accepts a Petri net along with an initial marking, and returns a specified number of process executions (repetitions may be possible).
Variants.EXTENSIVE A playout that accepts a Petri net along with an initial marking, and returns all the executions that are possible according to the model, up to a provided length of trace (may be computationally expensive).

The list of parameters for such variants are:

Variant Parameter Description
Variants.BASIC_PLAYOUT Parameters.ACTIVITY_KEY The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY The trace attribute that should be used as case identifier in the playout log.
Parameters.NO_TRACES The number of traces that the playout log should contain.
Parameters.MAX_TRACE_LENGTH The maximum trace length (after which, the playout of the trace is stopped).
Variants.EXTENSIVE Parameters.ACTIVITY_KEY The name of the attribute to use as activity in the playout log.
Parameters.TIMESTAMP_KEY The name of the attribute to use as timestamp in the playout log.
Parameters.CASE_ID_KEY The trace attribute that should be used as case identifier in the playout log.
Parameters.MAX_TRACE_LENGTH The maximum trace length (after which, the extensive playout is stopped).
An example application of the basic playout, given a Petri net, to get a log of 50 traces, is the following:

from pm4py.algo.simulation.playout.petri_net import algorithm as simulator

if __name__ == "__main__":
    simulated_log = simulator.apply(net, im, variant=simulator.Variants.BASIC_PLAYOUT, parameters={simulator.Variants.BASIC_PLAYOUT.value.Parameters.NO_TRACES: 50})
                                    
An example application of the extensive playout, given a Petri net, to get the log containing all the executions of length <= 7:

from pm4py.algo.simulation.playout.petri_net import algorithm as simulator

if __name__ == "__main__":
    simulated_log = simulator.apply(net, im, variant=simulator.Variants.EXTENSIVE, parameters={simulator.Variants.EXTENSIVE.value.Parameters.MAX_TRACE_LENGTH: 7})
                                    

Monte Carlo Simulation

A time-related simulation permits to know how probable is that a process execution is terminated after a given amount of time. This leads to a better estimation of Service Level Agreements, or a better identification of the process instances that are most likely to have an high throughput time.
All this starts from a performance DFG, for example the one discovered from the running-example log

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
    dfg_perf, sa, ea = pm4py.discover_performance_dfg(log)
                                    
and the knowledge of the case arrival ratio. The case arrival ratio is the amount of time that passes (in average, or median) between the arrival of two consecutive cases. It can be provided by the user or inferred from the event log. The inference from the event log is done by using the following command:

import pm4py

if __name__ == "__main__":
    ratio = pm4py.get_rework_cases_per_activity(log)
    print(ratio)
                                    

Using the DFG mining approach, it is possible to retrieve a Petri net model from the DFG. This kind of models is the “default” one for Monte Carlo simulation (because its execution semantics is very clear). Moreover, the Petri net extracted by the DFG mining approach is a sound workflow net (that gives other good properties to the model).

The DFG mining approach can be applied in the following way:

import pm4py

if __name__ == "__main__":
    net, im, fm = pm4py.convert_to_petri_net(dfg_perf, sa, ea)
                                    
To perform a basic Montecarlo simulation, the following code can be used. The following is a sort of resource-constrained simulation, where it is assumed that a place can hold at most 1 token per time. Later, we will see how to provide an higher number of tokens that can be hosted by a place.

from pm4py.algo.simulation.montecarlo import algorithm as montecarlo_simulation
from pm4py.algo.conformance.tokenreplay.algorithm import Variants

if __name__ == "__main__":
    parameters = {}
    parameters[
        montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.TOKEN_REPLAY_VARIANT] = Variants.BACKWARDS
    parameters[montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.PARAM_CASE_ARRIVAL_RATIO] = 10800
    simulated_log, res = montecarlo_simulation.apply(log, net, im, fm, parameters=parameters)
                                    

During the replay operation, some debug messages are written to the screen. The main outputs of the simulation process are:

Key Description of the value
simulated_log The traces that have been simulated during the simulation.
res The result of the simulation (Python dictionary).

Among res, that is the result of the simulation, we have the following keys:

Key Description of the value
places_interval_trees an interval tree for each place, that hosts an interval for each time when it was “full” according to the specified maximum amount of tokens per place.
transitions_interval_trees an interval tree for each transition, that contains all the time intervals in which the transition was enabled but not yet fired (so, the time between a transition was fully enabled and the consumption of the tokens from the input places)
cases_ex_time a list containing the throughput times for all the cases of the log
median_cases_ex_time the median throughput time of the cases in the simulated log
input_case_arrival_ratio the case arrival ratio that was provided by the user, or automatically calculated from the event log.
total_cases_time the difference between the last timestamp of the log, and the first timestamp of the simulated log.
The last four items of the previous list are simple Python objects (floats and lists in the specific). The interval trees objects can be used in the following way to get time-specific information. For example, the following code snippet prints for a random transition in the model, the number of intervals that are overlapping for 11 different points (including the minimum and the maximum timestamp in the log) that are uniformly distributed across the time interval of the log.

import random
if __name__ == "__main__":
    last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
    first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
    pick_trans = random.choice(list(res["transitions_interval_trees"]))
    print(pick_trans)
    n_div = 10
    i = 0
    while i < n_div:
        timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
        print("\t", timestamp, len(res["transitions_interval_trees"][pick_trans][timestamp]))
        i = i + 1
                                    
The following code snippet instead prints, for a random transition in the model, the number of intervals that are overlapping for 11 different points (including the minimum and the maximum timestamp of the log) that are uniformly distributed across the time interval of the log:

import random
if __name__ == "__main__":
    last_timestamp = max(event["time:timestamp"] for trace in log for event in trace).timestamp()
    first_timestamp = min(event["time:timestamp"] for trace in log for event in trace).timestamp()
    pick_place = random.choice(list(res["places_interval_trees"]))
    print(pick_place)
    n_div = 10
    i = 0
    while i < n_div:
        timestamp = first_timestamp + (last_timestamp - first_timestamp)/n_div * i
        print("\t", timestamp, len(res["places_interval_trees"][pick_place][timestamp]))
        i = i + 1
                                    

The information can be used to build some graphs like these (using external programs such as Microsoft Excel).

The simulation process can be resumed as follows:

  • An event log and a model (DFG) is considered.
  • Internally in the simulation, a replay operation is done between the log and the model.
  • The replay operation leads to the construction of a stochastic map that associates to each transition a probability distribution (for example, a normal distribution, an exponential distribution …). The probability distribution that maximizes the likelihood of the observed values during the replay is chosen. The user can force a specific transition (like exponential) if he wants.
  • Moreover, during the replay operation, the frequency of each transition is found. That helps in picking in a “weighted” way one of the transitions enabled in a marking, when the simulation occurs.
  • The simulation process occurs. For each one of the trace that are generated (the distance between the start of them is fixed) a thread is spawned, stochastic choices are made. The possibility to use a given place (depending on the maximum number of resources that is possible to use) is given by a semaphore object in Python.
  • A maximum amount of time is specified for the simulation. If one or more threads exceed that amount of time, the threads are killed and the corresponding trace is not added to the simulation log.

Hence, several parameters are important in order to perform a Monte Carlo simulation. These parameters, that are inside the petri_semaph_fifo class, are (ordered by importance).

Variant Parameter Description
Variants.PETRI_SEMAPH_FIFO Parameters.PARAM_NUM_SIMULATIONS Number of simulations that are performed (the goal is to have such number of traces in the model)
Parameters.PARAM_CASE_ARRIVAL_RATIO The case arrival ratio that is specified by the user.
Parameters.PARAM_MAP_RESOURCES_PER_PLACE A map containing for each place of the Petri net the maximum amount of tokens
Parameters.PARAM_DEFAULT_NUM_RESOURCES_PER_PLACE If the map of resources per place is not specified, then use the specified maximum number of resources per place.
Parameters.PARAM_MAX_THREAD_EXECUTION_TIME Specifies the maximum execution time of the simulation (for example, 60 seconds).
Parameters.PARAM_SMALL_SCALE_FACTOR Specifies the ratio between the “real” time scale and the simulation time scale. A higher ratio means that the simulation goes faster but is in general less accurate. A lower ratio means that the simulation goes slower and is in general more accurate (in providing detailed diagnostics). The default choice is 864000 seconds (10 days). So that means that a second in the simulation is corresponding to 10 days of real log.
Parameters.PARAM_ENABLE_DIAGNOSTICS Enables the printing of the simulation diagnostics through the usage of the “logging” class of Python
Parameters.ACTIVITY_KEY The attribute of the log that should be used as activity
Parameters.TIMESTAMP_KEY The attribute of the log that should be used as timestamp
Parameters.TOKEN_REPLAY_VARIANT The variant of the token-based replay to use: token_replay, the classic variant, that cannot handle duplicate transitions; backwards, the backwards token-based replay, that is slower but can handle invisible transitions.
Parameters.PARAM_FORCE_DISTRIBUTION If specified, the distribution that is forced for the transitions (normal, exponential)
Parameters.PARAM_DIAGN_INTERVAL The time interval in which diagnostics should be printed (for example, diagnostics should be printed every 10 seconds).

Extensive Playout of a Process Tree

An extensive playout operation permits to obtain (up to the provided limits) the entire language of the process model. Doing an extensive playout operation on a Petri net can be incredibly expensive (the reachability graph needs to be explored). Process trees, with their bottom-up structure, permit to obtain the entire language of an event log in a much easier way, starting from the language of the leafs (that is obvious) and then following specific merge rules for the operators.

However, since the language of a process tree can be incredibly vast (when parallel operators are involved) or also infinite (when loops are involved), the extensive playouts is possible up to some limits:

  • A specification of the maximum number of occurrences for a loop must be done, if a loop is there. This stops an extensive playout operation at the given number of occurences.
  • Since the number of different executions, when loops are involved, is still incredibly big, it is possible to specify the maximum length of a trace to be returned. So, traces that are above the maximum length are automatically discarded.
  • For further limiting the number of different executions, the maximum number of traces returned by the algorithm might be provided.

Moreover, from the structure of the process tree, it is easy to infer the minimum length of a trace allowed by the process model (always following the bottom-up approach).

Some reasonable settings for the extensive playout are the following:

  • Overall, the maximum number of traces returned by the algorithm is set to 100000.
  • The maximum length of a trace that is an output of the playout is, by default, set to the minimum length of a trace accepted by a process tree.
  • The maximum number of loops is set to be the minimum length of a trace divided by two.

The list of parameters are:

Parameter Description
MAX_LIMIT_NUM_TRACES Maximum number of traces that are returned by the algorithm.
MAX_TRACE_LENGTH Maximum length of a trace that is output of the algorithm.
MAX_LOOP_OCC Maximum number of times we enter in a loop.
In the following, we see how the playout can be executed. First, a log can be imported:

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
                                    
Then, a process tree can be discovered using the inductive miner algorithm.

if __name__ == "__main__":
    tree = pm4py.discover_process_tree_inductive(log)
                                    
We specify to retrieve traces of length at most equal to 3, and we want to retrieve at most 100000 traces.

from pm4py.algo.simulation.playout.process_tree import algorithm as tree_playout

if __name__ == "__main__":
    playout_variant = tree_playout.Variants.EXTENSIVE
    param = tree_playout.Variants.EXTENSIVE.value.Parameters

    simulated_log = tree_playout.apply(tree, variant=playout_variant,
                                       parameters={param.MAX_TRACE_LENGTH: 3, param.MAX_LIMIT_NUM_TRACES: 100000})
    print(len(simulated_log))
                                    

At this point, the extensive playout operation is done.

Social Network Analysis

In pm4py we offer support for different Social Network Analysis metrics, and support for the discovery of roles.

Handover of Work

The Handover of Work metric measures how many times an individual is followed by another individual in the execution of a business process. To calculate the Handover of Work metric, the following code could be used:

import pm4py

if __name__ == "__main__":
    hw_values = pm4py.discover_handover_of_work_network(log)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

import pm4py

if __name__ == "__main__":
    pm4py.view_sna(hw_values)
                                    

Subcontracting

The subcontracting metric calculates how many times the work of an individual is interleaved by the work of some other individual, only to eventually “return” to the original individual. To measure the subcontracting metric, the following code could be used:

import pm4py

if __name__ == "__main__":
    sub_values = pm4py.discover_subcontracting_network(log)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

import pm4py

if __name__ == "__main__":
    pm4py.view_sna(sub_values)
                                    

Working Together

The Working together metric calculates how many times two individuals work together for resolving a process instance. To measure the Working Together metric, the following code could be used:

import pm4py

if __name__ == "__main__":
    wt_values = pm4py.discover_working_together_network(log)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

import pm4py

if __name__ == "__main__":
    pm4py.view_sna(wt_values)
                                    

Similar Activities

The Similar Activities metric calculates how much similar is the work pattern between two individuals. To measure the Similar Activities metric, the following code could be used:

import pm4py

if __name__ == "__main__":
    ja_values = pm4py.discover_activity_based_resource_similarity(log)
                                    
Then, a visualization could be obtained through the NetworkX or through the Pyvis:

import pm4py

if __name__ == "__main__":
    pm4py.view_sna(ja_values)
                                    

Roles Discovery

A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:

An article on roles detection, that has inspired the technique implemented in pm4py, is:

Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.

  • In understanding which activities are executed by which roles.
  • By understanding roles itself (numerosity of resources for a single activity may not provide enough explanation)

Initially, each activity corresponds to a different role, and is associated to the multiset of his originators. After that, roles are merged according to their similarity, until no more merges are possible.

First, you need to import a log:

import pm4py
import os
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
                                    
After that, the role detection algorithm can be applied:

import pm4py

if __name__ == "__main__":
    roles = pm4py.discover_organizational_roles(log)
                                    

We can print the sets of activities that are grouped in roles by doing print([x[0] for x in roles]).

Clustering (SNA results)

Given the results of applying a SNA metric, a clustering operation permits to group the resources that are connected by a meaningful connection in the given metric. For example:

  • Clustering the results of the working together metric, individuals that work often together would be inserted in the same group.
  • Clustering the results of the similar activities metric, individuals that work on the same tasks would be inserted in the same group.

We provide a baseline method to get a list of groups (where each group is a list of resources) from the specification of the values of a SNA metric. This can be applied as follows on the running-example log and the results of the similar activities metric:


import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))

    sa_metric = pm4py.discover_activity_based_resource_similarity(log)

    from pm4py.algo.organizational_mining.sna import util
    clustering = util.cluster_affinity_propagation(sa_metric)

Resource Profiles

The profilation of resources from event logs is also possible. We implement the approach described in:

Pika, Anastasiia, et al. "Mining resource profiles from event logs." ACM Transactions on Management Information Systems (TMIS) 8.1 (2017): 1-30.

Basically, the behavior of a resource can be measured over a period of time with different metrics presented in the paper:

  • RBI 1.1 (number of distinct activities): Number of distinct activities done by a resource in a given time interval [t1, t2)
  • RBI 1.3 (activity frequency): Fraction of completions of a given activity a, by a given resource r, during a given time slot, [t1, t2), with respect to the total number of activity completions by resource r during [t1, t2)
  • RBI 2.1 (activity completions): The number of activity instances completed by a given resource during a given time slot.
  • RBI 2.2 (case completions): The number of cases completed during a given time slot in which a given resource was involved.
  • RBI 2.3 (fraction case completion): The fraction of cases completed during a given time slot in which a given resource was involved with respect to the total number of cases completed during the time slot.
  • RBI 2.4 (average workload): The average number of activities started by a given resource but not completed at a moment in time.
  • RBI 3.1 (multitasking): The fraction of active time during which a given resource is involved in more than one activity with respect to the resource's active time.
  • RBI 4.3 (average duration activity): The average duration of instances of a given activity completed during a given time slot by a given resource.
  • RBI 4.4 (average case duration): The average duration of cases completed during a given time slot in which a given resource was involved.
  • RBI 5.1 (interaction two resources): The number of cases completed during a given time slot in which two given resources were involved.
  • RBI 5.2 (social position): The fraction of resources involved in the same cases with a given resource during a given time slot with respect to the total number of resources active during the time slot.

The following example calculates these metrics starting from the running-example XES event log:


import os
from pm4py.algo.organizational_mining.resource_profiles import algorithm
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
    # Metric RBI 1.1: Number of distinct activities done by a resource in a given time interval [t1, t2)
    print(algorithm.distinct_activities(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sara"))
    # Metric RBI 1.3: Fraction of completions of a given activity a, by a given resource r,
    # during a given time slot, [t1, t2), with respect to the total number of activity completions by resource r
    # during [t1, t2)
    print(algorithm.activity_frequency(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sara", "decide"))
    # Metric RBI 2.1: The number of activity instances completed by a given resource during a given time slot.
    print(algorithm.activity_completions(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sara"))
    # Metric RBI 2.2: The number of cases completed during a given time slot in which a given resource was involved.
    print(algorithm.case_completions(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Pete"))
    # Metric RBI 2.3: The fraction of cases completed during a given time slot in which a given resource was involved
    # with respect to the total number of cases completed during the time slot.
    print(algorithm.fraction_case_completions(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Pete"))
    # Metric RBI 2.4: The average number of activities started by a given resource but not completed at a moment in time.
    print(algorithm.average_workload(log, "2010-12-30 00:00:00", "2011-01-15 00:00:00", "Mike"))
    # Metric RBI 3.1: The fraction of active time during which a given resource is involved in more than one activity
    # with respect to the resource's active time.
    print(algorithm.multitasking(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Mike"))
    # Metric RBI 4.3: The average duration of instances of a given activity completed during a given time slot by
    # a given resource.
    print(algorithm.average_duration_activity(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sue", "examine thoroughly"))
    # Metric RBI 4.4: The average duration of cases completed during a given time slot in which a given resource was involved.
    print(algorithm.average_case_duration(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sue"))
    # Metric RBI 5.1: The number of cases completed during a given time slot in which two given resources were involved.
    print(algorithm.interaction_two_resources(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Mike", "Pete"))
    # Metric RBI 5.2: The fraction of resources involved in the same cases with a given resource during a given time slot
    # with respect to the total number of resources active during the time slot.
    print(algorithm.social_position(log, "2010-12-30 00:00:00", "2011-01-25 00:00:00", "Sue"))
                                    

Organizational Mining

With event logs, we are able to identify groups of resources doing similar activities. As we have seen in the previous sections, we have different ways to detect automatically these groups from event logs:

  • Discovering the Similar Activities metric and applying a clustering algorithm to find the groups.
  • Applying the roles discovery algorithm (Burattin et al.)

As a third option, an attribute might be there in the events, describing the group that performed the event.

With the term organizational mining, we mean the discovery of behavior-related information specific to an organizational group (e.g. which activities are done by the group?).

We provide an implementation of the approach described in:

Yang, Jing, et al. 'OrgMining 2.0: A Novel Framework for Organizational Model Mining from Event Logs.' arXiv preprint arXiv:2011.12445 (2020).

The approach provides the description of some group-related metrics (local diagnostics). Among these, we have:

  • Group Relative Focus: (on a given type of work) specifies how much a resource group performed this type of work compared to the overall workload of the group. It can be used to measure how the workload of a resource group is distributed over different types of work, i.e., work diversification of the group.
  • Group Relative Stake: (in a given type of work) specifies how much this type of work was performed by a certain resource group among all groups. It can be used to measure how the workload devoted to a certain type of work is distributed over resource groups in an organizational model, i.e., work participation by different groups.
  • Group Coverage: with respect to a given type of work specifies the proportion of members of a resource group that performed this type of work.
  • Group Member Contribution: of a member of a resource group with respect to the given type of work specifies how much of this type of work by the group was performed by the member. It can be used to measure how the workload of the entire group devoted to a certain type of work is distributed over the group members.

The following example calculates these metrics starting from the receipt XES event log, and how the information can be exploited, from an attribute that specifies which is the group doing the task:


import pm4py
import os
from pm4py.algo.organizational_mining.local_diagnostics import algorithm as local_diagnostics

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
    # this applies the organizational mining from an attribute that is in each event, describing the group that is performing the task.
    ld = local_diagnostics.apply_from_group_attribute(log, parameters={local_diagnostics.Parameters.GROUP_KEY: "org:group"})
    # GROUP RELATIVE FOCUS (on a given type of work) specifies how much a resource group performed this type of work
    # compared to the overall workload of the group. It can be used to measure how the workload of a resource group
    # is distributed over different types of work, i.e., work diversification of the group.
    print("\ngroup_relative_focus")
    print(ld["group_relative_focus"])
    # GROUP RELATIVE STAKE (in a given type of work) specifies how much this type of work was performed by a certain
    # resource group among all groups. It can be used to measure how the workload devoted to a certain type of work is
    # distributed over resource groups in an organizational model, i.e., work participation by different groups.
    print("\ngroup_relative_stake")
    print(ld["group_relative_stake"])
    # GROUP COVERAGE with respect to a given type of work specifies the proportion of members of a resource group that
    # performed this type of work.
    print("\ngroup_coverage")
    print(ld["group_coverage"])
    # GROUP MEMBER CONTRIBUTION of a member of a resource group with respect to the given type of work specifies how
    # much of this type of work by the group was performed by the member. It can be used to measure how the workload
    # of the entire group devoted to a certain type of work is distributed over the group members.
    print("\ngroup_member_contribution")
    print(ld["group_member_contribution"])
                    

Alternatively, the apply_from_clustering_or_roles method of the same class can be used, providing the log as first argument, and the results of the clustering as second argument.

BPMN Support

In pm4py, we offer support for importing/exporting/layouting BPMN diagrams. The support is limited to the following BPMN elements:

  • Events (start / end events)
  • Tasks
  • Gateways (exclusive, parallel, inclusive)

Moreover, we offer support to conversion from/to some process models implemented in pm4py (such as Petri nets and BPMN diagrams).

BPMN 2.0 – Importing

The BPMN 2.0 XML files can be imported using the following instructions:

import pm4py
import os

if __name__ == "__main__":
    bpmn_graph = pm4py.read_bpmn(os.path.join("tests", "input_data", "running-example.bpmn"))
                                    

BPMN 2.0 – Exporting

The BPMN models can be exported using the following instructions (here, bpmn_graph is the Python object hosting the model).

import pm4py
import os

if __name__ == "__main__":
    pm4py.write_bpmn(bpmn_graph, "ru.bpmn")
                                    

BPMN 2.0 – Layouting

A layouting operation tries to give a good position to the nodes and the edges of the BPMN diagram. For our purposes, we chose an octilinear edges layout.

The following commands perform the layouting:

from pm4py.objects.bpmn.layout import layouter

if __name__ == "__main__":
    bpmn_graph = layouter.apply(bpmn_graph)
                                    

BPMN 2.0 – Conversion to Petri net

A conversion of a BPMN model into a Petri net model enables different pm4py algorithms (such as conformance checking and simulation algorithms), hence is a particularly important operation.

To convert a BPMN model into an (accepting) Petri net, the following code can be used:

import pm4py

if __name__ == "__main__":
    net, im, fm = pm4py.convert_to_petri_net(bpmn_graph)
                                    

BPMN 2.0 – Conversion from a process tree

Process trees are important classes of block-structured processes (and the output of the inductive miner algorithm). These models can be easily converted to BPMN models.

Let’s see an example. First, we import a XES event log, and we discover a model using the inductive miner:

import pm4py
import os

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
    tree = pm4py.discover_process_tree_inductive(log)
                                    
Then, we can convert that to a BPMN graph:

import pm4py

if __name__ == "__main__":
    bpmn_graph = pm4py.convert_to_bpmn(tree)
                                    

Directly-Follows Graphs

The directly-follows graphs are one of the simplest class of process models. The nodes are the activities of the DFG. The edges report the number of times two activities follow each other. In pm4py, we offer support for advanced operations on top of the directly-follows graphs.

In particular, the discovery of the directly-follows graph, along with the start and end activities of the log, can be done using the command:

import pm4py

if __name__ == "__main__":
    dfg, sa, ea = pm4py.discover_directly_follows_graph(log)
                                    
Instead the discovery of the activities of the log, along with the number of occurrences, can be done, assuming that concept:name is the attribute reporting the activity, using:

import pm4py

if __name__ == "__main__":
    activities_count = pm4py.get_event_attribute_values(log, "concept:name")
                                    

Filtering activities/paths

Directly-follows graphs can contain a huge number of activities and paths, with some of them being outliers. In this section, we will see how to filter on the activities and paths of the graph, keeping a subset of its behavior.

We can load an example log and calculate the directly-follows graph.

import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    dfg, sa, ea = pm4py.discover_directly_follows_graph(log)
    activities_count = pm4py.get_event_attribute_values(log, "concept:name")
                                    
The filtering on the activities percentage is applied as in the following snippet. The most frequent activities according to the percentage are kept, along with all the activities that keep the graph connected. If a percentage of 0 % is specified, then the most frequent activity (and the activities keeping the graph connected) is retrieved. Specifying 0.2 as in the example, we want to keep the 20% of activities. The filter is applied concurrently to the DFG, to the start activities, to the end activities, and to the dictionary containing the activity occurrences. In such way, consistency is kept.

from pm4py.algo.filtering.dfg import dfg_filtering
if __name__ == "__main__":
    dfg, sa, ea, activities_count = dfg_filtering.filter_dfg_on_activities_percentage(dfg, sa, ea, activities_count, 0.2)
                                    

The filtering on the paths percentage is applied as in the following snippet. The most frequent paths according to the percentage are kept, along with all the paths that are necessary to keep the graph connected. If a percentage of 0 % is specified, then the most frequent path (and the paths keeping the graph connected) is retrieved. Specifying 0.2 as in the example, we want to keep the 20% of paths. The filter is applied concurrently to the DFG, to the start activities, to the end activities, and to the dictionary containing the activity occurrences. In such way, consistency is kept.

from pm4py.algo.filtering.dfg import dfg_filtering
if __name__ == "__main__":
    dfg, sa, ea, activities_count = dfg_filtering.filter_dfg_on_paths_percentage(dfg, sa, ea, activities_count, 0.2)
                                    

Playout of a DFG

A playout operation on a directly-follows graph is useful to retrieve the traces that are allowed from the directly-follows graph. In this case, a trace is a set of activities visited in the DFG from the start node to the end node. We can assign a probability to each trace (assuming that the DFG represents a Markov chain). In particular, we are interested in getting the most likely traces. In this section, we will see how to perform the playout of a directly-follows graph.

We can load an example log and calculate the directly-follows graph.

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    dfg, sa, ea = pm4py.discover_directly_follows_graph(log)
    activities_count = pm4py.get_event_attribute_values(log, "concept:name")
                                    
Then, we can perform the playout operation.

if __name__ == "__main__":
    simulated_log = pm4py.play_out(dfg, sa, ea)
                                    

Alignments on a DFG

A popular conformance checking technique is the one of alignments. Alignments are usually performed on Petri nets; however, this could take time, since the state space of Petri nets can be huge. It is also possible to perform alignments on a directly-follows graph. Since the state space of a directly-follows graph is small, the result is a very efficient computation of alignments. This permits to get quick diagnostics on the activities and paths that are executed in a wrong way. In this section, we will show an example on how to perform alignments between a process execution and a DFG.

We can load an example log and calculate the directly-follows graph.

import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    dfg, sa, ea = pm4py.discover_directly_follows_graph(log)
    activities_count = pm4py.get_event_attribute_values(log, "concept:name")
                                    
Then, we can perform alignments between the process executions of the log and the DFG:

if __name__ == "__main__":
    alignments = pm4py.conformance_diagnostics_alignments(simulated_log, dfg, sa, ea)
                                    

The output of the alignments is equivalent to the one obtained against Petri nets. In particular, the output is a list containing for each trace the result of the alignment. Each alignment consists in some moves from the start to the end of both the trace and the DFG. We can have sync moves, moves on log (whether a move in the process execution is not mimicked by the DFG) and moves on model (whether a move is needed in the model that is not supported by the process execution).

Convert Directly-Follows Graph to a Workflow Net

The Directly-Follows Graph is the representation of a process provided by many commercial tools. An idea of Sander Leemans is about converting the DFG into a workflow net that perfectly mimic the DFG. This is called DFG mining. The following steps are useful to load the log, calculate the DFG, convert it into a workflow net and perform alignments.

First, we have to import the log. Subsequently, we have to mine the Directly-Follow graph. This DFG can then be converted to a workflow net.

import pm4py
import os
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))

    from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
    dfg = dfg_discovery.apply(log)

    from pm4py.objects.conversion.dfg import converter as dfg_mining
    net, im, fm = dfg_mining.apply(dfg)

Streaming Process Mining

Streaming Package General Structure

In pm4py, we offer support for streaming process mining functionalities, including:

  • Streaming process discovery (DFG)
  • Streaming conformance checking (footprints and TBR)
  • Streaming importing of XES/CSV files
The management of the stream of events is done by the pm4py.streaming.stream.live_event_stream.LiveEventStream class. This class provides access to two methods:
  • register(algo): registers a new algorithm to the live event stream (that will be notified when an event is added to the stream.
  • append(event): adds an event to the live event stream.

The LiveEventStream processes the incoming events using a thread pool. This helps to manage a “flood” of events using a given number of different threads.

For the streaming algorithms, that are registered to the LiveEventStream, we provide an interface that should be implemented. The following methods should be implemented inside each streaming algorithm:

  • _process(event): a method that accepts and process an incoming event.
  • _current_result(): a method that returns the current state of the streaming algorithm.

Streaming Process Discovery (Directly-Follows Graph)

The following example will show how to discover a DFG from a stream of events.

Let’s first define the (live) event stream:

from pm4py.streaming.stream.live_event_stream import LiveEventStream

if __name__ == "__main__":
    live_event_stream = LiveEventStream()
                                    
Then, create the streaming DFG discovery object (that will contain the list of activities and relationships inside the DFG):

from pm4py.streaming.algo.discovery.dfg import algorithm as dfg_discovery

if __name__ == "__main__":
    streaming_dfg = dfg_discovery.apply()
                                    
Then, we need to register the streaming DFG discovery to the stream:

if __name__ == "__main__":
    live_event_stream.register(streaming_dfg)
                                    
And start the stream:

if __name__ == "__main__":
    live_event_stream.start()
                                    
To put some known event log in the stream, we need to import a XES log:

import os
import pm4py

if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "running-example.xes"))
                                    
And then convert that to a static event stream:

import pm4py

if __name__ == "__main__":
    static_event_stream = pm4py.convert_to_event_stream(log)
                                    
Then, we can add all the events to the live stream:

if __name__ == "__main__":
    for ev in static_event_stream:
        live_event_stream.append(ev)
                                    
Then, stopping the stream, we make sure that the events in the queue are fully processed:

if __name__ == "__main__":
    live_event_stream.stop()
                                    
At the end, we can get the directly-follows graph, along with the activities of the graph, the set of start and end activities, by doing:

if __name__ == "__main__":
    dfg, activities, sa, ea = streaming_dfg.get()
                                    

If we do print(dfg) on the running-example.xes log we obtain:

{(‘register request’, ‘examine casually’): 3, (‘examine casually’, ‘check ticket’): 4, (‘check ticket’, ‘decide’): 6, (‘decide’, ‘reinitiate request’): 3, (‘reinitiate request’, ‘examine thoroughly’): 1, (‘examine thoroughly’, ‘check ticket’): 2, (‘decide’, ‘pay compensation’): 3, (‘register request’, ‘check ticket’): 2, (‘check ticket’, ‘examine casually’): 2, (‘examine casually’, ‘decide’): 2, (‘register request’, ‘examine thoroughly’): 1, (‘decide’, ‘reject request’): 3, (‘reinitiate request’, ‘check ticket’): 1, (‘reinitiate request’, ‘examine casually’): 1, (‘check ticket’, ‘examine thoroughly’): 1, (‘examine thoroughly’, ‘decide’): 1}

Streaming Conformance Checking (TBR)

The following examples will show how to check conformance against a stream of events with the footprints and token-based replay algorithms. For both the examples that follow, we assume to work with the running-example.xes log and with a log discovered using inductive miner infrequent with the default noise threshold (0.2).

The following code can be used to import the running-example.xes log

import os
import pm4py
if __name__ == "__main__":
    log = pm4py.read_xes(os.path.join("tests", "input_data", "receipt.xes"))
                                    
And convert that to a static stream of events:

import pm4py
if __name__ == "__main__":
    static_event_stream = pm4py.convert_to_event_stream(log)
                                    
Then, the following code can be used to discover a process tree using the inductive miner:

import pm4py
if __name__ == "__main__":
    tree = pm4py.discover_process_tree_inductive(log)
                                    
And convert that to a Petri net:

import pm4py
if __name__ == "__main__":
    net, im, fm = pm4py.convert_to_petri_net(tree)
                                    

Now, we can apply the streaming TBR.

Then, we create a live event stream:

from pm4py.streaming.stream.live_event_stream import LiveEventStream
if __name__ == "__main__":
    live_event_stream = LiveEventStream()
                                    
And the streaming token-based replay algorithm:

from pm4py.streaming.algo.conformance.tbr import algorithm as tbr_algorithm
if __name__ == "__main__":
    streaming_tbr = tbr_algorithm.apply(net, im, fm)
                                    
Moreover, we can register that to the live event stream:

if __name__ == "__main__":
    live_event_stream.register(streaming_tbr)
                                    
And start the live event stream:

if __name__ == "__main__":
    live_event_stream.start()
                                    
After that, we can add each event of the log to the live event stream:

if __name__ == "__main__":
    for ev in static_event_stream:
        live_event_stream.append(ev)
                                    
And then, stop the event stream:

if __name__ == "__main__":
    live_event_stream.stop()
                                    
And get statistics on the execution of the replay (how many missing tokens were needed?) as a Pandas dataframe. This method can be called throughout the lifecycle of the stream, providing the “picture” of the replay up to that point:

if __name__ == "__main__":
    conf_stats = streaming_tbr.get()
    print(conf_stats)
                                    

In addition to this, the following methods are available inside the streaming TBR that print some warning during the replay. The methods can be overriden easily (for example, to send the message with mail):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_missing_tokens
  • message_case_not_in_dictionary
  • message_final_marking_not_reached

Streaming Conformance Checking (footprints)

Footprints is another conformance checking method offered in pm4py, which can be implemented in the context of streaming events. In the following, we see an application of the streaming footprints.

First of all, we can discover the footprints from the process model:

if __name__ == "__main__":
    from pm4py.algo.discovery.footprints import algorithm as fp_discovery
    footprints = fp_discovery.apply(tree)
                                    
Then, we can create the live event stream:

if __name__ == "__main__":
    from pm4py.streaming.stream.live_event_stream import LiveEventStream
    live_event_stream = LiveEventStream()
                                    
Then, we can create the streaming footprints object:

if __name__ == "__main__":
    from pm4py.streaming.algo.conformance.footprints import algorithm as fp_conformance
    streaming_footprints = fp_conformance.apply(footprints)
                                    
And register that to the stream:

if __name__ == "__main__":
    live_event_stream.register(streaming_footprints)
                                    
After that, we can start the live event stream:

if __name__ == "__main__":
    live_event_stream.start()
                                    
And append every event of the original log to this live event stream:

if __name__ == "__main__":
    for ev in static_event_stream:
        live_event_stream.append(ev)
                                    
Eventually, we can stop the live event stream:

if __name__ == "__main__":
    live_event_stream.stop()
                                    
And get the statistics of conformance checking:

if __name__ == "__main__":
    conf_stats = streaming_footprints.get()
    print(conf_stats)
                                    

In addition to this, the following methods are available inside the streaming footprints that print some warning during the replay. The methods can be overriden easily (for example, to send the message with mail):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_footprints_not_possible
  • message_start_activity_not_possible
  • message_end_activity_not_possible
  • message_case_not_in_dictionary

Streaming Conformance Checking (Temporal Profile)

We propose in pm4py an implementation of the temporal profile model. This has been described in:

Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).

A temporal profile measures for every couple of activities in the log the average time and the standard deviation between events having the provided activities. The time is measured between the completion of the first event and the start of the second event. Hence, it is assumed to work with an interval log where the events have two timestamps. The output of the temporal profile discovery is a dictionary where each couple of activities (expressed as a tuple) is associated to a couple of numbers, the first is the average and the second is the average standard deviation.

It is possible to use a temporal profile to perform conformance checking on an event log. The times between the couple of activities in the log are assessed against the numbers stored in the temporal profile. Specifically, a value is calculated that shows how many standard deviations the value is different from the average. If that value exceeds a threshold (by default set to 6, according to the six-sigma principles), then the couple of activities is signaled.

In pm4py, we provide a streaming conformance checking algorithm based on the temporal profile. The algorithm checks an incoming event against every event that happened previously in the case, identifying deviations according to the temporal profile. This section provides an example where a temporal profile is discovered, the streaming conformance checking is set-up and actually a log is replayed on the stream.

We can load an event log, and apply the discovery algorithm.

import pm4py
from pm4py.algo.discovery.temporal_profile import algorithm as temporal_profile_discovery

if __name__ == "__main__":
    log = pm4py.read_xes("tests/input_data/running-example.xes")
    temporal_profile = temporal_profile_discovery.apply(log)
                                
We create the stream, register the temporal conformance checking algorithm and start the stream. The conformance checker can be created with some parameters.

from pm4py.streaming.stream.live_event_stream import LiveEventStream
from pm4py.streaming.algo.conformance.temporal import algorithm as temporal_conformance_checker

if __name__ == "__main__":
    stream = LiveEventStream()
    temp_cc = temporal_conformance_checker.apply(temporal_profile)
    stream.register(temp_cc)
    stream.start()
                                
Parameter Key Type Default Description
Parameters.CASE_ID_KEY string case:concept:name The attribute to use as case ID.
Parameters.ACTIVITY_KEY string concept:name The attribute to use as activity.
Parameters.START_TIMESTAMP_KEY string start_timestamp The attribute to use as start timestamp.
Parameters.TIMESTAMP_KEY string time:timestamp The attribute to use as timestamp.
Parameters.ZETA int 6 Multiplier for the standard deviation. Couples of events that are more distant than this are signaled by the temporal profile.
We send the events of the log against the stream:

if __name__ == "__main__":
    static_stream = pm4py.convert_to_event_stream(log)
    for event in static_stream:
        stream.append(event)
                                

During the execution of the streaming temporal profile conformance checker, some warnings are printed if a couple of events violate the temporal profile. Moreover, it is also possible to get a dictionary containing the cases with deviations associated with all their deviations.

The following code is useful to get the results of the streaming temporal profile conformance checking.

if __name__ == "__main__":
    stream.stop()
    res = temp_cc.get()
                                

Streaming Importer (XES trace-by-trace)

In order to be able to process the traces of a XES event log that might not fit in the memory, or when a sample of a big log is needed, the usage of the XES trace-by-trace streaming importer helps to cope with the situation.

The importer can be used in a natural way, providing the path to the log:

from pm4py.streaming.importer.xes import importer as xes_importer

if __name__ == "__main__":
    streaming_log_object = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"), variant=xes_importer.Variants.XES_TRACE_STREAM)
                                    
And it is possible to iterate over the traces of this log (that are read trace-by-trace):

if __name__ == "__main__":
    for trace in streaming_log_object:
        print(trace)
                                    

Streaming Importer (XES event-by-event)

In order to be able to process the events of a XES event log that might not fit in the memory, or when the sample of a big log is needed, the usage of the XES event-by-event streaming importer helps to cope with the situation. In this case, the single events inside the traces are picked during the iteration.

The importer can be used in a natural way, providing the path to the log:

from pm4py.streaming.importer.xes import importer as xes_importer

if __name__ == "__main__":
    streaming_ev_object = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"), variant=xes_importer.Variants.XES_EVENT_STREAM)
                                    
And it is possible to iterate over the single events of this log (that are read during the iteration):

if __name__ == "__main__":
    for event in streaming_ev_object:
        print(event)
                                    

Streaming Importer (CSV event-by-event)

In order to be able to process the events of a CSV event log that might not fit in the memory, or when the sample of a big log is needed, Pandas might not be feasible. In this case, the single rows of the CSV file are parsed during the iteration.

The importer can be used in a natural way, providing the path to a CSV log:

from pm4py.streaming.importer.csv import importer as csv_importer
if __name__ == "__main__":
    log_object = csv_importer.apply(os.path.join("tests", "input_data", "running-example.csv"))
                                    
And it is possible to iterate over the single events of this log (that are read during the iteration):

if __name__ == "__main__":
    for ev in log_object:
        print(ev)
                                    

OCEL streaming

We offer support for streaming on OCEL. The support is currently limited to:

  • Iterating over the events of an OCEL.
  • Listening to OCELs to direct them to traditional event listeners.
One can iterate over the events of an OCEL as follows:

import pm4py
import os
from pm4py.objects.ocel.util import ocel_iterator

if __name__ == "__main__":
    ocel = pm4py.read_ocel(os.path.join("tests", "input_data", "ocel", "example_log.jsonocel"))
    for ev in ocel_iterator.apply(ocel):
        print(ev)
                                    
A complete example in which we take an OCEL, we instantiate two event streams for the order and element object types respectively, and we push to them the flattening of the events of the OCEL, is reported on the right. The two event listeners are attached with a printer, such that the flattened event is printed on the screen whenever received.

import pm4py
import os
from pm4py.streaming.stream import live_event_stream
from pm4py.streaming.util import event_stream_printer
from pm4py.streaming.conversion import ocel_flatts_distributor
from pm4py.objects.ocel.util import ocel_iterator

if __name__ == "__main__":
    ocel = pm4py.read_ocel(os.path.join("tests", "input_data", "ocel", "example_log.jsonocel"))
    # we wants to use the traditional algorithms for streaming also on object-centric event logs.
    # for this purpose, first we create two different event streams, one for the "order" object type
    # and one for the "element" object type.
    order_stream = live_event_stream.LiveEventStream()
    element_stream = live_event_stream.LiveEventStream()
    # Then, we register an algorithm for every one of them, which is a simple printer of the received events.
    order_stream_printer = event_stream_printer.EventStreamPrinter()
    element_stream_printer = event_stream_printer.EventStreamPrinter()
    order_stream.register(order_stream_printer)
    element_stream.register(element_stream_printer)
    # Then, we create the distributor object.
    # This registers different event streams for different object types.
    flatts_distributor = ocel_flatts_distributor.OcelFlattsDistributor()
    flatts_distributor.register("order", order_stream)
    flatts_distributor.register("element", element_stream)
    order_stream.start()
    element_stream.start()
    # in this way, we iterate over the events of an OCEL
    for ev in ocel_iterator.apply(ocel):
        # and the OCEL event is sent to all the "flattened" event streams.
        flatts_distributor.append(ev)
        # since the "flattened" event streams register a printer each, what we get is a print
        # of all the events that reach these instances.
    order_stream.stop()
    element_stream.stop()