Documentation Index
Fetch the complete documentation index at: https://docs.backant.io/llms.txt
Use this file to discover all available pages before exploring further.
All queries in BackAnt live in repository files (api/repositories/). The DBSession wrapper provides the methods for executing queries, and SQLAlchemy’s select() API builds the statements.
DBSession methods
| Method | Description |
|---|
myDB.execute(stmt) | Execute a SELECT statement, returns a Result |
myDB.execute_commit(stmt) | Execute an UPDATE/DELETE and commit |
myDB.add(item) | Insert a model instance and commit |
myDB.add_all(items) | Bulk insert a list of model instances |
myDB.delete(item) | Delete a model instance and commit |
All methods roll back the session automatically on failure.
Repository base class
All repositories extend Repository, which provides add(), add_all(), and delete():
class Repository:
def __init__(self, db: DBSession, logger: Logger):
self.db = db
self.logger = logger
def add(self, item: Base):
self.db.add(item)
self.logger.info(f"{item} added.")
def add_all(self, items: List[Base]):
self.db.add_all(items)
def delete(self, item: Base):
self.db.delete(item)
self.logger.info(f"{item} removed")
SELECT queries
from sqlalchemy import select
from helper.DBSession import myDB
from models.Users_model import Users
# Fetch all rows
def get_all_users(self):
stmt = select(Users)
return myDB.execute(stmt).scalars().all()
# Fetch one by primary key
def get_by_id(self, user_id: int):
stmt = select(Users).where(Users.id == user_id)
return myDB.execute(stmt).scalars().first()
# Filter by field
def get_by_email(self, email: str):
stmt = select(Users).where(Users.email == email)
return myDB.execute(stmt).scalars().first()
# Multiple conditions
def get_active_users(self, role: str):
stmt = select(Users).where(Users.active == True, Users.role == role)
return myDB.execute(stmt).scalars().all()
INSERT
def add_user(self, name: str, email: str):
user = Users(name=name, email=email)
try:
self.add(user)
except IntegrityError as e:
self.logger.warning(f"Could not create user: {e.detail}")
raise e
return user
UPDATE
from sqlalchemy import update
def update_user_name(self, user_id: int, name: str):
stmt = update(Users).where(Users.id == user_id).values(name=name)
myDB.execute_commit(stmt)
Or fetch-then-modify pattern:
def update_user(self, user_id: int, data: dict):
user = self.get_by_id(user_id)
if user is None:
return None
for key, value in data.items():
setattr(user, key, value)
myDB.add(user)
return user
DELETE
def delete_user(self, user_id: int):
user = self.get_by_id(user_id)
try:
self.delete(user)
except IntegrityError as e:
self.logger.warning(f"Could not delete user: {e.detail}")
raise e
return user
Ordering and limiting
from sqlalchemy import select, asc, desc
stmt = select(Users).order_by(desc(Users.created_at)).limit(20).offset(0)
Joins
from sqlalchemy import select
from models.Users_model import Users
from models.Orders_model import Orders
stmt = (
select(Orders)
.join(Users, Orders.user_id == Users.id)
.where(Users.email == email)
)
orders = myDB.execute(stmt).scalars().all()
Raw SQL (when needed)
from sqlalchemy import text
result = myDB.session.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
Use raw SQL sparingly — prefer the SQLAlchemy query API.