AWS SDK for C++  0.12.9
AWS SDK for C++
FairBoundedResourceManager.h
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2010-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License").
6  * You may not use this file except in compliance with the License.
7  * A copy of the License is located at
8  *
9  * http://aws.amazon.com/apache2.0
10  *
11  * or in the "license" file accompanying this file. This file is distributed
12  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13  * express or implied. See the License for the specific language governing
14  * permissions and limitations under the License.
15  */
16 
17 #pragma once
18 
20 
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <functional>
27 #include <mutex>
28 
29 namespace Aws
30 {
31 namespace Transfer
32 {
33 
34 class WaitingBufferRequester;
35 
37 {
40 };
41 
42 static const char* RESOURCE_MANAGER_ALLOCATION_TAG = "FairBoundedResourceManager";
43 
44 /*
45 A thread-safe resource manager that guarantees fairness and allows several different wait policies.
46 
47 C++ concurrency primitives do not provide fairness by default, so we have to implement it ourselves
48 
49 type T requirements: copyable, self-cleaning
50 
51 The copyable requirement could be relaxed to just movable
52 */
53 template< typename T >
55 {
56  public:
57 
58  using ResourceFactoryType = std::function< T(void) >;
60 
61  FairBoundedResourceManager(ResourceFactoryType resourceFactory, uint32_t resourceCount, ResourceWaitPolicy waitPolicy = ResourceWaitPolicy::ALL_AVAILABLE);
62  virtual ~FairBoundedResourceManager();
63 
64  virtual void AcquireResources(uint32_t resourceCount, ResourceListType& acquiredResources) override { AcquireResourcesInternal(resourceCount, acquiredResources, false); }
65  virtual void TryAcquireResources(uint32_t resourceCount, ResourceListType& acquiredResources) override { AcquireResourcesInternal(resourceCount, acquiredResources, true); }
66 
67  virtual void ReleaseResources(ResourceListType& resources) override;
68 
69  virtual void AdjustResourceCount(uint32_t m_resourceCount) override;
70 
71  // Testing interface
72  size_t GetWaiterCount();
73 
74  private:
75 
76  class WaitingResourceRequester
77  {
78  public:
79  WaitingResourceRequester(uint32_t resourceCount, ResourceListType& acquiredResources) :
80  m_resourceList(&acquiredResources),
81  m_requestedCount(resourceCount),
82  m_readySignal(),
83  m_done(false)
84  {}
85 
86  ResourceListType *m_resourceList;
87  uint32_t m_requestedCount;
88  std::condition_variable m_readySignal;
89  std::atomic<bool> m_done; // atomic so that changes to m_done are guaranteed visible to threads waiting on m_readySignal
90  };
91 
93  using WaiterQueueType = Aws::Deque< std::shared_ptr< WaitingResourceRequester > >; // A regular queue can't be used by std:: algorithms since it lacks iterators
94 
95  void ShrinkResourcePool();
96 
97  bool IsRequestFulfilled(uint32_t initialResourceCount, uint32_t acquiredResourceCount, uint32_t desiredResourceCount);
98 
99  void AcquireResourcesInternal(uint32_t resourceCount, ResourceListType& acquiredResources, bool returnInsteadOfWait);
100  void ReleaseResourcesInternal(ResourceListType& resources, WaiterListType& fulfilledRequests);
101 
102  void FulfillRequests(const WaiterListType& requests);
103 
104  ResourceFactoryType m_resourceFactory;
105 
106  ResourceWaitPolicy m_waitPolicy;
107 
108  uint32_t m_desiredResourceCount;
109  uint32_t m_currentResourceCount;
110 
111  ResourceListType m_freeResources;
112 
113  std::mutex m_resourcesMutex;
114  WaiterQueueType m_waiters;
115 };
116 
118 // Implementation
119 
120 template< typename T >
122  m_resourceFactory(resourceFactory),
123  m_waitPolicy(waitPolicy),
124  m_desiredResourceCount(0),
125  m_currentResourceCount(0),
126  m_freeResources(),
127  m_resourcesMutex(),
128  m_waiters()
129 {
130  // for now, require that at least one resource be present; this helps ensure the invariant that calling Acquire with a positive amount always returns with at least one
131  // resource
132  uint32_t modifiedResourceCount = std::max(resourceCount, 1U);
133  m_desiredResourceCount = modifiedResourceCount;
134  m_currentResourceCount = modifiedResourceCount;
135 
136  std::lock_guard<std::mutex> lock(m_resourcesMutex);
137 
138  for(uint32_t i = 0; i < m_desiredResourceCount; ++i)
139  {
140  m_freeResources.push_back(m_resourceFactory());
141  }
142 }
143 
144 template< typename T >
146 {
147  std::lock_guard<std::mutex> lock(m_resourcesMutex);
148 
149  m_freeResources.clear();
150  m_currentResourceCount = 0;
151  m_desiredResourceCount = 0;
152 }
153 
154 // assumes m_resourcesMutex has been locked by the caller
155 template< typename T >
156 void FairBoundedResourceManager< T >::ReleaseResourcesInternal(ResourceListType& resources, WaiterListType& fulfilledRequests)
157 {
158  // add the resources back to the pool
159  std::for_each(resources.begin(), resources.end(), [&](const T& resource){ m_freeResources.push_back(resource); });
160  resources.clear();
161 
162  // lazy dynamic pool resizing
163  ShrinkResourcePool();
164 
165  // is anyone waiting for resources?
166  while(m_freeResources.size() > 0 && m_waiters.size() > 0)
167  {
168  auto firstWaiter = m_waiters.front();
169  auto initialResourceCount = firstWaiter->m_resourceList->size();
170 
171  // add resources to the oldest waiter
172  while(m_freeResources.size() > 0 && firstWaiter->m_resourceList->size() < firstWaiter->m_requestedCount)
173  {
174  firstWaiter->m_resourceList->push_back(m_freeResources.back());
175  m_freeResources.pop_back();
176  }
177 
178  if (IsRequestFulfilled(static_cast<uint32_t>(initialResourceCount), static_cast<uint32_t>(firstWaiter->m_resourceList->size()), firstWaiter->m_requestedCount))
179  {
180  // remove from queue and push it into a local array so that we can signal the notification when we're out of the overall resource pool lock
181  fulfilledRequests.push_back(firstWaiter);
182  m_waiters.pop_front();
183  }
184  else
185  {
186  break;
187  }
188  }
189 }
190 
191 template< typename T >
193 {
194  WaiterListType fulfilledRequests;
195 
196  { // begin resource lock
197  std::lock_guard<std::mutex> lock(m_resourcesMutex);
198 
199  ReleaseResourcesInternal(resources, fulfilledRequests);
200  } // end resource lock
201 
202  FulfillRequests(fulfilledRequests);
203 }
204 
205 // This was surprisingly unpleasant and made things quite a bit more complex; I mildly regret doing it
206 template< typename T >
208 {
209  // don't allow this for now because it breaks an invariant that I'd like to maintain (Acquire always returns with at least one resource)
210  if (resourceCount == 0)
211  {
212  return;
213  }
214 
215  std::unique_lock<std::mutex> lock(m_resourcesMutex);
216 
217  if(resourceCount > m_currentResourceCount)
218  {
219  // Grow request
220 
221  // add a corresponding amount of resources
222  for(uint32_t i = 0; i < resourceCount - m_currentResourceCount; ++i)
223  {
224  m_freeResources.push_back(m_resourceFactory());
225  }
226 
227  // if anyone's waiting, pass the resources on to them by calling a zero-length release
228  // This is what forced ReleaseResources to be split into an internal function that did not touch m_resourcesMutex, and a wrapper function that did
229  // If we unlocked our lock before calling the top-level ReleaseResources function, someone could sneak in and take what we just gave back; this would break fairness
230  if(m_waiters.size() > 0)
231  {
232  WaiterListType fulfilledRequests;
233  ResourceListType emptyResources;
234  ReleaseResourcesInternal(emptyResources, fulfilledRequests);
235 
236  lock.unlock();
237  FulfillRequests(fulfilledRequests);
238  return;
239  }
240  }
241  else if (resourceCount < m_currentResourceCount)
242  {
243  // Shrink request
244 
245  m_desiredResourceCount = resourceCount;
246 
247  // empty the resource pool of any excess; if the pool empties before we reach the desired amount, we'll remove the remaining items as users release resources in the future
248  ShrinkResourcePool();
249 
250  // clamp each request's desired resource count by the new maximum amount
251  std::for_each(m_waiters.begin(), m_waiters.end(), [&](const std::shared_ptr< WaitingResourceRequester > &waiter){ waiter->m_requestedCount = std::min(waiter->m_requestedCount, m_desiredResourceCount); });
252 
253  // If we're using the partially-fillable wait policy (AT_LEAST_ONE_AVAILABLE) then something worse can happen:
254  // If the overall pool size drops <= a request's existing resource allocation, we need to just force-return the request because it has become impossible to fulfill progressively
255  // Do this by partitioning the queue elements by whether or not they violate this property and then pop-and-notify all violators off the queue
256  // Use a stable partition to preserve fairness
257  std::stable_partition(m_waiters.begin(), m_waiters.end(), [&](const std::shared_ptr< WaitingResourceRequester > &waiter){ return waiter->m_resourceList->size() >= m_desiredResourceCount; });
258 
259  // pull off the impossible-to-fulfill violators
261  while(m_waiters.size() > 0 && m_waiters.front()->m_resourceList->size() >= m_desiredResourceCount)
262  {
263  forceWakes.push_back(m_waiters.front());
264  m_waiters.pop_front();
265  }
266 
267  lock.unlock();
268 
269  FulfillRequests(forceWakes);
270  }
271 }
272 
273 // Unprotected; assumes caller has locked m_resourcesMutex
274 template< typename T >
276 {
277  if(m_currentResourceCount <= m_desiredResourceCount)
278  {
279  return;
280  }
281 
282  size_t freeableSize = std::min( m_freeResources.size(), static_cast< size_t >(m_currentResourceCount - m_desiredResourceCount) );
283  if (freeableSize > 0)
284  {
285  m_freeResources.erase(m_freeResources.begin() + m_freeResources.size() - freeableSize, m_freeResources.end());
286  m_currentResourceCount -= static_cast<uint32_t>(freeableSize);
287  }
288 }
289 
290 template< typename T >
291 bool FairBoundedResourceManager< T >::IsRequestFulfilled(uint32_t initialResourceCount, uint32_t acquiredResourceCount, uint32_t desiredResourceCount)
292 {
293  switch(m_waitPolicy)
294  {
296  return desiredResourceCount == 0 || acquiredResourceCount > initialResourceCount;
297 
299  return acquiredResourceCount >= desiredResourceCount;
300 
301  default:
302  return true;
303  }
304 }
305 
306 template< typename T >
307 void FairBoundedResourceManager< T >::AcquireResourcesInternal(uint32_t resourceCount, ResourceListType& acquiredResources, bool returnInsteadOfWait)
308 {
309  std::unique_lock<std::mutex> lock(m_resourcesMutex);
310 
311  size_t acquiredSize = acquiredResources.size();
312  resourceCount = std::min(resourceCount, m_desiredResourceCount);
313  if (acquiredSize >= resourceCount)
314  {
315  return;
316  }
317 
318  bool shouldWait = m_waiters.size() > 0 ||
319  m_freeResources.size() == 0 ||
320  !IsRequestFulfilled(static_cast<uint32_t>(acquiredSize), static_cast<uint32_t>(acquiredSize + m_freeResources.size()), resourceCount);
321  if (shouldWait)
322  {
323  if (returnInsteadOfWait)
324  {
325  return;
326  }
327 
328  auto waitMemento = Aws::MakeShared< WaitingResourceRequester >(RESOURCE_MANAGER_ALLOCATION_TAG, resourceCount, acquiredResources);
329  m_waiters.push_back(waitMemento);
330 
331  waitMemento->m_readySignal.wait(lock, [&](){ return waitMemento->m_done.load(); } );
332 
333  return;
334  }
335 
336  size_t copyCount = std::min(m_freeResources.size(), static_cast<size_t>(resourceCount - acquiredSize));
337  size_t startIndex = m_freeResources.size() - copyCount;
338  auto rangeStart = m_freeResources.begin() + startIndex;
339  auto rangeEnd = m_freeResources.end();
340  std::copy(rangeStart, rangeEnd, std::back_inserter(acquiredResources));
341  m_freeResources.erase(rangeStart, rangeEnd);
342 }
343 
344 template< typename T >
345 void FairBoundedResourceManager< T >::FulfillRequests(const Aws::Vector< std::shared_ptr< WaitingResourceRequester > >& requests)
346 {
347  for(uint32_t i = 0; i < requests.size(); ++i)
348  {
349  requests[i]->m_done = true;
350  requests[i]->m_readySignal.notify_one();
351  }
352 }
353 
354 template< typename T >
356 {
357  std::lock_guard< std::mutex > lock(m_resourcesMutex);
358 
359  return m_waiters.size();
360 }
361 
362 } // namespace Transfer
363 } // namespace Aws
virtual void TryAcquireResources(uint32_t resourceCount, ResourceListType &acquiredResources) override
std::vector< T, Aws::Allocator< T > > Vector
Definition: AWSVector.h:27
virtual void AcquireResources(uint32_t resourceCount, ResourceListType &acquiredResources) override
static const char * RESOURCE_MANAGER_ALLOCATION_TAG
virtual void ReleaseResources(ResourceListType &resources) override
std::deque< T, Aws::Allocator< T > > Deque
Definition: AWSDeque.h:27
FairBoundedResourceManager(ResourceFactoryType resourceFactory, uint32_t resourceCount, ResourceWaitPolicy waitPolicy=ResourceWaitPolicy::ALL_AVAILABLE)
virtual void AdjustResourceCount(uint32_t m_resourceCount) override
JSON (JavaScript Object Notation).