KSquare Utilities
KKJobManager.h
Go to the documentation of this file.
1 #ifndef _JOBMANAGER_
2 #define _JOBMANAGER_
3 
4 namespace KKJobManagment
5 {
6 }
7 
8 
9 #include "KKJob.h"
10 
11 
12 #include "RunLog.h"
13 
14 
15 /*!
16  \namespace KKJobManagment
17  \brief NameSpace for the JobManagement Library.
18  */
19 
20 namespace KKJobManagment
21 {
22  /*!
23  \class KKJobManager
24  \brief Responsable for keeping track of a list of jobs.
25  \details See the application called RandomSplitsJobManager. It is the first alplication to use
26  this library; any class that is derived from this class are required to implement these
27  method "StatusFileProcessLine", "StatusFileProcessLineJobStatusChange", "ToStatusStr"
28  */
29  class KKJobManager: public KKJob
30  {
31  public:
33 
34  KKJobManager (const KKJobManager& j);
35 
36  KKJobManager (JobManagerPtr _manager, /*!< Ptr to job that is managing this 'KKJobManager' */
37  kkint32 _jobId,
38  kkint32 _parentId,
39  kkint32 _numPorcessesAllowed,
40  const KKStr& _managerName, /*!< Name of this 'KKJobManager' ; status and lock file will be based on it. */
41  kkint32 _numJobsAtATime, /*!< The number of jobs that can be allocatd at one time for a single process to execute. */
42  RunLog& _log
43  );
44 
45  ~KKJobManager ();
46 
47 
48  /*!
49  \brief Initialize the KKJobManager object.
50  \details This is the first Method that needs to be called right after the object is constructed.
51  */
52  void InitilizeJobManager (bool& successful);
53 
54 
55  virtual KKJobPtr Duplicate () const = 0; /*!< Create a duplicate instance. */
56 
57  virtual const char* JobType () const; /*!< Allows us to know which specific implementaion of 'KKJob' an instance really is. */
58 
59 
60  bool SupportCompletedJobData () const {return supportCompletedJobData;}
61 
62  void SupportCompletedJobData (bool _supportCompletedJobData) {supportCompletedJobData = _supportCompletedJobData;}
63 
64 
65  double CpuTimeTotalUsed () const {return cpuTimeTotalUsed;}
66  kkint32 ExpansionCount () const {return expansionCount;}
67  KKJobListPtr Jobs () const {return jobs;}
68  const KKStr& ManagerName () const {return managerName;}
69  kkint32 NextJobId () const {return nextJobId;}
70  bool QuitRunning () const {return quitRunning;}
71 
72 
73  kkint32 AllocateNextJobId (); // Used to get next available KKJob Id
74 
75  bool AreAllJobsDone ();
76 
77  void Block ();
78 
79  void EndBlock ();
80 
81  kkint32 GetNextJobId (); // Returns 'nextJobId' then increments by '1'.
82 
83  void Restart ();
84 
85  void Run ();
86 
87  void SetQuitRunningFlag ();
88 
89  void Update (JobManagerPtr p);
90 
91  private:
92  virtual
93  KKJobListPtr JobsCreateInitialSet () = 0; // Derived classes use this method to sed the initial set of jobs.
94 
95 
96  /**
97  *@brief KKJob manager calls this method when ever it is rady to start processing another group of jobs.
98  *@details Derived classes need to implement this method; it will be called when all jobs have been completed. The
99  * variable 'jobs' will contain a list of all jobs.
100  *@params[in,out] jobsJustCompletd - Contains a list of jobs that have been completed since the last time this
101  * method was called or 'JobsCreateInitialSet' was called.
102  *@Returns List of new jobs that are to be processed. A NULL signifies that no more jobs to process.
103  */
104  virtual KKJobListPtr JobsExpandNextSetOfJobs (const KKJobListPtr jobsJustCompletd) = 0;
105 
106 
107  virtual void GenerateFinalResultsReport () = 0;
108 
109 
110 
111  /**
112  *@brief Load any run time data that will be needed.
113  *@details 'KKJobManager' will call this method just before it calls 'StatusFileInitialize' if
114  * no existing StausFile otherwise it will call it just after it loads the StatusFile.
115  */
116  virtual void LoadRunTimeData () = 0;
117 
118 
119  /**
120  *@brief In one (Block - EndBlock) sequesnce will update completd jobs and select next set of jobs to process.
121  */
122  KKJobListPtr GetNextSetOfJobs (KKJobListPtr completedJobs);
123 
124 
125 
126  //****************************************************************************************
127  //* Status File Routines.
128 
129  void ReportCpuTimeUsed (ofstream* statusFile);
130 
131  void StatusFileInitialize ();
132 
133  virtual
134  void StatusFileInitialize (ostream& o) = 0;
135 
136 
137  ofstream* StatusFileOpen (ios::openmode openMode);
138 
139  void StatusFileLoad (); /*!< To load initial contents of status file. */
140 
141  void StatusFileRefresh (); /*!< Will read in any changes from status file since last call. */
142 
143  void StatusFileWrite (); /*!< Will rewrite the status file from scratch; the old one will be deleted. */
144 
145 
146  protected:
147  // The next 3 methods need to be implemented by any Derived classes. Those derived class need to also
148  // call the Base class version of this method.
149 
150  virtual void StatusFileProcessLine (const KKStr& ln,
151  istream& statusFile
152  );
153 
154  virtual void StatusFileProcessLineJobStatusChange (KKStr& statusLineStr);
155 
156  virtual KKStr ToStatusStr (); /**<derived classes should implement this method. They need to return a tab
157  * delimietd string with all there parameters
158  * <FieldName1> <\t> <FieldValue1> <\t> <fieldName2> <\t> <FieldValue2> etc etc etc ...
159  * They should call the base Class version of this method and return that string first.
160  * ex: return KKJobManager::ToStatusStr () + "\t" + "FieldName1" + "\t" + "Value1" + "\t" + "FieldName2" + "\t" + "Value2";
161  */
162 
163  private:
164  //****************************************************************************************
165  void ProcessNextExpansion (ostream& o);
166 
167  void ProcessRestart ();
168 
169  void ProcessJobXmlBlockOfText (const KKStr& startStr,
170  istream& i
171  );
172 
173 
174  double cpuTimeLastReported;
175  double cpuTimeTotalUsed;
176  KKB::DateTime dateTimeStarted;
177  KKB::DateTime dateTimeEnded;
178  bool dateTimeFirstOneFound; /**< Flag that indicates weather we have read the first
179  * instance of a 'CurrentDateTime' in the Status file.
180  */
181 
182 
183  KKJobListPtr jobs; /**<List of jobs that we are working with. We
184  * will be maintaining consistance with other
185  * parallel running processes through the
186  * 'status' file. As we make changes to each
187  * individule 'BinaryJob' we write them out to
188  * the status file. We periodically read from
189  * the status file to see if there have been
190  * changes made by other processors.
191  */
192 
193  kkint32 blockLevel; /**< Starts at 0; increments with each Call to 'Block' and decrementes with 'EndBlock' */
194  kkint32 lockFile;
195  KKStr lockFileName;
196  bool lockFileOpened;
197 
198  KKStr managerName;
199  bool supportCompletedJobData; /**< If set to 'true' will call 'WriteCompletedJobData' after each job is completed. */
200 
201  kkint32 expansionCount;
202  kkint32 expansionFirstJobId; /**< jobId of first job in the current expansion. */
203  kkint32 nextJobId;
204  kkint32 numJobsAtATime;
205  kkint32 procId; /**< Processor Id that O/S assigns to this thread. */
206  bool quitRunning; /**< If this flag is set true we are to terminate processing as soon as possible. */
207 
208  bool restart;
209 
210  KKStr statusFileName;
211  long statusFileNextByte; /**< Byte offset of next bye to read from status file. */
212 
213  }; /* KKJobManager */
214 
215 
217 
218  #define _JobManagerDefined_
219 
220 
221 
223  {
224  public:
225  JobManagerList (const KKStr& _rootDir);
226  ~JobManagerList ();
227 
228  private:
229  };
230 
231 } /* KKJobManagment */
232 
233 
234 
235 #endif
kkint32 NextJobId() const
Definition: KKJobManager.h:69
KKJob * KKJobPtr
Definition: KKJob.h:75
void InitilizeJobManager(bool &successful)
Initialize the KKJobManager object.
__int32 kkint32
Definition: KKBaseTypes.h:88
virtual void StatusFileProcessLine(const KKStr &ln, istream &statusFile)
KKJobListPtr Jobs() const
Definition: KKJobManager.h:67
kkint32 ExpansionCount() const
Definition: KKJobManager.h:66
KKJobManager * JobManagerPtr
Definition: KKJob.h:63
void Update(JobManagerPtr p)
void AddPrerequisites(VectorInt _prerequisites)
virtual KKJobPtr Duplicate() const =0
virtual const char * JobType() const
virtual void StatusFileProcessLineJobStatusChange(KKStr &statusLineStr)
bool SupportCompletedJobData() const
Definition: KKJobManager.h:60
KKTHread * KKTHreadPtr
Responsable for keeping track of a list of jobs.
Definition: KKJobManager.h:29
void SupportCompletedJobData(bool _supportCompletedJobData)
Definition: KKJobManager.h:62
KKJobManager(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, const KKStr &_managerName, kkint32 _numJobsAtATime, RunLog &_log)
static KKStr Concat(const std::vector< std::string > &values)
Concatenates the list of &#39;std::string&#39; strings.
Definition: KKStr.cpp:1082
const KKStr & ManagerName() const
Definition: KKJobManager.h:68
JobManagerList(const KKStr &_rootDir)
KKJobManager(const KKJobManager &j)
KKJobList * KKJobListPtr
Definition: KKJob.h:58
Used for logging messages.
Definition: RunLog.h:49
A framework for managing a large number of processes(Jobs) in a multi-cpu/ multi-o/s environment...
Definition: KKJob.h:56
KKJobManager * JobManagerPtr
Definition: KKJobManager.h:32
double CpuTimeTotalUsed() const
Definition: KKJobManager.h:65