Skip to content

10. Backend Jobs

Basically we are talking about the microservices. A microservice is a small, independent service that does one specific job in an application. These services takes time to get complete. We can use a job queue to run these jobs in the background. So implement these task in the main server of the application. We create a small independent application to serve such jobs. These can keep running ever your main application is not running.

Note The following things are required to create a job microservice: - 1. Celery - Distributed Task Queue(python library) - 2. Redis - In-Memory Data Store (message broker)

Let's configure and setup celery.

Installation

We will install celery with redis as a message broker.


shell
pip install celery[redis]

Note We have already installed redis in the previous step.

Configuration

We will create a separate file where we will push our main flask application context to the celery application, so that it will read all the configuration from the flask application.

We will create a folder backendjobs and inside this folder we will create a file named workers.py and inside this file we will write the following code.


python filename=backendjobs/workers.py
from celery import Celery
from flask import current_app as app

celery = Celery("Application Jobs")


class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return self.run(*args, **kwargs)

Now let's write the configuration for the celery application. In the app.py file we will add the following code.


python filename=app.py
...
from backendjobs import workers
...
app.config['BROKER_CONNECTION_RETRY_ON_STARTUP'] = True
app.config['CELERY_BROKER_URL'] = "redis://localhost:6379/1"
app.config['CELERY_RESULT_BACKEND'] = "redis://localhost:6379/2"
app.config['CELERY_TIMEZONE'] = "Asia/Kolkata"
...
# Create celery
celery = workers.celery

# Update with configuration
celery.conf.update(
    broker_url=app.config["CELERY_BROKER_URL"],
    result_backend=app.config["CELERY_RESULT_BACKEND"],
    timezone=app.config["CELERY_TIMEZONE"],
    broker_connection_retry_on_startup=app.config[
        "BROKER_CONNECTION_RETRY_ON_STARTUP"]
)

celery.Task = workers.ContextTask
app.app_context().push()
...

Note You may face working outside of app_context() so to fix those issue you can manually push the app context.

Let's create all the mentioned tasks. in a file named tasks.py in the backendjobs folder. But before creating these required tasks I will configure and setup flask mail. Since we have to create a task like daily reminder through mail so we will have to configure the flask mail.

Flask Mail

Let's create a separate file for flask mail in the backendjobs folder.


python filename=backendjobs/send_mail.py
from flask_mail import Mail

# Flask app configuration for MailHog
# http://localhost:8025/


def mail_factory(app):
    app.config['MAIL_SERVER'] = 'localhost'
    app.config['MAIL_PORT'] = 1025
    app.config['MAIL_USE_TLS'] = False
    app.config['MAIL_USE_SSL'] = False
    app.config['MAIL_DEFAULT_SENDER'] = "me@example.com"
    mail = Mail()
    mail.init_app(app)
    return mail

You can change the default sender to your own email address.

Tasks

Now let's create a task to send the mail to the user.


python filename=backendjobs/tasks.py
from backendjobs.workers import celery
from datetime import datetime
from database import (
    User,
    Product
)
from jinja2 import Template
from flask_sse import sse
from celery.schedules import crontab
import csv
from backendjobs.send_mail import mail_factory
from flask_mail import Message


# scheduled task
@celery.task()
def daily_reminder_to_user():
    users = User.query.all()
    for user in users:
        flag = True
        for order in user.purchased:
            if order.order_date.strftime("%m/%d") == \
                    datetime.now().strftime("%m/%d"):
                flag = False
                break
        if flag and user.role == 'user':
            from flask import current_app as app
            mail = mail_factory(app)
            with mail.connect() as conn:
                subject = "Grocery App V2 Reminder"
                message = """
                        <div style="max-width: 600px; margin: 20px auto; padding: 20px; background-color: #fff; border-radius: 8px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);">
                            <h1 style="color: #28a745;">Reminder: Visit Eat Fresh App</h1>
                            <p>This is a friendly reminder to visit Eat Fresh App and explore our latest offerings. We have exciting
                                products and categories waiting for you!</p>
                            <p>Don't miss out on the freshest and tastiest options. Click the link below to start your Eat Fresh
                                experience:</p>
                            <a href="http://127.0.0.1:5000/" style="display: inline-block; padding: 10px 20px; background-color: #28a745; color: #fff; text-decoration: none; border-radius: 5px;">Visit Eat Fresh App</a>
                            <p>If you have any questions or need assistance, feel free to reach out to our support team.</p>
                            <p>Thank you for choosing Eat Fresh!</p>
                            <p>Best regards,<br>Eat Fresh</p>
                        </div>
                        """
                msg = Message(recipients=[user.email],
                              html=message, subject=subject)
                conn.send(msg)
    return {"status": "success"}

