Create Predicted Eventlog: Difference between revisions

From QPR ProcessAnalyzer Wiki
Jump to navigation Jump to search
No edit summary
(Added latest version of example prediction script)
Line 35: Line 35:


== Create prediction script in QPR ProcessAnalyzer ==
== Create prediction script in QPR ProcessAnalyzer ==
1. Go to QPR ProcessAnalyzer and create the following expression script with name '''ML library''':
1. Create the following example expression script (e.g., with name '''Create prediction model'''):
<pre>
<pre>
/**
let sourceModel = ProjectByName("<project name>").ModelByName("<model name>");
* Name of the stored procedure in Snowflake used for ML model training and related event generation.
let completeCaseEventTypeName = "<event type name found only in complete cases>";
*/
let predictionProcedureName = "qprpa_sp_prediction";


function GetDataTableConnection(sourceModel)
let targetProject = Project;
{
let eventTypeColumnName = sourceModel.EventsDataTable.ColumnMappings["EventType"];
  // The following line is a temporary solution replacing the actual line (commented line below)
  // until defect D-14142 is fixed.
  // NOTE: If OdbcConnectionString or OdbcConnectionStringKey needs to be used, they Should
  // be given as parameters of CreateSnowflakeConnection-call.
  return CreateSnowflakeConnection();
//    return sourceModel.EventDataTable.DataSourceConnection;
}
 
function ModifyColumnTypes(dataTable, columnTypesToRestore)
{
  let currentColumnTypes = dataTable.ColumnTypes;
  let currentColumnTypesDict = #{};
  currentColumnTypes.{
    let ct = _;
    currentColumnTypesDict.Set(ct.Name, ct.DataType);
  };
 
  let columnsToModify = [];
 
  columnTypesToRestore.{
    let ct = _;
    let dataTypeToRestore = ct.DataType;
    let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name);
    if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) {
      columnsToModify = Concat(columnsToModify, [ct]);
    }
  };
 
  dataTable
    .AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}});
 
  columnsToModify.{
    let ct = _;
    dataTable
      .UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType}));
  };
 
  dataTable
    .RemoveColumns(columnsToModify.Name)
    .RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)});
 
  dataTable.Synchronize();
}
 
/**
* @name RunFunctionWithParallelLogging
* @descripion
* Runs given function that generates logging information into given data table in a way that all the logging will be included into the
* generated script run log as well.
* @param logTable
* A DataTable having two columns:
* - Datetime-column for the log message's timestamp.
* - Log message column.
* @param callbackFunc
* Function that uses given data table for logging it's current status.
* @returns
* The result of the callback function.
*/
function RunFunctionWithParallelLogging(logTable, callbackFunc)
{
  let state = #{};
 
  logTable.Truncate();
 
  _system.Parallel.Run([
    () => {
      try {
        state.Set("Result", callbackFunc());
      }
      finally {
        state.Set("Finished", true);
      }
    },
    () => {
      let readRows = 0;
      function UpdateLog()
      {
        try {
          let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
          readRows = readRows + rowsDf.NRows;
          rowsDf.Rows.{
            let row = _;
            WriteLog(`${row[0]} \t${row[1]}`);
          }
        }
        catch (ex) {
          if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") {
            WriteLog(`Log data table has disappeared. The generated log will not be complete!`);
            break;
          }
          WriteLog(`Log has not yet been created.`);
        }
      }
 
      while (true) {
        UpdateLog();
        let finished = state.TryGetValue("Finished");
        if (finished == true) {
          UpdateLog();
          WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
          break;
        }
        Sleep(5000);
      }
    }
  ]);
  return state.TryGetValue("Result");
}
 
/**
* @name Train
* @descripion
* Train a ML model using given parameters and log table.
* @param params
* String dictionary containing the training configuration.
* @param logTable
* DataTable to be used for logging.
* @returns
* Result returned by the ML model training.
*/
function Train(params, logTable)
{
  function TrainCallback()
  {
    WriteLog("Starting training...");
 
    let connection = logTable.DataSourceConnection;
    let finalParams = params.Clone();
    finalParams.Set("_typed", params["_typed"].Extend(#{
      "log_table": logTable
    }));
    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
 
    WriteLog("Finished training...")
    return result;
  }
  return RunFunctionWithParallelLogging(logTable, TrainCallback);
}
 
