Monitor network endpoints with Python asyncio and aiohttp

My motivation – I wanted to make a network monitoring service in Python. Python isn’t known for it’s async ability, but with asyncio it’s possible. I wanted to include it in a larger Django app, GlitchTip. Keeping everything as a monolithic code base makes it easier to maintain and deploy. Go and Node handle concurrent IO a little more naturally but don’t have any web framework even close to as feature complete as Django.

How asyncio works compared to JavaScript

I’m used to synchronous Python and asynchronous JavaScript. asyncio is a little strange at first. It’s far more verbose than just stringing along a few JS promises. Let’s compare this example of JS and Python.

fetch('http://example.com/example.json')
  .then(response => response.json())
  .then(data => console.log(data));
async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://example.com/example.json') as response:
            html = await response.json()
            print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

There’s more boilerplate in Python. aiohttp has three chained async calls while fetching JSON in JS requires just two chained promises. Let’s break these differences down a bit

  • An async call to GET/POST/etc the resource. At this time, we don’t have the body of the request. fetch vs sessions.get are about the same here.
  • An async call to get the body contents (and perhaps process them in some manner such as converting a JSON payload to a object or dictionary). If we only need say the status code, there is no need to spend time doing this. Both have async text() and json() functions that also work similarly.
  • aiohttp has a ClientSession context manager that closes the connection. The only async IO occurs when closing the connection. It’s possible to reuse a session for some performance benefit. This is often useful in Python as our async code block will often live nested in synchronous code. Fetch does not have this (as far as I’m aware at the time of this writing).
  • get_event_loop and run_until_complete allow us to run async functions from a synchronous code function. Python is synchronous by default, so this is necessary. When running Django or Celery or a python script, everything is blocking until explicitly run async. JavaScript on the lets you run async code with 0 boilerplate.

One other thing to note is that both Python and JavaScript are single threaded. While you can “do multiple things” while waiting for IO, you cannot use multiple CPU cores without starting multiple processes, for example by running uwsgi workers. Thus in Python it’s called asyncio.

Source: docs.aiohttp.org

Network Monitoring with aiohttp

Network monitoring can easily start as a couple line script or be a very complex, massive service depending on scale. I won’t claim that my method is the best, mega-scale method ever, but I think it’s quite sufficient for small to medium scale projects. Let’s start with requirements

  • Must handle 1 million network checks per minute
  • Must run at least every 30 seconds (smaller scale this could probably go much shorter)
  • Must only run Python and get embedded into a Django code base
  • Must not require anything other than a Celery compatible service broker and Django compatible database

And a few non-functional requirements that I believe will help scale

  • Must scale to run from many servers (Celery workers)
  • Must batch database writes as efficiently as possible to avoid bottlenecks
Overview of architecture

A Celery beat scheduler will run a “dispatch_checks” task every 30 seconds. Dispatch checks will determine which “monitors” need checked based on their set interval frequency and last check. It will then batch these in groups and dispatch further parallel celery tasks called “perform_checks” to actually perform the network check. The perform_checks task will then fetch additional monitor data in one query and asynchronously check each network asset. Once done, it will save to the database using standard Django ORM. By batching inserts, we should be able to improve scalability. It also means we don’t need a massive number of celery tasks, which would be unnecessary overhead. In real life, we may only have a few celery works for the “small or medium scale” so it would waste resources to dispatch 1 million celery tasks. If we batch inserts by 1000 and really have our max target of 1 million monitors, then we would want 1000 celery workers. Another variable is the timeout for each check. Making it lower, means our workers get done faster instead of waiting for the slowest request.

See the full code on GlitchTip’s GitLab.

Celery Tasks

@shared_task
def dispatch_checks():
    now = timezone.now()
    latest_check = Subquery(
        MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
        .order_by("-start_check")
        .values("start_check")[:1]
    )
    monitor_ids = (
        Monitor.objects.filter(organization__is_accepting_events=True)
        .annotate(
            last_min_check=ExpressionWrapper(
                now - F("interval"), output_field=DateTimeField()
            ),
            latest_check=latest_check,
        )
        .filter(latest_check__lte=F("last_min_check"))
        .values_list("id", flat=True)
    )
    batch_size = 1000
    batch_ids = []
    for i, monitor_id in enumerate(monitor_ids.iterator(), 1):
        batch_ids.append(monitor_id)
        if i % batch_size == 0:
            perform_checks.delay(batch_ids, now)
            batch_ids = []
    if len(batch_ids) > 0:
        perform_checks.delay(batch_ids, now)

