KSquare Utilities
KKJob.cpp
Go to the documentation of this file.
1 #include "FirstIncludes.h"
2 
3 #include <stdlib.h>
4 #include <stdio.h>
5 
6 #include <fstream>
7 #include <iostream>
8 #include <map>
9 #include <vector>
10 
11 #ifdef WIN32
12 #include <io.h>
13 #include <windows.h>
14 #else
15 //#include <sys/loadavg.h>
16 #include <unistd.h>
17 #endif
18 
19 
20 #include "MemoryDebug.h"
21 
22 using namespace std;
23 
24 
25 
26 #include "KKBaseTypes.h"
27 #include "KKException.h"
28 #include "KKQueue.h"
29 #include "OSservices.h"
30 #include "RunLog.h"
31 #include "KKStr.h"
32 using namespace KKB;
33 
34 
35 
36 #include "KKJob.h"
37 #include "KKJobManager.h"
38 using namespace KKJobManagment;
39 
40 
41 
42 KKJob::KKJob (const KKJob& j):
43 
44  log (j.log),
45  manager (j.manager),
46  jobId (j.jobId),
51  status (j.status)
52 
53 {
54 }
55 
56 
57 
58 // To create a brand new job that has not been proceesed yet.
59 KKJob::KKJob (JobManagerPtr _manager,
60  kkint32 _jobId,
61  kkint32 _parentId,
62  kkint32 _numPorcessesAllowed,
63  RunLog& _log
64  ):
65 
66  manager (_manager),
67  jobId (_jobId),
68  parentId (_parentId),
69  numProcessors (0),
70  numPorcessesAllowed (_numPorcessesAllowed),
71  prerequisites (),
72  status (jsOpen),
73  log (_log)
74 
75 {
76 }
77 
78 
79 
80 
81 
83  manager (_manager),
84  jobId (-1),
85  parentId (-1),
86  numProcessors (0),
88  prerequisites (),
89  status (jsNULL),
90  log (_manager->Log ())
91 {
92 }
93 
94 
95 
96 
98 {
99 }
100 
101 
102 
103 
104 const char* KKJob::JobType () const
105 {
106  return "KKJob";
107 }
108 
109 
110 
111 
112 KKJobPtr KKJob::Duplicate () const
113 {
114  return new KKJob (*this);
115 }
116 
117 
118 
120 {
121  // This metjod should have been overridden by a derived class; if we end up here then
122  // no actual work will get done.
123  throw KKException ("KKJob::ProcessNode ***ERROR*** ProcessNode was not defined bydecendent class.");
124 }
125 
126 
127 
128 
129 typedef KKJobPtr (*ConstructorPtr)(const KKStr&);
130 
132 
133 
134 
135 void KKJob::RegisterConstructor (const KKStr& _name,
136  ConstructorPtr _constructor
137  )
138 {
139  registeredConstructors.insert (pair<KKStr, ConstructorPtr>(_name, _constructor));
140 }
141 
142 
143 
144 
146  const KKStr& _jobTypeName,
147  const KKStr& _statusStr
148  )
149 {
150  ConstructorIndex::iterator idx;
151  idx = registeredConstructors.find (_jobTypeName);
152  if (idx == registeredConstructors.end ())
153  return NULL;
154  KKJobPtr j = idx->second (_manager);
155  j->ProcessStatusStr (_statusStr);
156  return j;
157 } /* CallAppropriateConstructor */
158 
159 
160 
161 
162 
163 void KKJob::ReFresh (KKJob& j)
164 {
165  jobId = j.jobId;
166  parentId = j.parentId;
167  manager = j.manager;
168  status = j.status;
169 
173  status = j.status;
174 } /* ReFresh */
175 
176 
177 
178 KKStr KKJob::JobStatusToStr (JobStatus status)
179 {
180  if (status == jsOpen)
181  return "Open";
182 
183  if (status == jsStarted)
184  return "Started";
185 
186  if (status == jsDone)
187  return "Done";
188 
189  if (status == jsExpanded)
190  return "Expanded";
191 
192  return "";
193 } /* JobStatusToStr */
194 
195 
196 
197 
198 KKJob::JobStatus KKJob::JobStatusFromStr (const KKStr& statusStr)
199 {
200  if (statusStr.CompareIgnoreCase ("OPEN") == 0)
201  return jsOpen;
202 
203  if (statusStr.CompareIgnoreCase ("STARTED") == 0)
204  return jsStarted;
205 
206  if (statusStr.CompareIgnoreCase ("DONE") == 0)
207  return jsDone;
208 
209  if (statusStr.CompareIgnoreCase ("Expanded") == 0)
210  return jsExpanded;
211 
212  return jsNULL;
213 } /* JobStatusToStr */
214 
215 
216 
217 
219 {
220  return JobStatusToStr (status);
221 } /* StatusStr */
222 
223 
224 
226 {
227  if (prerequisites.size () < 1)
228  return "None";
229 
230  KKStr s (5 + prerequisites.size () * 5);
231  for (kkuint32 x = 0; x < prerequisites.size (); ++x)
232  {
233  if (x > 0)
234  s << ",";
235  s << prerequisites[x];
236  }
237  return s;
238 } /* PrerequisitesToStr */
239 
240 
241 
242 
244 {
245  prerequisites.clear ();
246 
247  if (s.CompareIgnoreCase ("None") != 0)
248  {
249  VectorKKStr fields = s.Split (',');
250  for (kkuint32 x = 0; x < fields.size (); ++x)
251  {
252  kkint32 p = fields[x].ToInt ();
253  prerequisites.push_back (p);
254  }
255  }
256 } /* PrerequisitesFromStr */
257 
258 
259 
260 
262 {
263  KKStr statusStr (200); // Preallocating 200 bytes.
264 
265  statusStr << "JobId" << "\t" << jobId << "\t"
266  << "ParentId" << "\t" << parentId << "\t"
267  << "Status" << "\t" << StatusStr () << "\t"
268  << "NumProcessors" << "\t" << numProcessors << "\t"
269  << "NumPorcessesAllowed" << "\t" << numPorcessesAllowed << "\t"
270  << "Prerequisites" << "\t" << PrerequisitesToStr ();
271 
272  return statusStr;
273 } /* ToStatusStr */
274 
275 
276 
277 
278 void KKJob::ProcessStatusStr (const KKStr& statusStr)
279 {
280  log.Level (30) << "KKJob::ProcessStatusStr[" << statusStr << "]" << endl;
281  KKStr fieldName;
282  KKStr fieldValue;
283 
284  VectorKKStr fields = statusStr.Split ('\t');
285  kkuint32 fieldNum = 0;
286 
287  while (fieldNum < fields.size ())
288  {
289  fieldName = fields[fieldNum];
290  fieldNum++;
291  if (fieldNum < fields.size ())
292  {
293  fieldValue = fields[fieldNum];
294  fieldNum++;
295  }
296  else
297  {
298  fieldValue = "";
299  }
300 
301  fieldName.Upper ();
302  fieldValue.TrimLeft ("\n\r\t ");
303  fieldValue.TrimRight ("\n\r\t ");
304 
305  if (fieldName.CompareIgnoreCase ("JOBID") == 0)
306  jobId = atoi (fieldValue.Str ());
307 
308  else if (fieldName.CompareIgnoreCase ("PARENTID") == 0)
309  parentId = atoi (fieldValue.Str ());
310 
311  else if (fieldName.CompareIgnoreCase ("STATUS") == 0)
312  status = JobStatusFromStr (fieldValue);
313 
314  else if (fieldName.CompareIgnoreCase ("NumProcessors") == 0)
315  numProcessors = fieldValue.ToInt ();
316 
317  else if (fieldName.CompareIgnoreCase ("NumPorcessesAllowed") == 0)
318  numPorcessesAllowed = fieldValue.ToInt ();
319 
320  else if (fieldName.CompareIgnoreCase ("Prerequisites") == 0)
321  PrerequisitesFromStr (fieldValue);
322 
323  else
324  {
325  ProcessStatusField (fieldName, fieldValue);
326  }
327  }
328 } /* ProcessStatusStr */
329 
330 
331 
332 
333 
334 void KKJob::ProcessStatusField (const KKStr& fieldName,
335  const KKStr& fieldValue
336  )
337 {
338  log.Level (-1) << "KKJob::ProcessStatusField Invalid Field Name[" << fieldName << "]." << endl;
339 } /* ProcessStatusField */
340 
341 
342 
343 
344 void KKJob::CompletedJobDataWrite (ostream& o)
345 {
346 }
347 
348 
349 
350 // Works with 'WriteCompletedJobData'; You use this to load in data written by 'WriteCompletedJobData'
351 void KKJob::CompletedJobDataRead (istream& i)
352 {
353 }
354 
355 
356 
357 
358 
360 
361  KKQueue<KKJob> (true),
362  log (_manager->Log ()),
363  manager (_manager)
364 
365 {
366 }
367 
368 
369 
371  KKQueue<KKJob> (jobs.Owner ()),
372  jobIdLookUpTable (),
374  log (jobs.log),
375  manager (jobs.manager)
376 
377 {
378  KKJobList::const_iterator idx;
379  for (idx = jobs.begin (); idx != jobs.end (); idx++)
380  {
381  KKJobPtr j = *idx;
382  if (Owner ())
383  PushOnBack (j->Duplicate ());
384  else
385  PushOnBack (j);
386  }
387 }
388 
389 
390 
391 KKJobPtr KKJobList::LookUpByJobId (kkint32 jobId)
392 {
393  jobIdLookUpTableIdx = jobIdLookUpTable.find (jobId);
394  if (jobIdLookUpTableIdx == jobIdLookUpTable.end ())
395  return NULL;
396 
397  return jobIdLookUpTableIdx->second;
398 } /* LookUpByJobId */
399 
400 
401 
402 
403 
404 bool KKJobList::AllPrequisitesDone (KKJobPtr job)
405 {
406  const VectorInt& p = job->Prerequisites ();
407  for (kkuint32 z = 0; z < p.size (); ++z)
408  {
409  kkint32 jobId = p[z];
410  KKJobPtr pj = LookUpByJobId (jobId);
411  if ((pj->Status () != KKJob::jsDone) && (pj->Status () != KKJob::jsExpanded))
412  return false;
413  }
414  return true;
415 } /* AllPrequisitesDone */
416 
417 
418 
419 
421 {
422  kkint32 x;
423  for (x = 0; x < QueueSize (); x++)
424  {
425  KKJobPtr j = IdxToPtr (x);
426  if (j->Status () == KKJob::jsOpen)
427  return j;
428  }
429 
430  return NULL;
431 } /* LocateOpenJob */
432 
433 
434 
435 
437 {
438  kkint32 x;
439  for (x = 0; x < QueueSize (); ++x)
440  {
441  KKJobPtr j = IdxToPtr (x);
443  return false;
444  }
445 
446  return true;
447 } /* AreAllJobsAreDone */
448 
449 
450 
451 
452 
453 
454 
455 
456 void KKJobList::PushOnBack (KKJobPtr j)
457 {
458  jobIdLookUpTableIdx = jobIdLookUpTable.find (j->JobId ());
459  if (jobIdLookUpTableIdx != jobIdLookUpTable.end ())
460  {
461  log.Level (-1) << endl
462  << endl
463  << "KKJobList::PushOnBack ***ERROR***" << endl
464  << endl
465  << "KKJobList::PushOnBack Duplicate JobId[" << j->JobId () << "]" << endl
466  << endl;
467  osWaitForEnter ();
468  exit (-1);
469  }
470 
471  KKQueue<KKJob>::PushOnBack (j);
472  jobIdLookUpTable.insert (JobIdLookUpTablePair (j->JobId (), j));
473 
474  return;
475 } /* PushOnBack */
476 
477 
478 
479 
480 
481 
483 {
484  KKJobList::iterator idx;
485  for (idx = begin (); idx != end (); ++idx)
486  {
487  KKJobPtr j = *idx;
488  if (j->Status () == KKJob::jsStarted)
489  return true;
490  }
491  return false;
492 } /* JobsStillRunning */
KKStr(kkint32 size)
Creates a KKStr object that pre-allocates space for &#39;size&#39; characters.
Definition: KKStr.cpp:655
JobManagerPtr manager
Definition: KKJob.h:211
KKJob * KKJobPtr
Definition: KKJob.h:75
kkint32 numPorcessesAllowed
Definition: KKJob.h:216
static KKStr JobStatusToStr(JobStatus status)
Definition: KKJob.cpp:178
bool AllPrequisitesDone(KKJobPtr job)
Definition: KKJob.cpp:404
virtual void ProcessNode()
Definition: KKJob.cpp:119
__int32 kkint32
Definition: KKBaseTypes.h:88
void PrerequisitesFromStr(const KKStr &s)
Definition: KKJob.cpp:243
KKJobList(JobManagerPtr _manager)
Definition: KKJob.cpp:359
const VectorInt & Prerequisites() const
Definition: KKJob.h:121
std::vector< int > VectorInt
Definition: KKBaseTypes.h:138
KKJobManager * JobManagerPtr
Definition: KKJob.h:63
static void RegisterConstructor(const KKStr &_name, ConstructorPtr _constructor)
Definition: KKJob.cpp:135
KKStr StatusStr() const
Definition: KKJob.cpp:218
virtual KKJobPtr Duplicate() const
Definition: KKJob.cpp:112
static JobStatus JobStatusFromStr(const KKStr &statusStr)
Definition: KKJob.cpp:198
void AddPrerequisites(VectorInt _prerequisites)
virtual void CompletedJobDataWrite(ostream &o)
Write out completed job results to status file.
Definition: KKJob.cpp:344
void ProcessStatusStr(const KKStr &statusStr)
Definition: KKJob.cpp:278
unsigned __int32 kkuint32
Definition: KKBaseTypes.h:89
static KKJobPtr CallAppropriateConstructor(JobManagerPtr _manager, const KKStr &_jobTypeName, const KKStr &_statusStr)
Definition: KKJob.cpp:145
KKTHread * KKTHreadPtr
virtual void CompletedJobDataRead(istream &i)
Works with &#39;WriteCompletedJobData&#39;; You use this to load in data written by &#39;WriteCompletedJobData&#39;.
Definition: KKJob.cpp:351
KKJob(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, RunLog &_log)
JobStatus status
Definition: KKJob.h:224
virtual void ProcessStatusField(const KKStr &fieldName, const KKStr &fieldValue)
Imjpelmentation specific field processing.
Definition: KKJob.cpp:334
KKJobPtr LocateOpenJob()
Definition: KKJob.cpp:420
static KKStr Concat(const std::vector< std::string > &values)
Concatenates the list of &#39;std::string&#39; strings.
Definition: KKStr.cpp:1082
kkint32 parentId
Definition: KKJob.h:210
KKStr PrerequisitesToStr() const
Definition: KKJob.cpp:225
JobStatus Status() const
Definition: KKJob.h:123
virtual void ReFresh(KKJob &j)
Definition: KKJob.cpp:163
void PushOnBack(KKJobPtr j)
Definition: KKJob.cpp:456
virtual const char * JobType() const
Definition: KKJob.cpp:104
VectorInt prerequisites
Definition: KKJob.h:220
KKException(const char *_exceptionStr)
Definition: KKException.cpp:38
Used for logging messages.
Definition: RunLog.h:49
KKJob(const KKJob &j)
Definition: KKJob.cpp:42
VectorKKStr Split(char del) const
Splits the string up into tokens using &#39;del&#39; as the separator returning them in a vector...
Definition: KKStr.cpp:3500
A framework for managing a large number of processes(Jobs) in a multi-cpu/ multi-o/s environment...
Definition: KKJob.h:56
KKJob(JobManagerPtr _manager)
Definition: KKJob.cpp:82
kkint32 numProcessors
Definition: KKJob.h:212
RunLog & Log()
Definition: KKJob.h:122
kkint32 CompareIgnoreCase(const char *s2) const
summary>Compares to Strings and returns -1, 0, or 1, indicating if less than, equal, or greater.
Definition: KKStr.cpp:955
KKJobList(const KKJobList &jobs)
Definition: KKJob.cpp:370
virtual KKStr ToStatusStr()
Definition: KKJob.cpp:261
static ConstructorIndex registeredConstructors
Definition: KKJob.h:100
KKJobPtr LookUpByJobId(kkint32 jobId)
Definition: KKJob.cpp:391