8#include <aws/core/Core_EXPORTS.h>
9#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
22template <
typename CLOCK = std::chrono::steady_clock,
typename DUR = std::chrono::seconds,
bool RENORMALIZE_RATE_CHANGES = true>
34 : m_elapsedTimeFunction(elapsedTimeFunction),
38 m_accumulatorFraction(0),
39 m_accumulatorUpdated(),
40 m_replenishNumerator(0),
41 m_replenishDenominator(0),
43 m_delayDenominator(0) {
45 static_assert(DUR::period::num > 0,
"Rate duration must have positive numerator");
46 static_assert(DUR::period::den > 0,
"Rate duration must have positive denominator");
47 static_assert(CLOCK::duration::period::num > 0,
"RateLimiter clock duration must have positive numerator");
48 static_assert(CLOCK::duration::period::den > 0,
"RateLimiter clock duration must have positive denominator");
59 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
61 auto now = m_elapsedTimeFunction();
62 auto elapsedTime = (now - m_accumulatorUpdated).count();
65 if (m_replenishNumerator != 0 && elapsedTime > 0 &&
66 (elapsedTime > std::numeric_limits<int64_t>::max() / m_replenishNumerator ||
67 m_accumulatorFraction > std::numeric_limits<int64_t>::max() - (elapsedTime * m_replenishNumerator))) {
68 m_accumulator = m_maxRate;
69 m_accumulatorFraction = 0;
72 auto temp = elapsedTime * m_replenishNumerator + m_accumulatorFraction;
73 m_accumulator += temp / m_replenishDenominator;
74 m_accumulatorFraction = temp % m_replenishDenominator;
77 m_accumulator = (std::min)(m_accumulator, m_maxRate);
78 if (m_accumulator == m_maxRate) {
79 m_accumulatorFraction = 0;
85 if (m_accumulator < 0) {
86 delay =
DelayType(-m_accumulator * m_delayDenominator / m_delayNumerator);
90 m_accumulator -= cost;
91 m_accumulatorUpdated = now;
100 auto costInMilliseconds =
ApplyCost(cost);
101 if (costInMilliseconds.count() > 0) {
102 std::this_thread::sleep_for(costInMilliseconds);
109 virtual void SetRate(int64_t rate,
bool resetAccumulator =
false)
override {
110 std::lock_guard<std::recursive_mutex> lock(m_accumulatorLock);
113 rate = (std::max)(
static_cast<int64_t
>(1), rate);
115 if (resetAccumulator) {
116 m_accumulator = rate;
117 m_accumulatorFraction = 0;
118 m_accumulatorUpdated = m_elapsedTimeFunction();
123 if (ShouldRenormalizeAccumulatorOnRateChange()) {
130 m_accumulator = m_accumulator * rate / m_maxRate;
131 m_accumulatorFraction = m_accumulatorFraction * rate / m_maxRate;
139 m_replenishNumerator = m_maxRate * DUR::period::den * CLOCK::duration::period::num;
140 m_replenishDenominator = DUR::period::num * CLOCK::duration::period::den;
141 auto gcd = ComputeGCD(m_replenishNumerator, m_replenishDenominator);
142 m_replenishNumerator /= gcd;
143 m_replenishDenominator /= gcd;
147 m_delayNumerator = m_maxRate * DelayType::period::num * DUR::period::den;
148 m_delayDenominator = DelayType::period::den * DUR::period::num;
149 gcd = ComputeGCD(m_delayNumerator, m_delayDenominator);
150 m_delayNumerator /= gcd;
151 m_delayDenominator /= gcd;
155 int64_t ComputeGCD(int64_t num1, int64_t num2)
const {
158 int64_t rem = num1 % num2;
166 bool ShouldRenormalizeAccumulatorOnRateChange()
const {
return RENORMALIZE_RATE_CHANGES; }
175 std::recursive_mutex m_accumulatorLock;
179 int64_t m_accumulator;
183 int64_t m_accumulatorFraction;
189 int64_t m_replenishNumerator;
190 int64_t m_replenishDenominator;
191 int64_t m_delayNumerator;
192 int64_t m_delayDenominator;
virtual void SetRate(int64_t rate, bool resetAccumulator=false) override
virtual ~DefaultRateLimiter()=default
virtual DelayType ApplyCost(int64_t cost) override
virtual void ApplyAndPayForCost(int64_t cost) override
DefaultRateLimiter(int64_t maxRate, ElapsedTimeFunctionType elapsedTimeFunction=CLOCK::now)
std::function< InternalTimePointType()> ElapsedTimeFunctionType
std::chrono::time_point< CLOCK > InternalTimePointType
std::chrono::milliseconds DelayType