AWS SDK for C++  1.8.154
AWS SDK for C++
Queue.h
Go to the documentation of this file.
1 
5 #pragma once
6 
9 #include <thread>
10 #include <atomic>
11 #include <functional>
12 
13 namespace Aws
14 {
15  namespace Queues
16  {
17  static const char* MEM_TAG = "Aws::Queues::Queue";
18 
23  template<typename MESSAGE_TYPE>
24  class Queue
25  {
26  typedef std::function<void(const Queue*, const MESSAGE_TYPE&, bool&)> MessageReceivedEventHandler;
27  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteFailedEventHandler;
28  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteSuccessEventHandler;
29  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendFailedEventHandler;
30  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendSuccessEventHandler;
31 
32  public:
39  Queue(unsigned pollingFrequency) :
40  m_continue(true), m_pollingFrequencyMs(pollingFrequency), m_pollingThread(nullptr)
41  {
42  }
43 
44  virtual ~Queue()
45  {
46  StopPolling();
47  }
48 
49  virtual MESSAGE_TYPE Top() const = 0;
50  virtual void Delete(const MESSAGE_TYPE&) = 0;
51  virtual void Push(const MESSAGE_TYPE&) = 0;
52 
58  void StartPolling()
59  {
60  if(!m_pollingThread)
61  {
62  m_continue = true;
63  m_pollingThread = Aws::MakeUnique<std::thread>(MEM_TAG, &Queue::Main, this);
64  }
65  }
66 
73  void StopPolling()
74  {
75  m_continue = false;
76  if(m_pollingThread)
77  {
78  m_pollingThread->join();
79  m_pollingThread = nullptr;
80  }
81  }
82 
83  inline void SetMessageReceivedEventHandler(const MessageReceivedEventHandler& messageHandler) { m_messageReceivedHandler = messageHandler; }
84  inline void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
85  inline void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
86  inline void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler& messageHandler) { m_messageSendFailedHandler = messageHandler; }
87  inline void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
88 
89  inline void SetMessageReceivedEventHandler(MessageReceivedEventHandler&& messageHandler) { m_messageReceivedHandler = messageHandler; }
90  inline void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler&& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
91  inline void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler&& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
92  inline void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler&& messageHandler) { m_messageSendFailedHandler = messageHandler; }
93  inline void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler&& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
94 
95  inline const MessageReceivedEventHandler& GetMessageReceivedEventHandler() const { return m_messageReceivedHandler; }
96  inline const MessageDeleteFailedEventHandler& GetMessageDeleteFailedEventHandler() const { return m_messageDeleteFailedHandler; }
97  inline const MessageDeleteSuccessEventHandler& GetMessageDeleteSuccessEventHandler() const { return m_messageDeleteSuccessHandler; }
98  inline const MessageSendFailedEventHandler& GetMessageSendFailedEventHandler() const { return m_messageSendFailedHandler; }
99  inline const MessageSendSuccessEventHandler& GetMessageSendSuccessEventHandler() const { return m_messageSendSuccessHandler; }
100 
101  protected:
102  std::atomic<bool> m_continue;
103 
104  private:
105  void Main()
106  {
107  while(m_continue)
108  {
109  auto start = std::chrono::system_clock::now();
110  MESSAGE_TYPE topMessage = Top();
111  bool deleteMessage = false;
112 
113  auto& receivedHandler = GetMessageReceivedEventHandler();
114  if (receivedHandler)
115  {
116  receivedHandler(this, topMessage, deleteMessage);
117  }
118 
119  if (deleteMessage)
120  {
121  Delete(topMessage);
122  }
123 
124  if(m_continue)
125  {
126  auto stop = std::chrono::system_clock::now();
127  auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
128 
129  if (m_pollingFrequencyMs >= timeTaken.count())
130  {
131  std::this_thread::sleep_for(std::chrono::milliseconds(m_pollingFrequencyMs - timeTaken.count()));
132  }
133  }
134  }
135  }
136 
137  unsigned m_pollingFrequencyMs;
138  Aws::UniquePtr<std::thread> m_pollingThread;
139 
140  // Handlers
141  MessageReceivedEventHandler m_messageReceivedHandler;
142  MessageDeleteFailedEventHandler m_messageDeleteFailedHandler;
143  MessageDeleteSuccessEventHandler m_messageDeleteSuccessHandler;
144  MessageSendFailedEventHandler m_messageSendFailedHandler;
145  MessageSendSuccessEventHandler m_messageSendSuccessHandler;
146  };
147  }
148 }
Aws::Queues::Queue::Queue
Queue(unsigned pollingFrequency)
Definition: Queue.h:39
Queues_EXPORTS.h
Aws::Queues::Queue::SetMessageDeleteFailedEventHandler
void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler &messageHandler)
Definition: Queue.h:84
Aws::Queues::Queue::m_continue
std::atomic< bool > m_continue
Definition: Queue.h:102
Aws::Queues::Queue::SetMessageDeleteSuccessEventHandler
void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler &messageHandler)
Definition: Queue.h:85
Aws::Queues::Queue::SetMessageSendSuccessEventHandler
void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler &messageHandler)
Definition: Queue.h:87
Aws::Queues::Queue::SetMessageDeleteFailedEventHandler
void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler &&messageHandler)
Definition: Queue.h:90
Aws::Queues::Queue::SetMessageReceivedEventHandler
void SetMessageReceivedEventHandler(MessageReceivedEventHandler &&messageHandler)
Definition: Queue.h:89
Aws::Queues::Queue::Push
virtual void Push(const MESSAGE_TYPE &)=0
Aws::Queues::Queue::GetMessageDeleteSuccessEventHandler
const MessageDeleteSuccessEventHandler & GetMessageDeleteSuccessEventHandler() const
Definition: Queue.h:97
Aws::Queues::Queue::GetMessageSendSuccessEventHandler
const MessageSendSuccessEventHandler & GetMessageSendSuccessEventHandler() const
Definition: Queue.h:99
Aws::Queues::Queue::Top
virtual MESSAGE_TYPE Top() const =0
Aws::Queues::Queue::SetMessageDeleteSuccessEventHandler
void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler &&messageHandler)
Definition: Queue.h:91
Aws::Queues::Queue::SetMessageSendSuccessEventHandler
void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler &&messageHandler)
Definition: Queue.h:93
Aws::Queues::Queue::~Queue
virtual ~Queue()
Definition: Queue.h:44
Aws::Queues::Queue
Definition: Queue.h:25
Aws::Queues::Queue::GetMessageSendFailedEventHandler
const MessageSendFailedEventHandler & GetMessageSendFailedEventHandler() const
Definition: Queue.h:98
Aws::Queues::Queue::StartPolling
void StartPolling()
Definition: Queue.h:58
Aws::UniquePtr
std::unique_ptr< T, Deleter< T > > UniquePtr
Definition: AWSMemory.h:249
Aws::Queues::Queue::GetMessageReceivedEventHandler
const MessageReceivedEventHandler & GetMessageReceivedEventHandler() const
Definition: Queue.h:95
Aws::Queues::MEM_TAG
static const char * MEM_TAG
Definition: Queue.h:17
Aws
Definition: AccessManagementClient.h:15
Aws::Queues::Queue::GetMessageDeleteFailedEventHandler
const MessageDeleteFailedEventHandler & GetMessageDeleteFailedEventHandler() const
Definition: Queue.h:96
Aws::Queues::Queue::SetMessageSendFailedEventHandler
void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler &&messageHandler)
Definition: Queue.h:92
Aws::Queues::Queue::Delete
virtual void Delete(const MESSAGE_TYPE &)=0
Aws::Queues::Queue::SetMessageSendFailedEventHandler
void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler &messageHandler)
Definition: Queue.h:86
ClientConfiguration.h
Aws::Queues::Queue::StopPolling
void StopPolling()
Definition: Queue.h:73
Aws::Queues::Queue::SetMessageReceivedEventHandler
void SetMessageReceivedEventHandler(const MessageReceivedEventHandler &messageHandler)
Definition: Queue.h:83