Create Predicted Eventlog
Jump to navigation
Jump to search
Prerequisites
Following prerequisites need to be fullfilled to run the eventlog prediction:
- QPR ProcessAnalyzer 2024.2 or later
- Snowflake is configured
- Source model is stored to Snowflake
Prediction installation
Follow these steps to install the eventlog prediction:
- Create a Snowflake-managed stage with name PREDICTION to the same schema that is configured for QPR ProcessAnalyzer (in the Snowflake connection string).
- Open the created stage and upload the predict.pyz file to the stage (ask the file from your QPR representative).
- Create the procedure procedure to the same schema.
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT)
RETURNS OBJECT
LANGUAGE PYTHON
STRICT
RUNTIME_VERSION = '3.8'
PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py')
HANDLER = 'main'
EXECUTE AS OWNER
AS '
import sys
def main(session, parameters_in: dict) -> dict:
session.file.get(''@prediction/predict.pyz'', ''/tmp'')
sys.path.append(''/tmp/predict.pyz'')
import predict
return predict.main(session, parameters_in)
';
- Go to QPR ProcessAnalyzer and create the following expression script:
let predictionProcedureName = "qprpa_sp_prediction";
function RunFunctionWithParallelLogging(logTable, callbackFunc)
{
let state = #{};
logTable.Truncate();
_system.Parallel.Run([
() => {
try {
state.Set("Result", callbackFunc());
}
finally {
state.Set("Finished", true);
}
},
() => {
let readRows = 0;
while (true) {
try {
let finished = state.TryGetValue("Finished");
if (finished == true) {
WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
break;
}
let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
readRows = readRows + rowsDf.NRows;
rowsDf.Rows.{
let row = _;
WriteLog(`${row[0]} \t${row[1]}`);
}
}
catch (ex) {
if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") {
WriteLog(`Log data table has disappeared. The generated log will not be complete!`);
break;
}
WriteLog(`Log has not yet been created.`);
}
}
Sleep(5000);
}
]);
return state.TryGetValue("Result");
}
function Train(params, logTable)
{
function TrainCallback()
{
WriteLog("Starting training...");
let connection = logTable.Project.CreateSnowflakeConnection();
let finalParams = params.Clone();
finalParams.Set("_typed", params["_typed"].Extend(#{
"log_table": logTable
}));
let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
WriteLog("Finished training...")
return result;
}
return RunFunctionWithParallelLogging(logTable, TrainCallback);
}
function Generate(params, logTable)
{
function GenerateCallback()
{
WriteLog("Starting generation...")
let connection = logTable.Project.CreateSnowflakeConnection();
let typedParams = params["_typed"];
let resultEventsTable = typedParams["target_event_data_table"];
let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
if (params["overwrite"]) {
resultEventsTable.Truncate();
resultCasesTable?.Truncate();
}
let finalParams = params.Clone();
finalParams.Set("_typed", typedParams.Extend(#{
"log_table": logTable
}));
let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});
resultEventsTable.Synchronize();
resultCasesTable?.Synchronize();
WriteLog("Finished generation...")
return result;
}
return RunFunctionWithParallelLogging(logTable, GenerateCallback);
}
function GetSampledEvents(sourceModel, sampledCaseCount, filter)
{
if (IsNull(sampledCaseCount) || sampledCaseCount < 1)
return sourceModel.EventsDataTable.SqlDataFrame;
let sampleFilterRule = sampledCaseCount == null
? null
: #{
"Type":"IncludeCases",
"Items":[#{
"Type": "SqlExpressionValue",
"Configuration": #{
"Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})`
}
}]
};
let filterRules = filter?["Items"];
if (!IsNullTop(filterRules)) {
if (!IsNullTop(sampleFilterRule))
filterRules = Concat(filterRules, sampleFilterRule);
}
else {
if (!IsNullTop(sampleFilterRule))
filterRules = [sampleFilterRule];
else
filterRules = [];
}
let finalFilter = #{
"Items":filterRules
};
sourceModel.CacheTableSqlDataFrame(finalFilter);
}
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
{
let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([sourceModel.EventsDataTable.ColumnMappings["CaseId"]]).NRows;
let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter);
let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([trainEventDataSdf.ColumnMappings["CaseId"]]).NRows;
WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`)
let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame;
let columnMappings = trainEventDataSdf.ColumnMappings;
if (trainCaseDataSdf != null) {
columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]});
}
trainingConfiguration = trainingConfiguration.Extend(#{
"_typed": #{
"event_data": trainEventDataSdf,
"project": targetProject
},
"model_name": sanitizedModelName,
"column_mappings": columnMappings,
"overwrite_model": recreatePredictionModel,
"case_start_time_probability_multiplier": originalCaseCount / actualTrainingCaseSampleSize
});
if (trainCaseDataSdf != null) {
trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf);
}
Train(trainingConfiguration, logTable);
}
function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
{
let sanitizedModelName = `${sourceModel.Id}-${modelName}`;
let eventsTableName = `${modelName} - events`;
let casesTableName = `${modelName} - cases`;
let m = targetProject.ModelByName(modelName);
let eventsTable = null;
let casesTable = null;
if (m != null)
{
eventsTable = m.EventsDataTable;
casesTable = m.CasesDataTable;
}
else {
let eventsTableConfiguration = #{
"Name": eventsTableName,
"Type": "Snowflake"
};
eventsTable = targetProject.DataTableByName(eventsTableName) ?? targetProject.CreateDataTable(eventsTableConfiguration).Synchronize();
if (sourceModel.CasesDataTable != null) {
let casesTableConfiguration = #{
"Name": casesTableName,
"Type": "Snowflake"
};
casesTable = targetProject.DataTableByName(casesTableName) ?? targetProject.CreateDataTable(casesTableConfiguration).Synchronize();
}
}
eventsTable.Truncate();
casesTable?.Truncate();
let generationParams, result;
if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
WriteLog(`Generating new cases with new events...`);
generationParams = generationConfiguration.Extend(#{
"_typed": #{
"target_event_data_table": eventsTable,
"project": targetProject
},
"model_name": sanitizedModelName,
"overwrite": false
});
if (casesTable != null) {
generationConfiguration["_typed"].Set("target_case_data_table", casesTable);
}
result = Generate(generationParams, logTable);
WriteLog(`Generation results:\r\n${result}`);
result = ParseJson(result);
if (result["result"] != "success")
throw result["exception"]
}
if (incompleteCasesFilter != null) {
WriteLog(`Generating new events for running cases...`);
let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
generationParams = generationConfiguration.Extend(#{
"_typed": #{
"event_data": incompleteCasesSqlDataFrame,
"target_event_data_table": eventsTable,
"project": targetProject
},
"model_name": sanitizedModelName,
"overwrite": false,
"temperature": 0
});
if (casesTable != null) {
generationConfiguration["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
}
result = Generate(generationParams, logTable);
WriteLog(`Generation results:\r\n${result}`);
result = ParseJson(result);
if (result["result"] != "success")
throw result["exception"]
}
if (!("Predicted".In(eventsTable.ColumnTypes.Name)))
eventsTable
.AddColumn("Predicted", "Boolean");
eventsTable
.DeleteRows(
Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__"
)
.UpdateRows(
1==1,
"Predicted", true
);
if (casesTable != null) {
if (!("Generated".In(casesTable.ColumnTypes.Name)))
casesTable
.AddColumn("Generated", "Boolean");
casesTable
.UpdateRows(
1==1,
"Generated", true
);
}
WriteLog(`Appending original model data...`);
eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true});
if (casesTable != null)
casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
eventsTable
.UpdateRows(
Column("Predicted") == null,
"Predicted", false
);
casesTable
?.UpdateRows(
Column("Generated") == null,
"Generated", false
);
if (m != null)
return m;
WriteLog(`Creating model...`);
let eventColumnMappings = result["column_mappings"]["events"];
let timestampMapping = eventColumnMappings["TimeStamp"];
eventColumnMappings.Remove("TimeStamp");
eventColumnMappings.Set("Timestamp", timestampMapping);
let modelConfiguration = #{
"DataSource": #{
"Events":#{
"DataSourceType": "datatable",
"DataTableName": eventsTableName,
"Columns": eventColumnMappings
}
}
};
if (casesTable != null) {
modelConfiguration["DataSource"].Set("Cases", #{
"DataSourceType": "datatable",
"DataTableName": casesTableName,
"Columns": result["column_mappings"]["cases"]
});
}
targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
}
function GeneratePredictionModel(configuration)
{
let modelName = configuration["Name"];
let sourceModel = configuration["SourceModel"];
let targetProject = configuration["TargetProject"];
let trainingConfiguration = configuration["TrainingConfiguration"];
let generationConfiguration = configuration["GenerationConfiguration"];
let trainingDataFilter = configuration["TrainingDataFilter"];
let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
let recreatePredictionModel = configuration["RecreatePredictionModel"];
let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
let result = null;
let logTableName = `_MLLog:${Now}`;
let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"});
try {
WriteLog(`Training a prediction model based on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
WriteLog(`Model training completed. Training result:\r\n${result}`);
if (generationConfiguration != null) {
WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName}):\r\n${result}`);
let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);
WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}:\r\n${result}`);
}
}
finally {
targetProject.DataTableByName(logTableName)?.DeletePermanently();
}
}
return #{
"GeneratePredictionModel": GeneratePredictionModel,
"RunFunctionWithParallelLogging": RunFunctionWithParallelLogging,
"Train": Train,
"Generate": Generate
};