# scheduled task


@celery.task()
def monthly_entertainment_report_to_users():
    users = User.query.all()
    for user in users:
        if user.role == 'user':
            from flask import current_app as app
            mail = mail_factory(app)
            with mail.connect() as conn:
                subject = "Grocery App V2 Monthly Report"
                template = Template("""
                        <div style="max-width: 600px; margin: 20px auto; padding: 20px; background-color: #fff; border-radius: 8px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);">
                            <h1 style="color: #007bff;">Order Report</h1>
                            <p>Dear {{ name }},</p>
                            <p>Here is the order report for the specified date range.</p>

                            <!-- Add your report content here -->
                            <table style="width: 100%; border-collapse: collapse; margin-top: 20px;">
                                <thead>
                                    <tr style="background-color: #007bff; color: #fff;">
                                        <th style="padding: 10px; text-align: left;">Product Name</th>
                                        <th style="padding: 10px; text-align: left;">Quantity</th>
                                        <th style="padding: 10px; text-align: left;">Total</th>
                                        <th style="padding: 10px; text-align: left;">Order Date</th>
                                    </tr>
                                </thead>
                                <tbody>
                                    {% for order in orders %}
                                    <tr>
                                        <td style="padding: 10px; border-bottom: 1px solid #ddd;">{{ order.product_name }}</td>
                                        <td style="padding: 10px; border-bottom: 1px solid #ddd;">{{ order.quantity }}</td>
                                        <td style="padding: 10px; border-bottom: 1px solid #ddd;">${{ order.total }}</td>
                                        <td style="padding: 10px; border-bottom: 1px solid #ddd;">{{ order.order_date.strftime('%Y-%m-%d %H:%M:%S') }}</td>
                                    </tr>
                                    {% endfor %}
                                </tbody>
                            </table>

                            <p>If you have any questions or need further details, please don't hesitate to contact us.</p>
                            <p>Thank you for your attention!</p>
                            <p>Best regards,<br>Eat Fresh</p>
                        </div>
                        """)
                message = template.render(
                    name=user.name, orders=user.purchased)
                msg = Message(recipients=[user.email],
                              html=message, subject=subject)
                conn.send(msg)
    return {"status": "success"}


celery.conf.beat_schedule = {
    'my_monthly_task': {
        'task': "backendjobs.tasks.monthly_entertainment_report_to_users",
        # Sending report to users on first day of each month at 6pm
        'schedule': crontab(
            hour=13, minute=50, day_of_month=1, month_of_year='*/1'
        ),
    },
    'my_daily_task': {
        'task': "backendjobs.tasks.daily_reminder_to_user",
        # Sending email and notification for inactive users
        'schedule': crontab(hour=21, minute=0),
    },
    'my_quick_check_task': {
        'task': "backendjobs.tasks.daily_reminder_to_user",
        # Sending email and notification for inactive users
        'schedule': crontab(minute='*/1'),
    },
}


@celery.task()
def user_triggered_async_job():
    header = ["Product Name", "Product Quantity",
              "Product Manufacturing Date", "Product Expiry Date",
              "Product RPU"]

    with open('product_report.csv', 'w', newline='') as f:
        csvwriter = csv.writer(f)
        csvwriter.writerow(header)
        content = []
        for product in Product.query.all():
            csvwriter.writerow([
                product.name,
                product.quantity,
                product.manufacture.strftime('%Y-%m-%d'),
                product.expiry.strftime('%Y-%m-%d'),
                product.rpu,
            ])
            item = {
                'name': product.name,
                'quantity': product.quantity,
                'manufacture': product.manufacture.strftime('%Y-%m-%d'),
                'expiry': product.expiry.strftime('%Y-%m-%d'),
                'description': product.description,
                'rpu': product.rpu,
            }
            content.append(item)
    return {'header': header, 'content': content}