/**
* @name Generate
* @descripion
* Generate new events using a trained ML model and log table.
* @param params
* String dictionary containing the training configuration.
* @param logTable
* DataTable to be used for logging.
* @returns
* Result returned by the generation.
*/
function Generate(params, logTable)
{
  function GenerateCallback()
  {
    WriteLog("Starting generation...")
 
    let connection = logTable.DataSourceConnection;
 
    let typedParams = params["_typed"];
    let resultEventsTable = typedParams.TryGetValue("target_event_data_table");
    let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
   
    if (params["overwrite"]) {
      resultEventsTable.Truncate();
      resultCasesTable?.Truncate();
    }
 
    let finalParams = params.Clone();
    finalParams.Set("_typed", typedParams.Extend(#{
      "log_table": logTable
    }));
 
    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
 
    resultEventsTable.Synchronize();
    resultCasesTable?.Synchronize();
   
    WriteLog("Finished generation...")
    return result;
  }
 
  return RunFunctionWithParallelLogging(logTable, GenerateCallback);
}


/**
_system.ML.GeneratePredictionModel(#{
* @name GetSampledEvents
   "Name": "My prediction model",      // Name of the PA model to generate ti the target project.
* @descripion
   "SourceModel": sourceModel,        // Snowflake-based PA model used for training the prediction model.
* Returns a SqlDataFrame containing sampled events of given model where given filter is first applied.  
   "TargetProject": targetProject,    // Target project to create the model into.
* @param sourceModel
   "TrainingConfiguration": #{         // Training parameters.
* ProcessAnalyzer model object of the model whose event data is to be filtered and sampled.
     "num_epochs_to_train": 200
* @param sampledCaseCount
* The maximum number of cases to return (or null if all cases should be returned).
* @param filter
* JSON filter to be applied on the event data of the source model prior to performing the sampling.
* @returns
* SqlDataFrame containing sampled events of given model where given filter is first applied.
*/
function GetSampledEvents(sourceModel, sampledCaseCount, filter)
{
  if (IsNull(sampledCaseCount) || sampledCaseCount < 1)
    return sourceModel.CacheTableSqlDataFrame(filter);
  let sampleFilterRule = sampledCaseCount == null
    ? null
    : #{
        "Type":"IncludeCases",
        "Items":[#{
          "Type": "SqlExpressionValue",
          "Configuration": #{
            "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})`
          }
        }]
      };
   let filterRules = filter?["Items"];
  if (!IsNullTop(filterRules)) {
    if (!IsNullTop(sampleFilterRule))
      filterRules = Concat(filterRules, sampleFilterRule);
  }
  else {
    if (!IsNullTop(sampleFilterRule))
      filterRules = [sampleFilterRule];
    else
      filterRules = [];
  }
 
  let finalFilter = #{
    "Items":filterRules
  };
  sourceModel.CacheTableSqlDataFrame(finalFilter);
}
 
