25 #include <condition_variable> 34 class WaitingBufferRequester;
53 template<
typename T >
64 virtual void AcquireResources(uint32_t resourceCount,
ResourceListType& acquiredResources)
override { AcquireResourcesInternal(resourceCount, acquiredResources,
false); }
69 virtual void AdjustResourceCount(uint32_t m_resourceCount)
override;
72 size_t GetWaiterCount();
76 class WaitingResourceRequester
79 WaitingResourceRequester(uint32_t resourceCount,
ResourceListType& acquiredResources) :
80 m_resourceList(&acquiredResources),
81 m_requestedCount(resourceCount),
87 uint32_t m_requestedCount;
88 std::condition_variable m_readySignal;
89 std::atomic<bool> m_done;
95 void ShrinkResourcePool();
97 bool IsRequestFulfilled(uint32_t initialResourceCount, uint32_t acquiredResourceCount, uint32_t desiredResourceCount);
99 void AcquireResourcesInternal(uint32_t resourceCount,
ResourceListType& acquiredResources,
bool returnInsteadOfWait);
100 void ReleaseResourcesInternal(
ResourceListType& resources, WaiterListType& fulfilledRequests);
102 void FulfillRequests(
const WaiterListType& requests);
108 uint32_t m_desiredResourceCount;
109 uint32_t m_currentResourceCount;
113 std::mutex m_resourcesMutex;
114 WaiterQueueType m_waiters;
120 template<
typename T >
122 m_resourceFactory(resourceFactory),
123 m_waitPolicy(waitPolicy),
124 m_desiredResourceCount(0),
125 m_currentResourceCount(0),
132 uint32_t modifiedResourceCount = std::max(resourceCount, 1U);
133 m_desiredResourceCount = modifiedResourceCount;
134 m_currentResourceCount = modifiedResourceCount;
136 std::lock_guard<std::mutex> lock(m_resourcesMutex);
138 for(uint32_t i = 0; i < m_desiredResourceCount; ++i)
140 m_freeResources.push_back(m_resourceFactory());
144 template<
typename T >
147 std::lock_guard<std::mutex> lock(m_resourcesMutex);
149 m_freeResources.clear();
150 m_currentResourceCount = 0;
151 m_desiredResourceCount = 0;
155 template<
typename T >
159 std::for_each(resources.begin(), resources.end(), [&](
const T& resource){ m_freeResources.push_back(resource); });
163 ShrinkResourcePool();
166 while(m_freeResources.size() > 0 && m_waiters.size() > 0)
168 auto firstWaiter = m_waiters.front();
169 auto initialResourceCount = firstWaiter->m_resourceList->size();
172 while(m_freeResources.size() > 0 && firstWaiter->m_resourceList->size() < firstWaiter->m_requestedCount)
174 firstWaiter->m_resourceList->push_back(m_freeResources.back());
175 m_freeResources.pop_back();
178 if (IsRequestFulfilled(static_cast<uint32_t>(initialResourceCount), static_cast<uint32_t>(firstWaiter->m_resourceList->size()), firstWaiter->m_requestedCount))
181 fulfilledRequests.push_back(firstWaiter);
182 m_waiters.pop_front();
191 template<
typename T >
194 WaiterListType fulfilledRequests;
197 std::lock_guard<std::mutex> lock(m_resourcesMutex);
199 ReleaseResourcesInternal(resources, fulfilledRequests);
202 FulfillRequests(fulfilledRequests);
206 template<
typename T >
210 if (resourceCount == 0)
215 std::unique_lock<std::mutex> lock(m_resourcesMutex);
217 if(resourceCount > m_currentResourceCount)
222 for(uint32_t i = 0; i < resourceCount - m_currentResourceCount; ++i)
224 m_freeResources.push_back(m_resourceFactory());
230 if(m_waiters.size() > 0)
232 WaiterListType fulfilledRequests;
234 ReleaseResourcesInternal(emptyResources, fulfilledRequests);
237 FulfillRequests(fulfilledRequests);
241 else if (resourceCount < m_currentResourceCount)
245 m_desiredResourceCount = resourceCount;
248 ShrinkResourcePool();
251 std::for_each(m_waiters.begin(), m_waiters.end(), [&](
const std::shared_ptr< WaitingResourceRequester > &waiter){ waiter->m_requestedCount = std::min(waiter->m_requestedCount, m_desiredResourceCount); });
257 std::stable_partition(m_waiters.begin(), m_waiters.end(), [&](
const std::shared_ptr< WaitingResourceRequester > &waiter){
return waiter->m_resourceList->size() >= m_desiredResourceCount; });
261 while(m_waiters.size() > 0 && m_waiters.front()->m_resourceList->size() >= m_desiredResourceCount)
263 forceWakes.push_back(m_waiters.front());
264 m_waiters.pop_front();
269 FulfillRequests(forceWakes);
274 template<
typename T >
277 if(m_currentResourceCount <= m_desiredResourceCount)
282 size_t freeableSize = std::min( m_freeResources.size(),
static_cast< size_t >(m_currentResourceCount - m_desiredResourceCount) );
283 if (freeableSize > 0)
285 m_freeResources.erase(m_freeResources.begin() + m_freeResources.size() - freeableSize, m_freeResources.end());
286 m_currentResourceCount -=
static_cast<uint32_t
>(freeableSize);
290 template<
typename T >
296 return desiredResourceCount == 0 || acquiredResourceCount > initialResourceCount;
299 return acquiredResourceCount >= desiredResourceCount;
306 template<
typename T >
309 std::unique_lock<std::mutex> lock(m_resourcesMutex);
311 size_t acquiredSize = acquiredResources.size();
312 resourceCount = std::min(resourceCount, m_desiredResourceCount);
313 if (acquiredSize >= resourceCount)
318 bool shouldWait = m_waiters.size() > 0 ||
319 m_freeResources.size() == 0 ||
320 !IsRequestFulfilled(static_cast<uint32_t>(acquiredSize), static_cast<uint32_t>(acquiredSize + m_freeResources.size()), resourceCount);
323 if (returnInsteadOfWait)
329 m_waiters.push_back(waitMemento);
331 waitMemento->m_readySignal.wait(lock, [&](){
return waitMemento->m_done.load(); } );
336 size_t copyCount = std::min(m_freeResources.size(),
static_cast<size_t>(resourceCount - acquiredSize));
337 size_t startIndex = m_freeResources.size() - copyCount;
338 auto rangeStart = m_freeResources.begin() + startIndex;
339 auto rangeEnd = m_freeResources.end();
340 std::copy(rangeStart, rangeEnd, std::back_inserter(acquiredResources));
341 m_freeResources.erase(rangeStart, rangeEnd);
344 template<
typename T >
347 for(uint32_t i = 0; i < requests.size(); ++i)
349 requests[i]->m_done =
true;
350 requests[i]->m_readySignal.notify_one();
354 template<
typename T >
357 std::lock_guard< std::mutex > lock(m_resourcesMutex);
359 return m_waiters.size();
virtual void TryAcquireResources(uint32_t resourceCount, ResourceListType &acquiredResources) override
virtual ~FairBoundedResourceManager()
std::function< T(void) > ResourceFactoryType
std::vector< T, Aws::Allocator< T > > Vector
virtual void AcquireResources(uint32_t resourceCount, ResourceListType &acquiredResources) override
static const char * RESOURCE_MANAGER_ALLOCATION_TAG
virtual void ReleaseResources(ResourceListType &resources) override
std::deque< T, Aws::Allocator< T > > Deque
Aws::Vector< T > ResourceListType
FairBoundedResourceManager(ResourceFactoryType resourceFactory, uint32_t resourceCount, ResourceWaitPolicy waitPolicy=ResourceWaitPolicy::ALL_AVAILABLE)
virtual void AdjustResourceCount(uint32_t m_resourceCount) override
JSON (JavaScript Object Notation).