KSquare Utilities
KKJobManager.cpp
Go to the documentation of this file.
1 #include "FirstIncludes.h"
2 
3 #include <fcntl.h>
4 #include <math.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <cmath>
8 #include <fstream>
9 #include <iomanip>
10 #include <iostream>
11 #include <map>
12 #include <string>
13 #include <strstream>
14 #include <vector>
15 
16 #include "MemoryDebug.h"
17 
18 using namespace std;
19 
20 #include <sys/types.h>
21 #ifdef WIN32
22 #include <io.h>
23 #include <windows.h>
24 #else
25 //#include <sys/loadavg.h>
26 #include <unistd.h>
27 #endif
28 
29 
30 #include "KKBaseTypes.h"
31 #include "KKQueue.h"
32 #include "OSservices.h"
33 #include "RunLog.h"
34 #include "KKStr.h"
35 using namespace KKB;
36 
37 
38 
39 #include "KKJobManager.h"
40 using namespace KKJobManagment;
41 
42 
44  KKJob (j),
45 
46  cpuTimeLastReported (j.cpuTimeLastReported),
47  cpuTimeTotalUsed (j.cpuTimeTotalUsed),
48  dateTimeStarted (j.dateTimeStarted),
49  dateTimeEnded (j.dateTimeEnded),
50  dateTimeFirstOneFound (j.dateTimeFirstOneFound),
51  jobs (NULL),
52 
53  blockLevel (j.blockLevel),
54  lockFile (j.lockFile),
55  lockFileName (j.lockFileName),
56  lockFileOpened (j.lockFileOpened),
57  managerName (j.managerName),
58  expansionCount (j.expansionCount),
59  expansionFirstJobId (j.expansionFirstJobId),
60  nextJobId (j.nextJobId),
61  numJobsAtATime (j.numJobsAtATime),
62  procId (j.procId),
63  quitRunning (j.quitRunning),
64  restart (j.restart),
65  statusFileName (j.statusFileName),
66  statusFileNextByte (j.statusFileNextByte),
67  supportCompletedJobData (j.supportCompletedJobData)
68 
69 {
70  jobs = new KKJobList (*j.jobs);
71 }
72 
73 
74 
75 
76 
77 // Make sure the the _summaryResultsFileName is deleted before we start processing.
78 KKJobManager::KKJobManager (JobManagerPtr _manager, // Ptr to job that is managing this 'KKJobManager'
79  kkint32 _jobId,
80  kkint32 _parentId,
81  kkint32 _numPorcessesAllowed,
82  const KKStr& _managerName, // Name of this 'KKJobManager' ; sttaus and lock file will be based on it.
83  kkint32 _numJobsAtATime, // The number of jobs that can be allocatd at one time for a single process to execute.
84  RunLog& _log
85  ):
86  KKJob (_manager, _jobId, _parentId, _numPorcessesAllowed, _log),
87 
88  blockLevel (0),
89  cpuTimeLastReported (0.0),
90  cpuTimeTotalUsed (0.0),
91  dateTimeStarted (),
92  dateTimeEnded (),
93  dateTimeFirstOneFound (false),
94  expansionCount (0),
95  expansionFirstJobId (0),
96  jobs (NULL),
97  lockFile (-1),
98  lockFileName (),
99  lockFileOpened (false),
100  managerName (_managerName),
101  nextJobId (0),
102  numJobsAtATime (_numJobsAtATime),
103  procId (-1),
104  quitRunning (false),
105  restart (false),
106  statusFileName (),
107  statusFileNextByte (0),
108  supportCompletedJobData (false)
109 
110 {
111  log.Level (10) << "KKJobManager::KKJobManager ManagerId[" << JobId () << "]" << endl;
112  procId = osGetProcessId ();
113 
114  lockFileName = osGetRootName (managerName) + ".Lock";
115  statusFileName = osGetRootName (managerName) + ".Status";
116 
117  jobs = new KKJobList (this);
118 
119  log.Level (10) << "KKJobManager::KKJobManager Exiting" << endl;
120 }
121 
122 
123 
124 
125 
127 {
128  delete jobs;
129  //EndBlock ();
130 }
131 
132 
133 
134 void KKJobManager::InitilizeJobManager (bool& successful)
135 {
136  successful = true;
137  Block ();
138 
139  cpuTimeLastReported = osGetSystemTimeUsed ();
140 
141  if (!osFileExists (statusFileName))
142  {
143  try
144  {
145  LoadRunTimeData ();
146  }
147  catch (KKStr errorMsg)
148  {
149  log.Level (-1) << endl << endl
150  << "KKJobManager::InitilizeJobManager ***ERROR*** Load of 'LoadRunTimeData' Failed." << endl
151  << endl
152  << errorMsg << endl
153  << endl;
154  successful = false;
155  return;
156  }
157  StatusFileInitialize ();
158  }
159  else
160  {
161  StatusFileLoad ();
162  try
163  {
164  LoadRunTimeData ();
165  }
166  catch (KKStr errorMsg)
167  {
168  log.Level (-1) << endl << endl
169  << "KKJobManager::InitilizeJobManager ***ERROR*** Load of 'LoadRunTimeData' Failed." << endl
170  << endl
171  << errorMsg << endl
172  << endl;
173  successful = false;
174  return;
175  }
176  }
177 
178  EndBlock ();
179 } /* InitilizeJobManager */
180 
181 
182 
183 
184 const char* KKJobManager::JobType () const
185 {
186  return "KKJobManager";
187 }
188 
189 
190 
192 {
193  blockLevel++;
194  log.Level (20) << "KKJobManager::Block blockLevel[" << blockLevel << "]" << endl;
195  if (blockLevel > 1)
196  {
197  // We already have a Block established; so there is no need to establish it now.
198  return;
199  }
200 
201 
202  if (lockFileOpened)
203  {
204  // We have out Lock and EndLock calls out of order.
205  log.Level (-1) << endl << endl
206  << "KKJobManager::Block *** WE ALREADY HAVE A BLOCK ESTABLISHED ***." << endl
207  << endl;
208  return;
209  }
210 
211  kkint32 count = 0;
212 
213  do {
214  lockFile = open (lockFileName.Str (), O_WRONLY | O_CREAT | O_EXCL);
215  if (lockFile < 0)
216  {
217  count++;
218  float zed = (float)((procId + rand ()) % 10) + 2.0f;
219  log.Level (10) << "KKJobManager::Block - We are locked out[" << count << "] for [" << zed << "] secs." << endl;
220  osSleep (zed);
221  }
222  } while (lockFile < 0);
223 
224  if (count > 0)
225  log.Level (10) << "KKJobManager::Block Lock has been established." << endl;
226 
227  lockFileOpened = true;
228 
229  log.Level (20) << "KKJobManager::Block - Lock is Established." << endl;
230 } /* Block */
231 
232 
233 
234 
236 {
237  blockLevel--;
238 
239  #ifndef WIN32
240  kkint32 returnCd;
241  #endif
242 
243 
244  log.Level (20) << "KKJobManager::EndBlock - Ending Block blockLevel[" << blockLevel << "]" << endl;
245 
246  if (blockLevel > 0)
247  {
248  // We are still nested in Block Levels. must reach 'blockLevel == 0' before we can UnBlock.
249  return;
250  }
251 
252  if (!lockFileOpened)
253  {
254  log.Level (-1) << endl << endl << endl
255  << "KKJobManager::EndBlock *** Lock file is not opened ***" << endl;
256  return;
257  }
258 
259  close (lockFile);
260  lockFileOpened = false;
261 
262  #ifdef WIN32
263  if (!DeleteFile (lockFileName.Str ()))
264  {
265  DWORD fileAttributes = GetFileAttributes (lockFileName.Str ());
266  fileAttributes = FILE_ATTRIBUTE_NORMAL;
267  if (!SetFileAttributes (lockFileName.Str (), fileAttributes))
268  {
269  DWORD lastErrorNum = GetLastError ();
270  log.Level (-1) << "KKJobManager::EndBlock - *** ERROR *** Could not set Lock File to Normal." << endl;
271  }
272  else
273  {
274  if (!DeleteFile (lockFileName.Str ()))
275  {
276  DWORD lastErrorNum = GetLastError ();
277  log.Level (-1) << "KKJobManager::EndBlock - Error[" << (kkuint32)lastErrorNum << "] deleting Lock File." << endl;
278  }
279  }
280  }
281  #else
282  returnCd = unlink (lockFileName.Str ());
283  #endif
284 
285  log.Level (20) << "EndBlock - Unlocking" << endl;
286  return;
287 } /* EndBlock */
288 
289 
290 
291 
293 {
294  kkint32 jobIdToReturn = nextJobId;
295  nextJobId++;
296  return jobIdToReturn;
297 }
298 
299 
300 
301 
303 {
304  kkint32 expandedJobId = statusLineStr.ExtractTokenInt ("\t");
305  KKJobPtr j = jobs->LookUpByJobId (expandedJobId);
306  if (!j)
307  {
308  log.Level (-1) << endl << endl << endl
309  << "ProcessStatusLineJobStatusChange ***Error*** Could not locate Expanded" << endl
310  << endl
311  << " JobId[" << expandedJobId << "]" << endl
312  << endl;
313  EndBlock ();
315  exit (-1);
316  }
317 
318  KKStr statusStr = statusLineStr.ExtractToken2 ("\t");
319  statusStr.TrimLeft ();
320  statusStr.TrimRight ();
321  KKJob::JobStatus status = KKJob::JobStatusFromStr (statusStr);
322  if (status == jsNULL)
323  {
324  log.Level (-1) << endl << endl << endl
325  << "ProcessStatusLineJobStatusChange ***Error*** Invalid Status Specified" << endl
326  << endl
327  << " JobId[" << expandedJobId << "]" << endl
328  << " Status[" << statusStr << "]" << endl
329  << endl;
330  EndBlock ();
332  exit (-1);
333  }
334 
335  j->Status (status);
336 } /* ProcessStatusLineJobStatusChange */
337 
338 
339 
340 
342  istream& statusFile
343  )
344 {
345  if (ln.SubStrPart (0, 1) == "//")
346  {
347  // A coment line; we can ignore it.
348  return;
349  }
350 
351  KKStr statusStr (ln);
352  KKStr fieldName = statusStr.ExtractToken2 ("\t");
353  if (fieldName.Empty ())
354  {
355  // A empty line we will ignore it.
356  return;
357  }
358 
359  statusStr.TrimLeft ("\n\r\t ");
360  statusStr.TrimRight ("\n\r\t ");
361 
362  if (fieldName.CompareIgnoreCase ("JOB") == 0)
363  {
364  // We have a KKJob entr line; the next field determines JobType fllowed by parameters for that JobType constructor.
365  KKStr jobTypeName = fieldName = statusStr.ExtractToken2 ("\t");
366 
367  KKJobPtr j = KKJob::CallAppropriateConstructor (this, jobTypeName, statusStr);
368  KKJobPtr existingJob = jobs->LookUpByJobId (j->JobId ());
369  if (existingJob)
370  {
371  existingJob->ReFresh (*j);
372  delete j; j = NULL;
373  }
374  else
375  {
376  jobs->PushOnBack (j);
377  }
378  }
379 
380  else if (fieldName.EqualIgnoreCase ("CPUTIMEUSED"))
381  {
382  double cpuTimeUsed = statusStr.ExtractTokenDouble ("\t");
383  cpuTimeTotalUsed += cpuTimeUsed;
384  }
385 
386  else if (fieldName.EqualIgnoreCase ("CURRENTDATETIME"))
387  {
388  KKB::DateTime dateTime = KKB::DateTime (statusStr);
389  if (!dateTimeFirstOneFound)
390  {
391  dateTimeFirstOneFound = true;
392  dateTimeStarted = dateTime;
393  }
394  dateTimeEnded = dateTime;
395  }
396 
397  else if (fieldName.EqualIgnoreCase ("ExpansionCount"))
398  expansionCount = statusStr.ToInt ();
399 
400  else if (fieldName.EqualIgnoreCase ("ExpansionFirstJobId"))
401  expansionFirstJobId = statusStr.ToInt ();
402 
403  else if (fieldName.EqualIgnoreCase ("JobStatusChange"))
405 
406  else if (fieldName.EqualIgnoreCase ("NextJobId"))
407  nextJobId = statusStr.ExtractTokenInt ("\t");
408 
409  else if (fieldName.EqualIgnoreCase ("QuitRunning"))
410  quitRunning = true;
411 
412  else if (fieldName.EqualIgnoreCase ("Restart"))
413  restart = false;
414 
415  else if (fieldName.EqualIgnoreCase ("Status"))
416  status = KKJob::JobStatusFromStr (statusStr);
417 
418  else
419  {
420  log.Level (-1) << "KKJobManager::StatusFileProcessLine Invalid Field Name[" << fieldName << "]." << endl;
421  }
422 } /* StatusFileProcessLine */
423 
424 
425 
426 
427 void KKJobManager::StatusFileLoad ()
428 {
429  log.Level (10) << "KKJobManager::StatusFileLoad." << endl;
430 
431  statusFileNextByte = 0;
432  StatusFileRefresh ();
433 
434  log.Level (20) << "KKJobManager::StatusFileLoad Exiting." << endl;
435 } /* StatusFileLoad */
436 
437 
438 
439 void KKJobManager::StatusFileRefresh ()
440 {
441  // we only want to read in any new changes to the status file.
442  log.Level (10) << "KKJobManager::StatusFileRefresh statusFileName[" << statusFileName << "]" << endl;
443 
444 
445  ifstream* statusFile = new ifstream (statusFileName.Str ());
446  if (!statusFile->is_open ())
447  {
448  log.Level (-1) << endl
449  << "KKJobManager::LoadCurrentStatusFile ***ERROR*** Can not open Status File["
450  << statusFileName << "]."
451  << endl;
452  EndBlock ();
453  delete statusFile; statusFile = NULL;
454  osWaitForEnter ();
455  exit (-1);
456  }
457 
458  char buff[20480];
459  KKStr statusStr (512);
460 
461  if (statusFileNextByte >= 0)
462  statusFile->seekg (statusFileNextByte);
463 
464  //fseek (statusFile, statusFileNextByte, SEEK_SET);
465 
466  statusFile->getline (buff, sizeof (buff));
467  while (!statusFile->eof ())
468  {
469  statusStr = buff;
470  if (statusStr.SubStrPart (0, 4) == "<KKJob ")
471  {
472  ProcessJobXmlBlockOfText (statusStr, *statusFile);
473  }
474  else
475  {
476  StatusFileProcessLine (statusStr, *statusFile);
477  }
478 
479  long zed = statusFile->tellg ();
480  if (zed >= 0)
481  statusFileNextByte = zed;
482 
483  if (!statusFile->eof ())
484  statusFile->getline (buff, sizeof (buff));
485  }
486 
487  statusFile->close ();
488  delete statusFile;
489  statusFile = NULL;
490 
491  log.Level (20) << "KKJobManager::StatusFileRefresh Exiting." << endl;
492 } /* StatusFileRefresh */
493 
494 
495 
496 
497 void KKJobManager::ProcessJobXmlBlockOfText (const KKStr& startStr,
498  istream& i
499  )
500 {
501  if ((startStr.SubStrPart (0, 4) != "<KKJob ") || (startStr.LastChar () != '>'))
502  {
503  log.Level (-1) << endl
504  << "KKJobManager::ProcessJobXmlBlockOfText ***ERROR*** StartStr[" << startStr << "] is not a KKJob String." << endl
505  << endl;
506  return;
507  }
508 
509  KKStr s = startStr.SubStrPart (5);
510  s.TrimLeft ();
511  s.ChopLastChar ();
512 
513  KKStr jobTypeStr = "";
514  kkint32 jobId = -1;
515 
516 
517  VectorKKStr parameters = s.Split (',');
518  for (kkuint32 x = 0; x < parameters.size (); ++x)
519  {
520  KKStr parameterStr = parameters[x];
521  parameterStr.TrimLeft ();
522  parameterStr.TrimRight ();
523 
524  KKStr fieldName = parameterStr.ExtractToken2 ("=");
525  fieldName.TrimLeft (); fieldName.TrimRight ();
526 
527  KKStr fieldValue = parameterStr.ExtractToken2 ("=");
528  fieldValue.TrimLeft (); fieldValue.TrimRight ();
529 
530  if (fieldName.CompareIgnoreCase ("JobType") == 0)
531  jobTypeStr = fieldValue;
532 
533  else if (fieldName.CompareIgnoreCase ("JobId") == 0)
534  jobId = fieldValue.ToInt ();
535  }
536 
537 
538  if (jobTypeStr.Empty () || (jobId < 0))
539  {
540  log.Level (-1) << endl
541  << "KKJobManager::ProcessJobXmlBlockOfText ***ERROR*** StartStr[" << startStr << "]." << endl
542  << " JobType and/or JobId were not provided." << endl
543  << endl;
544  return;
545  }
546 
547 
548  KKJobPtr j = jobs->LookUpByJobId (jobId);
549  if (j == NULL)
550  {
551  // We do not have this job in memory yet. We will have to create it now.
552  KKStr emptyStatusStr = "JobId\t" + StrFormatInt (jobId, "ZZZZ0");
553  j = KKJob::CallAppropriateConstructor (this, jobTypeStr, emptyStatusStr);
554  }
555 
556 
558 } /* ProcessJobXmlBlockOfText */
559 
560 
561 
562 
563 
564 
565 
566 
567 ofstream* KKJobManager::StatusFileOpen (ios::openmode openMode)
568 {
569  log.Level (20) << "KKJobManager::StatusFileOpen." << endl;
570 
571  kkint32 openAttempts = 0;
572 
573  ofstream* statusFile = new ofstream (statusFileName.Str (), openMode);
574 
575  while ((!statusFile->is_open ()) && (openAttempts < 4))
576  {
577  openAttempts++;
578  osSleep (2.0f);
579  log.Level (10) << "StatusFileOpen Open Attempt[" << openAttempts << "]." << endl;
580  statusFile = new ofstream (statusFileName.Str (), openMode);
581  if (!statusFile->is_open ())
582  {
583  log.Level (0) << "StatusFileOpen - *** WARNING *** Can Not Open Status File["
584  << statusFileName.Str () << "] -Will Retry-."
585  << endl;
586  }
587  }
588 
589  if (!statusFile->is_open ())
590  {
591  log.Level (-1) << "StatusFileOpen ***ERROR*** Can not Create Status File[" << statusFileName.Str () << "]." << endl;
592  EndBlock ();
593  osWaitForEnter ();
594  exit (-1);
595  }
596 
597  log.Level (20) << "KKJobManager::StatusFileOpen Exiting" << endl;
598 
599  return statusFile;
600 } /* StatusFileOpen */
601 
602 
603 
604 
605 void KKJobManager::StatusFileWrite ()
606 {
607  log.Level (10) << "KKJobManager::StatusFileWrite" << endl;
608 
609  ofstream* statusFile = StatusFileOpen (ios::out);
610 
611  *statusFile << "// Date/Time [" << osGetLocalDateTime () << "]." << endl
612  << "//" << endl
613  << endl;
614 
615  *statusFile << "Status" << "\t" << KKJob::JobStatusToStr (status) << endl
616  << "NextJobId" << "\t" << nextJobId << endl
617  << "CurrentDateTime" << "\t" << osGetLocalDateTime () << endl
618  << "ExpansionCount" << "\t" << expansionCount << endl
619  << "ExpansionFirstJobId" << "\t" << expansionFirstJobId << endl
620  << endl;
621 
622  kkint32 x;
623  for (x = 0; x < jobs->QueueSize (); x++)
624  {
625  KKJobPtr j = jobs->IdxToPtr (x);
626  *statusFile << "KKJob" << "\t" << j->JobType () << "\t" << j->ToStatusStr () << endl;
627  }
628 
629  statusFile->flush ();
630  statusFile->close ();
631  delete statusFile;
632 
633  log.Level (10) << "KKJobManager::StatusFileWrite Exiting" << endl;
634 } /* StatusFileWrite */
635 
636 
637 
638 
639 
640 void KKJobManager::ReportCpuTimeUsed (ofstream* statusFile)
641 {
642  // While we have the status file open lets report CPU time used so far
643  double currentCpuTime = osGetSystemTimeUsed ();
644  double cpuTimeUsed = currentCpuTime - cpuTimeLastReported;
645  cpuTimeLastReported = currentCpuTime;
646  *statusFile << "CpuTimeUsed" << "\t" << cpuTimeUsed << "\t"
647  << "ProcId" << "\t" << procId << "\t"
648  << endl
649  << "CurrentDateTime" << "\t" << osGetLocalDateTime () << endl;
650 } /* ReportCpuTimeUsed */
651 
652 
653 
654 
655 
656 
657 void KKJobManager::StatusFileInitialize ()
658 {
659  log.Level (10) << "KKJobManager::InializeStatusFile" << endl;
660 
661  delete jobs;
662  jobs = new KKJobList (this);
663 
664  ofstream* statusFile = StatusFileOpen (ios::out);
665 
666  *statusFile << "// Date/Time [" << osGetLocalDateTime () << "]." << endl
667  << "//" << endl
668  << "//" << endl;
669 
670 
671  *statusFile << "Status" << "\t" << KKJob::JobStatusToStr (status) << endl
672  << "NextJobId" << "\t" << nextJobId << endl
673  << "CurrentDateTime" << "\t" << osGetLocalDateTime () << endl
674  << "ExpansionCount" << "\t" << expansionCount << endl
675  << "ExpansionFirstJobId" << "\t" << expansionFirstJobId << endl;
676 
677 
678  StatusFileInitialize (*statusFile); // Have derived classes do there initilization.
679 
680  delete jobs; jobs = NULL;
681  jobs = JobsCreateInitialSet ();
682 
683  if (jobs)
684  {
685  KKJobList::iterator idx;
686  for (idx = jobs->begin (); idx != jobs->end (); idx++)
687  {
688  KKJobPtr j = *idx;
689  *statusFile << "KKJob" << "\t" << j->JobType () << "\t" << j->ToStatusStr () << endl;
690  }
691  }
692  else
693  {
694  jobs = new KKJobList (this);
695  }
696 
697  statusFile->flush ();
698  statusFile->close ();
699  delete statusFile;
700 
701  log.Level (10) << "KKJobManager::InializeStatusFile Exiting" << endl;
702 } /* StatusFileInitialize */
703 
704 
705 
706 
707 
708 
710 {
711  kkint32 jobId = nextJobId;
712  nextJobId++;
713  return jobId;
714 }
715 
716 
717 
718 
720 {
721  status = p->status;
722 }
723 
724 
725 
726 
727 
729 {
730  KKStr statusStr (200); // PreAllocate 200 bytes
731 
732  statusStr << KKJob::ToStatusStr () << "\t"
733  << "Status" << "\t" << KKJob::JobStatusToStr (status) << "\t"
734  << "NextJobId" << "\t" << nextJobId << "\t"
735  << "ExpansionCount" << "\t" << expansionCount << "\t"
736  << "ExpansionFirstJobId" << "\t" << expansionFirstJobId;
737 
738  return statusStr;
739 } /* ToStatusStr */
740 
741 
742 
743 
744 
745 
746 
747 
748 void KKJobManager::ProcessRestart ()
749 {
750  log.Level (10) << "ProcessRestart" << endl;
751 
752  Block ();
753 
754  StatusFileLoad ();
755 
756  if (this->Status () != jsDone)
757  {
758  ofstream* statusFile = StatusFileOpen (ios::app);
759 
761  *statusFile << "ReStart" << endl;
762  *statusFile << "Status" << "\t" << StatusStr () << endl;
763 
764  KKJobList::iterator idx;
765  for (idx = jobs->begin (); idx != jobs->end (); idx++)
766  {
767  KKJobPtr j = *idx;
768  if (j->Status () == jsStarted)
769  {
771  *statusFile << "JobStatusChange" << "\t" << j->JobId () << "\t" << j->StatusStr () << endl;
772  }
773  }
774 
775  statusFile->flush ();
776  statusFile->close ();
777  delete statusFile;
778  }
779 
780  EndBlock ();
781 } /* ProcessRestart */
782 
783 
784 
786 {
787  Block ();
788  ofstream* statusFile = StatusFileOpen (ios::app);
789 
790  *statusFile << "QuitRunning" << endl;
791  quitRunning = true;
792 
793  statusFile->flush ();
794  statusFile->close ();
795  delete statusFile; statusFile = NULL;
796 
797  EndBlock ();
798 } /* SetQuitRunningFlag */
799 
800 
801 
802 
803 void KKJobManager::ProcessNextExpansion (ostream& o)
804 {
805  KKJobListPtr jobsJustCompleted = new KKJobList (this);
806  jobsJustCompleted->Owner (false);
807  {
808  // Add jobs completed since last expansion to this list.
809  KKJobList::iterator idx;
810  for (idx = jobs->begin (); idx != jobs->end (); idx++)
811  {
812  KKJobPtr j = *idx;
813  if (j->JobId () >= expansionFirstJobId)
814  jobsJustCompleted->PushOnBack (j);
815  }
816  }
817 
818  // Derived class will now peform expansion.
819  KKJobListPtr expandedJobs = JobsExpandNextSetOfJobs (jobsJustCompleted);
820  {
821  if (expandedJobs)
822  {
823  expandedJobs->Owner (false);
824  KKJobList::iterator idx;
825  for (idx = expandedJobs->begin (); idx != expandedJobs->end (); idx++)
826  {
827  KKJobPtr j = *idx;
828  if (j->JobId () < expansionFirstJobId)
829  expansionFirstJobId = j->JobId ();
830 
831  jobs->PushOnBack (j);
832  o << "KKJob" << "\t" << j->JobType () << "\t" << j->ToStatusStr () << endl;
833  }
834  }
835  delete expandedJobs;
836  expandedJobs = NULL;
837  }
838 
839  delete jobsJustCompleted;
840  jobsJustCompleted = NULL;
841 
842  o << "NextJobId" << "\t" << nextJobId << endl
843  << "ExpansionCount" << "\t" << expansionCount << endl
844  << "ExpansionFirstJobId" << "\t" << expansionFirstJobId << endl;
845 
846 } /* ProcessNextExpansion */
847 
848 
849 
850 
851 KKJobListPtr KKJobManager::GetNextSetOfJobs (KKJobListPtr completedJobs)
852 {
853  log.Level (20) << "KKJobManager::GetNextSetOfJobs." << endl;
854 
855  Block ();
856 
857  StatusFileRefresh ();
858 
859  if (completedJobs)
860  {
861  // We will first write out results of jobs that have been completed,
862  ofstream* statusFile = StatusFileOpen (ios::app);
863 
864  KKJobList::iterator idx;
865  for (idx = completedJobs->begin (); idx != completedJobs->end (); idx++)
866  {
867  KKJobPtr j = *idx;
868 
869  *statusFile << "KKJob" << "\t" << j->JobType () << "\t" << j->ToStatusStr () << endl;
870  if (supportCompletedJobData)
871  {
872  *statusFile << "<KKJob JobType=" << j->JobType () << ", " << "JobId=" << j->JobId () << ">" << endl;
873  j->CompletedJobDataWrite (*statusFile);
874  *statusFile << "</job>" << endl;
875  }
876 
877  KKJobPtr existingJob = jobs->LookUpByJobId (j->JobId ());
878  if (existingJob)
879  {
880  existingJob->ReFresh (*j);
881  }
882  else
883  {
884  log.Level (-1) << endl << endl << endl
885  << "GetNextSetOfJobs *** ERROR ***" << endl
886  << endl
887  << " Could not locate KKJob[" << j->JobId () << "]" << endl
888  << endl;
889  return NULL;
890  }
891  }
892 
893  statusFile->close ();
894  delete statusFile; statusFile = NULL;
895  }
896 
897 
898  KKJobListPtr jobsToExecute = new KKJobList (this);
899  jobsToExecute->Owner (false);
900 
901  if (!quitRunning)
902  {
903  ofstream* statusFile = StatusFileOpen (ios::app);
904 
905  KKJobPtr nextJob = jobs->LocateOpenJob ();
906 
907  if (!nextJob)
908  {
909  if (jobs->AreAllJobsDone ())
910  {
911  // There are no jobs to do; we will have to expand some existing jobs then
912  ProcessNextExpansion (*statusFile);
913  nextJob = jobs->LocateOpenJob ();
914  }
915  else
916  {
917  // There are still some jobs that are running. We are going to go to
918  // for now and try again later.
919  //
920  // By leaving "nextJob = NULL" we will drop strait through the rest of
921  // this method and return to the caller with 'jobsToExecute' empty
922  // signaling that it will need to sleep for a while before calling
923  // us again.
924  }
925  }
926 
927  while (nextJob && (jobsToExecute->QueueSize () < numJobsAtATime))
928  {
929  jobsToExecute->PushOnBack (nextJob);
930  nextJob->Status (jsStarted);
931  *statusFile << "JobStatusChange" << "\t" << nextJob->JobId () << "\t" << nextJob->StatusStr () << endl;
932  nextJob = jobs->LocateOpenJob ();
933  }
934 
935  statusFile->close ();
936  }
937 
938  if (jobsToExecute->QueueSize () < 1)
939  {
940  delete jobsToExecute;
941  jobsToExecute = NULL;
942  }
943 
944  EndBlock ();
945 
946  log.Level (20) << "KKJobManager::GetNextSetOfJobs Exiting." << endl;
947 
948  return jobsToExecute;
949 } /* GetNextSetOfJobs */
950 
951 
952 
953 
955 {
956  if (jobs)
957  {
958  return jobs->AreAllJobsDone ();
959  }
960  else
961  {
962  // Will assume false for now.
963  return false;
964  }
965 } /* AreAllJobsAreDone */
966 
967 
968 
969 
971 {
972  log.Level (10) << "KKJobManager::Run." << endl;
973 
974  bool keepOnRunning = true;
975 
976  KKJobListPtr executedJobs = NULL;
977  KKJobListPtr jobsToExecute = GetNextSetOfJobs (NULL);
978 
979  while (keepOnRunning && (!quitRunning))
980  {
981  if (jobsToExecute)
982  {
983  delete executedJobs; executedJobs = NULL;
984 
985  executedJobs = new KKJobList (this);
986  executedJobs->Owner (true);
987 
988  KKJobList::iterator idx;
989  for (idx = jobsToExecute->begin (); idx != jobsToExecute->end (); idx++)
990  {
991  KKJobPtr j = *idx;
992  j->ProcessNode ();
993  executedJobs->PushOnBack (j->Duplicate ());
994  }
995  delete jobsToExecute; jobsToExecute = NULL;
996  }
997  else
998  {
999  if (!(jobs->JobsStillRunning ()))
1000  {
1001  keepOnRunning = false;
1002  }
1003  else
1004  {
1005  // We will sleep for a bit until there are more jobs to run
1006  log.Level (10) << "KKJobManager::Run No jobs avaialble to run; will sleep a bit." << endl;
1007  osSleep ((float)(30 + rand () % 10));
1008  }
1009  }
1010 
1011  if (keepOnRunning)
1012  jobsToExecute = GetNextSetOfJobs (executedJobs);
1013 
1014  delete executedJobs; executedJobs = NULL;
1015  }
1016 
1017 
1018  Block ();
1019  StatusFileRefresh ();
1020  if ((!quitRunning) && (status != KKJob::jsDone))
1021  {
1022  if (status != KKJob::jsDone)
1023  {
1024  GenerateFinalResultsReport ();
1025 
1027 
1028  ofstream* statusFile = StatusFileOpen (ios::app);
1029 
1030  *statusFile << "Status" << "\t" << StatusStr () << endl;
1031 
1032  ReportCpuTimeUsed (statusFile);
1033 
1034  statusFile->close ();
1035  delete statusFile;
1036  }
1037  }
1038  EndBlock ();
1039 
1040 
1041  log.Level (10) << "KKJobManager::Run Exiting." << endl;
1042 } /* Run */
KKStr(kkint32 size)
Creates a KKStr object that pre-allocates space for &#39;size&#39; characters.
Definition: KKStr.cpp:655
KKJob * KKJobPtr
Definition: KKJob.h:75
static KKStr JobStatusToStr(JobStatus status)
Definition: KKJob.cpp:178
bool EqualIgnoreCase(const char *s2) const
Definition: KKStr.cpp:1257
void InitilizeJobManager(bool &successful)
Initialize the KKJobManager object.
virtual void ProcessNode()
Definition: KKJob.cpp:119
__int32 kkint32
Definition: KKBaseTypes.h:88
virtual void StatusFileProcessLine(const KKStr &ln, istream &statusFile)
KKStr & TrimRight(const char *whiteSpaceChars="\n\r\t ")
Definition: KKStr.cpp:1695
KKStr ExtractToken2(const char *delStr="\n\t\r ")
Extract first Token from the string.
Definition: KKStr.cpp:3026
void osWaitForEnter()
void ChopLastChar()
Definition: KKStr.cpp:1668
KKJobList(JobManagerPtr _manager)
Definition: KKJob.cpp:359
DateTime(const KKStr &s)
Definition: DateTime.cpp:1043
kkint32 ToInt() const
Definition: KKStr.cpp:3565
bool operator==(const char *rtStr) const
Definition: KKStr.cpp:1588
void Status(JobStatus _status)
Definition: KKJob.h:127
virtual KKJobPtr Duplicate() const
Definition: KKJob.cpp:112
void Update(JobManagerPtr p)
static JobStatus JobStatusFromStr(const KKStr &statusStr)
Definition: KKJob.cpp:198
void AddPrerequisites(VectorInt _prerequisites)
KKStr operator+(const char *right) const
Definition: KKStr.cpp:3986
unsigned __int32 kkuint32
Definition: KKBaseTypes.h:89
void osSleep(float numOfSecs)
static KKJobPtr CallAppropriateConstructor(JobManagerPtr _manager, const KKStr &_jobTypeName, const KKStr &_statusStr)
Definition: KKJob.cpp:145
KKStr & operator=(KKStr &&src)
Definition: KKStr.cpp:1369
bool operator!=(const char *rtStr) const
Definition: KKStr.cpp:1596
virtual const char * JobType() const
virtual void StatusFileProcessLineJobStatusChange(KKStr &statusLineStr)
char LastChar() const
Definition: KKStr.cpp:2007
KKTHread * KKTHreadPtr
KKStr operator+(const char *left, const KKStr &right)
Definition: KKStr.cpp:3976
double ExtractTokenDouble(const char *delStr)
Definition: KKStr.cpp:3180
virtual void CompletedJobDataRead(istream &i)
Works with &#39;WriteCompletedJobData&#39;; You use this to load in data written by &#39;WriteCompletedJobData&#39;.
Definition: KKJob.cpp:351
KKStr(const KKStr &str)
Copy Constructor.
Definition: KKStr.cpp:561
KKJob(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, RunLog &_log)
kkint32 osGetProcessId()
JobStatus status
Definition: KKJob.h:224
void TrimLeft(const char *whiteSpaceChars="\n\r\t ")
Definition: KKStr.cpp:1745
bool Empty() const
Definition: KKStr.h:241
Responsable for keeping track of a list of jobs.
Definition: KKJobManager.h:29
KKStr SubStrPart(kkint32 firstChar, kkint32 lastChar) const
returns a SubString consisting of all characters starting at index &#39;firstChar&#39; and ending at &#39;lastInd...
Definition: KKStr.cpp:2802
KKJobPtr LocateOpenJob()
Definition: KKJob.cpp:420
kkint32 JobId() const
Definition: KKJob.h:118
KKJobManager(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, const KKStr &_managerName, kkint32 _numJobsAtATime, RunLog &_log)
static KKStr Concat(const std::vector< std::string > &values)
Concatenates the list of &#39;std::string&#39; strings.
Definition: KKStr.cpp:1082
JobStatus Status() const
Definition: KKJob.h:123
DateTime(const DateTime &dateTime)
Definition: DateTime.cpp:1006
virtual void ReFresh(KKJob &j)
Definition: KKJob.cpp:163
KKStr StrFormatInt(kkint32 val, const char *mask)
Definition: KKStr.cpp:5004
void PushOnBack(KKJobPtr j)
Definition: KKJob.cpp:456
KKJobManager(const KKJobManager &j)
KKJobList * KKJobListPtr
Definition: KKJob.h:58
kkint32 ExtractTokenInt(const char *delStr)
Definition: KKStr.cpp:3129
double osGetSystemTimeUsed()
Returns the number of CPU seconds used by current process.
bool osFileExists(const KKStr &_fileName)
Definition: OSservices.cpp:568
KKStr & operator=(const KKStr &src)
Definition: KKStr.cpp:1390
Used for logging messages.
Definition: RunLog.h:49
KKJob(const KKJob &j)
Definition: KKJob.cpp:42
VectorKKStr Split(char del) const
Splits the string up into tokens using &#39;del&#39; as the separator returning them in a vector...
Definition: KKStr.cpp:3500
A framework for managing a large number of processes(Jobs) in a multi-cpu/ multi-o/s environment...
Definition: KKJob.h:56
KKStr SubStrPart(kkint32 firstChar) const
returns a SubString consisting of all characters starting at index &#39;firstChar&#39; until the end of the s...
Definition: KKStr.cpp:2780
kkint32 CompareIgnoreCase(const char *s2) const
summary>Compares to Strings and returns -1, 0, or 1, indicating if less than, equal, or greater.
Definition: KKStr.cpp:955
KKJobList(const KKJobList &jobs)
Definition: KKJob.cpp:370
KKJobManager * JobManagerPtr
Definition: KKJobManager.h:32
virtual KKStr ToStatusStr()
Definition: KKJob.cpp:261
KKStr osGetRootName(const KKStr &fullFileName)
DateTime & operator=(const DateTime &right)
Definition: DateTime.cpp:1231
KKJobPtr LookUpByJobId(kkint32 jobId)
Definition: KKJob.cpp:391