Create Object-centric Model from Relational Schema

From QPR ProcessAnalyzer Wiki
Revision as of 08:22, 10 June 2025 by Ollvihe (talk | contribs)
Jump to navigation Jump to search

This page contains instructions how to create an object-centric model from a data schema.

Steps:

  1. Create the object tables.
  2. Define relations between the object tables using foreign keys.
  3. Run the script to update rest of object-centric model datatables.

Define foreign keys

Project.DatatableByName("Sales Order Item").SetForeignKeys([
  #{
    "Columns": [
      #{ "Source": "VBELN", "Target": "VBELN" }
    ],
    "TargetDatatable": "Sales Order Header"
  }
]);

Project.DatatableByName("Delivery Item").SetForeignKeys([
  #{
    "Columns": [
      #{ "Source": "VBELN", "Target": "VBELN" }
    ],
    "TargetDatatable": "Delivery Header"
  },
  #{
    "Columns": [
      #{ "Source": "VGBEL", "Target": "VBELN" },
      #{ "Source": "VGPOS", "Target": "POSNR" }
    ],
    "TargetDatatable": "Sales Order Item"
  }
]);

Update object-centric model tables

Following script uses the object types tables and creates the events, objects, event-to-object, and object-to-object tables. Note that the event types tables are not needed at all in this approach. The scripts uses the datatable names defined in the object-centric model and uses the datatable names

let modelObject = Project.Models.Where(IsOcelModel);
if (Count(modelObject) == 0) {
  throw "There is no object-centric model in the project.";
} else if (Count(modelObject) > 1) {
  throw "There are several object-centric models in the project.";
} 
modelObject = modelObject[0];
let objectTypeNames = modelObject.Configuration.OcelDataSource.ObjectTypes.Keys;

function getDatatableAndCreateIfNeeded(datatableName) {
  let dt = Project.DatatableByName(datatableName);
  if (dt == null) {
    dt = Project.CreateDatatable(#{
      "Name": datatableName,
      "Type": "Snowflake"
    });
  }
  return dt;
}

