Create Predicted Eventlog: Difference between revisions
No edit summary |
|||
(18 intermediate revisions by 2 users not shown) | |||
Line 2: | Line 2: | ||
* Event attribute '''Predicted''' denotes whether the event is from the source data (''false'') or whether it's predicted (''true''). | * Event attribute '''Predicted''' denotes whether the event is from the source data (''false'') or whether it's predicted (''true''). | ||
* Case attribute '''Generated''' denotes whether the case is in the source data (''false'') or whether the prediction generated it as a new case (''true''). | * Case attribute '''Generated''' denotes whether the case is in the source data (''false'') or whether the prediction generated it as a new case (''true''). | ||
== Prerequisites for prediction == | == Prerequisites for prediction == | ||
Line 36: | Line 37: | ||
1. Go to QPR ProcessAnalyzer and create the following expression script with name '''ML library''': | 1. Go to QPR ProcessAnalyzer and create the following expression script with name '''ML library''': | ||
<pre> | <pre> | ||
/** | |||
* Name of the stored procedure in Snowflake used for ML model training and related event generation. | |||
*/ | |||
let predictionProcedureName = "qprpa_sp_prediction"; | let predictionProcedureName = "qprpa_sp_prediction"; | ||
function GetDataTableConnection(sourceModel) | |||
{ | |||
// The following line is a temporary solution replacing the actual line (commented line below) | |||
// until defect D-14142 is fixed. | |||
// NOTE: If OdbcConnectionString or OdbcConnectionStringKey needs to be used, they Should | |||
// be given as parameters of CreateSnowflakeConnection-call. | |||
return CreateSnowflakeConnection(); | |||
// return sourceModel.EventDataTable.DataSourceConnection; | |||
} | |||
function ModifyColumnTypes(dataTable, columnTypesToRestore) | |||
{ | |||
let currentColumnTypes = dataTable.ColumnTypes; | |||
let currentColumnTypesDict = #{}; | |||
currentColumnTypes.{ | |||
let ct = _; | |||
currentColumnTypesDict.Set(ct.Name, ct.DataType); | |||
}; | |||
let columnsToModify = []; | |||
columnTypesToRestore.{ | |||
let ct = _; | |||
let dataTypeToRestore = ct.DataType; | |||
let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name); | |||
if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) { | |||
columnsToModify = Concat(columnsToModify, [ct]); | |||
} | |||
}; | |||
dataTable | |||
.AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}}); | |||
columnsToModify.{ | |||
let ct = _; | |||
dataTable | |||
.UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType})); | |||
}; | |||
dataTable | |||
.RemoveColumns(columnsToModify.Name) | |||
.RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)}); | |||
dataTable.Synchronize(); | |||
} | |||
/** | |||
* @name RunFunctionWithParallelLogging | |||
* @descripion | |||
* Runs given function that generates logging information into given data table in a way that all the logging will be included into the | |||
* generated script run log as well. | |||
* @param logTable | |||
* A DataTable having two columns: | |||
* - Datetime-column for the log message's timestamp. | |||
* - Log message column. | |||
* @param callbackFunc | |||
* Function that uses given data table for logging it's current status. | |||
* @returns | |||
* The result of the callback function. | |||
*/ | |||
function RunFunctionWithParallelLogging(logTable, callbackFunc) | function RunFunctionWithParallelLogging(logTable, callbackFunc) | ||
{ | { | ||
Line 55: | Line 119: | ||
() => { | () => { | ||
let readRows = 0; | let readRows = 0; | ||
function UpdateLog() | |||
{ | |||
try { | try { | ||
let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect(); | let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect(); | ||
readRows = readRows + rowsDf.NRows; | readRows = readRows + rowsDf.NRows; | ||
Line 78: | Line 137: | ||
} | } | ||
} | } | ||
Sleep(5000); | |||
while (true) { | |||
UpdateLog(); | |||
let finished = state.TryGetValue("Finished"); | |||
if (finished == true) { | |||
UpdateLog(); | |||
WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`); | |||
break; | |||
} | |||
Sleep(5000); | |||
} | |||
} | } | ||
]); | ]); | ||
Line 84: | Line 153: | ||
} | } | ||
/** | |||
* @name Train | |||
* @descripion | |||
* Train a ML model using given parameters and log table. | |||
* @param params | |||
* String dictionary containing the training configuration. | |||
* @param logTable | |||
* DataTable to be used for logging. | |||
* @returns | |||
* Result returned by the ML model training. | |||
*/ | |||
function Train(params, logTable) | function Train(params, logTable) | ||
{ | { | ||
Line 90: | Line 170: | ||
WriteLog("Starting training..."); | WriteLog("Starting training..."); | ||
let connection = logTable. | let connection = logTable.DataSourceConnection; | ||
let finalParams = params.Clone(); | let finalParams = params.Clone(); | ||
finalParams.Set("_typed", params["_typed"].Extend(#{ | finalParams.Set("_typed", params["_typed"].Extend(#{ | ||
Line 103: | Line 183: | ||
} | } | ||
/** | |||
* @name Generate | |||
* @descripion | |||
* Generate new events using a trained ML model and log table. | |||
* @param params | |||
* String dictionary containing the training configuration. | |||
* @param logTable | |||
* DataTable to be used for logging. | |||
* @returns | |||
* Result returned by the generation. | |||
*/ | |||
function Generate(params, logTable) | function Generate(params, logTable) | ||
{ | { | ||
Line 109: | Line 200: | ||
WriteLog("Starting generation...") | WriteLog("Starting generation...") | ||
let connection = logTable. | let connection = logTable.DataSourceConnection; | ||
let typedParams = params["_typed"]; | let typedParams = params["_typed"]; | ||
let resultEventsTable = typedParams | let resultEventsTable = typedParams.TryGetValue("target_event_data_table"); | ||
let resultCasesTable = typedParams.TryGetValue("target_case_data_table"); | let resultCasesTable = typedParams.TryGetValue("target_case_data_table"); | ||
Line 137: | Line 228: | ||
} | } | ||
/** | |||
* @name GetSampledEvents | |||
* @descripion | |||
* Returns a SqlDataFrame containing sampled events of given model where given filter is first applied. | |||
* @param sourceModel | |||
* ProcessAnalyzer model object of the model whose event data is to be filtered and sampled. | |||
* @param sampledCaseCount | |||
* The maximum number of cases to return (or null if all cases should be returned). | |||
* @param filter | |||
* JSON filter to be applied on the event data of the source model prior to performing the sampling. | |||
* @returns | |||
* SqlDataFrame containing sampled events of given model where given filter is first applied. | |||
*/ | |||
function GetSampledEvents(sourceModel, sampledCaseCount, filter) | function GetSampledEvents(sourceModel, sampledCaseCount, filter) | ||
{ | { | ||
if (IsNull(sampledCaseCount) || sampledCaseCount < 1) | if (IsNull(sampledCaseCount) || sampledCaseCount < 1) | ||
return sourceModel. | return sourceModel.CacheTableSqlDataFrame(filter); | ||
let sampleFilterRule = sampledCaseCount == null | let sampleFilterRule = sampledCaseCount == null | ||
? null | ? null | ||
Line 170: | Line 274: | ||
} | } | ||
/** | |||
* @name TrainMLModelForModel | |||
* @descripion | |||
* Generates a new ML model based on the data found in given source model. | |||
* @param modelName | |||
* Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel". | |||
* @param sourceModel | |||
* ProcessAnalyzer model object of the model which is to be trained to the ML model. | |||
* @param targetProject | |||
* Project ín whose context the ML model is to be trained. | |||
* @param trainingConfiguration | |||
* String dictionary containing the training configuration. | |||
* @param recreatePredictionModel | |||
* Should a prediction model be overwritten if one already exists for this source model and target model name combination? | |||
* @param trainingDataFilter | |||
* Filter JSON applied on top of all the source model events to find all the events to be used for training. | |||
* @param logTable | |||
* DataTable to be used for logging. | |||
* @param trainingCaseSampleSize | |||
* Maximum number of cases to use from the source model (random sampled). | |||
* @returns | |||
* Result returned by the ML model training. | |||
*/ | |||
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize) | function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize) | ||
{ | { | ||
let sanitizedModelName = `${sourceModel.Id}-${modelName}`; | let sanitizedModelName = `${sourceModel.Id}-${modelName}`; | ||
let eventDataCaseIdColumn = sourceModel.EventsDataTable.ColumnMappings["CaseId"]; | |||
let | let eventDataTimeStampColumn = sourceModel.EventsDataTable.ColumnMappings["TimeStamp"]; | ||
let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([eventDataCaseIdColumn]).NRows; | |||
let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter); | let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter); | ||
let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([ | let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([eventDataCaseIdColumn]).NRows; | ||
let startTimesSdf = sourceModel.CacheTableSqlDataFrame(trainingDataFilter).GroupBy([eventDataCaseIdColumn]).Aggregate([eventDataTimeStampColumn], ["Min"]).Select([eventDataTimeStampColumn]); | |||
WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`) | WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`) | ||
Line 188: | Line 317: | ||
trainingConfiguration = trainingConfiguration.Extend(#{ | trainingConfiguration = trainingConfiguration.Extend(#{ | ||
"_typed": #{ | "_typed": #{ | ||
"event_data": trainEventDataSdf, | "event_data": trainEventDataSdf, | ||
"start_times": startTimesSdf, | |||
"project": targetProject | "project": targetProject | ||
}, | }, | ||
Line 194: | Line 324: | ||
"column_mappings": columnMappings, | "column_mappings": columnMappings, | ||
"overwrite_model": recreatePredictionModel, | "overwrite_model": recreatePredictionModel, | ||
" | "case_sample_percentage": originalCaseCount / actualTrainingCaseSampleSize | ||
}); | }); | ||
Line 204: | Line 334: | ||
} | } | ||
function | function CreateTargetTables(modelName, sourceModel, targetProject) | ||
{ | { | ||
function GetTable(conn, tableName) | |||
{ | |||
let tableConfiguration = #{ | |||
"Name": tableName, | |||
"Type": "Snowflake", | |||
"Connection": conn | |||
}; | |||
let resultTable = targetProject.DataTableByName(tableName); | |||
if (resultTable == null) | |||
{ | |||
resultTable = targetProject.CreateDataTable(tableConfiguration) | |||
.Modify(#{"NameInDataSource": null}) | |||
.Synchronize(); | |||
} | |||
return resultTable; | |||
} | |||
let sanitizedModelName = `${sourceModel.Id}-${modelName}`; | let sanitizedModelName = `${sourceModel.Id}-${modelName}`; | ||
let eventsTableName = `${modelName} - events`; | let eventsTableName = `${modelName} - events`; | ||
let casesTableName = `${modelName} - cases`; | let casesTableName = `${modelName} - cases`; | ||
let | let targetModel = targetProject.ModelByName(modelName); | ||
let eventsTable = null; | let eventsTable, casesTable = null; | ||
if ( | if (targetModel != null) | ||
{ | { | ||
eventsTable = | eventsTable = targetModel.EventsDataTable; | ||
casesTable = | casesTable = targetModel.CasesDataTable; | ||
} | } | ||
else { | else { | ||
let | let conn = GetDataTableConnection(sourceModel); | ||
eventsTable = GetTable(conn, eventsTableName); | |||
eventsTable = | |||
if (sourceModel.CasesDataTable != null) { | if (sourceModel.CasesDataTable != null) { | ||
casesTable = GetTable(conn, casesTableName); | |||
casesTable = | |||
} | } | ||
} | } | ||
Line 235: | Line 375: | ||
casesTable?.Truncate(); | casesTable?.Truncate(); | ||
let generationParams, result; | return #{ | ||
"TargetModel": targetModel, | |||
"Events": eventsTable, | |||
"Cases": casesTable, | |||
"ModelName": sanitizedModelName | |||
}; | |||
} | |||
function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings) | |||
{ | |||
WriteLog(`Creating model...`); | |||
let eventsTableName = `${modelName} - events`; | |||
let casesTableName = `${modelName} - cases`; | |||
let timestampMapping = eventColumnMappings["TimeStamp"]; | |||
eventColumnMappings.Remove("TimeStamp"); | |||
eventColumnMappings.Set("Timestamp", timestampMapping); | |||
let modelConfiguration = #{ | |||
"DataSource": #{ | |||
"Events":#{ | |||
"DataSourceType": "datatable", | |||
"DataTableName": eventsTableName, | |||
"Columns": eventColumnMappings | |||
} | |||
} | |||
}; | |||
if (caseColumnMappings != null) { | |||
modelConfiguration["DataSource"].Set("Cases", #{ | |||
"DataSourceType": "datatable", | |||
"DataTableName": casesTableName, | |||
"Columns": caseColumnMappings | |||
}); | |||
} | |||
targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration}); | |||
} | |||
/** | |||
* @name GenerateNewModel | |||
* @descripion | |||
* Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases: | |||
* - Generate new cases with new events. | |||
* - Predict new events for incomplete cases. | |||
* - Append original source model data. | |||
* - Create ProcessAnalyzer model if it does not already exist. | |||
* @param modelName | |||
* Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel". | |||
* @param sourceModel | |||
* ProcessAnalyzer model object of the model based on which the new model will be created. | |||
* @param targetProject | |||
* Project ínto which the new ProcessAnalyzer model is to be created. | |||
* @param generationConfiguration | |||
* String dictionary containing the generation configuration. | |||
* @param incompleteCasesFilter | |||
* Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases. | |||
* @param logTable | |||
* DataTable to be used for logging. | |||
* @returns | |||
* ProcessAnalyzer model object of the containing the results of the generation. | |||
*/ | |||
function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable) | |||
{ | |||
let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); | |||
let eventsTable = tablesDict["Events"]; | |||
let casesTable = tablesDict["Cases"]; | |||
let targetModel = tablesDict["TargetModel"]; | |||
let sanitizedModelName = tablesDict["ModelName"]; | |||
let generationParams, result, overwrite = true; | |||
if (generationConfiguration.TryGetValue("cases_to_generate") != 0) { | if (generationConfiguration.TryGetValue("cases_to_generate") != 0) { | ||
Line 246: | Line 457: | ||
}, | }, | ||
"model_name": sanitizedModelName, | "model_name": sanitizedModelName, | ||
"overwrite": | "overwrite": overwrite | ||
}); | }); | ||
if (casesTable != null) { | if (casesTable != null) { | ||
generationParams["_typed"].Set("target_case_data_table", casesTable); | |||
} | } | ||
Line 259: | Line 470: | ||
if (result["result"] != "success") | if (result["result"] != "success") | ||
throw result["exception"] | throw result["exception"] | ||
overwrite = false; | |||
} | } | ||
Line 272: | Line 485: | ||
}, | }, | ||
"model_name": sanitizedModelName, | "model_name": sanitizedModelName, | ||
"overwrite": | "overwrite": overwrite, | ||
"temperature": 0 | "temperature": 0 | ||
}); | }); | ||
if (casesTable != null) { | if (casesTable != null) { | ||
generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame); | |||
} | } | ||
result = Generate(generationParams, logTable); | result = Generate(generationParams, logTable); | ||
overwrite = false; | |||
WriteLog(`Generation results:\r\n${result}`); | WriteLog(`Generation results:\r\n${result}`); | ||
Line 329: | Line 543: | ||
); | ); | ||
if ( | ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); | ||
return | if (casesTable != null) | ||
ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); | |||
if (targetModel != null) | |||
return targetModel; | |||
CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null); | |||
} | |||
/** | |||
* @name GeneratePredictionModel | |||
* @descripion | |||
* Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events. | |||
* @param configuration | |||
* A string dictionary configuring the model prediction process. Supported keys are: | |||
* "Name": Name of the PA model to generate to the target project. | |||
* "SourceModel": Snowflake-based PA model used for training the prediction model. | |||
* "TargetProject": Target project to create the model into. | |||
* "TrainingConfiguration": Training parameters as string dictionary. | |||
* "GenerationConfiguration": Event generation parameters as string dictionary. | |||
* "TrainingDataFilter": Filter JSON for events to be used for training. | |||
* "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted. | |||
* "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination? | |||
* "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled). | |||
*/ | |||
function GeneratePredictionModel(configuration) | |||
{ | |||
let modelName = configuration["Name"]; | |||
let sourceModel = configuration["SourceModel"]; | |||
let targetProject = configuration["TargetProject"]; | |||
let trainingConfiguration = configuration["TrainingConfiguration"]; | |||
let generationConfiguration = configuration["GenerationConfiguration"]; | |||
let trainingDataFilter = configuration["TrainingDataFilter"]; | |||
let incompleteCasesFilter = configuration["IncompleteCasesFilter"]; | |||
let recreatePredictionModel = configuration["RecreatePredictionModel"]; | |||
let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"]; | |||
let result = null; | |||
let logTableName = `_MLLog:${Now}`; | |||
let conn = GetDataTableConnection(sourceModel); | |||
let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize(); | |||
try { | |||
WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`); | |||
result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); | |||
WriteLog(`Model training completed. Training result:\r\n${result}`); | |||
result = ParseJson(result); | |||
if (generationConfiguration != null) { | |||
WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`); | |||
let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable); | |||
WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); | |||
result.Set("Model", newModel); | |||
" | |||
} | } | ||
} | |||
finally { | |||
targetProject.DataTableByName(logTableName)?.DeletePermanently(); | |||
} | |||
return result; | |||
} | |||
function TransformModel(modelName, sourceModel, targetProject, configurations, logTable) | |||
{ | |||
let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); | |||
let eventsTable = tablesDict["Events"]; | |||
let casesTable = tablesDict["Cases"]; | |||
let targetModel = tablesDict["TargetModel"]; | |||
let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings; | |||
let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings; | |||
let result; | |||
WriteLog(`Performing transformations...`); | |||
let configuration = #{ | |||
"_typed": #{ | |||
"event_data": sourceModel.EventsDataTable.SqlDataFrame, | |||
"project": targetProject, | |||
"target_event_data_table": eventsTable | |||
}, | |||
"column_mappings": eventColumnMappings, | |||
"overwrite": true, | |||
"transformations": configurations | |||
}; | }; | ||
if (casesTable != null) { | |||
configuration["_typed"].Set("target_case_data_table", casesTable); | |||
} | |||
result = Generate(configuration, logTable); | |||
WriteLog(`Transformation results:\r\n${result}`); | |||
result = ParseJson(result); | |||
if (result["result"] != "success") | |||
throw result["exception"] | |||
if (casesTable != null) { | if (casesTable != null) { | ||
WriteLog(`Importing case data data...`); | |||
casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true}); | |||
ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); | |||
} | |||
}); | |||
ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); | |||
if (targetModel != null) | |||
return targetModel; | |||
CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null); | |||
} | |||
function ApplyTransformations(configuration) | |||
{ | |||
let modelName = configuration["Name"]; | |||
let sourceModel = configuration["SourceModel"]; | |||
let targetProject = configuration["TargetProject"]; | |||
let transformationConfigurations = configuration["Transformations"]; | |||
let result = null; | |||
let logTableName = `_MLLog:${Now}`; | |||
let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}).Synchronize(); | |||
try { | |||
WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`); | |||
let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable); | |||
WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); | |||
return newModel; | |||
} | |||
finally { | |||
targetProject.DataTableByName(logTableName)?.DeletePermanently(); | |||
} | } | ||
} | |||
targetProject. | function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable) | ||
{ | |||
let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); | |||
let eventsTable = tablesDict["Events"]; | |||
let casesTable = tablesDict["Cases"]; | |||
let targetModel = tablesDict["TargetModel"]; | |||
let sanitizedModelName = tablesDict["ModelName"]; | |||
let generationParams, result, overwrite = true; | |||
WriteLog(`Generating case attribute values for running cases...`); | |||
let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame; | |||
if (incompleteCasesFilter != null) | |||
incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame); | |||
generationParams = generationConfiguration.Extend(#{ | |||
"_typed": #{ | |||
"event_data": incompleteCasesSqlDataFrame, | |||
"case_data": sourceModel.CasesDataTable.SqlDataFrame, | |||
"target_case_data_table": casesTable, | |||
"project": targetProject | |||
}, | |||
"model_name": sanitizedModelName, | |||
"overwrite": overwrite, | |||
"temperature": 0 | |||
}); | |||
result = Generate(generationParams, logTable); | |||
overwrite = false; | |||
WriteLog(`Generation results:\r\n${result}`); | |||
result = ParseJson(result); | |||
if (result["result"] != "success") | |||
throw result["exception"] | |||
if (!("Predicted".In(casesTable.ColumnTypes.Name))) | |||
casesTable | |||
.AddColumn("Predicted", "Boolean"); | |||
casesTable | |||
.UpdateRows( | |||
null, | |||
"Predicted", true | |||
); | |||
WriteLog(`Appending original model data...`); | |||
eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize(); | |||
attributesToGenerate.{ | |||
let col = _; | |||
let newCol = `Predicted_${col}`; | |||
casesTable.AddColumn(newCol, "String"); | |||
casesTable.UpdateRows(null, newCol, Column(#expr{col})); | |||
}; | |||
let createdColumns = casesTable.ColumnTypes.Name; | |||
sourceModel.CasesDataTable.ColumnTypes.Name | |||
.Where(!_.In(createdColumns)).{ | |||
let col = _; | |||
casesTable.AddColumn(col, "String"); | |||
}; | |||
casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]); | |||
casesTable | |||
.UpdateRows( | |||
Column("Predicted") == null, | |||
"Predicted", false | |||
); | |||
WriteLog(`Restoring original column types...`); | |||
ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); | |||
ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); | |||
if (targetModel != null) | |||
return targetModel; | |||
CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null); | |||
} | } | ||
function | function GenerateCaseAttributePredictionModel(configuration) | ||
{ | { | ||
let modelName = configuration["Name"]; | let modelName = configuration["Name"]; | ||
Line 373: | Line 767: | ||
let result = null; | let result = null; | ||
let logTableName = `_MLLog:${Now}`; | let logTableName = `_MLLog:${Now}`; | ||
let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}); | let conn = GetDataTableConnection(sourceModel); | ||
let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize(); | |||
try { | try { | ||
WriteLog(`Training a prediction model based on model: | let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"]; | ||
WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`); | |||
result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); | result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); | ||
WriteLog(`Model training completed. Training result:\r\n${result}`); | WriteLog(`Model training completed. Training result:\r\n${result}`); | ||
result = ParseJson(result); | |||
if (generationConfiguration != null) { | if (generationConfiguration != null) { | ||
WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName}) | WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`); | ||
let newModel = | let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable); | ||
WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name} | WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); | ||
result.Set("Model", newModel); | |||
} | } | ||
} | } | ||
Line 389: | Line 788: | ||
targetProject.DataTableByName(logTableName)?.DeletePermanently(); | targetProject.DataTableByName(logTableName)?.DeletePermanently(); | ||
} | } | ||
return result; | |||
} | } | ||
return #{ | return #{ | ||
"GeneratePredictionModel": GeneratePredictionModel, | "GeneratePredictionModel": GeneratePredictionModel, | ||
" | "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel, | ||
" | "ApplyTransformations": ApplyTransformations, | ||
" | "ModifyColumnTypes": ModifyColumnTypes | ||
}; | }; | ||
</pre> | </pre> | ||
2. Create the following expression script (e.g., with name '''Create prediction model'''): | 2. Create the following example expression script (e.g., with name '''Create prediction model'''): | ||
<pre> | <pre> | ||
let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{}); | let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{}); | ||
Line 407: | Line 808: | ||
"SourceModel": ModelById(1), | "SourceModel": ModelById(1), | ||
"TargetProject": targetProject, | "TargetProject": targetProject, | ||
"TrainingConfiguration": #{}, | "RecreatePredictionModel": true, | ||
"TrainingConfiguration": #{ | |||
"num_epochs_to_train": 500 | |||
}, | |||
"GenerationConfiguration": #{ | "GenerationConfiguration": #{ | ||
"cases_to_generate": 1000 | "cases_to_generate": 1000 | ||
}, | }, | ||
"TrainingDataFilter": | "TrainingDataFilter": #{ | ||
"Items": [ | |||
#{ | |||
"TrainingCaseSampleSize": 1000 | "Type": "IncludeCases", | ||
"Items": [ | |||
#{ | |||
"Type": "EventAttributeValue", | |||
"Attribute": "Event type", | |||
"StringifiedValues": [ | |||
"0Invoice Payment" | |||
] | |||
} | |||
] | |||
} | |||
] | |||
}, | |||
"TrainingCaseSampleSize": 1000, | |||
"IncompleteCasesFilter": #{ | |||
"Items": [ | |||
#{ | |||
"Type": "ExcludeCases", | |||
"Items": [ | |||
#{ | |||
"Type": "EventAttributeValue", | |||
"Attribute": "Event type", | |||
"StringifiedValues": [ | |||
"0Invoice Payment" | |||
] | |||
} | |||
] | |||
} | |||
] | |||
}, | |||
}); | }); | ||
</pre> | </pre> | ||
Line 426: | Line 859: | ||
* '''RecreatePredictionModel''': When ''true'', a new ML model is trained when the script is run. When ''false'', the prediction is run using possibly pre-existing ML model. | * '''RecreatePredictionModel''': When ''true'', a new ML model is trained when the script is run. When ''false'', the prediction is run using possibly pre-existing ML model. | ||
* '''TrainingConfiguration''': Training parameters. | * '''TrainingConfiguration''': Training parameters. | ||
* '''GenerationConfiguration''': Event generation parameters. When null, no generation is done. | ** '''num_epochs_to_train''': How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected. | ||
* '''TrainingDataFilter''': | ** '''max_num_case_clusters''': Maximum number of clusters to divide the case attribute values into. Default is 20. | ||
* '''GenerationConfiguration''': Event generation parameters. When null, no generation is done. For example, following parameters are supported: | |||
** '''cases_to_generate''': Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the ''case_generation_start_time'' and ''case_generation_end_time'' parameters. | |||
** '''case_generation_start_time''': If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format. | |||
** '''case_generation_end_time''': If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format. | |||
** '''generate_debug_event_attributes''': If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities. | |||
** '''min_prediction_probability ''': Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01. | |||
** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1. | |||
* '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that. | |||
* '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data. | * '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data. | ||
* '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which | * '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore. | ||
Latest revision as of 13:03, 29 October 2024
This article has instructions how to install, configure and use eventlog predictions. The prediction creates a new model that contains the source model data and the predictions. It's able to predict case attributes for the generated new cases and event attributes for the predicted events. To distinguish the real (source data) and predicted events and cases, there are following attributes in the model:
- Event attribute Predicted denotes whether the event is from the source data (false) or whether it's predicted (true).
- Case attribute Generated denotes whether the case is in the source data (false) or whether the prediction generated it as a new case (true).
Prerequisites for prediction
Following prerequisites need to be fulfilled to run the eventlog prediction:
- QPR ProcessAnalyzer 2024.2 or later in use
- Snowflake connection is configured
- Source models are stored to Snowflake
Install prediction to Snowflake
To install the eventlog prediction to Snowflake:
- Go to Snowflake, and create a Snowflake-managed stage with name PREDICTION to the same schema configured to QPR ProcessAnalyzer (in the Snowflake connection string). Use settings in the following image:
- Open the created stage and upload the predict.pyz file into the stage (ask the file from your QPR representative).
- Create the following procedure to the same schema:
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT) RETURNS OBJECT LANGUAGE PYTHON STRICT RUNTIME_VERSION = '3.8' PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py') HANDLER = 'main' EXECUTE AS OWNER AS ' import sys def main(session, parameters_in: dict) -> dict: session.file.get(''@decision_intelligence/predict.pyz'', ''/tmp'') sys.path.append(''/tmp/predict.pyz'') import predict return predict.main(session, parameters_in) ';
Create prediction script in QPR ProcessAnalyzer
1. Go to QPR ProcessAnalyzer and create the following expression script with name ML library:
/** * Name of the stored procedure in Snowflake used for ML model training and related event generation. */ let predictionProcedureName = "qprpa_sp_prediction"; function GetDataTableConnection(sourceModel) { // The following line is a temporary solution replacing the actual line (commented line below) // until defect D-14142 is fixed. // NOTE: If OdbcConnectionString or OdbcConnectionStringKey needs to be used, they Should // be given as parameters of CreateSnowflakeConnection-call. return CreateSnowflakeConnection(); // return sourceModel.EventDataTable.DataSourceConnection; } function ModifyColumnTypes(dataTable, columnTypesToRestore) { let currentColumnTypes = dataTable.ColumnTypes; let currentColumnTypesDict = #{}; currentColumnTypes.{ let ct = _; currentColumnTypesDict.Set(ct.Name, ct.DataType); }; let columnsToModify = []; columnTypesToRestore.{ let ct = _; let dataTypeToRestore = ct.DataType; let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name); if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) { columnsToModify = Concat(columnsToModify, [ct]); } }; dataTable .AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}}); columnsToModify.{ let ct = _; dataTable .UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType})); }; dataTable .RemoveColumns(columnsToModify.Name) .RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)}); dataTable.Synchronize(); } /** * @name RunFunctionWithParallelLogging * @descripion * Runs given function that generates logging information into given data table in a way that all the logging will be included into the * generated script run log as well. * @param logTable * A DataTable having two columns: * - Datetime-column for the log message's timestamp. * - Log message column. * @param callbackFunc * Function that uses given data table for logging it's current status. * @returns * The result of the callback function. */ function RunFunctionWithParallelLogging(logTable, callbackFunc) { let state = #{}; logTable.Truncate(); _system.Parallel.Run([ () => { try { state.Set("Result", callbackFunc()); } finally { state.Set("Finished", true); } }, () => { let readRows = 0; function UpdateLog() { try { let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect(); readRows = readRows + rowsDf.NRows; rowsDf.Rows.{ let row = _; WriteLog(`${row[0]} \t${row[1]}`); } } catch (ex) { if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") { WriteLog(`Log data table has disappeared. The generated log will not be complete!`); break; } WriteLog(`Log has not yet been created.`); } } while (true) { UpdateLog(); let finished = state.TryGetValue("Finished"); if (finished == true) { UpdateLog(); WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`); break; } Sleep(5000); } } ]); return state.TryGetValue("Result"); } /** * @name Train * @descripion * Train a ML model using given parameters and log table. * @param params * String dictionary containing the training configuration. * @param logTable * DataTable to be used for logging. * @returns * Result returned by the ML model training. */ function Train(params, logTable) { function TrainCallback() { WriteLog("Starting training..."); let connection = logTable.DataSourceConnection; let finalParams = params.Clone(); finalParams.Set("_typed", params["_typed"].Extend(#{ "log_table": logTable })); let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams}); WriteLog("Finished training...") return result; } return RunFunctionWithParallelLogging(logTable, TrainCallback); } /** * @name Generate * @descripion * Generate new events using a trained ML model and log table. * @param params * String dictionary containing the training configuration. * @param logTable * DataTable to be used for logging. * @returns * Result returned by the generation. */ function Generate(params, logTable) { function GenerateCallback() { WriteLog("Starting generation...") let connection = logTable.DataSourceConnection; let typedParams = params["_typed"]; let resultEventsTable = typedParams.TryGetValue("target_event_data_table"); let resultCasesTable = typedParams.TryGetValue("target_case_data_table"); if (params["overwrite"]) { resultEventsTable.Truncate(); resultCasesTable?.Truncate(); } let finalParams = params.Clone(); finalParams.Set("_typed", typedParams.Extend(#{ "log_table": logTable })); let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams}); resultEventsTable.Synchronize(); resultCasesTable?.Synchronize(); WriteLog("Finished generation...") return result; } return RunFunctionWithParallelLogging(logTable, GenerateCallback); } /** * @name GetSampledEvents * @descripion * Returns a SqlDataFrame containing sampled events of given model where given filter is first applied. * @param sourceModel * ProcessAnalyzer model object of the model whose event data is to be filtered and sampled. * @param sampledCaseCount * The maximum number of cases to return (or null if all cases should be returned). * @param filter * JSON filter to be applied on the event data of the source model prior to performing the sampling. * @returns * SqlDataFrame containing sampled events of given model where given filter is first applied. */ function GetSampledEvents(sourceModel, sampledCaseCount, filter) { if (IsNull(sampledCaseCount) || sampledCaseCount < 1) return sourceModel.CacheTableSqlDataFrame(filter); let sampleFilterRule = sampledCaseCount == null ? null : #{ "Type":"IncludeCases", "Items":[#{ "Type": "SqlExpressionValue", "Configuration": #{ "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})` } }] }; let filterRules = filter?["Items"]; if (!IsNullTop(filterRules)) { if (!IsNullTop(sampleFilterRule)) filterRules = Concat(filterRules, sampleFilterRule); } else { if (!IsNullTop(sampleFilterRule)) filterRules = [sampleFilterRule]; else filterRules = []; } let finalFilter = #{ "Items":filterRules }; sourceModel.CacheTableSqlDataFrame(finalFilter); } /** * @name TrainMLModelForModel * @descripion * Generates a new ML model based on the data found in given source model. * @param modelName * Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel". * @param sourceModel * ProcessAnalyzer model object of the model which is to be trained to the ML model. * @param targetProject * Project ín whose context the ML model is to be trained. * @param trainingConfiguration * String dictionary containing the training configuration. * @param recreatePredictionModel * Should a prediction model be overwritten if one already exists for this source model and target model name combination? * @param trainingDataFilter * Filter JSON applied on top of all the source model events to find all the events to be used for training. * @param logTable * DataTable to be used for logging. * @param trainingCaseSampleSize * Maximum number of cases to use from the source model (random sampled). * @returns * Result returned by the ML model training. */ function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize) { let sanitizedModelName = `${sourceModel.Id}-${modelName}`; let eventDataCaseIdColumn = sourceModel.EventsDataTable.ColumnMappings["CaseId"]; let eventDataTimeStampColumn = sourceModel.EventsDataTable.ColumnMappings["TimeStamp"]; let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([eventDataCaseIdColumn]).NRows; let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter); let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([eventDataCaseIdColumn]).NRows; let startTimesSdf = sourceModel.CacheTableSqlDataFrame(trainingDataFilter).GroupBy([eventDataCaseIdColumn]).Aggregate([eventDataTimeStampColumn], ["Min"]).Select([eventDataTimeStampColumn]); WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`) let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame; let columnMappings = trainEventDataSdf.ColumnMappings; if (trainCaseDataSdf != null) { columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]}); } trainingConfiguration = trainingConfiguration.Extend(#{ "_typed": #{ "event_data": trainEventDataSdf, "start_times": startTimesSdf, "project": targetProject }, "model_name": sanitizedModelName, "column_mappings": columnMappings, "overwrite_model": recreatePredictionModel, "case_sample_percentage": originalCaseCount / actualTrainingCaseSampleSize }); if (trainCaseDataSdf != null) { trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf); } Train(trainingConfiguration, logTable); } function CreateTargetTables(modelName, sourceModel, targetProject) { function GetTable(conn, tableName) { let tableConfiguration = #{ "Name": tableName, "Type": "Snowflake", "Connection": conn }; let resultTable = targetProject.DataTableByName(tableName); if (resultTable == null) { resultTable = targetProject.CreateDataTable(tableConfiguration) .Modify(#{"NameInDataSource": null}) .Synchronize(); } return resultTable; } let sanitizedModelName = `${sourceModel.Id}-${modelName}`; let eventsTableName = `${modelName} - events`; let casesTableName = `${modelName} - cases`; let targetModel = targetProject.ModelByName(modelName); let eventsTable, casesTable = null; if (targetModel != null) { eventsTable = targetModel.EventsDataTable; casesTable = targetModel.CasesDataTable; } else { let conn = GetDataTableConnection(sourceModel); eventsTable = GetTable(conn, eventsTableName); if (sourceModel.CasesDataTable != null) { casesTable = GetTable(conn, casesTableName); } } eventsTable.Truncate(); casesTable?.Truncate(); return #{ "TargetModel": targetModel, "Events": eventsTable, "Cases": casesTable, "ModelName": sanitizedModelName }; } function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings) { WriteLog(`Creating model...`); let eventsTableName = `${modelName} - events`; let casesTableName = `${modelName} - cases`; let timestampMapping = eventColumnMappings["TimeStamp"]; eventColumnMappings.Remove("TimeStamp"); eventColumnMappings.Set("Timestamp", timestampMapping); let modelConfiguration = #{ "DataSource": #{ "Events":#{ "DataSourceType": "datatable", "DataTableName": eventsTableName, "Columns": eventColumnMappings } } }; if (caseColumnMappings != null) { modelConfiguration["DataSource"].Set("Cases", #{ "DataSourceType": "datatable", "DataTableName": casesTableName, "Columns": caseColumnMappings }); } targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration}); } /** * @name GenerateNewModel * @descripion * Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases: * - Generate new cases with new events. * - Predict new events for incomplete cases. * - Append original source model data. * - Create ProcessAnalyzer model if it does not already exist. * @param modelName * Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel". * @param sourceModel * ProcessAnalyzer model object of the model based on which the new model will be created. * @param targetProject * Project ínto which the new ProcessAnalyzer model is to be created. * @param generationConfiguration * String dictionary containing the generation configuration. * @param incompleteCasesFilter * Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases. * @param logTable * DataTable to be used for logging. * @returns * ProcessAnalyzer model object of the containing the results of the generation. */ function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable) { let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); let eventsTable = tablesDict["Events"]; let casesTable = tablesDict["Cases"]; let targetModel = tablesDict["TargetModel"]; let sanitizedModelName = tablesDict["ModelName"]; let generationParams, result, overwrite = true; if (generationConfiguration.TryGetValue("cases_to_generate") != 0) { WriteLog(`Generating new cases with new events...`); generationParams = generationConfiguration.Extend(#{ "_typed": #{ "target_event_data_table": eventsTable, "project": targetProject }, "model_name": sanitizedModelName, "overwrite": overwrite }); if (casesTable != null) { generationParams["_typed"].Set("target_case_data_table", casesTable); } result = Generate(generationParams, logTable); WriteLog(`Generation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] overwrite = false; } if (incompleteCasesFilter != null) { WriteLog(`Generating new events for running cases...`); let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame); generationParams = generationConfiguration.Extend(#{ "_typed": #{ "event_data": incompleteCasesSqlDataFrame, "target_event_data_table": eventsTable, "project": targetProject }, "model_name": sanitizedModelName, "overwrite": overwrite, "temperature": 0 }); if (casesTable != null) { generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame); } result = Generate(generationParams, logTable); overwrite = false; WriteLog(`Generation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] } if (!("Predicted".In(eventsTable.ColumnTypes.Name))) eventsTable .AddColumn("Predicted", "Boolean"); eventsTable .DeleteRows( Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__" ) .UpdateRows( 1==1, "Predicted", true ); if (casesTable != null) { if (!("Generated".In(casesTable.ColumnTypes.Name))) casesTable .AddColumn("Generated", "Boolean"); casesTable .UpdateRows( 1==1, "Generated", true ); } WriteLog(`Appending original model data...`); eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}); if (casesTable != null) casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true}); eventsTable .UpdateRows( Column("Predicted") == null, "Predicted", false ); casesTable ?.UpdateRows( Column("Generated") == null, "Generated", false ); ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); if (casesTable != null) ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); if (targetModel != null) return targetModel; CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null); } /** * @name GeneratePredictionModel * @descripion * Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events. * @param configuration * A string dictionary configuring the model prediction process. Supported keys are: * "Name": Name of the PA model to generate to the target project. * "SourceModel": Snowflake-based PA model used for training the prediction model. * "TargetProject": Target project to create the model into. * "TrainingConfiguration": Training parameters as string dictionary. * "GenerationConfiguration": Event generation parameters as string dictionary. * "TrainingDataFilter": Filter JSON for events to be used for training. * "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted. * "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination? * "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled). */ function GeneratePredictionModel(configuration) { let modelName = configuration["Name"]; let sourceModel = configuration["SourceModel"]; let targetProject = configuration["TargetProject"]; let trainingConfiguration = configuration["TrainingConfiguration"]; let generationConfiguration = configuration["GenerationConfiguration"]; let trainingDataFilter = configuration["TrainingDataFilter"]; let incompleteCasesFilter = configuration["IncompleteCasesFilter"]; let recreatePredictionModel = configuration["RecreatePredictionModel"]; let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"]; let result = null; let logTableName = `_MLLog:${Now}`; let conn = GetDataTableConnection(sourceModel); let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize(); try { WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`); result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); WriteLog(`Model training completed. Training result:\r\n${result}`); result = ParseJson(result); if (generationConfiguration != null) { WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`); let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable); WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); result.Set("Model", newModel); } } finally { targetProject.DataTableByName(logTableName)?.DeletePermanently(); } return result; } function TransformModel(modelName, sourceModel, targetProject, configurations, logTable) { let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); let eventsTable = tablesDict["Events"]; let casesTable = tablesDict["Cases"]; let targetModel = tablesDict["TargetModel"]; let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings; let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings; let result; WriteLog(`Performing transformations...`); let configuration = #{ "_typed": #{ "event_data": sourceModel.EventsDataTable.SqlDataFrame, "project": targetProject, "target_event_data_table": eventsTable }, "column_mappings": eventColumnMappings, "overwrite": true, "transformations": configurations }; if (casesTable != null) { configuration["_typed"].Set("target_case_data_table", casesTable); } result = Generate(configuration, logTable); WriteLog(`Transformation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] if (casesTable != null) { WriteLog(`Importing case data data...`); casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true}); ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); } ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); if (targetModel != null) return targetModel; CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null); } function ApplyTransformations(configuration) { let modelName = configuration["Name"]; let sourceModel = configuration["SourceModel"]; let targetProject = configuration["TargetProject"]; let transformationConfigurations = configuration["Transformations"]; let result = null; let logTableName = `_MLLog:${Now}`; let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}).Synchronize(); try { WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`); let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable); WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); return newModel; } finally { targetProject.DataTableByName(logTableName)?.DeletePermanently(); } } function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable) { let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject); let eventsTable = tablesDict["Events"]; let casesTable = tablesDict["Cases"]; let targetModel = tablesDict["TargetModel"]; let sanitizedModelName = tablesDict["ModelName"]; let generationParams, result, overwrite = true; WriteLog(`Generating case attribute values for running cases...`); let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame; if (incompleteCasesFilter != null) incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame); generationParams = generationConfiguration.Extend(#{ "_typed": #{ "event_data": incompleteCasesSqlDataFrame, "case_data": sourceModel.CasesDataTable.SqlDataFrame, "target_case_data_table": casesTable, "project": targetProject }, "model_name": sanitizedModelName, "overwrite": overwrite, "temperature": 0 }); result = Generate(generationParams, logTable); overwrite = false; WriteLog(`Generation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] if (!("Predicted".In(casesTable.ColumnTypes.Name))) casesTable .AddColumn("Predicted", "Boolean"); casesTable .UpdateRows( null, "Predicted", true ); WriteLog(`Appending original model data...`); eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize(); attributesToGenerate.{ let col = _; let newCol = `Predicted_${col}`; casesTable.AddColumn(newCol, "String"); casesTable.UpdateRows(null, newCol, Column(#expr{col})); }; let createdColumns = casesTable.ColumnTypes.Name; sourceModel.CasesDataTable.ColumnTypes.Name .Where(!_.In(createdColumns)).{ let col = _; casesTable.AddColumn(col, "String"); }; casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]); casesTable .UpdateRows( Column("Predicted") == null, "Predicted", false ); WriteLog(`Restoring original column types...`); ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes); ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes); if (targetModel != null) return targetModel; CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null); } function GenerateCaseAttributePredictionModel(configuration) { let modelName = configuration["Name"]; let sourceModel = configuration["SourceModel"]; let targetProject = configuration["TargetProject"]; let trainingConfiguration = configuration["TrainingConfiguration"]; let generationConfiguration = configuration["GenerationConfiguration"]; let trainingDataFilter = configuration["TrainingDataFilter"]; let incompleteCasesFilter = configuration["IncompleteCasesFilter"]; let recreatePredictionModel = configuration["RecreatePredictionModel"]; let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"]; let result = null; let logTableName = `_MLLog:${Now}`; let conn = GetDataTableConnection(sourceModel); let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize(); try { let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"]; WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`); result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); WriteLog(`Model training completed. Training result:\r\n${result}`); result = ParseJson(result); if (generationConfiguration != null) { WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`); let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable); WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`); result.Set("Model", newModel); } } finally { targetProject.DataTableByName(logTableName)?.DeletePermanently(); } return result; } return #{ "GeneratePredictionModel": GeneratePredictionModel, "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel, "ApplyTransformations": ApplyTransformations, "ModifyColumnTypes": ModifyColumnTypes };
2. Create the following example expression script (e.g., with name Create prediction model):
let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{}); let targetProject = Project; lib.GeneratePredictionModel(#{ "Name": "My prediction model", "SourceModel": ModelById(1), "TargetProject": targetProject, "RecreatePredictionModel": true, "TrainingConfiguration": #{ "num_epochs_to_train": 500 }, "GenerationConfiguration": #{ "cases_to_generate": 1000 }, "TrainingDataFilter": #{ "Items": [ #{ "Type": "IncludeCases", "Items": [ #{ "Type": "EventAttributeValue", "Attribute": "Event type", "StringifiedValues": [ "0Invoice Payment" ] } ] } ] }, "TrainingCaseSampleSize": 1000, "IncompleteCasesFilter": #{ "Items": [ #{ "Type": "ExcludeCases", "Items": [ #{ "Type": "EventAttributeValue", "Attribute": "Event type", "StringifiedValues": [ "0Invoice Payment" ] } ] } ] }, });
3. Configure prediction for the previously created script as instructed in the next chapter.
Configure prediction
Prediction script has the following settings in the GeneratePredictionModel call:
- Name: Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
- SourceModel: Source model for which the prediction is made. Model can be selected for example based on id with ModelById function or by name with ModelByName function.
- TargetProject: Target project to create the new model into.
- RecreatePredictionModel: When true, a new ML model is trained when the script is run. When false, the prediction is run using possibly pre-existing ML model.
- TrainingConfiguration: Training parameters.
- num_epochs_to_train: How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected.
- max_num_case_clusters: Maximum number of clusters to divide the case attribute values into. Default is 20.
- GenerationConfiguration: Event generation parameters. When null, no generation is done. For example, following parameters are supported:
- cases_to_generate: Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the case_generation_start_time and case_generation_end_time parameters.
- case_generation_start_time: If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format.
- case_generation_end_time: If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format.
- generate_debug_event_attributes: If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities.
- min_prediction_probability : Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01.
- temperature: If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
- TrainingDataFilter: Filter to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
- TrainingCaseSampleSize: Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.
- IncompleteCasesFilter: Optional filter to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.