Django (/ˈdʒæŋɡoʊ/ jang-goh) is a free, open source web application framework written in Python. Here you can find a tutorial from Django, if you don't know it yet and want to try it out: https://docs.djangoproject.com/en/2.1/intro/tutorial01/. The next article is aimed at experienced Django developers.

Django and Microservices

I have been using Django together with the task engine Celery for some time for asynchronous and distributed processing over several systems. Recently I am working on the topic "Microservices" and how they can be realized together with Django. (more about Microservices: [1] Medium, [2] microservices.io).
We want to divide the existing application into smaller services and each service is an independent Django application. One of the standards of the Microservice architecture is that each service has its own data (database, schema, etc.) (see "[3] Database per Service"), where the service is the sole administrator.
One of the challenges now is that the cross-service data is consistent. For example, an order in the "Order" service is only valid once the transaction has been validated in the "Customer" service.

One solution is the Saga Pattern, which sends events when data is updated. Other services can subscribe to these events and update their data in response.

Publish/Subscribe with Django

The prerequisite to use Celery is a message broker like RabbitMQ with the AMQP protocol. This protocol is also very good for publishing or subscribing messages for certain topics. Although Celery is well suited as a task engine, it is less suited as an event bus, since Celery assumes that each message is a particular task that is to be executed. The coordination between the task engine and the broker is done by "Kombu" a messaging library. This library supports many different brokers and protocols, we are interested in those which support the message type "Topic", such as AMQP.

Exchange: Topic
A good overview of the different routing types of RabbitMQ is given by cloudamp.

Messages are forwarded to one or more queues based on a match between a message routing key and the routing pattern.

Topic Exchange

Example:
A subscriber of animals.# gets all messages that start with the routing key animals., like animals.rabbit or animals.turtle.

Yosun

Yosun is a great little pub/sub utility in Python that uses the Kombu library. With it news (as events) can be published and consumed by the subscriber. We can use Yosun in Django to publish events and in another Django service to subscribe certain events, which then trigger further processes in the service. For example, required data can be fetched via REST API.

Installation

pip install yosun

Subscribe

from kombu import Connection, Exchange
from yosun import Yosun

# Define Conneciton
connection = Connection('amqp://guest:guest@localhost:5672//')

# RabbitMQ Exchange definition
exchange = Exchange('events', type='topic')

# Yosun init
yosun = Yosun(connection, exchange)

def on_rabbit(body, message):
    print('Look, a rabbit!')
    print(body)

def on_animal(body, message):
    print('Look, an animal!')
    print(body)

sub = yosun.subscribe('animals.#')

# from now on when a animals.rabbit message arrives, on_rabbit will be called
sub.on('animals.rabbit', on_rabbit)

# when any message matching animals.# arrives, on_animal will be called
sub.all(on_animal)

Publish

from kombu import Connection, Exchange
from yosun import Yosun

# Define Connection
connection = Connection('amqp://guest:guest@localhost:5672//')

# RabbitMQ Exchange definition
exchange = Exchange('events', type='topic')

# Yosun initialize
yosun = Yosun(connection, exchange)

yosun.publish('animals.rabbit', {'name' : 'McTwisp'})

Example: Django with Signals and Events

We use the post_save Signal which Django sends when updating the data in Service "A" to publish an event with Yosun.

# Service A

from serviceA.models import Animals
from django.db.models.signals import post_save
from django.dispatch import receiver

from kombu import Connection, Exchange
from yosun import Yosun

connection = Connection('amqp://guest:guest@localhost:5672//')
exchange = Exchange('events', type='topic')
yosun = Yosun(connection, exchange)

@receiver(post_save, sender=Animals)
def publish_event(sender, instance, **kwargs):
    yosun.publish('animals.rabbit', {'id': instance.id})

And in Service B we use the event animals.rabbit to start the Celery Task on_rabbit asynchronously:

 # Service B

 from serviceB.celery import app

 @app.task
 def on_rabbit(body, message):
     # do something with the rabbit event :)

from kombu import Connection, Exchange
from yosun import Yosun

connection = Connection('amqp://guest:guest@localhost:5672//')
exchange = Exchange('events', type='topic')
yosun = Yosun(connection, exchange)

sub = yosun.subscribe('animals.#')

# from now on when a animals.rabbit message arrives, on_rabbit will be called async as celery task
sub.on('animals.rabbit', on_rabbit.delay)

Previous Post

Add a comment