@shared_task
def perform_checks(monitor_ids: List[int], now=None):
    if now is None:
        now = timezone.now()
    # Convert queryset to raw list[dict] for asyncio operations
    monitors = list(Monitor.objects.filter(pk__in=monitor_ids).values())
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(monitors, loop))
    MonitorCheck.objects.bulk_create(
        [
            MonitorCheck(
                monitor_id=result["id"],
                is_up=result["is_up"],
                start_check=now,
                reason=result.get("reason", None),
                response_time=result.get("response_time", None),
            )
            for result in results
        ]
    )

The fancy Django ORM subquery is to ensure we are able to determine which monitors need checked while being as performant as possible. While some may prefer complex queries in raw SQL, for some reason I prefer ORM and I’m impressed to see how many use cases Django can cover these days. Anything to avoid writing lots of join table SQL 🤣️

aiohttp code

async def process_response(monitor, response):
    if response.status == monitor["expected_status"]:
        if monitor["expected_body"]:
            if monitor["expected_body"] in await response.text():
                monitor["is_up"] = True
            else:
                monitor["reason"] = MonitorCheckReason.BODY
        else:
            monitor["is_up"] = True
    else:
        monitor["reason"] = MonitorCheckReason.STATUS

async def fetch(session, monitor):
    url = monitor["url"]
    monitor["is_up"] = False
    start = time.monotonic()
    try:
        if monitor["monitor_type"] == MonitorType.PING:
            async with session.head(url, timeout=PING_AIOHTTP_TIMEOUT):
                monitor["is_up"] = True
        elif monitor["monitor_type"] == MonitorType.GET:
            async with session.get(url, timeout=DEFAULT_AIOHTTP_TIMEOUT) as response:
                await process_response(monitor, response)
        elif monitor["monitor_type"] == MonitorType.POST:
            async with session.post(url, timeout=DEFAULT_AIOHTTP_TIMEOUT) as response:
                await process_response(monitor, response)
        monitor["response_time"] = timedelta(seconds=time.monotonic() - start)
    except SSLError:
        monitor["reason"] = MonitorCheckReason.SSL
    except asyncio.TimeoutError:
        monitor["reason"] = MonitorCheckReason.TIMEOUT
    except OSError:
        monitor["reason"] = MonitorCheckReason.UNKNOWN
    return monitor

async def fetch_all(monitors, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        results = await asyncio.gather(
            *[fetch(session, monitor) for monitor in monitors], return_exceptions=True
        )
        return results

That’s it. Ignoring my models and plenty of Django boilerplate, we have the core of a reasonably performant uptime monitoring system in about 120 lines of code. GlitchTip is MIT licensed so feel free to use as you see fit. I also run a small SaaS service at app.glitchtip.com which helps fund development.

On testing

I greatly prefer testing in Python over JavaScript. I’m pretty sure this 15 line integration test would require a pretty complex Jasmine boilerplate and run about infinite times slower in CI. I will gladly put up with some asyncio boilerplate to avoid testing anything in JavaScript. In my experience, there are Python test driven development fans and there are JS developers who intended to write tests.

    @aioresponses()
    def test_monitor_checks_integration(self, mocked):
        test_url = "https://example.com"
        mocked.get(test_url, status=200)
        with freeze_time("2020-01-01"):
            mon = baker.make(Monitor, url=test_url, monitor_type=MonitorType.GET)
        self.assertEqual(mon.checks.count(), 1)

        mocked.get(test_url, status=200)
        with freeze_time("2020-01-01"):
            dispatch_checks()
        self.assertEqual(mon.checks.count(), 1)

        with freeze_time("2020-01-02"):
            dispatch_checks()
        self.assertEqual(mon.checks.count(), 2)

There’s a lot going on in little code. I use aioresponses to mock network requests. Django baker to quickly generate DB test data. freezegun to simulate time changes. assertEqual from Django’s TestClient. And not seen, CELERY_ALWAYS_EAGER in settings.py to force celery to run synchronously for convenience. I didn’t write any async tests code yet I have a pretty decent test covering the core functionality from having monitors in the DB to ensuring they were checked properly.

JS equivalent

describe("test uptime", function() {
  it("should work", function() {
    // TODO
  });
});

Joking aside, I find it quite hard to find a good Node based task runner like Celery, ORM, and test framework that really work well together. There are many little niceties like running Celery in always eager mode that make testing a joy in Python. Let me know in a comment if you disagree and have any JavaScript based solutions you like.

By David

I am a supporter of free software and run Burke Software and Consulting LLC. I am always looking for contract work especially for non-profits and open source projects. Open Source Contributions I maintain a number of Django related projects including GlitchTip, Passit, and django-report-builder. You can view my work on gitlab. Academic papers Incorporating Gaming in Software Engineering Projects: Case of RMU Monopoly in the Journal of Systemics, Cybernetics and Informatics (2008)

Leave a comment