|
|
Line 35: |
Line 35: |
|
| |
|
| == Create prediction script in QPR ProcessAnalyzer == | | == Create prediction script in QPR ProcessAnalyzer == |
| 1. Go to QPR ProcessAnalyzer and create the following expression script with name '''ML library''': | | 1. Create the following example expression script (e.g., with name '''Create prediction model'''): |
| <pre> | | <pre> |
| /**
| | let sourceModel = ProjectByName("<project name>").ModelByName("<model name>"); |
| * Name of the stored procedure in Snowflake used for ML model training and related event generation.
| | let completeCaseEventTypeName = "<event type name found only in complete cases>"; |
| */
| |
| let predictionProcedureName = "qprpa_sp_prediction"; | |
|
| |
|
| function GetDataTableConnection(sourceModel)
| | let targetProject = Project; |
| {
| | let eventTypeColumnName = sourceModel.EventsDataTable.ColumnMappings["EventType"]; |
| // The following line is a temporary solution replacing the actual line (commented line below)
| |
| // until defect D-14142 is fixed.
| |
| // NOTE: If OdbcConnectionString or OdbcConnectionStringKey needs to be used, they Should
| |
| // be given as parameters of CreateSnowflakeConnection-call.
| |
| return CreateSnowflakeConnection();
| |
| // return sourceModel.EventDataTable.DataSourceConnection;
| |
| }
| |
| | |
| function ModifyColumnTypes(dataTable, columnTypesToRestore)
| |
| {
| |
| let currentColumnTypes = dataTable.ColumnTypes;
| |
| let currentColumnTypesDict = #{};
| |
| currentColumnTypes.{
| |
| let ct = _;
| |
| currentColumnTypesDict.Set(ct.Name, ct.DataType);
| |
| };
| |
|
| |
| let columnsToModify = [];
| |
|
| |
| columnTypesToRestore.{
| |
| let ct = _;
| |
| let dataTypeToRestore = ct.DataType;
| |
| let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name);
| |
| if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) {
| |
| columnsToModify = Concat(columnsToModify, [ct]);
| |
| }
| |
| };
| |
|
| |
| dataTable
| |
| .AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}});
| |
| | |
| columnsToModify.{
| |
| let ct = _;
| |
| dataTable
| |
| .UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType}));
| |
| };
| |
|
| |
| dataTable
| |
| .RemoveColumns(columnsToModify.Name)
| |
| .RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)});
| |
| | |
| dataTable.Synchronize();
| |
| }
| |
| | |
| /**
| |
| * @name RunFunctionWithParallelLogging
| |
| * @descripion
| |
| * Runs given function that generates logging information into given data table in a way that all the logging will be included into the
| |
| * generated script run log as well.
| |
| * @param logTable
| |
| * A DataTable having two columns:
| |
| * - Datetime-column for the log message's timestamp.
| |
| * - Log message column.
| |
| * @param callbackFunc
| |
| * Function that uses given data table for logging it's current status.
| |
| * @returns
| |
| * The result of the callback function.
| |
| */
| |
| function RunFunctionWithParallelLogging(logTable, callbackFunc)
| |
| {
| |
| let state = #{};
| |
| | |
| logTable.Truncate();
| |
| | |
| _system.Parallel.Run([
| |
| () => {
| |
| try {
| |
| state.Set("Result", callbackFunc());
| |
| }
| |
| finally {
| |
| state.Set("Finished", true);
| |
| }
| |
| },
| |
| () => {
| |
| let readRows = 0;
| |
| function UpdateLog()
| |
| {
| |
| try {
| |
| let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
| |
| readRows = readRows + rowsDf.NRows;
| |
| rowsDf.Rows.{
| |
| let row = _;
| |
| WriteLog(`${row[0]} \t${row[1]}`);
| |
| }
| |
| }
| |
| catch (ex) {
| |
| if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") {
| |
| WriteLog(`Log data table has disappeared. The generated log will not be complete!`);
| |
| break;
| |
| }
| |
| WriteLog(`Log has not yet been created.`);
| |
| }
| |
| }
| |
| | |
| while (true) {
| |
| UpdateLog();
| |
| let finished = state.TryGetValue("Finished");
| |
| if (finished == true) {
| |
| UpdateLog();
| |
| WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
| |
| break;
| |
| }
| |
| Sleep(5000);
| |
| }
| |
| }
| |
| ]);
| |
| return state.TryGetValue("Result");
| |
| }
| |
| | |
| /**
| |
| * @name Train
| |
| * @descripion
| |
| * Train a ML model using given parameters and log table.
| |
| * @param params
| |
| * String dictionary containing the training configuration.
| |
| * @param logTable
| |
| * DataTable to be used for logging.
| |
| * @returns
| |
| * Result returned by the ML model training.
| |
| */
| |
| function Train(params, logTable)
| |
| {
| |
| function TrainCallback()
| |
| {
| |
| WriteLog("Starting training...");
| |
| | |
| let connection = logTable.DataSourceConnection;
| |
| let finalParams = params.Clone();
| |
| finalParams.Set("_typed", params["_typed"].Extend(#{
| |
| "log_table": logTable
| |
| }));
| |
| let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
| |
| | |
| WriteLog("Finished training...")
| |
| return result;
| |
| }
| |
| return RunFunctionWithParallelLogging(logTable, TrainCallback);
| |
| }
| |
| | |
| /**
| |
| * @name Generate
| |
| * @descripion
| |
| * Generate new events using a trained ML model and log table.
| |
| * @param params
| |
| * String dictionary containing the training configuration.
| |
| * @param logTable
| |
| * DataTable to be used for logging.
| |
| * @returns
| |
| * Result returned by the generation.
| |
| */
| |
| function Generate(params, logTable)
| |
| {
| |
| function GenerateCallback()
| |
| {
| |
| WriteLog("Starting generation...")
| |
| | |
| let connection = logTable.DataSourceConnection;
| |
| | |
| let typedParams = params["_typed"];
| |
| let resultEventsTable = typedParams.TryGetValue("target_event_data_table");
| |
| let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
| |
|
| |
| if (params["overwrite"]) {
| |
| resultEventsTable.Truncate();
| |
| resultCasesTable?.Truncate();
| |
| }
| |
| | |
| let finalParams = params.Clone();
| |
| finalParams.Set("_typed", typedParams.Extend(#{
| |
| "log_table": logTable
| |
| }));
| |
| | |
| let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
| |
| | |
| resultEventsTable.Synchronize();
| |
| resultCasesTable?.Synchronize();
| |
|
| |
| WriteLog("Finished generation...")
| |
| return result;
| |
| }
| |
| | |
| return RunFunctionWithParallelLogging(logTable, GenerateCallback);
| |
| }
| |
|
| |
|
| /**
| | _system.ML.GeneratePredictionModel(#{ |
| * @name GetSampledEvents
| | "Name": "My prediction model", // Name of the PA model to generate ti the target project. |
| * @descripion
| | "SourceModel": sourceModel, // Snowflake-based PA model used for training the prediction model. |
| * Returns a SqlDataFrame containing sampled events of given model where given filter is first applied.
| | "TargetProject": targetProject, // Target project to create the model into. |
| * @param sourceModel
| | "TrainingConfiguration": #{ // Training parameters. |
| * ProcessAnalyzer model object of the model whose event data is to be filtered and sampled.
| | "num_epochs_to_train": 200 |
| * @param sampledCaseCount
| |
| * The maximum number of cases to return (or null if all cases should be returned).
| |
| * @param filter
| |
| * JSON filter to be applied on the event data of the source model prior to performing the sampling.
| |
| * @returns
| |
| * SqlDataFrame containing sampled events of given model where given filter is first applied.
| |
| */
| |
| function GetSampledEvents(sourceModel, sampledCaseCount, filter)
| |
| {
| |
| if (IsNull(sampledCaseCount) || sampledCaseCount < 1)
| |
| return sourceModel.CacheTableSqlDataFrame(filter);
| |
| let sampleFilterRule = sampledCaseCount == null
| |
| ? null
| |
| : #{
| |
| "Type":"IncludeCases",
| |
| "Items":[#{
| |
| "Type": "SqlExpressionValue",
| |
| "Configuration": #{
| |
| "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})`
| |
| }
| |
| }]
| |
| };
| |
| let filterRules = filter?["Items"]; | |
| if (!IsNullTop(filterRules)) {
| |
| if (!IsNullTop(sampleFilterRule))
| |
| filterRules = Concat(filterRules, sampleFilterRule);
| |
| }
| |
| else {
| |
| if (!IsNullTop(sampleFilterRule))
| |
| filterRules = [sampleFilterRule];
| |
| else
| |
| filterRules = [];
| |
| }
| |
| | |
| let finalFilter = #{
| |
| "Items":filterRules
| |
| };
| |
| sourceModel.CacheTableSqlDataFrame(finalFilter);
| |
| }
| |
| | |
| /**
| |
| * @name TrainMLModelForModel
| |
| * @descripion
| |
| * Generates a new ML model based on the data found in given source model.
| |
| * @param modelName
| |
| * Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
| |
| * @param sourceModel
| |
| * ProcessAnalyzer model object of the model which is to be trained to the ML model.
| |
| * @param targetProject
| |
| * Project ín whose context the ML model is to be trained.
| |
| * @param trainingConfiguration
| |
| * String dictionary containing the training configuration.
| |
| * @param recreatePredictionModel
| |
| * Should a prediction model be overwritten if one already exists for this source model and target model name combination?
| |
| * @param trainingDataFilter
| |
| * Filter JSON applied on top of all the source model events to find all the events to be used for training.
| |
| * @param logTable
| |
| * DataTable to be used for logging.
| |
| * @param trainingCaseSampleSize
| |
| * Maximum number of cases to use from the source model (random sampled).
| |
| * @returns
| |
| * Result returned by the ML model training.
| |
| */
| |
| function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
| |
| {
| |
| let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
| |
| let eventDataCaseIdColumn = sourceModel.EventsDataTable.ColumnMappings["CaseId"];
| |
| let eventDataTimeStampColumn = sourceModel.EventsDataTable.ColumnMappings["TimeStamp"];
| |
| let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([eventDataCaseIdColumn]).NRows;
| |
| let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter);
| |
| let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([eventDataCaseIdColumn]).NRows;
| |
| let startTimesSdf = sourceModel.CacheTableSqlDataFrame(trainingDataFilter).GroupBy([eventDataCaseIdColumn]).Aggregate([eventDataTimeStampColumn], ["Min"]).Select([eventDataTimeStampColumn]);
| |
| WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`)
| |
| | |
| let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame;
| |
| | |
| let columnMappings = trainEventDataSdf.ColumnMappings;
| |
| if (trainCaseDataSdf != null) {
| |
| columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]});
| |
| }
| |
| | |
| trainingConfiguration = trainingConfiguration.Extend(#{
| |
| "_typed": #{
| |
| "event_data": trainEventDataSdf,
| |
| "start_times": startTimesSdf,
| |
| "project": targetProject
| |
| },
| |
| "model_name": sanitizedModelName,
| |
| "column_mappings": columnMappings,
| |
| "overwrite_model": recreatePredictionModel,
| |
| "case_sample_percentage": originalCaseCount / actualTrainingCaseSampleSize
| |
| });
| |
| | |
| if (trainCaseDataSdf != null) {
| |
| trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf);
| |
| }
| |
| | |
| Train(trainingConfiguration, logTable);
| |
| }
| |
| | |
| function CreateTargetTables(modelName, sourceModel, targetProject)
| |
| {
| |
| function GetTable(conn, tableName)
| |
| {
| |
| let tableConfiguration = #{
| |
| "Name": tableName,
| |
| "Type": "Snowflake",
| |
| "Connection": conn
| |
| };
| |
| let resultTable = targetProject.DataTableByName(tableName);
| |
| if (resultTable == null)
| |
| {
| |
| resultTable = targetProject.CreateDataTable(tableConfiguration)
| |
| .Modify(#{"NameInDataSource": null})
| |
| .Synchronize();
| |
| }
| |
| return resultTable;
| |
| }
| |
| | |
| let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
| |
| let eventsTableName = `${modelName} - events`;
| |
| let casesTableName = `${modelName} - cases`;
| |
| let targetModel = targetProject.ModelByName(modelName);
| |
| let eventsTable, casesTable = null;
| |
| | |
| if (targetModel != null)
| |
| {
| |
| eventsTable = targetModel.EventsDataTable;
| |
| casesTable = targetModel.CasesDataTable;
| |
| }
| |
| else {
| |
| let conn = GetDataTableConnection(sourceModel);
| |
| eventsTable = GetTable(conn, eventsTableName);
| |
| if (sourceModel.CasesDataTable != null) {
| |
| casesTable = GetTable(conn, casesTableName);
| |
| }
| |
| }
| |
| | |
| eventsTable.Truncate();
| |
| casesTable?.Truncate();
| |
| | |
| return #{
| |
| "TargetModel": targetModel,
| |
| "Events": eventsTable,
| |
| "Cases": casesTable,
| |
| "ModelName": sanitizedModelName
| |
| };
| |
| }
| |
| | |
| function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings)
| |
| {
| |
| WriteLog(`Creating model...`);
| |
| | |
| let eventsTableName = `${modelName} - events`;
| |
| let casesTableName = `${modelName} - cases`;
| |
| | |
| let timestampMapping = eventColumnMappings["TimeStamp"];
| |
| eventColumnMappings.Remove("TimeStamp");
| |
| eventColumnMappings.Set("Timestamp", timestampMapping);
| |
| | |
| let modelConfiguration = #{
| |
| "DataSource": #{
| |
| "Events":#{
| |
| "DataSourceType": "datatable",
| |
| "DataTableName": eventsTableName,
| |
| "Columns": eventColumnMappings
| |
| }
| |
| }
| |
| };
| |
| | |
| if (caseColumnMappings != null) {
| |
| modelConfiguration["DataSource"].Set("Cases", #{
| |
| "DataSourceType": "datatable",
| |
| "DataTableName": casesTableName,
| |
| "Columns": caseColumnMappings
| |
| });
| |
| }
| |
| | |
| targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
| |
| }
| |
| | |
| /** | |
| * @name GenerateNewModel
| |
| * @descripion
| |
| * Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases:
| |
| * - Generate new cases with new events.
| |
| * - Predict new events for incomplete cases.
| |
| * - Append original source model data.
| |
| * - Create ProcessAnalyzer model if it does not already exist.
| |
| * @param modelName
| |
| * Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
| |
| * @param sourceModel
| |
| * ProcessAnalyzer model object of the model based on which the new model will be created.
| |
| * @param targetProject
| |
| * Project ínto which the new ProcessAnalyzer model is to be created.
| |
| * @param generationConfiguration
| |
| * String dictionary containing the generation configuration.
| |
| * @param incompleteCasesFilter
| |
| * Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases.
| |
| * @param logTable
| |
| * DataTable to be used for logging.
| |
| * @returns
| |
| * ProcessAnalyzer model object of the containing the results of the generation.
| |
| */
| |
| function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
| |
| {
| |
| let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
| |
| let eventsTable = tablesDict["Events"];
| |
| let casesTable = tablesDict["Cases"];
| |
| let targetModel = tablesDict["TargetModel"];
| |
| let sanitizedModelName = tablesDict["ModelName"];
| |
| | |
| let generationParams, result, overwrite = true;
| |
| | |
| if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
| |
| WriteLog(`Generating new cases with new events...`);
| |
| | |
| generationParams = generationConfiguration.Extend(#{
| |
| "_typed": #{
| |
| "target_event_data_table": eventsTable,
| |
| "project": targetProject
| |
| },
| |
| "model_name": sanitizedModelName,
| |
| "overwrite": overwrite
| |
| });
| |
| if (casesTable != null) {
| |
| generationParams["_typed"].Set("target_case_data_table", casesTable);
| |
| }
| |
| | |
| result = Generate(generationParams, logTable);
| |
| | |
| WriteLog(`Generation results:\r\n${result}`);
| |
| | |
| result = ParseJson(result);
| |
| if (result["result"] != "success")
| |
| throw result["exception"]
| |
| | |
| overwrite = false;
| |
| } | |
| | |
| if (incompleteCasesFilter != null) {
| |
| WriteLog(`Generating new events for running cases...`);
| |
| let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
| |
| | |
| generationParams = generationConfiguration.Extend(#{
| |
| "_typed": #{
| |
| "event_data": incompleteCasesSqlDataFrame,
| |
| "target_event_data_table": eventsTable,
| |
| "project": targetProject
| |
| },
| |
| "model_name": sanitizedModelName,
| |
| "overwrite": overwrite,
| |
| "temperature": 0
| |
| });
| |
| if (casesTable != null) {
| |
| generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
| |
| }
| |
| | |
| result = Generate(generationParams, logTable);
| |
| overwrite = false;
| |
| WriteLog(`Generation results:\r\n${result}`);
| |
| | |
| result = ParseJson(result);
| |
| if (result["result"] != "success")
| |
| throw result["exception"]
| |
| }
| |
| | |
| if (!("Predicted".In(eventsTable.ColumnTypes.Name)))
| |
| eventsTable
| |
| .AddColumn("Predicted", "Boolean");
| |
| | |
| eventsTable
| |
| .DeleteRows(
| |
| Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__"
| |
| )
| |
| .UpdateRows(
| |
| 1==1,
| |
| "Predicted", true
| |
| );
| |
| | |
| if (casesTable != null) {
| |
| if (!("Generated".In(casesTable.ColumnTypes.Name)))
| |
| casesTable
| |
| .AddColumn("Generated", "Boolean"); | |
| | |
| casesTable
| |
| .UpdateRows(
| |
| 1==1,
| |
| "Generated", true
| |
| );
| |
| }
| |
| | |
| WriteLog(`Appending original model data...`);
| |
| eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true});
| |
| if (casesTable != null)
| |
| casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
| |
| | |
| eventsTable
| |
| .UpdateRows(
| |
| Column("Predicted") == null,
| |
| "Predicted", false
| |
| );
| |
| | |
| casesTable
| |
| ?.UpdateRows(
| |
| Column("Generated") == null,
| |
| "Generated", false
| |
| );
| |
| | |
| ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
| |
| if (casesTable != null)
| |
| ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
| |
| | |
| if (targetModel != null)
| |
| return targetModel;
| |
| | |
| CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
| |
| }
| |
| | |
| /** | |
| * @name GeneratePredictionModel
| |
| * @descripion
| |
| * Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events.
| |
| * @param configuration
| |
| * A string dictionary configuring the model prediction process. Supported keys are:
| |
| * "Name": Name of the PA model to generate to the target project.
| |
| * "SourceModel": Snowflake-based PA model used for training the prediction model.
| |
| * "TargetProject": Target project to create the model into.
| |
| * "TrainingConfiguration": Training parameters as string dictionary.
| |
| * "GenerationConfiguration": Event generation parameters as string dictionary.
| |
| * "TrainingDataFilter": Filter JSON for events to be used for training.
| |
| * "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted.
| |
| * "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination?
| |
| * "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled).
| |
| */
| |
| function GeneratePredictionModel(configuration)
| |
| {
| |
| let modelName = configuration["Name"];
| |
| let sourceModel = configuration["SourceModel"];
| |
| let targetProject = configuration["TargetProject"];
| |
| let trainingConfiguration = configuration["TrainingConfiguration"];
| |
| let generationConfiguration = configuration["GenerationConfiguration"];
| |
| let trainingDataFilter = configuration["TrainingDataFilter"];
| |
| let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
| |
| let recreatePredictionModel = configuration["RecreatePredictionModel"];
| |
| let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
| |
| | |
| let result = null;
| |
| let logTableName = `_MLLog:${Now}`;
| |
| let conn = GetDataTableConnection(sourceModel);
| |
| let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize();
| |
| try {
| |
| WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`);
| |
| result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
| |
| WriteLog(`Model training completed. Training result:\r\n${result}`);
| |
| result = ParseJson(result);
| |
| | |
| if (generationConfiguration != null) {
| |
| WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`);
| |
| let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);
| |
| | |
| WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
| |
| result.Set("Model", newModel);
| |
| }
| |
| }
| |
| finally {
| |
| targetProject.DataTableByName(logTableName)?.DeletePermanently();
| |
| }
| |
| return result;
| |
| }
| |
| | |
| function TransformModel(modelName, sourceModel, targetProject, configurations, logTable)
| |
| {
| |
| let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
| |
| let eventsTable = tablesDict["Events"];
| |
| let casesTable = tablesDict["Cases"];
| |
| let targetModel = tablesDict["TargetModel"];
| |
| | |
| let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings;
| |
| let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings;
| |
| | |
| let result;
| |
| | |
| WriteLog(`Performing transformations...`);
| |
| | |
| let configuration = #{
| |
| "_typed": #{
| |
| "event_data": sourceModel.EventsDataTable.SqlDataFrame,
| |
| "project": targetProject,
| |
| "target_event_data_table": eventsTable
| |
| },
| |
| "column_mappings": eventColumnMappings,
| |
| "overwrite": true,
| |
| "transformations": configurations
| |
| };
| |
| if (casesTable != null) {
| |
| configuration["_typed"].Set("target_case_data_table", casesTable);
| |
| }
| |
| | |
| result = Generate(configuration, logTable);
| |
| | |
| WriteLog(`Transformation results:\r\n${result}`);
| |
| | |
| result = ParseJson(result);
| |
| if (result["result"] != "success")
| |
| throw result["exception"]
| |
| | |
| if (casesTable != null) {
| |
| WriteLog(`Importing case data data...`);
| |
| casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
| |
| ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
| |
| }
| |
| | |
| ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
| |
| | |
| if (targetModel != null)
| |
| return targetModel;
| |
| | |
| CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null);
| |
| }
| |
| | |
| function ApplyTransformations(configuration)
| |
| {
| |
| let modelName = configuration["Name"];
| |
| let sourceModel = configuration["SourceModel"];
| |
| let targetProject = configuration["TargetProject"];
| |
| let transformationConfigurations = configuration["Transformations"];
| |
| let result = null;
| |
| let logTableName = `_MLLog:${Now}`;
| |
| let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}).Synchronize();
| |
| | |
| try {
| |
| WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
| |
| let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable);
| |
| WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
| |
| return newModel;
| |
| }
| |
| finally {
| |
| targetProject.DataTableByName(logTableName)?.DeletePermanently();
| |
| }
| |
| }
| |
| | |
| function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable)
| |
| {
| |
| let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
| |
| let eventsTable = tablesDict["Events"];
| |
| let casesTable = tablesDict["Cases"];
| |
| let targetModel = tablesDict["TargetModel"];
| |
| let sanitizedModelName = tablesDict["ModelName"];
| |
| | |
| let generationParams, result, overwrite = true;
| |
| | |
| WriteLog(`Generating case attribute values for running cases...`);
| |
| let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame;
| |
| if (incompleteCasesFilter != null)
| |
| incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
| |
| | |
| generationParams = generationConfiguration.Extend(#{
| |
| "_typed": #{
| |
| "event_data": incompleteCasesSqlDataFrame,
| |
| "case_data": sourceModel.CasesDataTable.SqlDataFrame,
| |
| "target_case_data_table": casesTable,
| |
| "project": targetProject
| |
| },
| |
| "model_name": sanitizedModelName,
| |
| "overwrite": overwrite,
| |
| "temperature": 0
| |
| });
| |
| | |
| result = Generate(generationParams, logTable);
| |
| overwrite = false;
| |
| WriteLog(`Generation results:\r\n${result}`);
| |
| | |
| result = ParseJson(result);
| |
| if (result["result"] != "success")
| |
| throw result["exception"]
| |
| | |
| if (!("Predicted".In(casesTable.ColumnTypes.Name)))
| |
| casesTable
| |
| .AddColumn("Predicted", "Boolean");
| |
| | |
| casesTable
| |
| .UpdateRows(
| |
| null,
| |
| "Predicted", true
| |
| );
| |
| | |
| WriteLog(`Appending original model data...`);
| |
| eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize();
| |
| attributesToGenerate.{
| |
| let col = _;
| |
| let newCol = `Predicted_${col}`;
| |
| casesTable.AddColumn(newCol, "String");
| |
| casesTable.UpdateRows(null, newCol, Column(#expr{col}));
| |
| };
| |
| let createdColumns = casesTable.ColumnTypes.Name;
| |
| sourceModel.CasesDataTable.ColumnTypes.Name
| |
| .Where(!_.In(createdColumns)).{
| |
| let col = _;
| |
| casesTable.AddColumn(col, "String");
| |
| };
| |
| casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]);
| |
| | |
| casesTable
| |
| .UpdateRows(
| |
| Column("Predicted") == null,
| |
| "Predicted", false
| |
| );
| |
| | |
| WriteLog(`Restoring original column types...`);
| |
| ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
| |
| ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
| |
| | |
| if (targetModel != null)
| |
| return targetModel;
| |
| | |
| CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
| |
| }
| |
| | |
| function GenerateCaseAttributePredictionModel(configuration)
| |
| {
| |
| let modelName = configuration["Name"];
| |
| let sourceModel = configuration["SourceModel"];
| |
| let targetProject = configuration["TargetProject"];
| |
| let trainingConfiguration = configuration["TrainingConfiguration"];
| |
| let generationConfiguration = configuration["GenerationConfiguration"];
| |
| let trainingDataFilter = configuration["TrainingDataFilter"];
| |
| let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
| |
| let recreatePredictionModel = configuration["RecreatePredictionModel"];
| |
| let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
| |
| | |
| let result = null;
| |
| let logTableName = `_MLLog:${Now}`;
| |
| let conn = GetDataTableConnection(sourceModel);
| |
| let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize();
| |
| try {
| |
| let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"];
| |
| | |
| WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`);
| |
| result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
| |
| WriteLog(`Model training completed. Training result:\r\n${result}`);
| |
| result = ParseJson(result);
| |
| | |
| if (generationConfiguration != null) {
| |
| WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`);
| |
| let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable);
| |
| | |
| WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
| |
| result.Set("Model", newModel);
| |
| }
| |
| }
| |
| finally {
| |
| targetProject.DataTableByName(logTableName)?.DeletePermanently();
| |
| }
| |
| return result;
| |
| }
| |
| | |
| return #{
| |
| "GeneratePredictionModel": GeneratePredictionModel,
| |
| "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel,
| |
| "ApplyTransformations": ApplyTransformations,
| |
| "ModifyColumnTypes": ModifyColumnTypes
| |
| };
| |
| | |
| </pre>
| |
| | |
| 2. Create the following example expression script (e.g., with name '''Create prediction model'''):
| |
| <pre>
| |
| let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
| |
| let targetProject = Project;
| |
| lib.GeneratePredictionModel(#{
| |
| "Name": "My prediction model",
| |
| "SourceModel": ModelById(1),
| |
| "TargetProject": targetProject,
| |
| "RecreatePredictionModel": true,
| |
| "TrainingConfiguration": #{
| |
| "num_epochs_to_train": 500 | |
| }, | | }, |
| "GenerationConfiguration": #{ | | "GenerationConfiguration": #{ // Model generation parameters. |
| "cases_to_generate": 1000 | | "cases_to_generate": 1000 |
| }, | | }, |
Line 822: |
Line 60: |
| #{ | | #{ |
| "Type": "EventAttributeValue", | | "Type": "EventAttributeValue", |
| "Attribute": "Event type", | | "Attribute": eventTypeColumnName, |
| "StringifiedValues": [ | | "StringifiedValues": [ |
| "0Invoice Payment" | | `0${completeCaseEventTypeName}` |
| ] | | ] |
| } | | } |
Line 831: |
Line 69: |
| ] | | ] |
| }, | | }, |
| "TrainingCaseSampleSize": 1000,
| |
| "IncompleteCasesFilter": #{ | | "IncompleteCasesFilter": #{ |
| "Items": [ | | "Items": [ |
Line 839: |
Line 76: |
| #{ | | #{ |
| "Type": "EventAttributeValue", | | "Type": "EventAttributeValue", |
| "Attribute": "Event type", | | "Attribute": eventTypeColumnName, |
| "StringifiedValues": [ | | "StringifiedValues": [ |
| "0Invoice Payment" | | `0${completeCaseEventTypeName}` |
| ] | | ] |
| } | | } |
Line 848: |
Line 85: |
| ] | | ] |
| }, | | }, |
| | "RecreatePredictionModel": true, // Should a prediction model be overwritten if one already exists for this source model and target model name combination. |
| | "TrainingCaseSampleSize": 10000 // Maximum number of cases to use from the source model (random sampled). |
| }); | | }); |
| </pre> | | </pre> |
| 3. Configure prediction for the previously created script as instructed in the next chapter.
| | 2. Configure prediction for the previously created script as instructed in the next chapter. At minimum, replace the tags listed below with some suitable values: |
| | |
| | * '''<project name>''': Name of the project in which the source model is located. |
| | * '''<model name>''': Name of the model to be used as source model. This data in this source model will be used to train the prediction model so that it can generate new cases and continuations for incomplete existing cases. |
| | * '''<event type name found only in complete cases>''': This example script has been hard-coded to determine whether a case is complete or incomplete based on the existence of this event type. |
|
| |
|
| == Configure prediction == | | == Configure prediction == |
Line 869: |
Line 112: |
| ** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1. | | ** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1. |
| * '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that. | | * '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that. |
| * '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.
| |
| * '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore. | | * '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore. |
| | * '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data. |