Exemple de processus scalable
Sommaire |
Dans cet exemple nous allons montrer comment écrire un processus scalable capable :
- D'être exécuté manuellement par l'utilisateur
- D'être exécuté simultanément sur plusieurs instances de l'Application
- D'être planifié dans un automate
- D'être exécuté au fil de l'eau.
Le processus transforme les objets d'une classe métier en changeant l'état d'un attribut énuméré.
- WFClasseA est la classe des objets métiers à traiter
- MyQueue est une file d'attente des objets prêt a être traité
- MyProcess est le processus de traitement
Principe d'exécution
L'exécution du processus est séparée en deux étapes :
- Dans une première étape le processus sélectionne les objets à traiter (doInQueue) et les place dans une file d'attente (MyQueue).
- Dans une seconde étape le processus consomme les objets de la file d'attente (doDeQueue) et les traite (doProcess).
Le sujet de la file d'attente (queueTopic) reflète les variables d'état des objets traités.
Code du processus
Le code du processus est le suivant :
unit TestSYFREWF; interface Type MyProcessus = Class(TitObject) public Procedure doDeQueue; Procedure doInQueue; Procedure doProcess(obj:TQueueObject); Procedure doTask; Procedure Execute; end; Implementation {MyProcessus} Procedure MyProcessus.doDeQueue; //Procedure doDeQueue; var obj:MyQueue; indx:Integer; aTopic:string; begin // In this step we dequeue and process the ready objects. // Request objects in initial state in case of the queue contains multiple states aTopic := 'wfclassea/0/%'; // This is just an estimation of the count, could be false if several processes run in parallel ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic])); // Get an enumerator from the queue foreach obj in MyQueue.Topic(aTopic) index indx do try // Progress indicator ProgressCount(indx); // Process this queue object // Separate the code to support straight through processing doProcess(obj); // delete the queue object obj.DeleteThisObject; except // on error requeue the object on an error topic // do not requeue on the same topic if the processus must be used in a straight through mode obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/')); end; // ProgressFull(); end;; Procedure MyProcessus.doInQueue; //Procedure doInQueue; var sel:TSelector; begin // In this step we InQueue all objects which match the process criteria. // In this example the process criteria is objects in initial state // Use a selector for objects in initial state sel := WFClasseA.CreateSelector('unEtat=%1','',true,[WFCAState_Initial]); // Insert the objects in the queue using the selector, this is a transactional operation. // Topic is //classname/state/code/oid sel.InQueue('MyQueue','wfclassea/@unEtat/@unCode/@oid'); end;; Procedure MyProcessus.doProcess(obj:TQueueObject); //Procedure doProcess(obj:TQueueObject); var inst:WFClasseA; begin // One transaction by object withP transaction do begin // Get the actual object inst := obj.queueRef as WFClasseA; if Assigned(inst) then begin // this is the process main code where business transformations are applied. // here we change the state of the object inst.unEtat.value := WFCAState_Etat2; ProgressMessage(obj.queueTopic); end; end; end;; Procedure MyProcessus.doTask; //Procedure doTask; begin // Step One : inqueue object to be processed. doInQueue; // Step two : Process objects from queue. doDeQueue; end;; Procedure MyProcessus.Execute; //Procedure Execute; var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer; begin // Main code of the processus if Assigned(UserContext.TaskContext) then begin // The processus has been started by the scheduler // UserContext.TaskContext.AddMessage('MyProcessus.Execute'); if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then begin // The scheduler task has consumed one object from the queue. // aMsg := UserContext.TaskContext.EventContext.receivedMsg; UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]); // a TdbmQueueMessage is also a TQueueObject // Original properties of the MyQueue object has been copied into aMsg doProcess(aMsg); end else begin // The scheduler task is not queue based. // UserContext.TaskContext.AddMessage('No message, process as a regular task'); doTask; end; end else doTask; // The processus has been started by the UI end;
Exécution par l'interface utilisateur
L'interface du processus est standard, l'exécution appelle la méthode Execute du processus.
Lorsque le processus est exécuté manuellement, ou bien dans une tâche d'automate planifié, il enchaine ces deux étapes (doTask).
Exécution au fil de l'eau
Lorsque le processus est exécuté par une tâche d'automate déclenchée par un évènement file d'attente il extrait du contexte de l'automate le message reçu de la file d'attente (Execute) et exécute directement le code de traitement (doProcess).
L'automate est configuré ainsi :
Le paramétrage de la file d'attente utilisée par l'évènement file d'attente :
La file d'attente est définie sur la classe file d'attente utilisée par le processus (MyQueue) et filtre sur le sujet correspondant aux objets à traiter (wfclassea/0/%)
Le code de la méthode Execute teste le contexte d'exécution de l'automate pour extraire le message à traiter :
Procedure MyProcessus.Execute; //Procedure Execute; var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer; begin if Assigned(UserContext.TaskContext) then begin UserContext.TaskContext.AddMessage('MyProcessus.Execute'); if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then begin aMsg := UserContext.TaskContext.EventContext.receivedMsg; UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]); // a TdbmQueueMessage is also a TQueueObject // Original properties of the MyQueue object has been copied into aMsg doProcess(aMsg); end else begin UserContext.TaskContext.AddMessage('No message, process as a regular task'); doTask; end; end else doTask; // started by the UI end;
Notez que le message reçu dans le contexte de la tâche d'automate n'est pas directement du type MyQueue mais du type TdbmSoredQueueMessage. C'est pour cette raison que le type de l'objet message passé à dbProcess n'est pas MyQueue mais TQueueObject.
Gestion des erreurs
Erreur en traitement par énumérateur
Lorsque une erreur de traitement se produit, le code de deQueue réactive l'objet de la file tout en modifiant son sujet.
//Procedure doDeQueue; begin ... foreach obj in MyQueue.Topic(aTopic) index indx do try .... except // on error requeue the object on an error topic // do not requeue on the same topic if the processus must be used in a straight through mode obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/')); end; end;
Réactiver l'objet de la file d'attente est un choix d'implémentation du processus, il serait aussi possible de supprimer cet objet.
En réactivant l'objet de la file d'attente sur un sujet différent il est possible de traiter ce objet dans un processus dédié aux erreurs.
En traitement au fil de l'eau
Dans ce cas c'est l'évènement file d'attente de la tâche d'automate qui va gérer l'erreur suivant le paramétrage défini dans la file d'attente.
- Si RequeueTopicXXXPattern sont définis l'objet sera réactivé dans la file d'attente (ici MyQueue) avec le nouveau sujet défini par
StringReplace(queueTopic,RequeueTopicOldPattern,RequeueTopicNewPattern)
- Si RequeueTopicXXXPattern ne sont pas définis l'objet de la file d'attente sera supprimé
Traitement des objets avec regroupement
Dans certain cas il est souhaitable de regrouper les objets suivant un critère particulier pour les traiter en un lot.
Pour se faire le critère de regroupement doit être présent dans le sujet associé à chaque objet traité.
Le principe est le suivant :
- On interroge la file d'attente pour obtenir le premier objet traitable (GetFirst).
- On extrait du sujet obtenu le critère de regroupement souhaité (ici en utilisant StringPart).
- On obtient de la file d'attente tous les objets vérifiant ce sujet (DeQueueList).
L'exemple précédent est modifié en remplaçant la procédure DeQueue :
//Procedure doDeQueueWithGroupBy; var obj:MyQueue; aList:TQueueObjectList; indx:Integer; aTopic,aGroupByTopic:string; begin // In this step we dequeue and process the ready objects. // The objects are dequeued group by unCode // Request objects in initial state in case of the queue contains multiple states aTopic := 'wfclassea/0/%'; // This is just an estimation of the count, could be false if several processes run in parallel ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic])); obj := MyQueue.GetFirst(aTopic); Repeat if Assigned(obj) then begin // Progress indicator ProgressCount(indx); // Get the topic on which to group by aGroupByTopic := 'wfclassea/0/'+StringPart(obj.queueTopic,'/',2)+'/%'; // Get a list of queue objects with this topic aList := MyQueue.deQueueList(aGroupByTopic); // the objects may have been consumed by an other process, so we have to check it again. if Assigned(aList) and (aList.Count>0) then try // process the list of objects doProcessList(aList); aList.DeleteObjects; except // On error ReQueue the list MyQueue.ReQueueList(aList,'/0/','/99/'); end; end; obj := MyQueue.GetNext(aTopic,obj); Until not Assigned(obj); // ProgressFull(); end; //Procedure doProcessList(aList:TQueueObjectList); var inst:WFClasseA; idx:Integer; begin // One transaction by object withP transaction do begin for idx:=0 to aList.Count-1 do begin // Get the actual object inst := aList.Refs[idx].queueRef as WFClasseA; if Assigned(inst) then begin // this is the process core where business transformations are apply. // here we change the state of the object inst.unEtat.value := WFCAState_Etat2; ProgressMessage(aList.Refs[idx].queueTopic); end; end; end; end;
En cas d'erreur on replace dans la file les objets de la liste (ReQueueList) en une opération en prenant soin de changer le sujet pour éviter de créer des objets poisons.