AWS SDK for C++  1.9.129
AWS SDK for C++
DefaultRateLimiter.h
Go to the documentation of this file.
1 
6 #pragma once
7 
9 
11 
12 #include <algorithm>
13 #include <mutex>
14 #include <thread>
15 #include <functional>
16 
17 namespace Aws
18 {
19  namespace Utils
20  {
21  namespace RateLimits
22  {
26  template<typename CLOCK = std::chrono::high_resolution_clock, typename DUR = std::chrono::seconds, bool RENORMALIZE_RATE_CHANGES = true>
28  {
29  public:
31 
32  using InternalTimePointType = std::chrono::time_point<CLOCK>;
33  using ElapsedTimeFunctionType = std::function< InternalTimePointType() >;
34 
38  DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction = CLOCK::now) :
39  m_elapsedTimeFunction(elapsedTimeFunction),
40  m_maxRate(0),
41  m_accumulatorLock(),
42  m_accumulator(0),
43  m_accumulatorFraction(0),
44  m_accumulatorUpdated(),
45  m_replenishNumerator(0),
46  m_replenishDenominator(0),
47  m_delayNumerator(0),
48  m_delayDenominator(0)
49  {
50  // verify we're not going to divide by zero due to goofy type parameterization
51  static_assert(DUR::period::num > 0, "Rate duration must have positive numerator");
52  static_assert(DUR::period::den > 0, "Rate duration must have positive denominator");
53  static_assert(CLOCK::duration::period::num > 0, "RateLimiter clock duration must have positive numerator");
54  static_assert(CLOCK::duration::period::den > 0, "RateLimiter clock duration must have positive denominator");
55 
56  DefaultRateLimiter::SetRate(maxRate, true);
57  }
58 
59  virtual ~DefaultRateLimiter() = default;
60 
64  virtual DelayType ApplyCost(int64_t cost) override
65  {
66  std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
67 
68  auto now = m_elapsedTimeFunction();
69  auto elapsedTime = (now - m_accumulatorUpdated).count();
70 
71  // replenish the accumulator based on how much time has passed
72  auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction;
73  m_accumulator += temp / m_replenishDenominator;
74  m_accumulatorFraction = temp % m_replenishDenominator;
75 
76  // the accumulator is capped based on the maximum rate
77  m_accumulator = (std::min)(m_accumulator, m_maxRate);
78  if (m_accumulator == m_maxRate)
79  {
80  m_accumulatorFraction = 0;
81  }
82 
83  // if the accumulator is still negative, then we'll have to wait
84  DelayType delay(0);
85  if (m_accumulator < 0)
86  {
87  delay = DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator);
88  }
89 
90  // apply the cost to the accumulator after the delay has been calculated; the next call will end up paying for our cost
91  m_accumulator -= cost;
92  m_accumulatorUpdated = now;
93 
94  return delay;
95  }
96 
100  virtual void ApplyAndPayForCost(int64_t cost) override
101  {
102  auto costInMilliseconds = ApplyCost(cost);
103  if(costInMilliseconds.count() > 0)
104  {
105  std::this_thread::sleep_for(costInMilliseconds);
106  }
107  }
108 
112  virtual void SetRate(int64_t rate, bool resetAccumulator = false) override
113  {
114  std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
115 
116  // rate must always be positive
117  rate = (std::max)(static_cast<int64_t>(1), rate);
118 
119  if (resetAccumulator)
120  {
121  m_accumulator = rate;
122  m_accumulatorFraction = 0;
123  m_accumulatorUpdated = m_elapsedTimeFunction();
124  }
125  else
126  {
127  // sync the accumulator to current time
128  ApplyCost(0); // this call is why we need a recursive mutex
129 
130  if (ShouldRenormalizeAccumulatorOnRateChange())
131  {
132  // now renormalize the accumulator and its fractional part against the new rate
133  // the idea here is we want to preserve the desired wait based on the previous rate
134  //
135  // As an example:
136  // Say we had a rate of 100/s and our accumulator was -500 (ie the next ApplyCost would incur a 5 second delay)
137  // If we change the rate to 1000/s and want to preserve that delay, we need to scale the accumulator to -5000
138  m_accumulator = m_accumulator * rate / m_maxRate;
139  m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate;
140  }
141  }
142 
143  m_maxRate = rate;
144 
145  // Helper constants that represent the amount replenished per CLOCK time period; use the gcd to reduce them in order to try and minimize the chance of integer overflow
146  m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num;
147  m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den;
148  auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator);
149  m_replenishNumerator /= gcd;
150  m_replenishDenominator /= gcd;
151 
152  // Helper constants that represent the delay per unit of costAccumulator; use the gcd to reduce them in order to try and minimize the chance of integer overflow
153  m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den;
154  m_delayDenominator = DelayType::period::den * DUR::period::num;
155  gcd = ComputeGCD(m_delayNumerator, m_delayDenominator);
156  m_delayNumerator /= gcd;
157  m_delayDenominator /= gcd;
158  }
159 
160  private:
161 
162  int64_t ComputeGCD(int64_t num1, int64_t num2) const
163  {
164  // Euclid's
165  while (num2 != 0)
166  {
167  int64_t rem = num1 % num2;
168  num1 = num2;
169  num2 = rem;
170  }
171 
172  return num1;
173  }
174 
175  bool ShouldRenormalizeAccumulatorOnRateChange() const { return RENORMALIZE_RATE_CHANGES; }
176 
178  ElapsedTimeFunctionType m_elapsedTimeFunction;
179 
181  int64_t m_maxRate;
182 
184  std::recursive_mutex m_accumulatorLock;
185 
188  int64_t m_accumulator;
189 
191  int64_t m_accumulatorFraction;
192 
194  InternalTimePointType m_accumulatorUpdated;
195 
197  int64_t m_replenishNumerator;
198  int64_t m_replenishDenominator;
199  int64_t m_delayNumerator;
200  int64_t m_delayDenominator;
201  };
202 
203  } // namespace RateLimits
204  } // namespace Utils
205 } // namespace Aws
Aws::Utils::RateLimits::DefaultRateLimiter
Definition: DefaultRateLimiter.h:28
Aws::Utils::RateLimits::DefaultRateLimiter::InternalTimePointType
std::chrono::time_point< CLOCK > InternalTimePointType
Definition: DefaultRateLimiter.h:32
Aws::Utils::RateLimits::DefaultRateLimiter::ApplyAndPayForCost
virtual void ApplyAndPayForCost(int64_t cost) override
Definition: DefaultRateLimiter.h:100
RateLimiterInterface.h
Aws::Utils::RateLimits::DefaultRateLimiter::~DefaultRateLimiter
virtual ~DefaultRateLimiter()=default
Aws::Utils::RateLimits::DefaultRateLimiter::ElapsedTimeFunctionType
std::function< InternalTimePointType() > ElapsedTimeFunctionType
Definition: DefaultRateLimiter.h:33
Aws::Utils::RateLimits::DefaultRateLimiter::ApplyCost
virtual DelayType ApplyCost(int64_t cost) override
Definition: DefaultRateLimiter.h:64
Aws::Utils::RateLimits::DefaultRateLimiter::DefaultRateLimiter
DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction=CLOCK::now)
Definition: DefaultRateLimiter.h:38
Aws::Utils::RateLimits::DefaultRateLimiter::SetRate
virtual void SetRate(int64_t rate, bool resetAccumulator=false) override
Definition: DefaultRateLimiter.h:112
Core_EXPORTS.h
Aws::Utils::RateLimits::RateLimiterInterface::DelayType
std::chrono::milliseconds DelayType
Definition: RateLimiterInterface.h:26
Aws
Definition: AccessManagementClient.h:15
Aws::Utils::RateLimits::RateLimiterInterface
Definition: RateLimiterInterface.h:24
count
int count
Definition: cJSON.h:228