opendata-nm-exporter/src/sql/crud.py

163 lines
4.9 KiB
Python

from . import models, schemas
from .database import Database
from sqlalchemy import delete, update, select, text
from sqlalchemy.orm import sessionmaker
db = Database()
Session = sessionmaker(db.engine)
def create_boucle(boucle: schemas.Boucle):
with Session() as session:
db_boucle = models.Boucle(
id=boucle.id,
name=boucle.name,
geolocalisation=boucle.geolocalisation,
)
session.add(db_boucle)
session.commit()
return True
def update_boucle(boucle: schemas.Boucle):
with Session() as session:
stmt = (
update(models.Boucle)
.where(models.Boucle.id == boucle.id)
.values(name=boucle.name, geolocalisation=boucle.geolocalisation)
)
session.execute(stmt)
return True
def get_unaccounted_record_with_exist_boucle():
with Session() as session:
textual_sql = text(
"SELECT id_boucle FROM unaccounted_comptage WHERE EXISTS (SELECT id FROM boucles WHERE id = unaccounted_comptage.id_boucle) GROUP BY id_boucle;"
)
return session.execute(textual_sql).scalars()
def create_comptage(comptage: schemas.ComptageBase, unaccounted_table):
with Session() as session:
if unaccounted_table:
db_comptage = models.unaccounted_Comptage(
id_boucle=comptage.id_boucle,
datetime=comptage.datetime,
count=comptage.count,
week_day=comptage.week_day,
holiday=comptage.holiday,
)
else:
db_comptage = models.Comptage(
id_boucle=comptage.id_boucle,
datetime=comptage.datetime,
count=comptage.count,
week_day=comptage.week_day,
holiday=comptage.holiday,
)
session.add(db_comptage)
session.commit()
return True
def is_boucle_id_exist(id):
with Session() as session:
return (
session.query(models.Boucle.id).filter_by(id=id).first()
is not None
)
def is_comptage_exist(date, id_boucle, unaccounted_table):
with Session() as session:
if unaccounted_table:
return (
session.query(models.unaccounted_Comptage)
.filter_by(datetime=date, id_boucle=id_boucle)
.first()
is not None
)
else:
return (
session.query(models.Comptage)
.filter_by(datetime=date, id_boucle=id_boucle)
.first()
is not None
)
def get_comptage_by_date_and_boucle(date, id_boucle, unaccounted_table):
with Session() as session:
if unaccounted_table:
return (
session.query(models.unaccounted_Comptage)
.filter_by(datetime=date, id_boucle=id_boucle)
.first()
)
else:
return (
session.query(models.Comptage)
.filter_by(datetime=date, id_boucle=id_boucle)
.first()
)
def update_comptage(comptage: schemas.ComptageBase, unaccounted_table):
with Session() as session:
if unaccounted_table:
stmt = (
update(models.unaccounted_Comptage)
.where(models.unaccounted_Comptage.id == comptage.id)
.values(
holiday=comptage.holiday,
week_day=comptage.week_day,
count=comptage.count,
)
)
else:
stmt = (
update(models.Comptage)
.where(models.Comptage.id == comptage.id)
.values(
holiday=comptage.holiday,
week_day=comptage.week_day,
count=comptage.count,
)
)
session.execute(stmt)
session.commit()
return True
def delete_comptage(comptage_id, unaccounted_table):
with Session() as session:
if unaccounted_table:
stmt = delete(models.unaccounted_Comptage).where(
models.unaccounted_Comptage.id == comptage_id
)
else:
stmt = delete(models.Comptage).where(
models.Comptage.id == comptage_id
)
session.execute(stmt)
session.commit()
return True
def select_comptage(id_boucle, unaccounted_table):
with Session() as session:
if unaccounted_table:
stmt = select(models.unaccounted_Comptage).where(
models.unaccounted_Comptage.id_boucle == id_boucle
)
else:
stmt = select(models.Comptage).where(
models.unaccounted_Comptage.id_boucle == id_boucle
)
return (
session.execute(stmt, execution_options={"prebuffer_rows": True})
.scalars()
.all()
)