Concurrent HTTP requests without opening too many connections

Back-end

Concurrent HTTP requests without opening too many connections

Hannes Van De Vreken

Hannes Van De Vreken

I’ve been performing a talk on PSR-7 at PHP meetups and conferences. In that talk I demo the use of Guzzle to perform concurrent HTTP requests.

Now what happens if you need to perform a large number of concurrent requests? If you don’t control the number of requests you might end up with a dangerously large amount of open TCP sockets to a server. You’ll need to use some sort of dispatching to have a limited number of concurrent requests at any given time.

Let’s show you how to do that.

Guzzle uses Promises to handle async requests.


$client = new GuzzleHttpClient();

$promise = $client->requestAsync('GET', 'https://api.github.com/users/hannesvdvreken');

$promise->then(function (ResponseInterface $response) {
    $profile = json_decode($response->getBody(), true);
    // Do something with the profile.
});

$promise->wait();

If you make multiple requests at the same time:


$promises = [];

foreach ($usernames as $username) {
    $promises[] = $client->requestAsync('GET', 'https://api.github.com/users/'.$username);
}

GuzzleHttpPromiseall($promises)->then(function (array $responses) {
    foreach ($responses as $response) {
         $profile = json_decode($response->getBody(), true);
         // Do something with the profile.
    }
})->wait();

You can also use GuzzleHttpPromiseeach($promises) instead of all. The benefit of all is that the responses are returned in the same order as the request promises are added to the promises array.

Now we are making all requests at the same time, no matter how big the $usernames array is. First you must note that the all and each methods take a Traversable of Promises. So we’ll create a generator method which will only start the async request when the promise is grabbed. For that we will use the yield keyword and an anonymous function for brevity.

<?php
$promises = (function () use ($username) {
    foreach ($usernames as $username) {
        yield $client->requestAsync('GET', 'https://api.github.com/users/'.$username);
    }
})(); // Self-invoking anonymous function (PHP 7 only), use call_user_func on older PHP versions.

GuzzleHttpPromiseall($promises)->then(function (array $responses) {
    foreach ($responses as $response) {
         $profile = json_decode($response->getBody(), true);
         // Do something with the profile.
    }
})->wait();

Next, to protect ourselves from starting all requests at the same time we will use the EachPromise, which is also used in the background by the each and all methods in the GuzzleHttpPromise namespace.

<?php
$promises = (function () use ($username) {
    foreach ($usernames as $username) {
        yield $client->requestAsync('GET', 'https://api.github.com/users/'.$username);
    }
})();

new EachPromise($promises, [
    'concurrency' => 4,
    'fulfilled' => function (ResponseInterface $responses) {
         $profile = json_decode($response->getBody(), true);
         // Do something with the profile.
    },
])->promise()->wait();

The EachPromise takes a couple of options: 'concurrency', 'fulfilled' and 'rejected'. When the concurrency option is passed it will not execute more that the given number of promises at the same time. When a fulfilled callback is given, that callback will be passed to each promises’ then method.

This is what happens when you profile the requests made with Guzzle (for all madewithlove employees) to the PHP Debugbar’s timeline tab:

profiling requests

Bonus:

What if you add caching? The responses could be cached to limit the total data sent over HTTP. Let’s take a look at how the generator method would look like. Note: we’re using PSR-6 interfaced objects to handle the caching.

<?php
/** @var PsrCacheCacheItemPoolInterface $cache */

$promises = (function () use ($username) use ($cache) {
    foreach ($usernames as $username) {
        if ($cache->hasItem($username)) {
            $value = $cache->getItem($username)->get();

            yield new GuzzleHttpPromiseFulfilledPromise($value);
            continue;
        }

        yield $client->requestAsync('GET', 'https://api.github.com/users/'.$username)
            ->then(function (ResponseInterface $response) use ($cache, $username) {
                $profile = json_decode($response->getBody(), true);
                $cache->save($cache->getItem($username)->set($profile));

                return $profile;
            });
    }
})();

new EachPromise($promises, [
    'concurrency' => 4,
    'fulfilled' => function (array $profile) {
         // Do something with the profile.
    },
])->promise()->wait();

Right now each item returned by the Generator will be an object that implements the PromiseInterface. This is very explicit and very clear that the Generator always returns objects of the same type. Less code is needed if we add a middleware to the Guzzle Client which caches every GET request and returns a FulfilledPromise from there.

Granted, if we dive into the EachPromise implementation, we see that for each item returned by the Generator it will, if it doesn’t implement the PromiseInterface wrap it in a Promise using the GuzzleHttpPromisepromise_for method (see here and here). But I rather have a robust method that always returns objects from the same type.

Find more demo code in my PSR-7 demo repository.

Cheers!

our blog Related blog articles

Do you have a question? Leave a comment