/**
* @name TrainMLModelForModel
* @descripion
* Generates a new ML model based on the data found in given source model.
* @param modelName
* Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
* @param sourceModel
* ProcessAnalyzer model object of the model which is to be trained to the ML model.
* @param targetProject
* Project ín whose context the ML model is to be trained.
* @param trainingConfiguration
* String dictionary containing the training configuration.
* @param recreatePredictionModel
* Should a prediction model be overwritten if one already exists for this source model and target model name combination?
* @param trainingDataFilter
* Filter JSON applied on top of all the source model events to find all the events to be used for training.
* @param logTable
* DataTable to be used for logging.
* @param trainingCaseSampleSize
* Maximum number of cases to use from the source model (random sampled).
* @returns
* Result returned by the ML model training.
*/
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
{
  let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
  let eventDataCaseIdColumn = sourceModel.EventsDataTable.ColumnMappings["CaseId"];
  let eventDataTimeStampColumn = sourceModel.EventsDataTable.ColumnMappings["TimeStamp"];
  let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([eventDataCaseIdColumn]).NRows;
  let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter);
  let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([eventDataCaseIdColumn]).NRows;
  let startTimesSdf = sourceModel.CacheTableSqlDataFrame(trainingDataFilter).GroupBy([eventDataCaseIdColumn]).Aggregate([eventDataTimeStampColumn], ["Min"]).Select([eventDataTimeStampColumn]);
  WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`)
 
  let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame;
 
  let columnMappings = trainEventDataSdf.ColumnMappings;
  if (trainCaseDataSdf != null) {
    columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]});
  }
 
  trainingConfiguration = trainingConfiguration.Extend(#{
    "_typed": #{
      "event_data": trainEventDataSdf,
      "start_times": startTimesSdf,       
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "column_mappings": columnMappings,
    "overwrite_model": recreatePredictionModel,
    "case_sample_percentage": originalCaseCount / actualTrainingCaseSampleSize
  });
 
  if (trainCaseDataSdf != null) {
    trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf);
  }
 
  Train(trainingConfiguration, logTable);
}
 
function CreateTargetTables(modelName, sourceModel, targetProject)
{
  function GetTable(conn, tableName)
  {
    let tableConfiguration = #{
      "Name": tableName,
      "Type": "Snowflake",
      "Connection": conn
    };
    let resultTable = targetProject.DataTableByName(tableName);
    if (resultTable == null)
    {
      resultTable = targetProject.CreateDataTable(tableConfiguration)
        .Modify(#{"NameInDataSource": null})
        .Synchronize();
    }
    return resultTable;
  }
 
  let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;
  let targetModel = targetProject.ModelByName(modelName);
  let eventsTable, casesTable = null;
 
  if (targetModel != null)
  {
    eventsTable = targetModel.EventsDataTable;
    casesTable = targetModel.CasesDataTable;
  }
  else {
    let conn = GetDataTableConnection(sourceModel);
    eventsTable = GetTable(conn, eventsTableName);
    if (sourceModel.CasesDataTable != null) {
      casesTable = GetTable(conn, casesTableName);
    }
  }
 
  eventsTable.Truncate();
  casesTable?.Truncate();
 
  return #{
    "TargetModel": targetModel,
    "Events": eventsTable,
    "Cases": casesTable,
    "ModelName": sanitizedModelName
  };
}
 
function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings)
{
  WriteLog(`Creating model...`);
 
  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;
 
  let timestampMapping = eventColumnMappings["TimeStamp"];
  eventColumnMappings.Remove("TimeStamp");
  eventColumnMappings.Set("Timestamp", timestampMapping);
 
  let modelConfiguration = #{
    "DataSource": #{
      "Events":#{
        "DataSourceType": "datatable",
        "DataTableName": eventsTableName,
        "Columns": eventColumnMappings
      }
    }
  };
 
  if (caseColumnMappings != null) {
    modelConfiguration["DataSource"].Set("Cases", #{
      "DataSourceType": "datatable",
      "DataTableName": casesTableName,
      "Columns": caseColumnMappings
    });
  }
 
  targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
}
 
/**
* @name GenerateNewModel
* @descripion
* Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases:
* - Generate new cases with new events.
* - Predict new events for incomplete cases.
* - Append original source model data.
* - Create ProcessAnalyzer model if it does not already exist.
* @param modelName
* Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
* @param sourceModel
* ProcessAnalyzer model object of the model based on which the new model will be created.
* @param targetProject
* Project ínto which the new ProcessAnalyzer model is to be created.
* @param generationConfiguration
* String dictionary containing the generation configuration.
* @param incompleteCasesFilter
* Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases.
* @param logTable
* DataTable to be used for logging.
* @returns
* ProcessAnalyzer model object of the containing the results of the generation.
*/
function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];
 
  let generationParams, result, overwrite = true;
 
  if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
    WriteLog(`Generating new cases with new events...`);
 
    generationParams = generationConfiguration.Extend(#{
      "_typed": #{
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": overwrite
    });
    if (casesTable != null) {
      generationParams["_typed"].Set("target_case_data_table", casesTable);
    }
 
    result = Generate(generationParams, logTable);
 
    WriteLog(`Generation results:\r\n${result}`);
 
    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]
 
    overwrite = false;
   }
 
  if (incompleteCasesFilter != null) {
    WriteLog(`Generating new events for running cases...`);
    let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
 
    generationParams = generationConfiguration.Extend(#{
      "_typed": #{
        "event_data": incompleteCasesSqlDataFrame, 
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": overwrite,
      "temperature": 0
    });
    if (casesTable != null) {
      generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
    }
 
    result = Generate(generationParams, logTable);
    overwrite = false;
    WriteLog(`Generation results:\r\n${result}`);
 
    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]
  }
 
  if (!("Predicted".In(eventsTable.ColumnTypes.Name)))
    eventsTable
      .AddColumn("Predicted", "Boolean");
 
  eventsTable
    .DeleteRows(
      Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__"
    )
    .UpdateRows(
      1==1,
      "Predicted", true
    );
 
  if (casesTable != null) {
    if (!("Generated".In(casesTable.ColumnTypes.Name)))
      casesTable
         .AddColumn("Generated", "Boolean");
 
    casesTable
      .UpdateRows(
        1==1,
        "Generated", true
      );
  }
 
  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true});
  if (casesTable != null)
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
 
  eventsTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );
 
  casesTable
    ?.UpdateRows(
      Column("Generated") == null,
      "Generated", false
    );
 
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
  if (casesTable != null)
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
 
  if (targetModel != null)
    return targetModel;
 
  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}
 
/**
* @name GeneratePredictionModel
* @descripion
* Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events.
* @param configuration
* A string dictionary configuring the model prediction process. Supported keys are:
*  "Name": Name of the PA model to generate to the target project.
*  "SourceModel": Snowflake-based PA model used for training the prediction model.
*   "TargetProject": Target project to create the model into.
*  "TrainingConfiguration": Training parameters as string dictionary.
*   "GenerationConfiguration": Event generation parameters as string dictionary.
*  "TrainingDataFilter": Filter JSON for events to be used for training.
*  "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted.
*  "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination?
*  "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled).
*/
function GeneratePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
 
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let conn = GetDataTableConnection(sourceModel);
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize();
  try {
    WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);
 
    if (generationConfiguration != null) {
      WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`);
      let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);
 
      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
      result.Set("Model", newModel);
    }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
  return result;
}
 
