04. Role Based Authentication
Let's add authentication and authorization to our grocery app. We'll implement token-based authentication using flask-jwt-extended
for simplicity. While flask-security-too
offers extensive features, flask-jwt-extended
is more straightforward and suitable for our needs.
Installation
First we will activate the .env
(if not active) and install the flask-jwt-extended package:
source .env/bin/activate
pip install flask-jwt-extended
We will be using the Automatic User Loadingflask-jwt-extended.readthedocs.io/en/stable/automatic_user_loading.html
feature of flask-jwt-extended. This feature allows us to easily access the current user (logged in user) in the system. The most common use case is to use the current_user
object. This object is available in the global context and can be used in any part of the application.
Next, we need to update the code in app.py
to configure flask-jwt-extended
and add the necessary imports:
python filename=app.py
...
# Hashing password is a standard practice.
from werkzeug.security import generate_password_hash
# we will using all the methods/classes from flask-jwt-extended
from flask_jwt_extended import ( # # [t1! add:start]
JWTManager,
verify_jwt_in_request,
create_access_token,
current_user,
get_jwt,
get_jwt_identity,
jwt_required
)# [t1! add:end]
app = Flask(__name__)
# basic configuration
app.config["JWT_SECRET_KEY"] = "super-secret" # Change this! [tl! add:start]
# By default CSRF protection is enabled, we can disable it.
app.config["JWT_COOKIE_CSRF_PROTECT"] = False # [tl! add:end]
...
db.init_app(app)
jwt = JWTManager(app)# [tl! add]
app.app_context().push()
db.create_all()
# inserting admin creadentials
exist_admin = User.query.filter_by(role='admin').first()
if not exist_admin:
the_admin = User(email="sachin@gmail.com", name="Sachin Kumar",
role="admin", password=generate_password_hash(
"password"))
db.session.add(the_admin)
db.session.commit()
# Register a callback function that takes whatever object is passed in as the #[tl! add:start]
# identity when creating JWTs and converts it to a JSON serializable format.
@jwt.user_identity_loader
def user_identity_lookup(user):
return user
# Register a callback function that loads a user from your database whenever
# a protected route is accessed. This should return any python object on a
# successful lookup, or None if the lookup failed for any reason (for example
# if the user has been deleted from the database).
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(email=identity).one_or_none()#[tl! add:end]
...
Note In this project, we will be using Flask routes as API endpoints. We will return the data in the response as JSON.
With Flask JWT initialized, our authentication mechanism is set up. We'll proceed to create two API endpoints for login and registration. To maintain simplicity, we'll define most of these routes within the same file.
API Endpoints
In the process of implementing registration and login functionality, we will utilize the werkzeug.security
package to hash and verify passwords, ensuring secure storage. Additionally, Flask Blueprints will be employed to organize routes efficiently. For more information, refer to the Flask Blueprints documentationflask.palletsprojects.com/en/2.2.x/blueprints/
.
python filename=app.py
...
from datetime import datetime # [tl! add]
...
# -------------------authentication and authorization-----------------# [tl! add:start]
auth = Blueprint('auth', __name__)
@auth.route('/api/login', methods=['POST'])
def login_post():
# login code goes here
data = request.get_json()
user = User.query.filter_by(email=data["email"]).first()
if not user or not check_password_hash(user.password, data["password"]):
# if the user doesn't exist or password is wrong, reload the page
return jsonify({'error': 'wrong credentials'}), 404
else:
access_token = create_access_token(identity=user.email)
user_data = {
'id': user.id,
'role': user.role,
'email': user.email,
'access_token': access_token,
# Assuming image is stored as a base64-encoded string
'image': (base64.b64encode(user.image).decode('utf-8')
if user.image else None)
}
user.loginAt = datetime.now()
db.session.commit()
return jsonify({'message': 'User login successfully',
'resource': user_data}), 200
@auth.route('/signup', methods=['POST'])
def signup_post():
data = request.get_json()
# if this returns a user, then the email already exists in database
user = User.query.filter_by(email=data["email"]).first()
# if this returns a user, then the email already exists in database
exist_req = RequestResponse.query.filter_by(sender=data["email"]).first()
# if this returns a user, then the email already exists in database
admin = User.query.filter_by(role='admin').first()
if user or exist_req:
return jsonify({'error': 'User already exists'}), 409
# create a new user with the form data. Hash the password so the plaintext
# version isn't saved.
if data["role"] == 'manager':
message = f"{data['email']},{data['name']},\
{data['role']},{data['password']}"
requested = RequestResponse(status='pending',
type='manager',
message=message,
sender=data['email'],
receiver=admin.email,
timestamp=datetime.now())
db.session.add(requested)
db.session.commit()
return jsonify(
{'message': 'Created request, on result will send on mail'}), 201
else:
new_user = User(
email=data["email"],
name=data["name"],
role=data["role"],
password=generate_password_hash(
data["password"]),
doj=datetime.now())
db.session.add(new_user)
db.session.commit()
verified_data = {
"email": data["email"],
"name": data["name"],
"role": data["role"],
}
return jsonify(
{'message': 'User registered successfully',
'data': verified_data}), 201
app.register_blueprint(auth) # [tl! add:end]
...
API Calls
Now that we have created the API endpoints for login and registration, we will make API calls to them. We will use the fetch
API method to perform these calls. fetch
is a built-in JavaScript method available in all modern browsers, so there is no need to import or install it manually.
js filename=static/views/RegisterCompo.js
...
async submitForm() { // [tl! add:start]
try {
const response = await fetch('http://127.0.0.1:5000/signup', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
Authorization: `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
"email": this.email,
"name": this.name,
"password": this.password,
"role": this.role
}),
});
const data = await response.json();
if (response.status === 201) {
alert(data.message);
if (this.$route.path != '/login') {
this.$router.push('/login')
this.closeCard()
}
} else if (response.status === 409) {
alert(data.error);
}
} catch (error) {
console.error(error);
}
},// [tl! add:end]
...
Updating LoginCompo.js
js filename=static/views/LoginCompo.js
...
submitForm() { // [tl! add:start]
fetch('http://127.0.0.1:5000/api/login', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
"email": this.email,
"password": this.password,
"remember": this.remember
}),
})
.then(response => {
if (response.status == 200) {
return response.json(); // Return the promise for the next then block
} else if (response.status == 404) {
this.message = "Wrong credentials";
}else if (response.status == 409) {
this.message = "User already exists";
}else if (response.status == 500) {
this.message = "Internal server error";
}
})
.then(data => {
if (data && data.resource.role) {
localStorage.setItem('token', data.resource.access_token);
if (data.resource.role === 'admin') {
this.$store.commit('setAuthenticatedUser', data.resource)
if (this.$route.path !== '/app/admin') {
this.$router.push('/app/admin');
}
} else if (data.resource.role === 'user') {
this.$store.commit('setAuthenticatedUser', data.resource)
if (this.$route.path !== '/app/user') {
this.$router.push('/app/user');
}
} else if (data.resource.role === 'manager') {
this.$store.commit('setAuthenticatedUser', data.resource)
if (this.$route.path !== '/app/manager') {
this.$router.push('/app/manager');
}
} else {
if (this.$route.path !== '/app') {
this.$router.push('/app');
}
}
}
console.log('Server response:', data);
})
.catch(error => {
console.error('Error sending data:', error);
});
}// [tl! add:end]
...
Authorization
Next, after implementing authentication, we will implement authorization. Flask-jwt-extended doesn't provide any built-in authorization implementation, so we will have to add it to our project. To do this, we will use the powerful feature of Python and the foundation of the Flask framework: decorators. We will create this decorator in a separate Python file so that we can use it anywhere in the application without running into circular import issues.
Now, let's create a separate Python file (role_auth.py
) for our authorization decorator.
python filename=role_auth.py
from functools import wraps
from flask import render_template
from flask_jwt_extended import (
jwt_required,
current_user
)
def role_required(roles: list):
def decorator(f):
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
if current_user is None or current_user.role not in roles:
return render_template('index.html')
return f(*args, **kwargs)
return decorated_function
return decorator
Note I recommend watching tutorials on Python decorators to understand their usage. In this case, I'm using a list of roles because some routes may be accessed by multiple roles. The
@jwt_required()
decorator is used because we're accessing thecurrent_user
.
Now you can import this role_required
decorator to protect your api endpoints by role to implement authorization.
We will be using it later in this project.
Logout
We use the browser's localStorage
to store the authentication token. While we have implemented the login feature, we now need to implement logout functionality. Since this is token-based authentication rather than session-based, the recommended way to implement logout is to revoke the token, as suggested by flask-jwt-extended
.
Note > Note To implement logout functionality, we need to store the revoked tokens somewhere. There are various ways to implement this, but I will go with redis one. Thus redis server is required! Please go through the tutorial on how to block list token in flask-jwt-extended. here
flask-jwt-extended.readthedocs.io/en/stable/blocklist_and_token_revoking.html#
.
Let's configure and setup
python filename=app.py
...
from datetime import timedelta # [tl! add]
ACCESS_EXPIRES = timedelta(hours=1) # [tl! add]
...
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = ACCESS_EXPIRES # [tl! add]
...
# Setup our redis connection for storing the blocklisted tokens. You will probably
# want your redis instance configured to persist data to disk, so that a restart
# does not cause your application to forget that a JWT was revoked.
jwt_redis_blocklist = redis.StrictRedis(# [tl! add:start]
host="localhost", port=6379, db=0, decode_responses=True
)
# Callback function to check if a JWT exists in the redis blocklist
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload: dict):
jti = jwt_payload["jti"]
token_in_redis = jwt_redis_blocklist.get(jti)
return token_in_redis is not None# [tl! add:end]
...
Testing it out
Time to test it out! Opend a new wsl terminal start the redis server and then on another terminal start the appliation server.
sudo service redis-server start
To start the app
python3 app.py
Commit the changes in the local repo using the following command:
git add .
git commit -m "added RBAC features"