AWS SDK for C++  0.14.3
AWS SDK for C++
Queue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 #pragma once
16 
19 #include <thread>
20 #include <atomic>
21 #include <functional>
22 
23 namespace Aws
24 {
25  namespace Queues
26  {
27  static const char* MEM_TAG = "Aws::Queues::Queue";
28 
33  template<typename MESSAGE_TYPE>
34  class Queue
35  {
36  typedef std::function<void(const Queue*, const MESSAGE_TYPE&, bool&)> MessageReceivedEventHandler;
37  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteFailedEventHandler;
38  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteSuccessEventHandler;
39  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendFailedEventHandler;
40  typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendSuccessEventHandler;
41 
42  public:
49  Queue(unsigned pollingFrequency) :
50  m_continue(true), m_pollingFrequencyMs(pollingFrequency), m_pollingThread(nullptr)
51  {
52  }
53 
54  virtual ~Queue()
55  {
56  StopPolling();
57  }
58 
59  virtual MESSAGE_TYPE Top() const = 0;
60  virtual void Delete(const MESSAGE_TYPE&) = 0;
61  virtual void Push(const MESSAGE_TYPE&) = 0;
62 
68  void StartPolling()
69  {
70  if(!m_pollingThread)
71  {
72  m_continue = true;
73  m_pollingThread = Aws::MakeUnique<std::thread>(MEM_TAG, &Queue::Main, this);
74  }
75  }
76 
83  void StopPolling()
84  {
85  m_continue = false;
86  if(m_pollingThread)
87  {
88  m_pollingThread->join();
89  m_pollingThread = nullptr;
90  }
91  }
92 
93  inline void SetMessageReceivedEventHandler(const MessageReceivedEventHandler& messageHandler) { m_messageReceivedHandler = messageHandler; }
94  inline void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
95  inline void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
96  inline void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler& messageHandler) { m_messageSendFailedHandler = messageHandler; }
97  inline void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
98 
99  inline void SetMessageReceivedEventHandler(MessageReceivedEventHandler&& messageHandler) { m_messageReceivedHandler = messageHandler; }
100  inline void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler&& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
101  inline void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler&& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
102  inline void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler&& messageHandler) { m_messageSendFailedHandler = messageHandler; }
103  inline void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler&& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
104 
105  inline const MessageReceivedEventHandler& GetMessageReceivedEventHandler() const { return m_messageReceivedHandler; }
106  inline const MessageDeleteFailedEventHandler& GetMessageDeleteFailedEventHandler() const { return m_messageDeleteFailedHandler; }
107  inline const MessageDeleteSuccessEventHandler& GetMessageDeleteSuccessEventHandler() const { return m_messageDeleteSuccessHandler; }
108  inline const MessageSendFailedEventHandler& GetMessageSendFailedEventHandler() const { return m_messageSendFailedHandler; }
109  inline const MessageSendSuccessEventHandler& GetMessageSendSuccessEventHandler() const { return m_messageSendSuccessHandler; }
110 
111  protected:
112  std::atomic<bool> m_continue;
113 
114  private:
115  void Main()
116  {
117  while(m_continue)
118  {
119  auto start = std::chrono::system_clock::now();
120  MESSAGE_TYPE topMessage = Top();
121  bool deleteMessage = false;
122 
123  auto& receivedHandler = GetMessageReceivedEventHandler();
124  if (receivedHandler)
125  {
126  receivedHandler(this, topMessage, deleteMessage);
127  }
128 
129  if (deleteMessage)
130  {
131  Delete(topMessage);
132  }
133 
134  if(m_continue)
135  {
136  auto stop = std::chrono::system_clock::now();
137  auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
138 
139  if (m_pollingFrequencyMs >= timeTaken.count())
140  {
141  std::this_thread::sleep_for(std::chrono::milliseconds(m_pollingFrequencyMs - timeTaken.count()));
142  }
143  }
144  }
145  }
146 
147  unsigned m_pollingFrequencyMs;
148  Aws::UniquePtr<std::thread> m_pollingThread;
149 
150  // Handlers
151  MessageReceivedEventHandler m_messageReceivedHandler;
152  MessageDeleteFailedEventHandler m_messageDeleteFailedHandler;
153  MessageDeleteSuccessEventHandler m_messageDeleteSuccessHandler;
154  MessageSendFailedEventHandler m_messageSendFailedHandler;
155  MessageSendSuccessEventHandler m_messageSendSuccessHandler;
156  };
157  }
158 }
const MessageReceivedEventHandler & GetMessageReceivedEventHandler() const
Definition: Queue.h:105
void StopPolling()
Definition: Queue.h:83
void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler &&messageHandler)
Definition: Queue.h:103
void StartPolling()
Definition: Queue.h:68
void SetMessageReceivedEventHandler(const MessageReceivedEventHandler &messageHandler)
Definition: Queue.h:93
void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler &&messageHandler)
Definition: Queue.h:100
void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler &&messageHandler)
Definition: Queue.h:102
const MessageDeleteFailedEventHandler & GetMessageDeleteFailedEventHandler() const
Definition: Queue.h:106
void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler &messageHandler)
Definition: Queue.h:97
virtual ~Queue()
Definition: Queue.h:54
void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler &messageHandler)
Definition: Queue.h:94
virtual void Push(const MESSAGE_TYPE &)=0
const MessageDeleteSuccessEventHandler & GetMessageDeleteSuccessEventHandler() const
Definition: Queue.h:107
const MessageSendSuccessEventHandler & GetMessageSendSuccessEventHandler() const
Definition: Queue.h:109
void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler &messageHandler)
Definition: Queue.h:96
virtual void Delete(const MESSAGE_TYPE &)=0
static const char * MEM_TAG
Definition: Queue.h:27
const MessageSendFailedEventHandler & GetMessageSendFailedEventHandler() const
Definition: Queue.h:108
void SetMessageReceivedEventHandler(MessageReceivedEventHandler &&messageHandler)
Definition: Queue.h:99
Queue(unsigned pollingFrequency)
Definition: Queue.h:49
void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler &&messageHandler)
Definition: Queue.h:101
std::unique_ptr< T, Deleter< T > > UniquePtr
Definition: AWSMemory.h:203
virtual MESSAGE_TYPE Top() const =0
void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler &messageHandler)
Definition: Queue.h:95
JSON (JavaScript Object Notation).
std::atomic< bool > m_continue
Definition: Queue.h:112