function TransformModel(modelName, sourceModel, targetProject, configurations, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
 
  let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings;
  let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings;
 
  let result;
 
  WriteLog(`Performing transformations...`);
 
  let configuration = #{
    "_typed": #{
      "event_data": sourceModel.EventsDataTable.SqlDataFrame, 
      "project": targetProject,
      "target_event_data_table": eventsTable
    },
    "column_mappings": eventColumnMappings,
    "overwrite": true,
    "transformations": configurations
  };
  if (casesTable != null) {
    configuration["_typed"].Set("target_case_data_table", casesTable);
  }
 
  result = Generate(configuration, logTable);
 
  WriteLog(`Transformation results:\r\n${result}`);
 
  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]
 
  if (casesTable != null) {
    WriteLog(`Importing case data data...`);
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
  }
 
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
 
  if (targetModel != null)
    return targetModel;
 
  CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null);
}
 
function ApplyTransformations(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let transformationConfigurations = configuration["Transformations"];
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"}).Synchronize();
 
  try {
    WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
    let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable);
    WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
    return newModel;
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
}
 
function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];
 
  let generationParams, result, overwrite = true;
 
  WriteLog(`Generating case attribute values for running cases...`);
  let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame;
  if (incompleteCasesFilter != null)
    incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
 
  generationParams = generationConfiguration.Extend(#{
    "_typed": #{
      "event_data": incompleteCasesSqlDataFrame,
      "case_data": sourceModel.CasesDataTable.SqlDataFrame,
      "target_case_data_table": casesTable,
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "overwrite": overwrite,
    "temperature": 0
  });
 
  result = Generate(generationParams, logTable);
  overwrite = false;
  WriteLog(`Generation results:\r\n${result}`);
 
  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]
 
  if (!("Predicted".In(casesTable.ColumnTypes.Name)))
    casesTable
      .AddColumn("Predicted", "Boolean");
 
  casesTable
    .UpdateRows(
      null,
      "Predicted", true
    );
 
  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize();
  attributesToGenerate.{
    let col = _;
    let newCol = `Predicted_${col}`;
    casesTable.AddColumn(newCol, "String");
    casesTable.UpdateRows(null, newCol, Column(#expr{col}));
  };
  let createdColumns = casesTable.ColumnTypes.Name;
  sourceModel.CasesDataTable.ColumnTypes.Name
    .Where(!_.In(createdColumns)).{
      let col = _;
      casesTable.AddColumn(col, "String");
    };
  casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]);
 
  casesTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );
 
  WriteLog(`Restoring original column types...`);
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
  ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
 
  if (targetModel != null)
    return targetModel;
 
  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}
 
function GenerateCaseAttributePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
 
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let conn = GetDataTableConnection(sourceModel);
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn}).Modify(#{"NameInDataSource": null}).Synchronize();
  try {
    let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"];
 
    WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);
 
    if (generationConfiguration != null) {
      WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`);
      let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable);
 
      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
      result.Set("Model", newModel);
    }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
  return result;
}
 
return #{
  "GeneratePredictionModel": GeneratePredictionModel,
  "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel,
  "ApplyTransformations": ApplyTransformations,
  "ModifyColumnTypes": ModifyColumnTypes
};
 
</pre>
 
2. Create the following example expression script (e.g., with name '''Create prediction model'''):
<pre>
let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
let targetProject = Project;
lib.GeneratePredictionModel(#{
  "Name": "My prediction model",
  "SourceModel": ModelById(1),
  "TargetProject": targetProject,
  "RecreatePredictionModel": true,
  "TrainingConfiguration": #{
     "num_epochs_to_train": 500
   },
   },
   "GenerationConfiguration": #{
   "GenerationConfiguration": #{       // Model generation parameters.
     "cases_to_generate": 1000
     "cases_to_generate": 1000
   },
   },
Line 822: Line 60:
           #{
           #{
             "Type": "EventAttributeValue",
             "Type": "EventAttributeValue",
             "Attribute": "Event type",
             "Attribute": eventTypeColumnName,
             "StringifiedValues": [
             "StringifiedValues": [
               "0Invoice Payment"
               `0${completeCaseEventTypeName}`
             ]
             ]
           }
           }
Line 831: Line 69:
     ]
     ]
   },
   },
  "TrainingCaseSampleSize": 1000,
   "IncompleteCasesFilter": #{
   "IncompleteCasesFilter": #{
     "Items": [
     "Items": [
Line 839: Line 76:
           #{
           #{
             "Type": "EventAttributeValue",
             "Type": "EventAttributeValue",
             "Attribute": "Event type",
             "Attribute": eventTypeColumnName,
             "StringifiedValues": [
             "StringifiedValues": [
               "0Invoice Payment"
               `0${completeCaseEventTypeName}`
             ]
             ]
           }
           }
Line 848: Line 85:
     ]
     ]
   },
   },
  "RecreatePredictionModel": true,    // Should a prediction model be overwritten if one already exists for this source model and target model name combination.
  "TrainingCaseSampleSize": 10000    // Maximum number of cases to use from the source model (random sampled).
});
});
</pre>
</pre>
3. Configure prediction for the previously created script as instructed in the next chapter.
2. Configure prediction for the previously created script as instructed in the next chapter. At minimum, replace the tags listed below with some suitable values:
 
* '''<project name>''': Name of the project in which the source model is located.
* '''<model name>''': Name of the model to be used as source model. This data in this source model will be used to train the prediction model so that it can generate new cases and continuations for incomplete existing cases.
* '''<event type name found only in complete cases>''': This example script has been hard-coded to determine whether a case is complete or incomplete based on the existence of this event type.


== Configure prediction ==
== Configure prediction ==
Line 869: Line 112:
** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
* '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
* '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
* '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data. 
* '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.
* '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.
* '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.

Revision as of 10:22, 15 November 2024

This article has instructions how to install, configure and use eventlog predictions. The prediction creates a new model that contains the source model data and the predictions. It's able to predict case attributes for the generated new cases and event attributes for the predicted events. To distinguish the real (source data) and predicted events and cases, there are following attributes in the model:

  • Event attribute Predicted denotes whether the event is from the source data (false) or whether it's predicted (true).
  • Case attribute Generated denotes whether the case is in the source data (false) or whether the prediction generated it as a new case (true).


Prerequisites for prediction

Following prerequisites need to be fulfilled to run the eventlog prediction:

  • QPR ProcessAnalyzer 2024.2 or later in use
  • Snowflake connection is configured
  • Source models are stored to Snowflake

Install prediction to Snowflake

To install the eventlog prediction to Snowflake:

  1. Go to Snowflake, and create a Snowflake-managed stage with name PREDICTION to the same schema configured to QPR ProcessAnalyzer (in the Snowflake connection string). Use settings in the following image: Create Snowflake stage.png
  2. Open the created stage and upload the predict.pyz file into the stage (ask the file from your QPR representative).
  3. Create the following procedure to the same schema:
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT)
RETURNS OBJECT
LANGUAGE PYTHON
STRICT
RUNTIME_VERSION = '3.8'
PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py')
HANDLER = 'main'
EXECUTE AS OWNER
AS '
import sys
def main(session, parameters_in: dict) -> dict:
	session.file.get(''@decision_intelligence/predict.pyz'', ''/tmp'')
	sys.path.append(''/tmp/predict.pyz'')
	import predict
	return predict.main(session, parameters_in)
';

Create prediction script in QPR ProcessAnalyzer

1. Create the following example expression script (e.g., with name Create prediction model):

let sourceModel = ProjectByName("<project name>").ModelByName("<model name>");
let completeCaseEventTypeName = "<event type name found only in complete cases>";

let targetProject = Project;
let eventTypeColumnName = sourceModel.EventsDataTable.ColumnMappings["EventType"];

_system.ML.GeneratePredictionModel(#{
  "Name": "My prediction model",      // Name of the PA model to generate ti the target project.
  "SourceModel": sourceModel,         // Snowflake-based PA model used for training the prediction model.
  "TargetProject": targetProject,     // Target project to create the model into.
  "TrainingConfiguration": #{         // Training parameters.
    "num_epochs_to_train": 200
  },
  "GenerationConfiguration": #{       // Model generation parameters.
    "cases_to_generate": 1000
  },
  "TrainingDataFilter": #{
    "Items": [
      #{
        "Type": "IncludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": eventTypeColumnName,
            "StringifiedValues": [
              `0${completeCaseEventTypeName}`
            ]
          }
        ]
      }
    ]
  },
  "IncompleteCasesFilter": #{
    "Items": [
      #{
        "Type": "ExcludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": eventTypeColumnName,
            "StringifiedValues": [
              `0${completeCaseEventTypeName}`
            ]
          }
        ]
      }
    ]
  },
  "RecreatePredictionModel": true,    // Should a prediction model be overwritten if one already exists for this source model and target model name combination.
  "TrainingCaseSampleSize": 10000     // Maximum number of cases to use from the source model (random sampled).
});

2. Configure prediction for the previously created script as instructed in the next chapter. At minimum, replace the tags listed below with some suitable values:

  • <project name>: Name of the project in which the source model is located.
  • <model name>: Name of the model to be used as source model. This data in this source model will be used to train the prediction model so that it can generate new cases and continuations for incomplete existing cases.
  • <event type name found only in complete cases>: This example script has been hard-coded to determine whether a case is complete or incomplete based on the existence of this event type.

Configure prediction

Prediction script has the following settings in the GeneratePredictionModel call:

  • Name: Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
  • SourceModel: Source model for which the prediction is made. Model can be selected for example based on id with ModelById function or by name with ModelByName function.
  • TargetProject: Target project to create the new model into.
  • RecreatePredictionModel: When true, a new ML model is trained when the script is run. When false, the prediction is run using possibly pre-existing ML model.
  • TrainingConfiguration: Training parameters.
    • num_epochs_to_train: How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected.
    • max_num_case_clusters: Maximum number of clusters to divide the case attribute values into. Default is 20.
  • GenerationConfiguration: Event generation parameters. When null, no generation is done. For example, following parameters are supported:
    • cases_to_generate: Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the case_generation_start_time and case_generation_end_time parameters.
    • case_generation_start_time: If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format.
    • case_generation_end_time: If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format.
    • generate_debug_event_attributes: If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities.
    • min_prediction_probability : Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01.
    • temperature: If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
  • TrainingDataFilter: Filter to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
  • IncompleteCasesFilter: Optional filter to select which cases the prediction is made for. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.
  • TrainingCaseSampleSize: Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.