Leaky bucket

We saw how to build a rate limiter using ticker in the previous chapters: by using time.Ticker to force a client to await its turn in order to get served. There is another take on rate limiting of services and libraries that's known as the leaky bucket. The name evokes an image of a bucket with a few holes in it. If you are filling it, you have to be careful to not put too much water into the bucket, otherwise it's going to overflow. Before adding more water, you need to wait for the level to drop – the speed at which this happens will depend on the size of the bucket and the number of the holes it has. We can easily understand what this concurrency pattern does by taking a look at the following analogy:

  •  The water going through the holes represents requests that have been completed.
  • The water that's overflowing from the bucket represents the requests that have been discarded.

The bucket will be defined by two attributes:

  • Rate: The ideal amount of requests per time if the frequency of requests is lower.
  • Capacity: The number of requests that can be done at the same time before the resource turns unresponsive temporarily.

The bucket has a maximum capacity, so when requests are made with a frequency higher than the rate specified, this capacity starts dropping, just like when you're putting too much water in and the bucket starts to overflow. If the frequency is zero or lower than the rate, the bucket will slowly gain its capacity, and so the water will be slowly drained.

The data structure of the leaky bucket will have a capacity and a counter for the requests that are available. This counter will be the same as the capacity on creation, and will drop each time requests are executed. The rate specifies how often the status needs to be reset to the capacity:

type bucket struct {
capacity uint64
status uint64
}

When creating a new bucket, we should also take care of the status reset. We can use a goroutine for this and use a context to terminate it correctly. We can create a ticker using the rate and then use these ticks to reset the status. We need to use the atomic package to ensure it is thread-safe:

func newBucket(ctx context.Context, cap uint64, rate time.Duration) *bucket {
b := bucket{capacity: cap, status: cap}
go func() {
t := time.NewTicker(rate)
for {
select {
case <-t.C:
atomic.StoreUint64(&b.status, b.capacity)
case <-ctx.Done():
t.Stop()
return
}
}
}()
return &b
}

When we're adding to the bucket, we can check the status and act accordingly:

  • If the status is 0, we cannot add anything.
  • If the amount to add is higher than the availability, we add what we can.
  • We add the full amount otherwise:
func (b *bucket) Add(n uint64) uint64 {
for {
r := atomic.LoadUint64(&b.status)
if r == 0 {
return 0
}
if n > r {
n = r
}
if !atomic.CompareAndSwapUint64(&b.status, r, r-n) {
continue
}
return n
}
}

We are using a loop to try atomic swap operations until they succeed to ensure that what we get with the Load operation doesn't change when we are doing a compare and swap (CAS).

The bucket can be used in a client that will try to add a random amount to the bucket and will log its result:

type client struct {
name string
max int
b *bucket
sleep time.Duration
}

func (c client) Run(ctx context.Context, start time.Time) {
for {
select {
case <-ctx.Done():
return
default:
n := 1 + rand.Intn(c.max-1)
time.Sleep(c.sleep)
e := time.Since(start).Seconds()
a := c.b.Add(uint64(n))
log.Printf("%s tries to take %d after %.02fs, takes
%d", c.name, n, e, a)
}
}
}

We can use more clients concurrently so that having concurrent access to resources will have the following result:

  • Some goroutines will be adding what they expect to the bucket.
  • One goroutine will finally fill the bucket by adding a quantity that is equal to the remaining capacity, even if the amount that they are trying to add is higher.
  • The other goroutines will not be able to add to the bucket until the capacity is reset:
func main() {
ctx, canc := context.WithTimeout(context.Background(), time.Second)
defer canc()
start := time.Now()
b := newBucket(ctx, 10, time.Second/5)
t := time.Second / 10
for i := 0; i < 5; i++ {
c := client{
name: fmt.Sprint(i),
b: b,
sleep: t,
max: 5,
}
go c.Run(ctx, start)
}
<-ctx.Done()
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.144.194