In the code we are filtering the inactive users and sending them daily reminder at a specified time and also sending the monthly report to the users on the first day of each month at a specified time. All the scheduling is done in the celery.conf.beat_schedule dictionary.

Note: crontab(hour=21, minute=0) is used to schedule the task to run at 9pm every day. and crontab(minute='*/1') is used to schedule the task to run every minute.

The every minute schedule is used for testing purpose only. You can remove this schedule when you are done testing. (Before submitting)

Server Side Event (SSE) Implementation

Based on some event on server side we want to send notification to users in our application. Let's talk about real life example for more clear understanding. Whenever you subscribe to a news channel in your phone, you will get a notification when a new article is published. So I want to notify Admin, Manager and User when some particular event happens in the application.

Frontend Implementation

Admin Dashboard Update


js filename=static/views/AdminApp.js
const AdminApp = {
    ...
    mounted() {
        const source = new EventSource("/stream");
        source.addEventListener(
        "notifyadmin",
        (event) => {
            let data = JSON.parse(event.data);
            alert(data.message);
        },
        false
        );
        ...
    }  
  };
  ...
};
export default AdminApp;

Here in the frontend we have subscribed an event of notifyadmin from the server and whenever this event is triggered we will get the notification in the frontend. Same way we will implement on the frontend for Manager and User.

Manager Dashboard Update


js filename=static/views/AdminApp.js
const ManagerApp = {
    ...
    mounted() {
        const source = new EventSource("/stream");
        source.addEventListener(
        "notifyadmin",
        (event) => {
            let data = JSON.parse(event.data);
            alert(data.message);
        },
        false
        );
        ...
    }  
  };
  ...
};
export default ManagerApp;

User Dashboard Update


js filename=static/views/AdminApp.js
const UserApp = {
    ...
    mounted() {
        const source = new EventSource("/stream");
        source.addEventListener(
        this.$store.state.authenticatedUser.email,
        (event) => {
            let data = JSON.parse(event.data);
            alert(data.message);
        },
        false
        );
        ...
    }  
  };
  ...
};
export default UserApp;

Note Here one thing you should notice in the User Dashboard we have set the event type to the email of the user. The reson is because some user may not get this event.

Backend Implementation


python filename=backendjobs/tasks.py
...
# scheduled task
@celery.task()
def daily_reminder_to_user():
    users = User.query.all()
    for user in users:
        ...
        if flag and user.role == 'user':
            ...
            sse.publish({ # [tl! add:start]
                "message": "You have not placed any order, please place now!",
                        "color": "alert alert-primary"}, type=user.email) # [tl! add:end]
    return {"status": "success"}

# scheduled task


@celery.task()
def monthly_entertainment_report_to_users():
    users = User.query.all()
    for user in users:
        if user.role == 'user':
            ...
            sse.publish({"message": "Monthly Report sent"}, type='notifyadmin') # [tl! add]
            sse.publish({"message": "Monthly Report sent"}, type='notifymanager') # [tl! add]
    return {"status": "success"}

...

User Triggered Task

We will also add a user triggered task in the tasks.py file. As mentioned in the requirement of the application, we will report to the admin or manager whenever they want, they can trigger the task. The task we help them to see the overall data.


python filename=backendjobs/tasks.py
...
@celery.task()
def user_triggered_async_job():
    header = ["Product Name", "Product Quantity",
              "Product Manufacturing Date", "Product Expiry Date",
              "Product RPU"]

    with open('product_report.csv', 'w', newline='') as f:
        csvwriter = csv.writer(f)
        csvwriter.writerow(header)
        content = []
        for product in Product.query.all():
            csvwriter.writerow([
                product.name,
                product.quantity,
                product.manufacture.strftime('%Y-%m-%d'),
                product.expiry.strftime('%Y-%m-%d'),
                product.rpu,
            ])
            item = {
                'name': product.name,
                'quantity': product.quantity,
                'manufacture': product.manufacture.strftime('%Y-%m-%d'),
                'expiry': product.expiry.strftime('%Y-%m-%d'),
                'description': product.description,
                'rpu': product.rpu,
            }
            content.append(item)
    return {'header': header, 'content': content}

This task will help to get the basic report of the products. Now we will add a route in the admin.py file so that