Author – Simran Singh, Full Stack Developer
While making a custom flow-engine from scratch can initially appear intimidating, it’s well worth the challenges. With your own unique flow-engine, you have the ability to change its components to meet the needs of your project and to expand its capabilities to accommodate any new requirements.
We’ll be mainly looking at automating our marketing tasks with flow-engines, even though the application of it is very vast. You will also find examples to help you create your own flow-engine.
For our marketing campaigns, every client had their own custom needs.
Some wanted to send marketing messages after a custom delay, some to send them only to a specified group of users, and some wanted to send different marketing messages based on the products that the user purchased.
When our client pool was small, we used to hard-code every client request and code the logic required for their business manually.
This was clearly not a scalable solution and it became evident as our client pool grew. We needed something that gives us the power to have configurable and complex business logic without actually having to code.
This is why we decided to build a flow system, which will help us configure business logic easily, from scratch. flow system
Typically, a flow system is represented in form of a directed graph, consisting of nodes and edges.
Imagine that each node represents a specific task, and after executing that task the next node(s) in the graph is executed, right up until the whole execution is completed.
Let’s get familiar with some terms before moving on:
Since the flow is essentially a graph, you can represent it in many ways, like an adjacency matrix, edge list, adjacency list, etc.
We will be going forward with something very simple for this example. We will assume that each node can have a maximum of one edge to the next node and thus, we can simply represent a node in this format:
{
"$NODE_ID": {
"type": "$NODE_TYPE",
"props": {
$NODE_PROPERITES..
},
"nextNode": "$NEXT_NODE_ID"
},
"$NODE_ID_2" : {
...
},
...
}
This is a rather uncommon way of representing a graph, but it is simple to understand.
A quality implementation of an extendable flow-engine has these 3 qualities.
Alright, now that we know what we need to build, the question is how do we build it while satisfying the requirements stated above.
The answer is **SOLID principles of OOPs**
We will be using Python’s Django for this particular example.
To do this, you need to understand OOPS and SOLID principles first. Let’s start by defining base classes first.
To meet our requirement of extendable API validation logic, we will be using nested validators (called serializer in django).
First, let’s see what the base class of a node will look like:
from abc import ABC, abstractmethod
from rest_framework import serializers
class NodeConfigSerializer(serializers.Serializer):
"""Node Config serializer"""
nextNode = serializers.CharField(null=True)
type = serializers.CharField()
props = serializers.JSONField()
class Node(ABC):
"""Node base class"""
def __init__(self, context, config):
"""
Context: A single object to access all the information regarding current context,
It will contain information about current context and data returned from previous node.
Config: Node configuration
"""
self.context = context
self.config = config
@abstractmethod
def execute(self) -> Optional[Dict]:
pass
"""Perform the task,
Returns data that can be passed to next node in its context if needed"""
@abstractmethod
def get_next_node(self):
pass
"""Get ID of next node to execute"""
We will define a validator (serializer) along with our node base class in the same file, which will help us in increasing cohesion in our programming structure and thus lead to easier maintainability and extendability.
Let’s see how you can define a sample node to send a message, we will assume that its config requires as template_id which is to be sent in in message.
class MessageNodeSerializer(NodeConfigSerializer):
"""We inherit NodeConfigSerializer and extend it"""
def validate_props(self, props):
"""Custom logic for validating props"""
if "template_id" not in props:
raise serializers.ValidationError("template_id missing")
return props
class MessageNode(Node):
def execute(self):
props = self.config['props']
user_phone = context.user.phone
send_message(user_phone, props['tempplate_id']) # send message to user_phone with template_id configured in flow
def get_next_node_id(self):
if 'nextNode' in self.config:
return self.config['nextNode']
return None
Factory method is a design pattern in OOPs used to create the object of a specified class easily.
"""
Mapping $NODE_TYPE to Node classes
We map it to Tuple(NodeClass, NodeSerializerClass)
"""
NODES = {
'message_node': (MessageNode, MessageNodeSerializer),
'conditional_node': (ConditionalNode, ConditionalNodeSerializer),
...
}
def get_node_config_serializer(config) -> NodeConfigSerializer:
"""
Factory method for getting node config serializer
config: JSON config for a node
example:
{
"type": "$NODE_TYPE",
"props": {
$NODE_PROPERITES..
},
"nextNode": "$NEXT_NODE_ID"
}
"""
node_type = config.get('type')
if node_type not in NODES:
raise Exception("Invalid node type")
return NODES[node_type][1](data=config)
def get_node(context, config) -> Node:
"""Factory method for getting node instance"""
node_type = config.get('type')
if node_type not in NODES:
raise Exception("Invalid node type")
return NODES[node_type][0](context, config)
The flow engine is the heart of our flow system, let’s see how it ties all of the things we have implemented together.
This is a very simplified format of the flow engine for easier understanding:
def execute_node(node_id, flow_graph, context):
"""
node_id: Node ID to execute
flow_graph: JSON representation of our graph
context: context object to be passed to node
"""
node_config = flow_graph[node_id]
node = get_node(context, node_config)
result = node.execute()
context.previous_node_result = result
next_node_id = node.get_next_node_id()
execute_node(next_node_id, flow_graph, context)
To trigger the flow initially you have to call the execute_node() function with the start_node_id which we store separately.
Now, let’s see how we tackle the problem of making API validation logic easily extendable by using our implementation strategy specified above.
We defined a validator(serializer) for each of our nodes along with Node classes, and we have also defined a factory method for getting the same. e will be using the factory method for the validator(serializer) in our API validation logic so that the API validation logic for our graph does not need to update.
Here’s the snippet we used:
# IN API validation logic:
flow_graph = request.post_data["flow_graph"]
for node_id, node_config in flow_graph.items():
node_config_serializer = get_node_config_serializer(config)
if not node_config_serializer.is_valid(): # django's serializer's inbuilt method for vaildation
raise Exception("Invalid configuration for one of the node")
We implemented a system that can now be termed as scalable, code scalability
This system is:
To continuously create a robust flow engine, we will need to optimize and build for ever-changing business requirements and scalability. Here are a few requirements we will focus on next
Stay tuned for Part 2!
Looking for more tech deep dives? Check out How We Scaled our Django Server and Celery Workers to Handle 10X Load this post here.