AWS SDK for C++  0.14.3
AWS SDK for C++
DefaultRateLimiter.h
Go to the documentation of this file.
1 /*
2  * Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <aws/core/Core_EXPORTS.h>
19 
22 
23 #include <algorithm>
24 #include <mutex>
25 #include <thread>
26 
27 namespace Aws
28 {
29  namespace Utils
30  {
31  namespace RateLimits
32  {
36  template<typename CLOCK = std::chrono::high_resolution_clock, typename DUR = std::chrono::seconds, bool RENORMALIZE_RATE_CHANGES = true>
38  {
39  public:
41 
42  using InternalTimePointType = std::chrono::time_point<CLOCK>;
43  using ElapsedTimeFunctionType = std::function< InternalTimePointType() >;
44 
48  DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction = AWS_BUILD_FUNCTION(CLOCK::now)) :
49  m_elapsedTimeFunction(elapsedTimeFunction),
50  m_maxRate(0),
51  m_accumulatorLock(),
52  m_accumulator(0),
53  m_accumulatorFraction(0),
54  m_accumulatorUpdated(),
55  m_replenishNumerator(0),
56  m_replenishDenominator(0),
57  m_delayNumerator(0),
58  m_delayDenominator(0)
59  {
60  // verify we're not going to divide by zero due to goofy type parameterization
61  static_assert(DUR::period::num > 0, "Rate duration must have positive numerator");
62  static_assert(DUR::period::den > 0, "Rate duration must have positive denominator");
63  static_assert(CLOCK::duration::period::num > 0, "RateLimiter clock duration must have positive numerator");
64  static_assert(CLOCK::duration::period::den > 0, "RateLimiter clock duration must have positive denominator");
65 
66  SetRate(maxRate, true);
67  }
68 
69  virtual ~DefaultRateLimiter() = default;
70 
74  virtual DelayType ApplyCost(int64_t cost) override
75  {
76  std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
77 
78  auto now = m_elapsedTimeFunction();
79  auto elapsedTime = (now - m_accumulatorUpdated).count();
80 
81  // replenish the accumulator based on how much time has passed
82  auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction;
83  m_accumulator += temp / m_replenishDenominator;
84  m_accumulatorFraction = temp % m_replenishDenominator;
85 
86  // the accumulator is capped based on the maximum rate
87  m_accumulator = std::min(m_accumulator, m_maxRate);
88  if (m_accumulator == m_maxRate)
89  {
90  m_accumulatorFraction = 0;
91  }
92 
93  // if the accumulator is still negative, then we'll have to wait
94  DelayType delay(0);
95  if (m_accumulator < 0)
96  {
97  delay = DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator);
98  }
99 
100  // apply the cost to the accumulator after the delay has been calculated; the next call will end up paying for our cost
101  m_accumulator -= cost;
102  m_accumulatorUpdated = now;
103 
104  return delay;
105  }
106 
110  virtual void ApplyAndPayForCost(int64_t cost) override
111  {
112  std::this_thread::sleep_for(ApplyCost(cost));
113  }
114 
118  virtual void SetRate(int64_t rate, bool resetAccumulator = false) override
119  {
120  std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
121 
122  // rate must always be positive
123  rate = std::max(static_cast<int64_t>(1), rate);
124 
125  if (resetAccumulator)
126  {
127  m_accumulator = rate;
128  m_accumulatorFraction = 0;
129  m_accumulatorUpdated = m_elapsedTimeFunction();
130  }
131  else
132  {
133  // sync the accumulator to current time
134  ApplyCost(0); // this call is why we need a recursive mutex
135 
136  if (ShouldRenormalizeAccumulatorOnRateChange())
137  {
138  // now renormalize the accumulator and its fractional part against the new rate
139  // the idea here is we want to preserve the desired wait based on the previous rate
140  //
141  // As an example:
142  // Say we had a rate of 100/s and our accumulator was -500 (ie the next ApplyCost would incur a 5 second delay)
143  // If we change the rate to 1000/s and want to preserve that delay, we need to scale the accumulator to -5000
144  m_accumulator = m_accumulator * rate / m_maxRate;
145  m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate;
146  }
147  }
148 
149  m_maxRate = rate;
150 
151  // Helper constants that represent the amount replenished per CLOCK time period; use the gcd to reduce them in order to try and minimize the chance of integer overflow
152  m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num;
153  m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den;
154  auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator);
155  m_replenishNumerator /= gcd;
156  m_replenishDenominator /= gcd;
157 
158  // Helper constants that represent the delay per unit of costAccumulator; use the gcd to reduce them in order to try and minimize the chance of integer overflow
159  m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den;
160  m_delayDenominator = DelayType::period::den * DUR::period::num;
161  gcd = ComputeGCD(m_delayNumerator, m_delayDenominator);
162  m_delayNumerator /= gcd;
163  m_delayDenominator /= gcd;
164  }
165 
166  private:
167 
168  int64_t ComputeGCD(int64_t num1, int64_t num2) const
169  {
170  // Euclid's
171  while (num2 != 0)
172  {
173  int64_t rem = num1 % num2;
174  num1 = num2;
175  num2 = rem;
176  }
177 
178  return num1;
179  }
180 
181  bool ShouldRenormalizeAccumulatorOnRateChange() const { return RENORMALIZE_RATE_CHANGES; }
182 
184  ElapsedTimeFunctionType m_elapsedTimeFunction;
185 
187  int64_t m_maxRate;
188 
190  std::recursive_mutex m_accumulatorLock;
191 
194  int64_t m_accumulator;
195 
197  int64_t m_accumulatorFraction;
198 
200  InternalTimePointType m_accumulatorUpdated;
201 
203  int64_t m_replenishNumerator;
204  int64_t m_replenishDenominator;
205  int64_t m_delayNumerator;
206  int64_t m_delayDenominator;
207  };
208 
209  } // namespace RateLimits
210  } // namespace Utils
211 } // namespace Aws
virtual void SetRate(int64_t rate, bool resetAccumulator=false) override
DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction=AWS_BUILD_FUNCTION(CLOCK::now))
virtual void ApplyAndPayForCost(int64_t cost) override
std::function< InternalTimePointType() > ElapsedTimeFunctionType
#define AWS_BUILD_FUNCTION(func)
Definition: AWSFunction.h:79
virtual DelayType ApplyCost(int64_t cost) override
std::chrono::time_point< CLOCK > InternalTimePointType
JSON (JavaScript Object Notation).