How to de-couple your business logic by writing a simple EventBus module
One of the exciting moments in the life of a software application is evolution, the moment you have to add yet another feature. Adding a new business capability is a sign of more usage being expected from running software. However, depending on how long the software has been running, adding new capabilities can be challenging. In a code base that isn’t well thought out, the result is a lot of procedural calls or couple if then else statements.
I am usually fascinated by clean and elegant code that is easy on the eyes and provides room for extensibility. This leads to me reading up on design patterns, or discovering libraries that preach the gospel of extensibility.
In this blog post, I want to demonstrate writing a simple EventBus library, that would help decouple business logic from one another. We will use a fictitious application called Deal Finder. If you are savvy with money, finding a good deal on purchases will mean having a lot more savings left at the end of the month considering the current economy and cost of living. I will focus on a few workflows - *User registration, User Clicked Deal Link* and *Deal Recommendations.*
System Description
In the Deal Finder application, a user registers in order to be recommended product deals from across the internet. Once a user registers, the system uses a couple of traits about the user’s likes and generates a list of product recommendations that offer a couple of discounts.
Caveat: we won’t actually be implementing the discount fetching and recommendation bit, but rather, we will assume by software magic, this all works. Below is a description of the flow of events across the User module ****and **Recommender** module.
A simple and non-thorough Event modelling of what happens within Deal Finder is as below.
The diagram above demonstrates the flow of Commands and Events within the system. The commands will simply be function calls, while Events will be outputs of a Command indicating something happened. Multiple commands can respond to an event being triggered. To facilitate this level of communication across modules, we will need a message bus, and thus, we will be writing a simple EventBus to achieve the above goals.
EventBus
The event bus serves as the main transport network and interconnection between commands and events across the different modules within the system. It has a world view of all registered commands and events mapping and once called, is responsible for making sure all interested parties to an event are notified that an Event happened.
Characteristics of the EventBus.
- Keep a list of Events and associated Handlers (commands)
- Notify handlers that an event happened.
class EventBus:
events: Dict[str, List[Func]]
def register(self, event: ClassVar[Event]):
# decorator for registering events
...
def publish(self, event: Event):
...
def get_hanlders(self, event: Event) -> List[Func]:
...
An event is a simple data class. This contains all the facts about the event that just happened.
@dataclasses
class Event:
id: UUID
@dataclasses(frozen=true)
class UserRegistered(Event):
user_id: UUID
email: str
interests: List[str]
To register a handler, we will simply use the register method on the EventBus class.
bus = EventBus()
@bus.register(UserRegistered)
def send_welcome_email(event: UserRegistered):
...
To demonstrate publish event, we will pretend, our complicated recommender system has computed some really smart logic and has found what the user will like.
bus = EventBus()
@dataclasses(frozen=True)
class DealFound(Event):
user_id: UUID
deals: List[Deal]
def find_deal(user_id) -> DealFound | None:
# .. run complicated logic to find deal
deals = [Deal(**deal) for deal in deals]
if deals:
bus.publish(DealFound(user_id=user_id, deals=deals))
By a stroke of technology magic, all systems interested in the DealFound
event will respond accordingly, including sending the found deals as emails to the user.
Implementing the EventBus
For the highly impatient here is a gist with all the code. Because this can become a lengthy post, I will focus on what I think are the key aspects of the EventBus.
Registering Listeners
import importlib
import inspect
import os
from functools import wraps
from typing import List, Callable, Dict, Type, ClassVar
class EventBus:
listeners: Dict[str, List[Callable]]
def __init__(self):
self.listeners = {}
def register(self, event: str | Type['Event']):
def decorator(func: Callable):
from .events import Event
name = event.__name__ if isinstance(event, type) and issubclass(event, Event) else event
self.listeners.setdefault(name, []).append(func)
setattr(func, '_is_registered_listener', event)
return func
return decorator
def register_listener(self, event: str | Type['Event']):
def decorator(func: Callable):
func._is_registered_listener = event
return func
return decorator
In order to use the register_listener
method, we first need to create an instance of EventBus
and use the decorator bus.register_listener(Event)
. The method register_listener
simply marks a function with attributes that indicate the event it is configured for.
The method bus.register
is later used to associate a function to an Event.
A concrete example is below.
@bus.register_listener(UserRegistered)
def handle_user_register(event: Event):
...
One of the key tricks we are using here is the setting the attribute setattr(func, '_is_registered_listener', True)
, this will become handy later when we want to scan and discover all event handlers in the project.
Dispatching Events
Now we have registered our event listener, it is only natural to find ways to trigger them. A single Event can be processed by as many handlers or processes as available memory.
class EventBus:
....
def dispatch(self, event: str | Type['Event'], **kwargs):
from deal_finder.events import Event
event_data = {"event": event}
handlers = self.get_handlers(event.__class__.__name__ if isinstance(event, Event) else event)
for listener in handlers:
print(f"Dispatching event {event} to {listener}")
listener(**event_data, **kwargs)
Dispatching an event is really simple, based on the event type which could be an instance of Event or a string name, we simply check our listener register and call each listener or subscriber in sequence. The sequence in which the listeners are called isn’t guaranteed. So if you need to call a sequence of handlers in a particular order, you will do better to write appropriate code that achieves that.
Discovering Listeners
The main brain of the EventBus is discovering handlers within the project. This will require scanning files and reading every callable and checking if it is a registered listener or not. Because you can have any function within the file, we attach special attributes to decorated functions called _is_registered_listener
. Any function with this attribute will count as a listener in my book.
class EventBus:
@staticmethod
def scan_for_listeners():
global bus
# Scan the current directory for Python files
for root, _, files in os.walk(os.getcwd()):
for file in files:
try:
if file.endswith('handlers.py'):
module_name = os.path.splitext(file)[0]
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and hasattr(obj, '_is_registered_listener'):
bus.register(getattr(obj, '_is_registered_listener'))(obj)
except Exception as e:
...
In order to keep things simple, we adopted a naming convention for handlers. This just makes it easy to know where to look for file. We start by calling os.walk(....)
function, which creates a Tree structure that represents the folder hierarchy of the project.
We are only interested in files that end in handlers.py
, and only then do we bother to scan the inside of each file to discover event handlers.
Activating and Using the EventBus
The most important step to kickstart handler discovery is to call the scan_for_listeners
method. Once that is done, we would have had all handlers in the project duly registered.
The code below demonstrates this.
# handler.py
@bus.register_listener(DealFound)
def send_deal_email(event: DealFound, **kwargs):
print(f"Deal found email sent to {event.email}")
# main.py
def run_find_deal(bus: EventBus):
event = find_deal(email='user@email.com')
bus.dispatch(event)
EventBus.scan_for_listeners()
def main():
# Run find deal
run_find_deal(bus)
The function find_deal
doesn’t exist, but let’s imagine it did. It does a bit of machine calculation and finds a couple of deals. In this context, it will return a DealFound
event, which we defined earlier in the post. We then use the output which we hope is a DealFound
to send off the deal to any interested listener. In our case we have a send_deal_email
that will email the user the deal we found.
Possible Improvements
- It would be good to have a way to discover events elegantly without the need to call
EventBus.scan_for_listeners
explicitly. - Introducing good logging to the EventBus to record the events fired and the corresponding listeners triggered.
- I could write on how to do Unit test on this but I am sure you can figure it out. If you still need me to, then drop a comment .
In conclusion, implementing an EventBus module can significantly enhance the modularity and scalability of your software application. By decoupling business logic, you can facilitate better communication between different modules, making it easier to add new features without creating a tangled web of procedural calls. The EventBus serves as a central hub for events and commands, ensuring that all interested parties are notified when an event occurs. This approach not only promotes clean and elegant code but also provides room for extensibility and future improvements. Overall, the EventBus module is a valuable tool for maintaining a well-structured and efficient codebase.