Combining Semaphore And Time Limiting In Python-trio With Asks Http Request
Solution 1:
One of the ways to achieve your goal would be using a mutex acquired by a worker before sending a request and released in a separate task after some interval:
asyncdeffetch_urls(urls: Iterator, responses, n_workers, throttle):
# Using binary `trio.Semaphore` to be able# to release it from a separate task.
mutex = trio.Semaphore(1)
asyncdeftick():
await trio.sleep(throttle)
mutex.release()
asyncdefworker():
for url in urls:
await mutex.acquire()
nursery.start_soon(tick)
response = await asks.get(url)
responses.append(response)
asyncwith trio.open_nursery() as nursery:
for _ inrange(n_workers):
nursery.start_soon(worker)
If a worker
gets response sooner than after throttle
seconds, it will block on await mutex.acquire()
. Otherwise the mutex
will be released by the tick
and another worker
will be able to acquire it.
This is similar to how leaky bucket algorithm works:
- Workers waiting for the
mutex
are like water in a bucket. - Each
tick
is like a bucket leaking at a constant rate.
If you add a bit of logging just before sending a request you should get an output similar to this:
0.00169 started
0.001821 n_workers: 5
0.001833 throttle: 1
0.002152 fetching https://httpbin.org/delay/4
1.012 fetching https://httpbin.org/delay/2
2.014 fetching https://httpbin.org/delay/2
3.017 fetching https://httpbin.org/delay/3
4.02 fetching https://httpbin.org/delay/0
5.022 fetching https://httpbin.org/delay/2
6.024 fetching https://httpbin.org/delay/2
7.026 fetching https://httpbin.org/delay/3
8.029 fetching https://httpbin.org/delay/0
9.031 fetching https://httpbin.org/delay/0
10.61 finished
Solution 2:
Using trio.current_time()
for this is much too complicated IMHO.
The easiest way to do rate limiting is a rate limiter, i.e. a separate task that basically does this:
asyncdefratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as scope:
task_status.started(scope)
whileTrue:
await queue.get()
await trio.sleep(tick)
Example use:
asyncwith trio.open_nursery() as nursery:
q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
in other words, you start that task with
q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
and then you can be sure that at most one call of
await q.put(None)
per second will return, as the zero-length queue acts as a rendezvous point. When you're done, call
limiter.cancel()
to stop the rate limiting task, otherwise your nursery won't exit.
If your use case includes starting sub-tasks which you need to finish before the limiter gets cancelled, the easiest way to do that is to rin them in another nursery, i.e. instead of
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
you'd use something like
asyncwith trio.open_nursery() as inner_nursery:
await start_tasks(inner_nursery, q)
limiter.cancel()
which would wait for the tasks to finish before touching the limiter.
NB: You can easily adapt this for "burst" mode, i.e. allow a certain number of requests before the rate limiting kicks in, by simply increasing the queue's length.
Solution 3:
Motivation and origin of this solution
Some months have passed since I asked this question. Python has improved since then, so has trio (and my knowledge of them). So I thought it was time for a little update using Python 3.6 with type annotations and trio-0.10 memory channels.
I developed my own improvement of the original version, but after reading @Roman Novatorov's great solution, adapted it again and this is the result. Kudos to him for the main structure of the function (and the idea to use httpbin.org for illustration purposes). I chose to use memory channels instead of a mutex to be able to take out any token re-release logic out of the worker.
Explanation of solution
I can rephrase the original problem like this:
- I want to have a number of workers that start the request independently of each other (thus, they will be realized as asynchronous functions).
- There is zero or one token released at any point; any worker starting a request to the server consumes a token, and the next token will not be issued until a minimum time has passed. In my solution, I use trio's memory channels to coordinate between the token issuer and the token consumers (workers)
In case your not familiar with memory channels and their syntax, you can read about them in the trio doc. I think the logic of async with memory_channel
and memory_channel.clone()
can be confusing in the first moment.
from typing importList, Iterator
import asks
import trio
asks.init('trio')
links: List[str] = [
'https://httpbin.org/delay/7',
'https://httpbin.org/delay/6',
'https://httpbin.org/delay/4'
] * 3asyncdeffetch_urls(urls: List[str], number_workers: int, throttle_rate: float):
asyncdeftoken_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):
asyncwith token_sender:
for _ inrange(number_tokens):
await token_sender.send(None)
await trio.sleep(1 / throttle_rate)
asyncdefworker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):
asyncwith token_receiver:
for url in url_iterator:
await token_receiver.receive()
print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')
response = await asks.get(url)
# print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')
responses.append(response)
responses = []
url_iterator = iter(urls)
token_send_channel, token_receive_channel = trio.open_memory_channel(0)
asyncwith trio.open_nursery() as nursery:
asyncwith token_receive_channel:
nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))
for _ inrange(number_workers):
nursery.start_soon(worker, url_iterator, token_receive_channel.clone())
return responses
responses = trio.run(fetch_urls, links, 5, 1.)
Example of logging output:
As you see, the minimum time between all page requests is one second:
[177878.99] Start loading link: https://httpbin.org/delay/7
[177879.99] Start loading link: https://httpbin.org/delay/6
[177880.99] Start loading link: https://httpbin.org/delay/4
[177881.99] Start loading link: https://httpbin.org/delay/7
[177882.99] Start loading link: https://httpbin.org/delay/6
[177886.20] Start loading link: https://httpbin.org/delay/4
[177887.20] Start loading link: https://httpbin.org/delay/7
[177888.20] Start loading link: https://httpbin.org/delay/6
[177889.44] Start loading link: https://httpbin.org/delay/4
Comments on the solution
As not untypical for asynchronous code, this solution does not maintain the original order of the requested urls. One way to solve this is to associate an id to the original url, e. g. with a tuple structure, put the responses into a response dictionary and later grab the responses one after the other to put them into a response list (saves sorting and has linear complexity).
Solution 4:
You need to increment next_request_at
by 1 every time you come into async_load_page
. Try using next_request_at = max(trio.current_time() + 1, next_request_at + 1)
. Also I think you only need to set it once. You may get into trouble if you're setting it around awaits, where you're giving the opportunity for other tasks to change it before examining it again.
Post a Comment for "Combining Semaphore And Time Limiting In Python-trio With Asks Http Request"