Real-time notifications on Django using gevent-socketio and RabbitMQ

A GeoDjango based site to track incidents in real-time

Posted by Agustín Bartó 2 years, 1 month ago Comments

Motivation

A few months ago we posted a similar article that presented a way to implement real-time notifications on Django using Node.js, socket.io and Redis. It got quite a few comments asking us why we used Node.js instead of a gevent-based solution. Our response was that we had hands-on experience with the Node.js solution, and that we would try to write another article about a gevent-based solution in the future. This is that article.

This time we’ll be replacing Node.js with a 100% Python implementation using gevent-socketio and Redis with RabbitMQ, but we also didn’t want to bore you with the same vanilla notifications site, so we’re going to build something different. Something useful.

This time we’re going to build a complete GeoDjango-based site to report geo-located incidents in real-time using Google Maps.

Update

Given the problems that we mentioned regarding gevent-socketio, some users asked us for a Node.js+socket.io implementation (similar to the one in the old article). We created a new branch named on GitHub named node_js to hold the new implementation.

The Application

The application is a Django 1.7 site that uses GeoDjango (backed by PostGIS) to track and report in real-time geo-located incidents that occur in certain areas of interest around the world. It provides views to manage incidents and areas of interest, a view to monitor the occurrence of incidents in real-time and a view to report incidents that uses geolocator to detect the user’s location.

Whenever an incident is saved (or updated), a message is sent to a RabbitMQ broadcast queue. At this time, the system checks whether the incident occurred in an area of interest, and a special alert message is sent if necessary. Any subscriber to the queue (which are created when a client connects to the notifications socket.io namespace) will receive the message and send a packet down the socket’s channel. It is up the the client’s JavaScript code to update the maps and generate notifications and alerts if necessary.

Although simple, the site has all the basic functionality and can be used as a basis for similar projects. The source is available on GitHub.

The model

To represent the incidents and the areas of interest, we’re going to use the following model:

from django.contrib.gis.db import models


class Incident(models.Model):
    objects = models.GeoManager()

    URGENT = 'UR'
    HIGH = 'HI'
    MEDIUM = 'ME'
    LOW = 'LO'
    INFO = 'IN'

    SEVERITY_CHOICES = (
        (URGENT, 'Urgent'),
        (HIGH, 'High'),
        (MEDIUM, 'Medium'),
        (LOW, 'Low'),
        (INFO, 'Info'),
    )

    name = models.CharField(max_length=150)
    description = models.TextField(max_length=1000)
    severity = models.CharField(max_length=2, choices=SEVERITY_CHOICES, default=MEDIUM)
    closed = models.BooleanField(default=False)
    location = models.PointField()
    created = models.DateTimeField(editable=False, auto_now_add=True)


class AreaOfInterest(models.Model):
    objects = models.GeoManager()

    name = models.CharField(max_length=150)
    severity = models.CharField(max_length=2, choices=Incident.SEVERITY_CHOICES, default=Incident.MEDIUM)
    polygon = models.PolygonField()

The Incident class represents the occurrence of an event around a specific geographic point, specified by the location field. The AreaOfInterest is used to define a region for which the user is going to be alerted if an incident is reported within it. The polygon field specifies the geographic area.

The alerts are going to be sent only when the incident’s severity is above the area’s target severity, and location is within the area’s polygon. This is done using spatial QuerySets within the Incident post_save signal handler:

areas_of_interest = [
    area_of_interest.geojson_feature for area_of_interest in AreaOfInterest.objects.filter(
        polygon__contains=kwargs['instance'].location,
        severity__in=kwargs['instance'].alert_severities,
    )
]

Sending notifications

Once a notification has been constructed, we connect to a RabbitMQ broadcast queue (using Kombu) and publish the notification:

def send_notification(notification):
    with BrokerConnection(settings.AMPQ_URL) as connection:
        with producers[connection].acquire(block=True) as producer:
            maybe_declare(notifications_exchange, producer.channel)
            producer.publish(
                notification,
                exchange='notifications',
                routing_key='notifications'
            )

When a user accesses the site’s home view, it connects to a socket.io namespace:

from django.conf import settings
from kombu import BrokerConnection
from kombu.mixins import ConsumerMixin
from socketio.namespace import BaseNamespace
from socketio.sdjango import namespace

