Skip to content

04. Role Based Authentication

After implementing authentication we will move to implement authorization. By default flask_jwt_extended does not provide the mechanism to verify the role of the user. To do so we will create a custom decorator to verify the role of the user.

Note To know more about decorators, you can read this articlewww.geeksforgeeks.org/decorators-in-python/.

Role Required

Let's create a new decorator named role_required:

python filename=main.py

...
from functools import wraps #[tl! add]
...
# ------- Custom Decorator for role verification ----# #[tl! add:start]
def role_required(roles: list):
    def decorator(f):
        @wraps(f)
        @jwt_required()
        def decorated_function(*args, **kwargs):
            if current_user is None or current_user.role not in roles:
                return {'msg': 'You role is not authorized'}, 401
            return f(*args, **kwargs)
        return decorated_function
    return decorator # [tl! add:end]
...

Now you can use this role_required decorator to protect your api endpoints by role to implement authorization.

Now we will start to create api resources for the frontend application.

Let's create the tables for Category and Product into our models.py file

python filename=models.py

...
class Category(db.Model):
    __tablename__ = 'category'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String, nullable=False)
    products = db.relationship('Product', backref='category', lazy=True)


class Product(db.Model):
    __tablename__ = 'product'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    quantity = db.Column(db.Integer, nullable=False)
    avg_rate = db.Column(db.Integer)
    name = db.Column(db.String, nullable=False)
    description = db.Column(db.String(1000))
    manufacture = db.Column(db.DateTime, nullable=False)
    expiry = db.Column(db.DateTime, nullable=False)
    rpu = db.Column(db.Float, nullable=False)
    unit = db.Column(db.String, nullable=False)
    image = db.Column(db.BLOB, nullable=False)
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'))

Here you can see that we are using the one-to-many relation between Category and product, since under a category there can be multiple products.

Note To store the image we are using the data type to be BLOB. BLOB stands for binary large object to store the media files.

Now we will create some resources related with login, register, logout, and CRUD for categories and products.

APIs

To keep the layout of the application to be simple we will create all the apis in the main.py file.

Also for creating Restful apis we will be using the Flask-Restful library.

Installation

Let's install the flask-restful library:

pip install flask-restful

Configuration

Now configure the flask restful with the flask app.

python filename=main.py

...
from flask_restful import Resource, Api #[tl! add]
...
app, jwt = None, None # [tl! remove]
app, api, jwt = None, None, None # [tl! add]


def create_app():
    app = Flask(__name__)
    ...
    api = Api(app)
    ...


...

So far we have successfully configured the flask restful with the flask app.

Now let's create the api resources for the application.

Register

Let's create create the register resource:

python filename=main.py

...
from werkzeug.security import generate_password_hash, check_password_hash
...
# ------- My flask-restful api resources will start from here --------#


# ------- The authentication and authorization part will start from here ----#
class Signup(Resource):
    def post(self):
        data = request.get_json()
        user = User.query.filter_by(email=data["email"]).first()
        exist_req = RequestResponse.query.filter_by(
            sender=data["email"]).first()

        admin = User.query.filter_by(role='admin').first()
        if user or exist_req:  #
            return {'error': 'User already exists, Try with another email'}, 409
        if data["role"] == 'manager':
            message = f"{data['email']},{data['name']},{data['role']}, \
            {data['password']}"
            requested = RequestResponse(status='pending',
                                        type='manager',
                                        message=message,
                                        sender=data['email'],
                                        receiver=admin.email,
                                        timestamp=datetime.now())
            db.session.add(requested)
            db.session.commit()
            return {'msg': 'Created request, \
                            on result will send on mail'}, 201
        else:
            new_user = User(email=data["email"], name=data["name"],
                            role=data["role"], password=generate_password_hash(
                data["password"], method='scrypt'), doj=datetime.now())
            db.session.add(new_user)
            db.session.commit()
            inserted_data = {
                "email": data["email"],
                "name": data["name"],
                "role": data["role"],
                "auth-token": "hello"
            }
            return {'msg': 'User registered successfully',
                    'data': inserted_data}, 201

# --------------- AUTH END HERE ------------------------###

...
api.add_resource(Signup, '/signup')
...

In the registration you can see that while registering we are checking the role of the user. If the user is registering as a manager then it will create a request to admin for approval. If the user is registering as a user then it will create a new user in the database.

Since manager registration should go through the approval of the admin, so we are storing the request in the database. Thus we will have to create a table to store the request.

python filename=models.py

...
class RequestResponse(db.Model):
    __tablename__ = 'request_response'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    status = db.Column(db.String, nullable=False)
    type = db.Column(db.String, nullable=False)
    message = db.Column(db.String, nullable=False)
    image = db.Column(db.BLOB)
    sender = db.Column(db.Integer, db.ForeignKey('user.id'))
    receiver = db.Column(db.Integer, db.ForeignKey('user.id'))
    timestamp = db.Column(db.DateTime, nullable=False)
...

Here in the message attribute of the table we will store the data of the request and we will follow the csv format to store the data so then we can esily extract the data from the message. For example message will be like this: "data1,data2,data3" which can easily be extracted.

The the sender and receiver will be from the user table so that's why we have the foreign key refering to the user table.

Login

Now let's create the login resource:

python filename=main.py

...
# ------- My flask-restful api resources will start from here --------#


