Skip to main content
All queries in BackAnt live in repository files (api/repositories/). The DBSession wrapper provides the methods for executing queries, and SQLAlchemy’s select() API builds the statements.

DBSession methods

MethodDescription
myDB.execute(stmt)Execute a SELECT statement, returns a Result
myDB.execute_commit(stmt)Execute an UPDATE/DELETE and commit
myDB.add(item)Insert a model instance and commit
myDB.add_all(items)Bulk insert a list of model instances
myDB.delete(item)Delete a model instance and commit
All methods roll back the session automatically on failure.

Repository base class

All repositories extend Repository, which provides add(), add_all(), and delete():
class Repository:
    def __init__(self, db: DBSession, logger: Logger):
        self.db = db
        self.logger = logger

    def add(self, item: Base):
        self.db.add(item)
        self.logger.info(f"{item} added.")

    def add_all(self, items: List[Base]):
        self.db.add_all(items)

    def delete(self, item: Base):
        self.db.delete(item)
        self.logger.info(f"{item} removed")

SELECT queries

from sqlalchemy import select
from helper.DBSession import myDB
from models.Users_model import Users

# Fetch all rows
def get_all_users(self):
    stmt = select(Users)
    return myDB.execute(stmt).scalars().all()

# Fetch one by primary key
def get_by_id(self, user_id: int):
    stmt = select(Users).where(Users.id == user_id)
    return myDB.execute(stmt).scalars().first()

# Filter by field
def get_by_email(self, email: str):
    stmt = select(Users).where(Users.email == email)
    return myDB.execute(stmt).scalars().first()

# Multiple conditions
def get_active_users(self, role: str):
    stmt = select(Users).where(Users.active == True, Users.role == role)
    return myDB.execute(stmt).scalars().all()

INSERT

def add_user(self, name: str, email: str):
    user = Users(name=name, email=email)
    try:
        self.add(user)
    except IntegrityError as e:
        self.logger.warning(f"Could not create user: {e.detail}")
        raise e
    return user

UPDATE

from sqlalchemy import update

def update_user_name(self, user_id: int, name: str):
    stmt = update(Users).where(Users.id == user_id).values(name=name)
    myDB.execute_commit(stmt)
Or fetch-then-modify pattern:
def update_user(self, user_id: int, data: dict):
    user = self.get_by_id(user_id)
    if user is None:
        return None
    for key, value in data.items():
        setattr(user, key, value)
    myDB.add(user)
    return user

DELETE

def delete_user(self, user_id: int):
    user = self.get_by_id(user_id)
    try:
        self.delete(user)
    except IntegrityError as e:
        self.logger.warning(f"Could not delete user: {e.detail}")
        raise e
    return user

Ordering and limiting

from sqlalchemy import select, asc, desc

stmt = select(Users).order_by(desc(Users.created_at)).limit(20).offset(0)

Joins

from sqlalchemy import select
from models.Users_model import Users
from models.Orders_model import Orders

stmt = (
    select(Orders)
    .join(Users, Orders.user_id == Users.id)
    .where(Users.email == email)
)
orders = myDB.execute(stmt).scalars().all()

Raw SQL (when needed)

from sqlalchemy import text

result = myDB.session.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
Use raw SQL sparingly — prefer the SQLAlchemy query API.