KSquare Utilities
MsgQueue.cpp
Go to the documentation of this file.
1 /* MsgQueue.h -- Implements a Message Queue allowing multiple threads to add and remove messages using KKStr
2  * instances without corrupting memory.
3  * Copyright (C) 2011-2014 Kurt Kramer
4  * For conditions of distribution and use, see copyright notice in KKB.h
5  */
6 #include "FirstIncludes.h"
7 #if defined(WIN32)
8 #include <windows.h>
9 #else
10 #include <fcntl.h>
11 #include <semaphore.h>
12 #endif
13 
14 #include <errno.h>
15 #include <istream>
16 #include <iostream>
17 #include <queue>
18 #include <vector>
19 using namespace std;
20 
21 
22 #include "MsgQueue.h"
23 #include "KKException.h"
24 #include "GoalKeeperSimple.h"
25 #include "OSservices.h"
26 using namespace KKB;
27 
28 
29 MsgQueue::MsgQueue (const KKStr& _name): /* Name of buffer, must be unique */
30  gateKeeper (NULL),
31  memoryConsumed (0),
32  name (_name),
33  queue ()
34 {
35  GoalKeeperSimple::Create ("MsgQueue_" + name, gateKeeper);
36  memoryConsumed = sizeof (MsgQueue) + gateKeeper->MemoryConsumedEstimated ();
37 }
38 
39 
40 
42 {
43  while (this->queue.size () > 0)
44  {
45  KKStrPtr m = this->queue.front ();
46  this->queue.pop ();
47  delete m;
48  m = NULL;
49  }
50 
51  GoalKeeperSimple::Destroy (gateKeeper);
52  gateKeeper = NULL;
53 }
54 
55 
56 
57 
58 void MsgQueue::AddMsg (KKStrPtr msg)
59 {
60  if (msg == NULL)
61  {
62  KKStr errMsg;
63  errMsg << "MsgQueue::AddMsg == NULL";
64  cerr << std::endl << std::endl << errMsg << std::endl << std::endl;
65  throw KKException (errMsg);
66  }
67 
68  gateKeeper->StartBlock ();
69  queue.push (msg);
70  memoryConsumed = memoryConsumed + msg->MemoryConsumedEstimated ();
71  gateKeeper->EndBlock ();
72 } /* AddFrame */
73 
74 
75 
76 void MsgQueue::AddMsg (const KKStr& msg)
77 {
78  gateKeeper->StartBlock ();
79  queue.push (new KKStr (msg));
80  memoryConsumed = memoryConsumed + msg.MemoryConsumedEstimated ();
81  gateKeeper->EndBlock ();
82 } /* AddFrame */
83 
84 
85 
86 void MsgQueue::AddMsgs (const KKStrListPtr msgs,
87  bool takeOwnership
88  )
89 {
90  gateKeeper->StartBlock ();
91  KKStrList::iterator idx;
92  for (idx = msgs->begin (); idx != msgs->end (); ++idx)
93  {
94  KKStrPtr msg = *idx;
95  if (takeOwnership)
96  queue.push (msg);
97  else
98  queue.push (new KKStr (*msg));
99 
100  memoryConsumed = memoryConsumed + msg->MemoryConsumedEstimated ();
101  }
102  gateKeeper->EndBlock ();
103 }
104 
105 
106 
107 KKStrPtr MsgQueue::GetNextMsg ()
108 {
109  gateKeeper->StartBlock ();
110 
111  KKStrPtr msg = NULL;
112  if (queue.size () > 0)
113  {
114  msg = queue.front ();
115  queue.pop ();
116  memoryConsumed = memoryConsumed - msg->MemoryConsumedEstimated ();
117  }
118 
119  gateKeeper->EndBlock ();
120  return msg;
121 } /* GetNextMsg */
122 
123 
124 
125 KKStrListPtr MsgQueue::GetAllMsgs ()
126 {
127  KKStrListPtr result = new KKStrList ();
128  gateKeeper->StartBlock ();
129 
130  KKStrPtr msg = NULL;
131  while (queue.size () > 0)
132  {
133  msg = queue.front ();
134  queue.pop ();
135  result->PushOnBack (msg);
136  memoryConsumed = memoryConsumed - msg->MemoryConsumedEstimated ();
137  }
138 
139  gateKeeper->EndBlock ();
140 
141  return result;
142 } /* GetAllMsgs */
143 
144 
145 
147 {
148  kkint32 result = 0;
149  gateKeeper->StartBlock ();
150  result = memoryConsumed;
151  gateKeeper->EndBlock ();
152  return result;
153 } /* MemoryConsumedEstimated */
154 
155 
156 
158 {
159  KKStrPtr msg = NULL;
160  gateKeeper->StartBlock ();
161  if (queue.size () > 0)
162  {
163  msg = new KKStr (*(queue.back ()));
164  }
165  gateKeeper->EndBlock ();
166  return msg;
167 }
void AddMsg(KKStrPtr msg)
Take ownership of &#39;msg&#39; and add to end of the queue.
Definition: MsgQueue.cpp:58
kkint32 MemoryConsumedEstimated() const
Definition: KKStr.cpp:766
__int32 kkint32
Definition: KKBaseTypes.h:88
kkint32 MemoryConsumedEstimated()
Returns an estimate of the amount of memory consumed in bytes by this instance.
Definition: MsgQueue.cpp:146
void AddMsg(const KKStr &msg)
Definition: MsgQueue.cpp:76
void AddMsgs(const KKStrListPtr msgs, bool takeOwnership)
Adds the contents of &#39;msgs&#39; to the message queue and depending on &#39;takeOwnership&#39; will either assume ...
Definition: MsgQueue.cpp:86
static void Destroy(volatile GoalKeeperSimplePtr &_goalKeeperInstance)
Destroys an existing instance of GoalKeeperSimple.
A simple/ light-weight implementation of critical section blocking.
void EndBlock()
Ends the block and allows other threads to pass through StatBlock.
KKStrPtr GetNextMsg()
Removes from the queue the oldest message added to the queue that has not been removed.
Definition: MsgQueue.cpp:107
KKStrListPtr GetAllMsgs()
Returns all messages that are currently in the queue.
Definition: MsgQueue.cpp:125
KKTHread * KKTHreadPtr
KKStr operator+(const char *left, const KKStr &right)
Definition: KKStr.cpp:3976
KKStr(const KKStr &str)
Copy Constructor.
Definition: KKStr.cpp:561
Will manage a buffer that will allow multiple threads to add and remove messages to a queue...
Definition: MsgQueue.h:19
static KKStr Concat(const std::vector< std::string > &values)
Concatenates the list of &#39;std::string&#39; strings.
Definition: KKStr.cpp:1082
MsgQueue(const KKStr &_name)
Definition: MsgQueue.cpp:29
static void Create(const KKStr &_name, volatile GoalKeeperSimplePtr &_newGoalKeeper)
Create a GoalKeeperSimple object and avoid a race condition doing it.
void StartBlock()
Initiates a Block as long as another thread has not already locked this object.
KKStrPtr GetCopyOfLastMsg()
Returns a duplicate of the last string added to the message queue.
Definition: MsgQueue.cpp:157
KKException(const KKStr &_exceptionStr)
Definition: KKException.cpp:45
kkint32 MemoryConsumedEstimated() const