Create Predicted Eventlog
Jump to navigation
Jump to search
Prerequisites
Following prerequisites need to be fullfilled to run the eventlog prediction:
- QPR ProcessAnalyzer 2024.2 or later
- Snowflake is configured
- Source model is stored to Snowflake
Prediction installation
Follow these steps to install the eventlog prediction:
- Create a Snowflake-managed stage with name PREDICTION to the same schema that is configured for QPR ProcessAnalyzer (in the Snowflake connection string).
- Open the created stage and upload the predict.pyz file to the stage (ask the file from your QPR representative).
- Create the procedure procedure to the same schema.
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT) RETURNS OBJECT LANGUAGE PYTHON STRICT RUNTIME_VERSION = '3.8' PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py') HANDLER = 'main' EXECUTE AS OWNER AS ' import sys def main(session, parameters_in: dict) -> dict: session.file.get(''@prediction/predict.pyz'', ''/tmp'') sys.path.append(''/tmp/predict.pyz'') import predict return predict.main(session, parameters_in) ';
- Go to QPR ProcessAnalyzer and create the following expression script:
let predictionProcedureName = "qprpa_sp_prediction"; function RunFunctionWithParallelLogging(logTable, callbackFunc) { let state = #{}; logTable.Truncate(); _system.Parallel.Run([ () => { try { state.Set("Result", callbackFunc()); } finally { state.Set("Finished", true); } }, () => { let readRows = 0; while (true) { try { let finished = state.TryGetValue("Finished"); if (finished == true) { WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`); break; } let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect(); readRows = readRows + rowsDf.NRows; rowsDf.Rows.{ let row = _; WriteLog(`${row[0]} \t${row[1]}`); } } catch (ex) { if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") { WriteLog(`Log data table has disappeared. The generated log will not be complete!`); break; } WriteLog(`Log has not yet been created.`); } } Sleep(5000); } ]); return state.TryGetValue("Result"); } function Train(params, logTable) { function TrainCallback() { WriteLog("Starting training..."); let connection = logTable.Project.CreateSnowflakeConnection(); let finalParams = params.Clone(); finalParams.Set("_typed", params["_typed"].Extend(#{ "log_table": logTable })); let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams}); WriteLog("Finished training...") return result; } return RunFunctionWithParallelLogging(logTable, TrainCallback); } function Generate(params, logTable) { function GenerateCallback() { WriteLog("Starting generation...") let connection = logTable.Project.CreateSnowflakeConnection(); let typedParams = params["_typed"]; let resultEventsTable = typedParams["target_event_data_table"]; let resultCasesTable = typedParams.TryGetValue("target_case_data_table"); if (params["overwrite"]) { resultEventsTable.Truncate(); resultCasesTable?.Truncate(); } let finalParams = params.Clone(); finalParams.Set("_typed", typedParams.Extend(#{ "log_table": logTable })); let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams}); resultEventsTable.Synchronize(); resultCasesTable?.Synchronize(); WriteLog("Finished generation...") return result; } return RunFunctionWithParallelLogging(logTable, GenerateCallback); } function GetSampledEvents(sourceModel, sampledCaseCount, filter) { if (IsNull(sampledCaseCount) || sampledCaseCount < 1) return sourceModel.EventsDataTable.SqlDataFrame; let sampleFilterRule = sampledCaseCount == null ? null : #{ "Type":"IncludeCases", "Items":[#{ "Type": "SqlExpressionValue", "Configuration": #{ "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})` } }] }; let filterRules = filter?["Items"]; if (!IsNullTop(filterRules)) { if (!IsNullTop(sampleFilterRule)) filterRules = Concat(filterRules, sampleFilterRule); } else { if (!IsNullTop(sampleFilterRule)) filterRules = [sampleFilterRule]; else filterRules = []; } let finalFilter = #{ "Items":filterRules }; sourceModel.CacheTableSqlDataFrame(finalFilter); } function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize) { let sanitizedModelName = `${sourceModel.Id}-${modelName}`; let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([sourceModel.EventsDataTable.ColumnMappings["CaseId"]]).NRows; let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter); let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([trainEventDataSdf.ColumnMappings["CaseId"]]).NRows; WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`) let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame; let columnMappings = trainEventDataSdf.ColumnMappings; if (trainCaseDataSdf != null) { columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]}); } trainingConfiguration = trainingConfiguration.Extend(#{ "_typed": #{ "event_data": trainEventDataSdf, "project": targetProject }, "model_name": sanitizedModelName, "column_mappings": columnMappings, "overwrite_model": recreatePredictionModel, "case_start_time_probability_multiplier": originalCaseCount / actualTrainingCaseSampleSize }); if (trainCaseDataSdf != null) { trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf); } Train(trainingConfiguration, logTable); } function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable) { let sanitizedModelName = `${sourceModel.Id}-${modelName}`; let eventsTableName = `${modelName} - events`; let casesTableName = `${modelName} - cases`; let m = targetProject.ModelByName(modelName); let eventsTable = null; let casesTable = null; if (m != null) { eventsTable = m.EventsDataTable; casesTable = m.CasesDataTable; } else { let eventsTableConfiguration = #{ "Name": eventsTableName, "Type": "Snowflake" }; eventsTable = targetProject.DataTableByName(eventsTableName) ?? targetProject.CreateDataTable(eventsTableConfiguration).Synchronize(); if (sourceModel.CasesDataTable != null) { let casesTableConfiguration = #{ "Name": casesTableName, "Type": "Snowflake" }; casesTable = targetProject.DataTableByName(casesTableName) ?? targetProject.CreateDataTable(casesTableConfiguration).Synchronize(); } } eventsTable.Truncate(); casesTable?.Truncate(); let generationParams, result; if (generationConfiguration.TryGetValue("cases_to_generate") != 0) { WriteLog(`Generating new cases with new events...`); generationParams = generationConfiguration.Extend(#{ "_typed": #{ "target_event_data_table": eventsTable, "project": targetProject }, "model_name": sanitizedModelName, "overwrite": false }); if (casesTable != null) { generationConfiguration["_typed"].Set("target_case_data_table", casesTable); } result = Generate(generationParams, logTable); WriteLog(`Generation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] } if (incompleteCasesFilter != null) { WriteLog(`Generating new events for running cases...`); let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame); generationParams = generationConfiguration.Extend(#{ "_typed": #{ "event_data": incompleteCasesSqlDataFrame, "target_event_data_table": eventsTable, "project": targetProject }, "model_name": sanitizedModelName, "overwrite": false, "temperature": 0 }); if (casesTable != null) { generationConfiguration["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame); } result = Generate(generationParams, logTable); WriteLog(`Generation results:\r\n${result}`); result = ParseJson(result); if (result["result"] != "success") throw result["exception"] } if (!("Predicted".In(eventsTable.ColumnTypes.Name))) eventsTable .AddColumn("Predicted", "Boolean"); eventsTable .DeleteRows( Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__" ) .UpdateRows( 1==1, "Predicted", true ); if (casesTable != null) { if (!("Generated".In(casesTable.ColumnTypes.Name))) casesTable .AddColumn("Generated", "Boolean"); casesTable .UpdateRows( 1==1, "Generated", true ); } WriteLog(`Appending original model data...`); eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}); if (casesTable != null) casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true}); eventsTable .UpdateRows( Column("Predicted") == null, "Predicted", false ); casesTable ?.UpdateRows( Column("Generated") == null, "Generated", false ); if (m != null) return m; WriteLog(`Creating model...`); let eventColumnMappings = result["column_mappings"]["events"]; let timestampMapping = eventColumnMappings["TimeStamp"]; eventColumnMappings.Remove("TimeStamp"); eventColumnMappings.Set("Timestamp", timestampMapping); let modelConfiguration = #{ "DataSource": #{ "Events":#{ "DataSourceType": "datatable", "DataTableName": eventsTableName, "Columns": eventColumnMappings } } }; if (casesTable != null) { modelConfiguration["DataSource"].Set("Cases", #{ "DataSourceType": "datatable", "DataTableName": casesTableName, "Columns": result["column_mappings"]["cases"] }); } targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration}); } function GeneratePredictionModel(configuration) { let modelName = configuration["Name"]; let sourceModel = configuration["SourceModel"]; let targetProject = configuration["TargetProject"]; let trainingConfiguration = configuration["TrainingConfiguration"]; let generationConfiguration = configuration["GenerationConfiguration"]; let trainingDataFilter = configuration["TrainingDataFilter"]; let incompleteCasesFilter = configuration["IncompleteCasesFilter"]; let recreatePredictionModel = configuration["RecreatePredictionModel"]; let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"]; let result = null; let logTableName = `_MLLog:${Now}`; let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}); try { WriteLog(`Training a prediction model based on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`); result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize); WriteLog(`Model training completed. Training result:\r\n${result}`); if (generationConfiguration != null) { WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName}):\r\n${result}`); let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable); WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}:\r\n${result}`); } } finally { targetProject.DataTableByName(logTableName)?.DeletePermanently(); } } return #{ "GeneratePredictionModel": GeneratePredictionModel, "RunFunctionWithParallelLogging": RunFunctionWithParallelLogging, "Train": Train, "Generate": Generate };