Exemple de processus scalable

De Wiki1000
(Différences entre les versions)
 
Ligne 323 : Ligne 323 :
 
En cas d'erreur on replace dans la file les objets de la liste (ReQueueList) en une opération en prenant soin de changer le sujet pour éviter de créer des objets poisons.
 
En cas d'erreur on replace dans la file les objets de la liste (ReQueueList) en une opération en prenant soin de changer le sujet pour éviter de créer des objets poisons.
  
[[Category:Architecture]]
 
 
[[Category:File d'attente]]
 
[[Category:File d'attente]]
[[Category:Latest]]
+
[[Category:Cluster]]
 +
[[Category:Version700]]

Version actuelle en date du 20 janvier 2015 à 14:02

Sommaire


Dans cet exemple nous allons montrer comment écrire un processus scalable capable :

  • D'être exécuté manuellement par l'utilisateur
  • D'être exécuté simultanément sur plusieurs instances de l'Application
  • D'être planifié dans un automate
  • D'être exécuté au fil de l'eau.

Le processus transforme les objets d'une classe métier en changeant l'état d'un attribut énuméré.

  • WFClasseA est la classe des objets métiers à traiter
  • MyQueue est une file d'attente des objets prêt a être traité
  • MyProcess est le processus de traitement

Principe d'exécution

L'exécution du processus est séparée en deux étapes :

  • Dans une première étape le processus sélectionne les objets à traiter (doInQueue) et les place dans une file d'attente (MyQueue).
  • Dans une seconde étape le processus consomme les objets de la file d'attente (doDeQueue) et les traite (doProcess).

Le sujet de la file d'attente (queueTopic) reflète les variables d'état des objets traités.

Code du processus

Le code du processus est le suivant :

unit TestSYFREWF;
interface
 
Type
  MyProcessus = Class(TitObject)
  public
    Procedure doDeQueue;
    Procedure doInQueue;
    Procedure doProcess(obj:TQueueObject);
    Procedure doTask;
    Procedure Execute;
  end;
 
Implementation
 
{MyProcessus}
 
Procedure MyProcessus.doDeQueue;
//Procedure doDeQueue;
var obj:MyQueue; indx:Integer; aTopic:string;
begin
  // In this step we dequeue and process the ready objects.
 
  // Request objects in initial state in case of the queue contains multiple states
  aTopic := 'wfclassea/0/%';
 
  // This is just an estimation of the count, could be false if several processes run in parallel
  ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic]));
 
  // Get an enumerator from the queue
  foreach obj in MyQueue.Topic(aTopic) index indx do
   try
     // Progress indicator
     ProgressCount(indx);
 
     // Process this queue object
     // Separate the code to support straight through processing
     doProcess(obj);
 
     // delete the queue object
     obj.DeleteThisObject;
   except
     // on error requeue the object on an error topic
     // do not requeue on the same topic if the processus must be used in a straight through mode
     obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
   end;
  //
  ProgressFull();
end;;
 
Procedure MyProcessus.doInQueue;
//Procedure doInQueue;
var sel:TSelector;
begin
  // In this step we InQueue all objects which match the process criteria.
 
  // In this example the process criteria is objects in initial state
  // Use a selector for objects in initial state
  sel := WFClasseA.CreateSelector('unEtat=%1','',true,[WFCAState_Initial]);
  // Insert the objects in the queue using the selector, this is a transactional operation.
  // Topic is //classname/state/code/oid
  sel.InQueue('MyQueue','wfclassea/@unEtat/@unCode/@oid');
end;;
 
Procedure MyProcessus.doProcess(obj:TQueueObject);
//Procedure doProcess(obj:TQueueObject);
var inst:WFClasseA;
begin
  // One transaction by object
  withP transaction do
   begin
     // Get the actual object
     inst := obj.queueRef as WFClasseA;
     if Assigned(inst) then
      begin
        // this is the process main code where business transformations are applied.
        // here we change the state of the object
        inst.unEtat.value := WFCAState_Etat2;
        ProgressMessage(obj.queueTopic);
      end;
   end;
end;;
 
Procedure MyProcessus.doTask;
//Procedure doTask;
begin
  // Step One : inqueue object to be processed.
  doInQueue;
  // Step two : Process objects from queue.
  doDeQueue;
end;;
 
Procedure MyProcessus.Execute;
//Procedure Execute;
var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer;
begin
  // Main code of the processus
  if Assigned(UserContext.TaskContext) then
   begin
     // The processus has been started by the scheduler
     //
     UserContext.TaskContext.AddMessage('MyProcessus.Execute');
     if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
       begin
         // The scheduler task has consumed one object from the queue.
         //
         aMsg := UserContext.TaskContext.EventContext.receivedMsg;
         UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
 
         // a TdbmQueueMessage is also a TQueueObject
         // Original properties of the MyQueue object has been copied into aMsg
         doProcess(aMsg);
       end
       else
       begin
         // The scheduler task is not queue based. 
         //
         UserContext.TaskContext.AddMessage('No message, process as a regular task');
         doTask;
       end;
   end
   else doTask; // The processus has been started by the UI
end;

Exécution par l'interface utilisateur

L'interface du processus est standard, l'exécution appelle la méthode Execute du processus.

image1.png

Lorsque le processus est exécuté manuellement, ou bien dans une tâche d'automate planifié, il enchaine ces deux étapes (doTask).

Exécution au fil de l'eau

Lorsque le processus est exécuté par une tâche d'automate déclenchée par un évènement file d'attente il extrait du contexte de l'automate le message reçu de la file d'attente (Execute) et exécute directement le code de traitement (doProcess).

L'automate est configuré ainsi :

image2.png

Le paramétrage de la file d'attente utilisée par l'évènement file d'attente :

image3.png

La file d'attente est définie sur la classe file d'attente utilisée par le processus (MyQueue) et filtre sur le sujet correspondant aux objets à traiter (wfclassea/0/%)

Le code de la méthode Execute teste le contexte d'exécution de l'automate pour extraire le message à traiter :

Procedure MyProcessus.Execute;
//Procedure Execute;
var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer;
begin
  if Assigned(UserContext.TaskContext) then
   begin
     UserContext.TaskContext.AddMessage('MyProcessus.Execute');
     if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
       begin
         aMsg := UserContext.TaskContext.EventContext.receivedMsg;
         UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
 
         // a TdbmQueueMessage is also a TQueueObject
         // Original properties of the MyQueue object has been copied into aMsg
         doProcess(aMsg);
       end
       else
       begin
         UserContext.TaskContext.AddMessage('No message, process as a regular task');
         doTask;
       end;
   end
   else doTask; // started by the UI
end;

Notez que le message reçu dans le contexte de la tâche d'automate n'est pas directement du type MyQueue mais du type TdbmSoredQueueMessage. C'est pour cette raison que le type de l'objet message passé à dbProcess n'est pas MyQueue mais TQueueObject.

Gestion des erreurs

Erreur en traitement par énumérateur

Lorsque une erreur de traitement se produit, le code de deQueue réactive l'objet de la file tout en modifiant son sujet.

//Procedure doDeQueue;
begin
  ...
  foreach obj in MyQueue.Topic(aTopic) index indx do
   try
     ....
   except
     // on error requeue the object on an error topic
     // do not requeue on the same topic if the processus must be used in a straight through mode
     obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
   end;
end;
Tip-20px.png Tip : Dans la gestion des erreurs il faut être attentif à ne pas créer des situations de boucle sans fin. Dans le cadre d'une consommation par un énumérateur ce n'est pas possible car l'énumération est ordonnée sur queueID.

Réactiver l'objet de la file d'attente est un choix d'implémentation du processus, il serait aussi possible de supprimer cet objet.

En réactivant l'objet de la file d'attente sur un sujet différent il est possible de traiter ce objet dans un processus dédié aux erreurs.

En traitement au fil de l'eau

Dans ce cas c'est l'évènement file d'attente de la tâche d'automate qui va gérer l'erreur suivant le paramétrage défini dans la file d'attente.

image3.png

  • Si RequeueTopicXXXPattern sont définis l'objet sera réactivé dans la file d'attente (ici MyQueue) avec le nouveau sujet défini par
  StringReplace(queueTopic,RequeueTopicOldPattern,RequeueTopicNewPattern)
  • Si RequeueTopicXXXPattern ne sont pas définis l'objet de la file d'attente sera supprimé

Traitement des objets avec regroupement

Dans certain cas il est souhaitable de regrouper les objets suivant un critère particulier pour les traiter en un lot.

Pour se faire le critère de regroupement doit être présent dans le sujet associé à chaque objet traité.

Le principe est le suivant :

  • On interroge la file d'attente pour obtenir le premier objet traitable (GetFirst).
  • On extrait du sujet obtenu le critère de regroupement souhaité (ici en utilisant StringPart).
  • On obtient de la file d'attente tous les objets vérifiant ce sujet (DeQueueList).

L'exemple précédent est modifié en remplaçant la procédure DeQueue :

//Procedure doDeQueueWithGroupBy;
var obj:MyQueue; aList:TQueueObjectList; indx:Integer; aTopic,aGroupByTopic:string;
begin
  // In this step we dequeue and process the ready objects.
  // The objects are dequeued group by unCode
 
  // Request objects in initial state in case of the queue contains multiple states
  aTopic := 'wfclassea/0/%';
 
  // This is just an estimation of the count, could be false if several processes run in parallel
  ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic]));
 
  obj := MyQueue.GetFirst(aTopic);
  Repeat
    if Assigned(obj) then
     begin
       // Progress indicator
       ProgressCount(indx);
 
       // Get the topic on which to group by
       aGroupByTopic := 'wfclassea/0/'+StringPart(obj.queueTopic,'/',2)+'/%';
 
       // Get a list of queue objects with this topic
       aList := MyQueue.deQueueList(aGroupByTopic);
       // the objects may have been consumed by an other process, so we have to check it again.
       if Assigned(aList) and (aList.Count>0) then
         try
           // process the list of objects
           doProcessList(aList);
           aList.DeleteObjects;
         except
         // On error ReQueue the list
         MyQueue.ReQueueList(aList,'/0/','/99/');
         end;
     end;
    obj := MyQueue.GetNext(aTopic,obj);
  Until not Assigned(obj);
  //
  ProgressFull();
end;
 
//Procedure doProcessList(aList:TQueueObjectList);
var inst:WFClasseA; idx:Integer;
begin
  // One transaction by object
  withP transaction do
   begin
     for idx:=0 to aList.Count-1 do
      begin
        // Get the actual object
        inst := aList.Refs[idx].queueRef as WFClasseA;
        if Assigned(inst) then
         begin
           // this is the process core where business transformations are apply.
           // here we change the state of the object
           inst.unEtat.value := WFCAState_Etat2;
           ProgressMessage(aList.Refs[idx].queueTopic);
         end;
      end;
   end;
end;

En cas d'erreur on replace dans la file les objets de la liste (ReQueueList) en une opération en prenant soin de changer le sujet pour éviter de créer des objets poisons.

Outils personnels