from .queues import notifications_queue


@namespace('/notifications')
class NotificationsNamespace(BaseNamespace):
    def __init__(self, *args, **kwargs):
        super(NotificationsNamespace, self).__init__(*args, **kwargs)

    def get_initial_acl(self):
        return ['recv_connect']

    def recv_connect(self):
        if self.request.user.is_authenticated():
            self.lift_acl_restrictions()
            self.spawn(self._dispatch)
        else:
            self.disconnect(silent=True)

    def _dispatch(self):
        with BrokerConnection(settings.AMPQ_URL) as connection:
            NotificationsConsumer(connection, self.socket, self.ns_name).run()

When a connection is established (and authentication is verified), a new greenlet is spawned, passing the control to a NotificationConsumer instance:

class NotificationsConsumer(ConsumerMixin):
    def __init__(self, connection, socket, ns_name):
        self.connection = connection
        self.socket = socket
        self.ns_name = ns_name

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[notifications_queue], callbacks=[self.process_notification])]

    def process_notification(self, body, message):
        self.socket.send_packet(dict(
            type='event',
            name='notification',
            args=(body,),
            endpoint=self.ns_name
        ))
        message.ack()

Each message sent to the broadcast queue is handled by the callback process_notification which send a new packet down the socket’s channel with the body of the notification object.

/static/media/uploads/home_screenshot.png

The image above is a screenshot of the site’s home page while an alert is being shown to the user. The client side of the communication is quite simple:

var socket = io.connect(
    "/notifications",
    {
        "reconnectionDelay": 5000,
        "timeout": 10000,
        "resource": "socket.io"
    }
);

socket.on('connect', function(){
    console.log('connect', socket);
});

The client connects to the socket, and hooks the appropriate callbacks. Socket.io hides away the complexities of choosing a transport layer and handling retries and reconnects. The notification callback handles most of the client’s logic.

socket.on('notification', function(notification){
    console.log('notification', notification);

    if (notification.type === "post_save") {
        if (notification.created) {
            map.data.addGeoJson(notification.feature);
        } else {
            var feature = map.data.getFeatureById(notification.feature.id);
            map.data.remove(feature);

            if (! notification.feature.properties.closed) {
                map.data.addGeoJson(notification.feature);
            }
        }
    } else if (notification.type === "post_delete") {
        var feature = map.data.getFeatureById(notification.feature.id);
        map.data.remove(feature);
    } else if (notification.type === "alert") {
        showAlert(buildAlertModalBodyHtml(notification))
    } else {
        console.log(notification);
    }
});
socket.on('disconnect', function(){
    console.log('disconnect', socket);
});

Upon receiving a notification we make use of Google Maps Data Layer API to draw onto the map. Notice that all we had to do is just given the GeoJSON representation of the objects (which is generated by GeoDjango) to the map, and the rest is take care for us.

Managing events and areas of interest

The site provides views to manage incidents and areas of interest that use Google Maps JavaScript API to manipulate the objects graphically within maps. GeoJson format is supported both by GeoDjango and Google Maps, so we use it as the exchange format in the forms.

/static/media/uploads/report_incident_screenshot.png

We also provide a simple incident report view (depicted above) that uses the geolocator JavaScript library to detect the user’s location.

Conclusions

Although the exercise proved to be really interesting (specially the features related to the spatial features), we really didn’t find any significant advantages over our previous solution based on Node.js. In fact, we had to tackle several complications related to the restrictions that gevent places on the packages that can or can’t be used. First of all, we had to make sure that the libraries that work on the greenlets were either gevent specific, or monkey patching compatible (kombu is). We also had problems running the site using Gunicorn so we had to switch to Chaussette. There was also the matter of gevent-socketio only supporting version 0.9 of the socket.io protocol (hence the bower dependency that points to the 0.9 branch of the client repo).

We hope that you find the information presented in this post useful. As usual, feel free to leave comments or suggestions on how to improve the solution.

Acknowledgements

The bulk of the notifications architecture is based on the solution presented by Jeremy West on his blogpost Django, Gevent, and Socket.io. His tutorial is a great way to understand how gevent-socketio works and how to integrate it into Django.


Previous / Next posts


Comments