from passlib.hash import bcrypt_sha256
from sqlalchemy import (
Column,
ForeignKey,
Integer,
String,
Boolean,
Text,
UniqueConstraint,
ForeignKeyConstraint,
)
from sqlalchemy.orm import relationship, aliased
from sqlalchemy.ext.hybrid import hybrid_property
from flask_sqlalchemy import SQLAlchemy
from saraki.utility import import_into_sqla_object, export_from_sqla_object
database = SQLAlchemy()
BaseModel = database.Model
class ModelMixin:
def import_data(self, data):
return import_into_sqla_object(self, data)
def export_data(self, include=(), exclude=()):
return export_from_sqla_object(self, include, exclude)
[docs]class Model(BaseModel, ModelMixin):
"""Abstract class from which all your model classes should extend."""
__abstract__ = True
[docs]class Plan(Model):
"""Available plans in your application."""
__tablename__ = "plan"
#: Primary key
id = Column(Integer, primary_key=True)
#: A name for the plan. For instance, Pro, Business, Personal, etc.
name = Column(String(100), nullable=False, unique=True)
#: The amount of members that an organization can have.
amount_of_members = Column(Integer, nullable=False, default=1)
#: Price of the plan.
price = Column(Integer, nullable=False, unique=True)
[docs]class User(Model):
"""User accounts."""
__tablename__ = "user"
#: Primary key
id = Column(Integer, primary_key=True)
#: Email associated with the account. Must be unique.
email = Column(String(128), unique=True, nullable=False)
_username = Column("username", String(20), unique=True, nullable=False)
@hybrid_property
def username(self):
"""Username associated with the account. Must be unique."""
return self._username
@username.setter
def username(self, value):
self._username = value
self._canonical_username = value.lower()
_canonical_username = Column(
"canonical_username", String(20), unique=True, nullable=False
)
@hybrid_property
def canonical_username(self):
""" Lowercase version of the username used for authentication.
Don't set this column directly. This column is filled automatically
when the :attr:`username` column is assigned with a value.
"""
return self._canonical_username
#: Don't set the value of this property directly, instead use the hybrid
#: property ``password`` that will encrypt it for you.
_password = Column("password", String(255), nullable=False)
@hybrid_property
def password(self):
""" The password is hashed under the hood, so set this with the
original/unhashed password directly.
"""
return self._password
@password.setter
def password(self, value):
self._password = bcrypt_sha256.hash(value)
#: This property defines if the user account is activated or not. To use
#: when the user verifies its account through an email for instance.
active = Column(Boolean(), default=False, nullable=False)
def verify_password(self, value):
return bcrypt_sha256.verify(value, self.password)
def import_data(self, data):
super(User, self).import_data(data)
# .import_data does not work with hybrid_property, so we set these
# manually.
if "username" in data:
self.username = data["username"]
if "password" in data:
self.password = data["password"]
def export_data(self, include=(), exclude=("id", "password", "canonical_username")):
return super(User, self).export_data(include, exclude)
def __str__(self):
return f"<User {self.username}>"
[docs]class Org(Model):
"""Organization accounts.
This table registers all organizations being managed by the application and
owned by at least one user account registered in the :class:`Membership`
table.
"""
__tablename__ = "org"
#: Primary Key.
id = Column(Integer, primary_key=True)
#: The organization account name.
orgname = Column(String(20), unique=True, nullable=False)
#: The name of the organization.
name = Column(String(80), unique=True, nullable=False)
#: The primary key of the user that created the organization account. But,
#: this account not necessarily is the owner of the organization account,
#: just the user that registered the organization. See the table
#: :class:`Member` for more information.
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
#: Plan selected from the :class:`Plan` table.
plan_id = Column(Integer, ForeignKey("plan.id"))
#
# - - - Relationships - - -
#
created_by = relationship("User", uselist=False)
plan = relationship("Plan", uselist=False)
[docs]class Membership(Model):
"""Users accounts that are members of an Organization.
Application users who belong to an organization are considered members,
including the owner of the account. This table is a many to many
relationship between the tables :class:`User` and :class:`Org`.
"""
__tablename__ = "membership"
#: The ID of a user account in the table :class:`User`.
user_id = Column(Integer, ForeignKey("user.id"), primary_key=True)
#: The ID of an organization account in the table :class:`Org`.
org_id = Column(Integer, ForeignKey("org.id"), primary_key=True)
#: If this is True, this member is the/an owner of this organization. One
#: or more members can be owner at the same time.
is_owner = Column(Boolean, default=False, server_default="FALSE", nullable=False)
#: Enable or disable a member from an organization.
enabled = Column(Boolean, default=False, nullable=False)
#
# -- Relationships --
#
org = relationship("Org", uselist=False)
user = relationship("User", uselist=False)
def export_data(self, include=(), exclude=("org_id", "user_id")):
return super(Membership, self).export_data(include, exclude)
[docs]class Action(Model, ModelMixin):
"""Actions performed across the application like manage, create, read,
update, delete, follow, etc.
This table stores all actions registered using :func:`~saraki.auth.require_auth`.
"""
__tablename__ = "action"
id = Column(Integer, primary_key=True)
name = Column(String(80), nullable=False, unique=True)
description = Column(Text)
def import_data(self, data):
super(Action, self).import_data(data)
[docs]class Resource(Model, ModelMixin):
"""Application resources."""
__tablename__ = "resource"
#: Primary Key.
id = Column(Integer, primary_key=True)
#: The name of the resource.
name = Column(String(80), nullable=False, unique=True)
#: A useful description, please.
description = Column(Text)
#: Parent resource.
parent_id = Column(Integer, ForeignKey("resource.id"))
#
# -- Relationships --
#
parent = relationship("Resource", uselist=False, remote_side=id)
def _persist_actions(actions):
"""Saves a list of actions in the database. Actions in the list that
already exist in the database are ignored.
Note that this doesn't commit the current session.
:param actions: list, tuple or set of action names.
"""
actions = set(actions)
persisted = {action.name for action in Action.query.all()}
new_actions = actions - persisted
for name in new_actions:
action = Action(name=name)
database.session.add(action)
def _persist_resources(resources, parent=None):
"""Save resources in the database.
Note that this doesn't commit the current session.
"""
persisted = {r.name: r for r in Resource.query.all()}
for key, value in resources.items():
if key not in persisted:
resource = Resource(name=key)
resource.parent = parent
database.session.add(resource)
else:
resource = persisted[key]
if value:
_persist_resources(value, resource)
[docs]class Ability(Model, ModelMixin):
"""An ability represents the capacity to perform an action (create, read,
update, delete) on a resource/module/service of an application. In other
words is an action/resource pair.
This table is used to define those pairs, give them a name and a useful
description.
"""
__tablename__ = "ability"
#: Foreign key. References to the column :attr:`~Action.id` of the table
#: :class:`Action`.
action_id = Column(Integer, ForeignKey("action.id"), primary_key=True)
#: Foreign key. References to the column :attr:`~Resource.id` of the table
#: :class:`Resource`.
resource_id = Column(Integer, ForeignKey("resource.id"), primary_key=True)
#: A name for the ability. For instance. Create Products.
name = Column(String(80), nullable=False, unique=True)
#: A long text that describes what this ability does.
description = Column(Text)
def _persist_abilities():
"""This function generates **abilities** (action/resource pair) and inserts
them into the database.
Note that this doesn't commit the current session.
"""
persisted = [(item.action_id, item.resource_id) for item in Ability.query.all()]
actions = Action.query.all()
resources = Resource.query.all()
for resource in resources:
for action in actions:
if (action.id, resource.id) not in persisted:
ability = Ability(
name=f"{action.name}:{resource.name}",
action_id=action.id,
resource_id=resource.id,
)
database.session.add(ability)
[docs]class Role(Model):
"""A Role is a set of **abilities** that can be assigned to organization
members, for example, Seller, Cashier, Driver, Manager, etc.
This table holds all roles of all organizations accounts, determining the
organization that owns the role by the :class:`Org` identifier in the
column :attr:`org_id`.
Since the roles of all organizations reside in this table, the column
:attr:`name` can have repeated values. But a role name must be unique
in each organization.
"""
__tablename__ = "role"
#: Primary Key.
id = Column(Integer, primary_key=True)
#: A name for the role, Cashier for example.
name = Column(String(80), nullable=False)
#: A long text that describes what this role does.
description = Column(Text, nullable=False)
#: The :attr:`~Org.id` of the organization account to which this role belongs.
org_id = Column(Integer, ForeignKey("org.id"))
#
# -- Relationships --
#
abilities = relationship("RoleAbility", passive_deletes=True)
__table_args__ = (
# MemberRole has a composite foreign key referencing those columns.
UniqueConstraint(id, org_id),
# The name must be unique to each organization.
UniqueConstraint(name, org_id),
)
[docs]class RoleAbility(Model, ModelMixin):
__tablename__ = "role_ability"
role_id = Column(
Integer, ForeignKey("role.id", ondelete="CASCADE"), primary_key=True
)
action_id = Column(Integer, nullable=False, primary_key=True)
resource_id = Column(Integer, nullable=False, primary_key=True)
__table_args__ = (
ForeignKeyConstraint(
[action_id, resource_id], [Ability.action_id, Ability.resource_id]
),
)
#
# -- Relationships --
#
ability = relationship("Ability", uselist=False)
role = relationship("Role", uselist=False)
[docs]class MemberRole(Model, ModelMixin):
"""All the roles that a user has in an organization.
This table have two composite foreign keys:
* (:attr:`org_id`, :attr:`user_id`) references to
:class:`Membership` (:attr:`~Membership.org_id`, :attr:`~Membership.user_id`).
* (:attr:`org_id`, :attr:`role_id`) references to
:class:`Role` (:attr:`~Role.org_id`, :attr:`~Role.user_id`).
Those two composite foreign keys ensure that the user to which a role is
assigned indeed is a member of the organization.
"""
__tablename__ = "member_role"
#: Foreign key. Must be present in the tables :class:`Membership`
#: and :class:`Role`.
org_id = Column(Integer, nullable=False, primary_key=True)
#: Foreign key with :attr:`~Membership.user_id` from the table
#: :class:`Membership`.
user_id = Column(Integer, nullable=False, primary_key=True)
#: Foreign key. :attr:`Role.id` from the table :class:`Role`.
role_id = Column(Integer, nullable=False, primary_key=True)
#
# -- Relationships --
#
role = relationship("Role", uselist=False)
__table_args__ = (
ForeignKeyConstraint(
[user_id, org_id],
[Membership.user_id, Membership.org_id],
ondelete="CASCADE",
),
ForeignKeyConstraint(
[role_id, org_id], [Role.id, Role.org_id], ondelete="CASCADE"
),
)
def get_member_privileges(org, user):
member = Membership.query.filter_by(user_id=user.id, org_id=org.id).one()
if member.is_owner:
return {"org": ["manage"]}
query = database.session.query
# Get all roles from a specific member
roles_subquery = (
query(MemberRole.role_id)
.filter_by(org_id=member.org_id, user_id=member.user_id)
.subquery()
)
# Get abilities from the list of roles from the subquery above
abilities_subquery = (
query(RoleAbility).filter(RoleAbility.role_id.in_(roles_subquery)).subquery()
)
action = aliased(Action)
resource = aliased(Resource)
# Get all action and resource names from the abilities in the subquery above
action_resource_query = (
query(action.name.label("action"), resource.name.label("resource"))
.select_from(abilities_subquery)
.join(action, action.id == abilities_subquery.c.action_id)
.join(resource, resource.id == abilities_subquery.c.resource_id)
)
abilities = action_resource_query.all()
privileges = {}
for ability in abilities:
resource_name = ability.resource
if resource_name not in privileges:
privileges[resource_name] = []
privileges[resource_name].append(ability.action)
return privileges