# ------- The authentication and authorization part will start from here ----#
class LoginResource(Resource):
    def post(self):
        data = request.get_json()
        email = data.get("email", None)
        password = data.get("password", None)
        user = User.query.filter_by(email=email).first()
        if user:
            if check_password_hash(user.password, password):
                access_token = create_access_token(identity=user)
                return {
                    'id': user.id,
                    'role': user.role,
                    'email': user.email,
                    'access_token': access_token,
                    'image': (base64.b64encode(user.image).decode('utf-8') if user.image else None)
                }
            return {'error': "Authentication failed"}, 401
        return {'error': "wrong credentials"}, 404

# --------------- AUTH END HERE ------------------------###

...
api.add_resource(LoginResource, '/api/login', '/logout')
...

Let's also create resources for Category and Product.

Category

python filename=main.py

...
# ------- The CRUD operations or the business logic part will start from here ----#

class CatListResource(Resource):
    @cache.cached(timeout=50, key_prefix="get_category")
    def get(self):
        categories = Category.query.all()
        categories_list = []
        for category in categories:
            cat = {
                'id': category.id,
                'name': category.name,
            }
            categories_list.append(cat)
        return categories_list


class CategoryResource(Resource):
    def get(self, id):
        category = Category.query.get(id)
        if category:
            return {
                'id': category.id,
                'name': category.name
            }
        else:
            return {'error': 'Category not found'}, 404

    @jwt_required()
    @role_required(['admin', 'manager'])
    def put(self, id):
        data = request.get_json()
        category = Category.query.filter_by(id=id).first()
        if current_user.role == 'manager':
            admin = User.query.filter_by(role='admin').first()
            message = f"{id},{data['name']}"
            requested = RequestResponse(status='pending',
                                        type='category update',
                                        message=message,
                                        sender=current_user.email,
                                        receiver=admin.email,
                                        timestamp=datetime.now())
            db.session.add(requested)
            db.session.commit()
            noti_data = {
                'id': requested.id,
                'state': requested.status,
                'msg': requested.message,
                'sender': requested.sender,
                'timestamp': requested.timestamp.strftime("%Y-%m-%d"),
            }
            return {'msg': 'Created request', 'resource': noti_data}, 201
        else:
            category.name = data['name']
            db.session.commit()
            return {
                'msg': f"Category {data['name']} update successfully in database",
                'resource': {
                    'id': category.id,
                    'name': category.name}
            }, 201

    @jwt_required()
    def delete(self, id):
        category = Category.query.filter_by(id=id).first()
        if category:
            if current_user.role == 'manager':
                admin = User.query.filter_by(role='admin').first()
                message = f"{id},{category.name}"
                requested = RequestResponse(status='pending',
                                            type='category delete',
                                            message=message,
                                            sender=current_user.email,
                                            receiver=admin.email,
                                            timestamp=datetime.now())
                db.session.add(requested)
                db.session.commit()
                noti_data = {
                    'id': requested.id,
                    'state': requested.status,
                    'msg': requested.message,
                    'sender': requested.sender,
                    'timestamp': requested.timestamp.strftime("%Y-%m-%d"),
                }
                return {'msg': 'Created request', 'resource': noti_data}, 201
            else:
                products = Product.query.filter_by(
                    category_id=int(id)).all()
                for product in products:
                    carts = Cart.query.filter_by(
                        product_id=product.id).all()
                    for cart in carts:
                        db.session.delete(cart)
                        db.session.commit()
                    db.session.delete(product)
                    db.session.commit()
                db.session.delete(category)
                db.session.commit()
                return {
                    'msg': f"Category { category.name } Deleted \
                    successfully from database",
                    'resource': {
                        'id': category.id,
                        'name': category.name}
                }, 201
        return {'msg': "Not found"}, 404

    @jwt_required()
    def post(self):
        data = request.get_json()
        if data:
            if not Category.query.filter_by(name=data['name']).first():
                if current_user.role == 'manager':
                    admin = User.query.filter_by(role='admin').first()
                    message = data['name']
                    requested = RequestResponse(status='pending',
                                                type='category',
                                                message=message,
                                                sender=current_user.email,
                                                receiver=admin.email,
                                                timestamp=datetime.now())
                    db.session.add(requested)
                    db.session.commit()
                    noti_data = {
                        'id': requested.id,
                        'state': requested.status,
                        'msg': requested.message,
                        'sender': requested.sender,
                        'timestamp': requested.timestamp.strftime("%Y-%m-%d"),
                    }
                    return {
                        'msg': 'Created request',
                        'resource': noti_data}, 201
                else:
                    category = Category(name=data['name'])
                    db.session.add(category)
                    db.session.commit()
                    return {
                        'msg': f"Category {data['name']} created successfully",
                        'resource': {
                            'id': category.id,
                            'name': category.name}
                    }, 201
            abort(409, message="Resource already exists")
        else:
            abort(404, message="Not found")

Above we have create an api resource to get he list of categories and then CRUD for a category. Same we have followed to for the product. We will be following the same pattern for all the resources.

So far we have setup the basic frontend and basice backend now we will move to the integration part of the development.

Testing it out

You can start the backend server and test all the api point using any api testing tool. (Thunder Client)

To start the backend server run the following command:

python3 main.py

Now commit the changes to the local git.

git add .
git commit -m "Added RBAC"

Continue to Integration...