20 #include <sys/types.h> 46 cpuTimeLastReported (j.cpuTimeLastReported),
47 cpuTimeTotalUsed (j.cpuTimeTotalUsed),
48 dateTimeStarted
(j.dateTimeStarted
),
49 dateTimeEnded
(j.dateTimeEnded
),
50 dateTimeFirstOneFound (j.dateTimeFirstOneFound),
53 blockLevel (j.blockLevel),
54 lockFile (j.lockFile),
55 lockFileName
(j.lockFileName
),
56 lockFileOpened (j.lockFileOpened),
57 managerName
(j.managerName
),
58 expansionCount (j.expansionCount),
59 expansionFirstJobId (j.expansionFirstJobId),
60 nextJobId (j.nextJobId),
61 numJobsAtATime (j.numJobsAtATime),
63 quitRunning (j.quitRunning),
65 statusFileName
(j.statusFileName
),
66 statusFileNextByte (j.statusFileNextByte),
67 supportCompletedJobData (j.supportCompletedJobData)
82 const KKStr& _managerName,
86 KKJob (_manager
, _jobId
, _parentId
, _numPorcessesAllowed
, _log
),
89 cpuTimeLastReported (0.0),
90 cpuTimeTotalUsed (0.0),
93 dateTimeFirstOneFound (
false),
95 expansionFirstJobId (0),
99 lockFileOpened (
false),
100 managerName
(_managerName
),
102 numJobsAtATime (_numJobsAtATime),
107 statusFileNextByte (0),
108 supportCompletedJobData (
false)
111 log.Level (10) <<
"KKJobManager::KKJobManager ManagerId[" << JobId () <<
"]" << endl;
119 log.Level (10) <<
"KKJobManager::KKJobManager Exiting" << endl;
147 catch (
KKStr errorMsg)
149 log.Level (-1) << endl << endl
150 <<
"KKJobManager::InitilizeJobManager ***ERROR*** Load of 'LoadRunTimeData' Failed." << endl
157 StatusFileInitialize ();
166 catch (
KKStr errorMsg)
168 log.Level (-1) << endl << endl
169 <<
"KKJobManager::InitilizeJobManager ***ERROR*** Load of 'LoadRunTimeData' Failed." << endl
186 return "KKJobManager";
194 log.Level (20) <<
"KKJobManager::Block blockLevel[" << blockLevel <<
"]" << endl;
205 log.Level (-1) << endl << endl
206 <<
"KKJobManager::Block *** WE ALREADY HAVE A BLOCK ESTABLISHED ***." << endl
214 lockFile = open (lockFileName.Str (), O_WRONLY | O_CREAT | O_EXCL);
218 float zed = (
float)((procId + rand ()) % 10) + 2.0f;
219 log.Level (10) <<
"KKJobManager::Block - We are locked out[" << count <<
"] for [" << zed <<
"] secs." << endl;
222 }
while (lockFile < 0);
225 log.Level (10) <<
"KKJobManager::Block Lock has been established." << endl;
227 lockFileOpened =
true;
229 log.Level (20) <<
"KKJobManager::Block - Lock is Established." << endl;
244 log.Level (20) <<
"KKJobManager::EndBlock - Ending Block blockLevel[" << blockLevel <<
"]" << endl;
254 log.Level (-1) << endl << endl << endl
255 <<
"KKJobManager::EndBlock *** Lock file is not opened ***" << endl;
260 lockFileOpened =
false;
263 if (!DeleteFile (lockFileName.Str ()))
265 DWORD fileAttributes = GetFileAttributes (lockFileName.Str ());
266 fileAttributes = FILE_ATTRIBUTE_NORMAL;
267 if (!SetFileAttributes (lockFileName.Str (), fileAttributes))
269 DWORD lastErrorNum = GetLastError ();
270 log.Level (-1) <<
"KKJobManager::EndBlock - *** ERROR *** Could not set Lock File to Normal." << endl;
274 if (!DeleteFile (lockFileName.Str ()))
276 DWORD lastErrorNum = GetLastError ();
277 log.Level (-1) <<
"KKJobManager::EndBlock - Error[" << (kkuint32)lastErrorNum <<
"] deleting Lock File." << endl;
282 returnCd = unlink (lockFileName.Str ());
285 log.Level (20) <<
"EndBlock - Unlocking" << endl;
294 kkint32 jobIdToReturn = nextJobId;
296 return jobIdToReturn;
308 log.Level (-1) << endl << endl << endl
309 <<
"ProcessStatusLineJobStatusChange ***Error*** Could not locate Expanded" << endl
311 <<
" JobId[" << expandedJobId <<
"]" << endl
324 log.Level (-1) << endl << endl << endl
325 <<
"ProcessStatusLineJobStatusChange ***Error*** Invalid Status Specified" << endl
327 <<
" JobId[" << expandedJobId <<
"]" << endl
328 <<
" Status[" << statusStr <<
"]" << endl
383 cpuTimeTotalUsed += cpuTimeUsed;
389 if (!dateTimeFirstOneFound)
391 dateTimeFirstOneFound =
true;
392 dateTimeStarted
= dateTime;
394 dateTimeEnded
= dateTime;
398 expansionCount = statusStr
.ToInt ();
401 expansionFirstJobId = statusStr
.ToInt ();
420 log.Level (-1) <<
"KKJobManager::StatusFileProcessLine Invalid Field Name[" << fieldName <<
"]." << endl;
429 log.Level (10) <<
"KKJobManager::StatusFileLoad." << endl;
431 statusFileNextByte = 0;
432 StatusFileRefresh ();
434 log.Level (20) <<
"KKJobManager::StatusFileLoad Exiting." << endl;
442 log.Level (10) <<
"KKJobManager::StatusFileRefresh statusFileName[" << statusFileName <<
"]" << endl;
445 ifstream* statusFile =
new ifstream (statusFileName.Str ());
446 if (!statusFile->is_open ())
448 log.Level (-1) << endl
449 <<
"KKJobManager::LoadCurrentStatusFile ***ERROR*** Can not open Status File[" 450 << statusFileName <<
"]." 453 delete statusFile; statusFile = NULL;
461 if (statusFileNextByte >= 0)
462 statusFile->seekg (statusFileNextByte);
466 statusFile->getline (buff,
sizeof (buff));
467 while (!statusFile->eof ())
470 if (statusStr.SubStrPart (0, 4) ==
"<KKJob ")
472 ProcessJobXmlBlockOfText (statusStr, *statusFile);
476 StatusFileProcessLine (statusStr, *statusFile);
479 long zed = statusFile->tellg ();
481 statusFileNextByte = zed;
483 if (!statusFile->eof ())
484 statusFile->getline (buff,
sizeof (buff));
487 statusFile->close ();
491 log.Level (20) <<
"KKJobManager::StatusFileRefresh Exiting." << endl;
503 log.Level (-1) << endl
504 <<
"KKJobManager::ProcessJobXmlBlockOfText ***ERROR*** StartStr[" << startStr <<
"] is not a KKJob String." << endl
513 KKStr jobTypeStr =
"";
518 for (
kkuint32 x = 0; x < parameters.size (); ++x)
520 KKStr parameterStr = parameters[x];
531 jobTypeStr
= fieldValue;
538 if (jobTypeStr
.Empty () || (jobId < 0))
540 log.Level (-1) << endl
541 <<
"KKJobManager::ProcessJobXmlBlockOfText ***ERROR*** StartStr[" << startStr <<
"]." << endl
542 <<
" JobType and/or JobId were not provided." << endl
567 ofstream*
KKJobManager::StatusFileOpen (ios::openmode openMode)
569 log.Level (20) <<
"KKJobManager::StatusFileOpen." << endl;
573 ofstream* statusFile =
new ofstream (statusFileName.Str (), openMode);
575 while ((!statusFile->is_open ()) && (openAttempts < 4))
579 log.Level (10) <<
"StatusFileOpen Open Attempt[" << openAttempts <<
"]." << endl;
580 statusFile =
new ofstream (statusFileName.Str (), openMode);
581 if (!statusFile->is_open ())
583 log.Level (0) <<
"StatusFileOpen - *** WARNING *** Can Not Open Status File[" 584 << statusFileName.Str () <<
"] -Will Retry-." 589 if (!statusFile->is_open ())
591 log.Level (-1) <<
"StatusFileOpen ***ERROR*** Can not Create Status File[" << statusFileName.Str () <<
"]." << endl;
597 log.Level (20) <<
"KKJobManager::StatusFileOpen Exiting" << endl;
607 log.Level (10) <<
"KKJobManager::StatusFileWrite" << endl;
609 ofstream* statusFile = StatusFileOpen (ios::out);
611 *statusFile <<
"// Date/Time [" << osGetLocalDateTime () <<
"]." << endl
615 *statusFile <<
"Status" <<
"\t" << KKJob::JobStatusToStr (status) << endl
616 <<
"NextJobId" <<
"\t" << nextJobId << endl
617 <<
"CurrentDateTime" <<
"\t" << osGetLocalDateTime () << endl
618 <<
"ExpansionCount" <<
"\t" << expansionCount << endl
619 <<
"ExpansionFirstJobId" <<
"\t" << expansionFirstJobId << endl
623 for (x = 0; x < jobs->QueueSize (); x++)
626 *statusFile <<
"KKJob" <<
"\t" << j->JobType () <<
"\t" << j->ToStatusStr () << endl;
629 statusFile->flush ();
630 statusFile->close ();
633 log.Level (10) <<
"KKJobManager::StatusFileWrite Exiting" << endl;
640 void KKJobManager::ReportCpuTimeUsed (ofstream* statusFile)
644 double cpuTimeUsed = currentCpuTime - cpuTimeLastReported;
645 cpuTimeLastReported = currentCpuTime;
646 *statusFile <<
"CpuTimeUsed" <<
"\t" << cpuTimeUsed <<
"\t" 647 <<
"ProcId" <<
"\t" << procId <<
"\t" 649 <<
"CurrentDateTime" <<
"\t" << osGetLocalDateTime () << endl;
659 log.Level (10) <<
"KKJobManager::InializeStatusFile" << endl;
664 ofstream* statusFile = StatusFileOpen (ios::out);
666 *statusFile <<
"// Date/Time [" << osGetLocalDateTime () <<
"]." << endl
671 *statusFile <<
"Status" <<
"\t" << KKJob::JobStatusToStr (status) << endl
672 <<
"NextJobId" <<
"\t" << nextJobId << endl
673 <<
"CurrentDateTime" <<
"\t" << osGetLocalDateTime () << endl
674 <<
"ExpansionCount" <<
"\t" << expansionCount << endl
675 <<
"ExpansionFirstJobId" <<
"\t" << expansionFirstJobId << endl;
678 StatusFileInitialize (*statusFile);
680 delete jobs; jobs = NULL;
681 jobs = JobsCreateInitialSet ();
685 KKJobList::iterator idx;
686 for (idx = jobs->begin (); idx != jobs->end (); idx++)
689 *statusFile <<
"KKJob" <<
"\t" << j->JobType () <<
"\t" << j->ToStatusStr () << endl;
697 statusFile->flush ();
698 statusFile->close ();
701 log.Level (10) <<
"KKJobManager::InializeStatusFile Exiting" << endl;
734 <<
"NextJobId" <<
"\t" << nextJobId <<
"\t" 735 <<
"ExpansionCount" <<
"\t" << expansionCount <<
"\t" 736 <<
"ExpansionFirstJobId" <<
"\t" << expansionFirstJobId;
750 log.Level (10) <<
"ProcessRestart" << endl;
758 ofstream* statusFile = StatusFileOpen (ios::app);
761 *statusFile <<
"ReStart" << endl;
762 *statusFile <<
"Status" <<
"\t" << StatusStr () << endl;
764 KKJobList::iterator idx;
765 for (idx = jobs->begin (); idx != jobs->end (); idx++)
771 *statusFile <<
"JobStatusChange" <<
"\t" << j->JobId () <<
"\t" << j->StatusStr () << endl;
775 statusFile->flush ();
776 statusFile->close ();
788 ofstream* statusFile = StatusFileOpen (ios::app);
790 *statusFile <<
"QuitRunning" << endl;
793 statusFile->flush ();
794 statusFile->close ();
795 delete statusFile; statusFile = NULL;
806 jobsJustCompleted->Owner (
false);
809 KKJobList::iterator idx;
810 for (idx = jobs->begin (); idx != jobs->end (); idx++)
819 KKJobListPtr expandedJobs = JobsExpandNextSetOfJobs (jobsJustCompleted);
823 expandedJobs->Owner (
false);
824 KKJobList::iterator idx;
825 for (idx = expandedJobs->begin (); idx != expandedJobs->end (); idx++)
832 o <<
"KKJob" <<
"\t" << j->JobType () <<
"\t" << j->ToStatusStr () << endl;
839 delete jobsJustCompleted;
840 jobsJustCompleted = NULL;
842 o <<
"NextJobId" <<
"\t" << nextJobId << endl
843 <<
"ExpansionCount" <<
"\t" << expansionCount << endl
844 <<
"ExpansionFirstJobId" <<
"\t" << expansionFirstJobId << endl;
853 log.Level (20) <<
"KKJobManager::GetNextSetOfJobs." << endl;
857 StatusFileRefresh ();
862 ofstream* statusFile = StatusFileOpen (ios::app);
864 KKJobList::iterator idx;
865 for (idx = completedJobs->begin (); idx != completedJobs->end (); idx++)
869 *statusFile <<
"KKJob" <<
"\t" << j->JobType () <<
"\t" << j->ToStatusStr () << endl;
870 if (supportCompletedJobData)
872 *statusFile <<
"<KKJob JobType=" << j->JobType () <<
", " <<
"JobId=" << j->JobId () <<
">" << endl;
873 j->CompletedJobDataWrite (*statusFile);
874 *statusFile <<
"</job>" << endl;
884 log.Level (-1) << endl << endl << endl
885 <<
"GetNextSetOfJobs *** ERROR ***" << endl
887 <<
" Could not locate KKJob[" << j->JobId () <<
"]" << endl
893 statusFile->close ();
894 delete statusFile; statusFile = NULL;
899 jobsToExecute->Owner (
false);
903 ofstream* statusFile = StatusFileOpen (ios::app);
912 ProcessNextExpansion (*statusFile);
927 while (nextJob && (jobsToExecute->QueueSize () < numJobsAtATime))
929 jobsToExecute->PushOnBack (nextJob);
930 nextJob->Status (jsStarted);
931 *statusFile <<
"JobStatusChange" <<
"\t" << nextJob->JobId () <<
"\t" << nextJob->StatusStr () << endl;
932 nextJob = jobs->LocateOpenJob ();
935 statusFile->close ();
938 if (jobsToExecute->QueueSize () < 1)
940 delete jobsToExecute;
941 jobsToExecute = NULL;
946 log.Level (20) <<
"KKJobManager::GetNextSetOfJobs Exiting." << endl;
948 return jobsToExecute;
972 log.Level (10) <<
"KKJobManager::Run." << endl;
974 bool keepOnRunning =
true;
979 while (keepOnRunning && (!quitRunning))
983 delete executedJobs; executedJobs = NULL;
986 executedJobs->Owner (
true);
988 KKJobList::iterator idx;
989 for (idx = jobsToExecute->begin (); idx != jobsToExecute->end (); idx++)
995 delete jobsToExecute; jobsToExecute = NULL;
1001 keepOnRunning =
false;
1006 log.Level (10) <<
"KKJobManager::Run No jobs avaialble to run; will sleep a bit." << endl;
1007 osSleep ((
float)(30 + rand () % 10));
1012 jobsToExecute = GetNextSetOfJobs (executedJobs);
1014 delete executedJobs; executedJobs = NULL;
1019 StatusFileRefresh ();
1024 GenerateFinalResultsReport ();
1028 ofstream* statusFile = StatusFileOpen (ios::app);
1030 *statusFile <<
"Status" <<
"\t" << StatusStr () << endl;
1032 ReportCpuTimeUsed (statusFile);
1034 statusFile->close ();
1041 log.Level (10) <<
"KKJobManager::Run Exiting." << endl;
KKStr(kkint32 size)
Creates a KKStr object that pre-allocates space for 'size' characters.
static KKStr JobStatusToStr(JobStatus status)
bool EqualIgnoreCase(const char *s2) const
void InitilizeJobManager(bool &successful)
Initialize the KKJobManager object.
virtual void ProcessNode()
virtual void StatusFileProcessLine(const KKStr &ln, istream &statusFile)
KKStr & TrimRight(const char *whiteSpaceChars="\n\r\t ")
KKStr ExtractToken2(const char *delStr="\n\t\r ")
Extract first Token from the string.
KKJobList(JobManagerPtr _manager)
bool operator==(const char *rtStr) const
void Status(JobStatus _status)
kkint32 AllocateNextJobId()
virtual KKJobPtr Duplicate() const
void SetQuitRunningFlag()
void Update(JobManagerPtr p)
static JobStatus JobStatusFromStr(const KKStr &statusStr)
void AddPrerequisites(VectorInt _prerequisites)
KKStr operator+(const char *right) const
unsigned __int32 kkuint32
void osSleep(float numOfSecs)
static KKJobPtr CallAppropriateConstructor(JobManagerPtr _manager, const KKStr &_jobTypeName, const KKStr &_statusStr)
KKStr & operator=(KKStr &&src)
bool operator!=(const char *rtStr) const
virtual const char * JobType() const
virtual void StatusFileProcessLineJobStatusChange(KKStr &statusLineStr)
KKStr operator+(const char *left, const KKStr &right)
double ExtractTokenDouble(const char *delStr)
virtual void CompletedJobDataRead(istream &i)
Works with 'WriteCompletedJobData'; You use this to load in data written by 'WriteCompletedJobData'.
KKStr(const KKStr &str)
Copy Constructor.
KKJob(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, RunLog &_log)
void TrimLeft(const char *whiteSpaceChars="\n\r\t ")
Responsable for keeping track of a list of jobs.
KKStr SubStrPart(kkint32 firstChar, kkint32 lastChar) const
returns a SubString consisting of all characters starting at index 'firstChar' and ending at 'lastInd...
KKJobManager(JobManagerPtr _manager, kkint32 _jobId, kkint32 _parentId, kkint32 _numPorcessesAllowed, const KKStr &_managerName, kkint32 _numJobsAtATime, RunLog &_log)
static KKStr Concat(const std::vector< std::string > &values)
Concatenates the list of 'std::string' strings.
DateTime(const DateTime &dateTime)
virtual void ReFresh(KKJob &j)
KKStr StrFormatInt(kkint32 val, const char *mask)
void PushOnBack(KKJobPtr j)
KKJobManager(const KKJobManager &j)
kkint32 ExtractTokenInt(const char *delStr)
virtual KKStr ToStatusStr()
double osGetSystemTimeUsed()
Returns the number of CPU seconds used by current process.
bool osFileExists(const KKStr &_fileName)
KKStr & operator=(const KKStr &src)
Used for logging messages.
VectorKKStr Split(char del) const
Splits the string up into tokens using 'del' as the separator returning them in a vector...
A framework for managing a large number of processes(Jobs) in a multi-cpu/ multi-o/s environment...
KKStr SubStrPart(kkint32 firstChar) const
returns a SubString consisting of all characters starting at index 'firstChar' until the end of the s...
kkint32 CompareIgnoreCase(const char *s2) const
summary>Compares to Strings and returns -1, 0, or 1, indicating if less than, equal, or greater.
KKJobList(const KKJobList &jobs)
KKJobManager * JobManagerPtr
virtual KKStr ToStatusStr()
KKStr osGetRootName(const KKStr &fullFileName)
DateTime & operator=(const DateTime &right)
KKJobPtr LookUpByJobId(kkint32 jobId)