// Create Objects table
let allObjectsDataFrame;
for (let i = 0; i < Count(objectTypeNames); i++) {
  let objectsDatatableName = modelObject.Configuration.OcelDataSource.ObjectTypes[objectTypeNames[i]];
  let objectsDataFrame = getDatatableAndCreateIfNeeded(objectsDatatableName)
    .SqlDataFrame
    .Select(["OcelObjectTypeObjectId"])
    .RenameColumns(["OcelObjectId": "OcelObjectTypeObjectId"])
    .WithColumn("OcelObjectType", #expr{objectTypeNames[i]})
    .Select(["OcelObjectId", "OcelObjectType"]);
  if (allObjectsDataFrame == _EMPTY) {
     allObjectsDataFrame = objectsDataFrame;
  } else {
    allObjectsDataFrame = allObjectsDataFrame.Append(objectsDataFrame);
  }
}
Project.DatatableByName(modelObject.Configuration.OcelDataSource.Objects)
  .Truncate()
  .Import(allObjectsDataFrame, #{ "ImportExistingColumnOnly": false });

// Create Events table
let allEventsDataFrame;
for (let i = 0; i < Count(objectTypeNames); i++) {
  let objectsDatatableName = modelObject.Configuration.OcelDataSource.ObjectTypes[objectTypeNames[i]]);
  let objectsDatatable = getDatatableAndCreateIfNeeded(objectsDatatableName);
  let dateColumns = objectsDatatable.Columns.Where(DataType == "DateTime").Name;
  for (let j = 0; j < Count(dateColumns); j++) {
    let dateColumn = dateColumns[j];
    let eventsDataFrame = objectsDatatable
      .SqlDataFrame
      .Where(Column(#expr{dateColumn}) != null)
      .Select(["OcelObjectTypeObjectId", #expr{dateColumn}])
      .WithColumn("OcelEventId", Concat(Column("OcelObjectTypeObjectId"), "_", #expr{dateColumn}))
      .WithColumn("OcelEventType", #expr{dateColumn})
      .RenameColumns(["OcelEventTime": #expr{dateColumn}])
      .WithColumn("Object Type", #expr{objectTypeNames[i]})
      .WithColumn("Object ID", Column("OcelObjectTypeObjectId"))
      .Select(["OcelEventId", "OcelEventType", "OcelEventTime", "Object Type", "Object ID"]);
    if (allEventsDataFrame == _EMPTY) {
      allEventsDataFrame = eventsDataFrame;
    } else {
      allEventsDataFrame = allEventsDataFrame.Append(eventsDataFrame);
    }
  }
}
Project.DatatableByName(modelObject.Configuration.OcelDataSource.Events)
  .Truncate()
  .Import(allEventsDataFrame, #{ "ImportExistingColumnOnly": false });

// Create Event-to-Object table
let allEventToObjectDataFrame;
for (let i = 0; i < Count(objectTypeNames); i++) {
  let objectsDatatableName = modelObject.Configuration.OcelDataSource.ObjectTypes[objectTypeNames[i]]);
  let objectsDatatable = getDatatableAndCreateIfNeeded(objectsDatatableName);
  let dateColumns = objectsDatatable.Columns.Where(DataType == "DateTime").Name;
  for (let j = 0; j < Count(dateColumns); j++) {
    let dateColumn = dateColumns[j];
    let eventToObjectDataFrame = objectsDatatable
      .SqlDataFrame
      .Where(Column(#expr{dateColumn}) != null)
      .Select(["OcelObjectTypeObjectId"])
      .WithColumn("OcelEventToObjectSourceId", Concat(Column("OcelObjectTypeObjectId"), "_", #expr{dateColumn}))
      .RenameColumns(["OcelEventToObjectTargetId": "OcelObjectTypeObjectId"])
      .WithColumn("OcelEventToObjectQualifier", "")
      .Select(["OcelEventToObjectSourceId", "OcelEventToObjectTargetId", "OcelEventToObjectQualifier"]);
    if (allEventToObjectDataFrame == _EMPTY) {
      allEventToObjectDataFrame = eventToObjectDataFrame;
    } else {
      allEventToObjectDataFrame = allEventToObjectDataFrame.Append(eventToObjectDataFrame);
    }
  }
}
Project.DatatableByName(modelObject.Configuration.OcelDataSource.EventToObject)
  .Truncate()
  .Import(allEventToObjectDataFrame, #{ "ImportExistingColumnOnly": false });

// Create Object-to-object table
let allObjectToObjectDataFrame;
for (let i = 0; i < Count(objectTypeNames); i++) {
  let objectsDatatableName = modelObject.Configuration.OcelDataSource.ObjectTypes[objectTypeNames[i]]);
  let objectsDatatable = getDatatableAndCreateIfNeeded(objectsDatatableName);
  let foreignKeys = objectsDatatable.ForeignKeys;
  for (let j = 0; j < Count(foreignKeys); j++) {
    let joinedColumns = foreignKeys[j].Columns;
    let targetDataFrame = Project.DatatableByName(foreignKeys[j].TargetDatatable).SqlDataFrame
      .Select(Flatten(["OcelObjectTypeObjectId", joinedColumns.Target]))
      .RenameColumns(Flatten(["OcelObjectTypeObjectId", joinedColumns.Target]).{let col = _; [("Target" + col): col]}[0]);
    let objectToObjectDataFrame = objectsDatatable.SqlDataFrame
      .Select(Flatten(["OcelObjectTypeObjectId", joinedColumns.Source]))
      .Join(targetDataFrame, joinedColumns.{let col = _; [col.Source: ("Target" + col.Target)]}[0], "inner")
      .WithColumn("OcelObjectToObjectSourceId", Column("OcelObjectTypeObjectId"))
      .WithColumn("OcelObjectToObjectTargetId", Column("TargetOcelObjectTypeObjectId"))
      .WithColumn("OcelObjectToObjectQualifier", "")
      .Select(["OcelObjectToObjectSourceId", "OcelObjectToObjectTargetId", "OcelObjectToObjectQualifier"]);
    if (allObjectToObjectDataFrame == _EMPTY) {
      allObjectToObjectDataFrame = objectToObjectDataFrame;
    } else {
      allObjectToObjectDataFrame = allObjectToObjectDataFrame.Append(objectToObjectDataFrame);
   }
  }
}
Project.DatatableByName(modelObject.Configuration.OcelDataSource.ObjectToObject)
  .Truncate()
  .Import(allObjectToObjectDataFrame, #{ "ImportExistingColumnOnly": false });