Exemple de processus scalable

De Wiki1000

Sommaire


Dans cet exemple nous allons montrer comment écrire un processus scalable capable :

  • D'être exécuté manuellement par l'utilisateur
  • D'être exécuté simultanément sur plusieurs instances de l'Application
  • D'être planifié dans un automate
  • D'être exécuté au fil de l'eau.

Le processus transforme les objets d'une classe métier en changeant l'état d'un attribut énuméré.

  • WFClasseA est la classe des objets métiers à traiter
  • MyQueue est une file d'attente des objets prêt a être traité
  • MyProcess est le processus de traitement

Principe d'exécution

L'exécution du processus est séparée en deux étapes :

  • Dans une première étape le processus sélectionne les objets à traiter (doInQueue) et les place dans une file d'attente (MyQueue).
  • Dans une seconde étape le processus consomme les objets de la file d'attente (doDeQueue) et les traite (doProcess).

Le sujet de la file d'attente (queueTopic) reflète les variables d'état des objets traités.

Code du processus

Le code du processus est le suivant :

unit TestSYFREWF;
interface
 
Type
  MyProcessus = Class(TitObject)
  public
    Procedure doDeQueue;
    Procedure doInQueue;
    Procedure doProcess(obj:TQueueObject);
    Procedure doTask;
    Procedure Execute;
  end;
 
Implementation
 
{MyProcessus}
 
Procedure MyProcessus.doDeQueue;
//Procedure doDeQueue;
var obj:MyQueue; indx:Integer; aTopic:string;
begin
  // In this step we dequeue and process the ready objects.
 
  // Request objects in initial state in case of the queue contains multiple states
  aTopic := 'wfclassea/0/%';
 
  // This is just an estimation of the count, could be false if several processes run in parallel
  ProgressMax(MyQueue.CountWhere('queueTopic like %1',true,[aTopic]));
 
  // Get an enumerator from the queue
  foreach obj in MyQueue.Topic(aTopic) index indx do
   try
     // Progress indicator
     ProgressCount(indx);
 
     // Process this queue object
     // Separate the code to support straight through processing
     doProcess(obj);
 
     // delete the queue object
     obj.DeleteThisObject;
   except
     // on error requeue the object on an error topic
     // do not requeue on the same topic if the processus must be used in a straight through mode
     obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
   end;
  //
  ProgressFull();
end;;
 
Procedure MyProcessus.doInQueue;
//Procedure doInQueue;
var sel:TSelector;
begin
  // In this step we InQueue all objects which match the process criteria.
 
  // In this example the process criteria is objects in initial state
  // Use a selector for objects in initial state
  sel := WFClasseA.CreateSelector('unEtat=%1','',true,[WFCAState_Initial]);
  // Insert the objects in the queue using the selector, this is a transactional operation.
  // Topic is //classname/state/code/oid
  sel.InQueue('MyQueue','wfclassea/@unEtat/@unCode/@oid');
end;;
 
Procedure MyProcessus.doProcess(obj:TQueueObject);
//Procedure doProcess(obj:TQueueObject);
var inst:WFClasseA;
begin
  // One transaction by object
  withP transaction do
   begin
     // Get the actual object
     inst := obj.queueRef as WFClasseA;
     if Assigned(inst) then
      begin
        // this is the process core where business transformations are apply.
        // here we change the state of the object
        inst.unEtat.value := WFCAState_Etat2;
        ProgressMessage(obj.queueTopic);
      end;
   end;
end;;
 
Procedure MyProcessus.doTask;
//Procedure doTask;
begin
  // Step One : inqueue object to be processed.
  doInQueue;
  // Step two : Process objects from queue.
  doDeQueue;
end;;
 
Procedure MyProcessus.Execute;
//Procedure Execute;
var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer;
begin
  if Assigned(UserContext.TaskContext) then
   begin
     UserContext.TaskContext.AddMessage('MyProcessus.Execute');
     if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
       begin
         aMsg := UserContext.TaskContext.EventContext.receivedMsg;
         UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
 
         // a TdbmQueueMessage is also a TQueueObject
         // Original properties of the MyQueue object has been copied into aMsg
         doProcess(aMsg);
       end
       else
       begin
         UserContext.TaskContext.AddMessage('No message, process as a regular task');
         doTask;
       end;
   end
   else doTask; // started by the UI
end;

Exécution par l'interface utilisateur

L'interface du processus est standard, l'exécution appelle la méthode Execute du processus.

image1.png

Lorsque le processus est exécuté manuellement, ou bien dans une tâche d'automate planifié, il enchaine ces deux étapes (doTask).

Exécution au fil de l'eau

Lorsque le processus est exécuté par une tâche d'automate déclenché par un évènement file d'attente il extrait du contexte de l'automate le message reçu de la file d'attente (Execute) et exécute directement le code de traitement (doProcess).

L'automate est configuré ainsi :

image2.png

Le paramétrage de la file d'attente utilisée par l'évènement file d'attente :

image3.png

La file d'attente est définie sur la classe file d'attente utilisée par le processus (MyQueue) et filtre sur le sujet correspondant aux objets à traiter (wfclassea/0/%)

Le code de la méthode Execute teste le contexte d'exécution de l'automate pour extraire le message à traiter :

Procedure MyProcessus.Execute;
//Procedure Execute;
var aMsg:TdbmQueueMessage; aTopic:string; idx:Integer;
begin
  if Assigned(UserContext.TaskContext) then
   begin
     UserContext.TaskContext.AddMessage('MyProcessus.Execute');
     if Assigned(UserContext.TaskContext.EventContext.receivedMsg) then
       begin
         aMsg := UserContext.TaskContext.EventContext.receivedMsg;
         UserContext.TaskContext.AddMessage(Format('Message :%s',[aMsg.queueTopic]]);
 
         // a TdbmQueueMessage is also a TQueueObject
         // Original properties of the MyQueue object has been copied into aMsg
         doProcess(aMsg);
       end
       else
       begin
         UserContext.TaskContext.AddMessage('No message, process as a regular task');
         doTask;
       end;
   end
   else doTask; // started by the UI
end;

Notez que le message reçu dans le contexte de la tâche d'automate n'est pas directement du type MyQueue mais du type TdbmSoredQueueMessage. C'est pour cette raison que le type de l'objet message passé à dbProcess n'est pas MyQueue mais TQueueObject.

Gestion des erreurs

Erreur en traitement par lot

Lorsque une erreur de traitement se produit, le code de deQueue réactive l'objet de la file tout en modifiant son sujet.

//Procedure doDeQueue;
begin
  ...
  foreach obj in MyQueue.Topic(aTopic) index indx do
   try
     ....
   except
     // on error requeue the object on an error topic
     // do not requeue on the same topic if the processus must be used in a straight through mode
     obj.ReQueue(StringReplace(obj.queueTopic,'/0/','/99/'));
   end;
end;
Tip-20px.png Tip : Dans la gestion des erreurs il faut être attentif à ne pas créer des situations de boucle sans fin. Dans le cadre d'une consommation par un énumérateur ce n'est pas possible car l'énumération est ordonnée sur queueID.

Réactiver l'objet de la file d'attente est un choix d'implémentation du processus, il serait aussi possible de supprimer cet objet.

En réactivant l'objet de la file d'attente sur un sujet différent il est possible de traiter ce objet dans un processus dédié aux erreurs.

En traitement au fil de l'eau

Dans ce cas c'est l'évènement file d'attente de la tâche d'automate qui va gérer l'erreur suivant le paramétrage défini dans la file d'attente.

image3.png

Outils personnels