163 lines
4.9 KiB
Python
163 lines
4.9 KiB
Python
from . import models, schemas
|
|
from .database import Database
|
|
from sqlalchemy import delete, update, select, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
db = Database()
|
|
Session = sessionmaker(db.engine)
|
|
|
|
|
|
def create_boucle(boucle: schemas.Boucle):
|
|
with Session() as session:
|
|
db_boucle = models.Boucle(
|
|
id=boucle.id,
|
|
name=boucle.name,
|
|
geolocalisation=boucle.geolocalisation,
|
|
)
|
|
session.add(db_boucle)
|
|
session.commit()
|
|
return True
|
|
|
|
|
|
def update_boucle(boucle: schemas.Boucle):
|
|
with Session() as session:
|
|
stmt = (
|
|
update(models.Boucle)
|
|
.where(models.Boucle.id == boucle.id)
|
|
.values(name=boucle.name, geolocalisation=boucle.geolocalisation)
|
|
)
|
|
session.execute(stmt)
|
|
return True
|
|
|
|
|
|
def get_unaccounted_record_with_exist_boucle():
|
|
with Session() as session:
|
|
textual_sql = text(
|
|
"SELECT id_boucle FROM unaccounted_comptage WHERE EXISTS (SELECT id FROM boucles WHERE id = unaccounted_comptage.id_boucle) GROUP BY id_boucle;"
|
|
)
|
|
return session.execute(textual_sql).scalars()
|
|
|
|
|
|
def create_comptage(comptage: schemas.ComptageBase, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
db_comptage = models.unaccounted_Comptage(
|
|
id_boucle=comptage.id_boucle,
|
|
datetime=comptage.datetime,
|
|
count=comptage.count,
|
|
week_day=comptage.week_day,
|
|
holiday=comptage.holiday,
|
|
)
|
|
else:
|
|
db_comptage = models.Comptage(
|
|
id_boucle=comptage.id_boucle,
|
|
datetime=comptage.datetime,
|
|
count=comptage.count,
|
|
week_day=comptage.week_day,
|
|
holiday=comptage.holiday,
|
|
)
|
|
session.add(db_comptage)
|
|
session.commit()
|
|
return True
|
|
|
|
|
|
def is_boucle_id_exist(id):
|
|
with Session() as session:
|
|
return (
|
|
session.query(models.Boucle.id).filter_by(id=id).first()
|
|
is not None
|
|
)
|
|
|
|
|
|
def is_comptage_exist(date, id_boucle, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
return (
|
|
session.query(models.unaccounted_Comptage)
|
|
.filter_by(datetime=date, id_boucle=id_boucle)
|
|
.first()
|
|
is not None
|
|
)
|
|
else:
|
|
return (
|
|
session.query(models.Comptage)
|
|
.filter_by(datetime=date, id_boucle=id_boucle)
|
|
.first()
|
|
is not None
|
|
)
|
|
|
|
|
|
def get_comptage_by_date_and_boucle(date, id_boucle, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
return (
|
|
session.query(models.unaccounted_Comptage)
|
|
.filter_by(datetime=date, id_boucle=id_boucle)
|
|
.first()
|
|
)
|
|
else:
|
|
return (
|
|
session.query(models.Comptage)
|
|
.filter_by(datetime=date, id_boucle=id_boucle)
|
|
.first()
|
|
)
|
|
|
|
|
|
def update_comptage(comptage: schemas.ComptageBase, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
stmt = (
|
|
update(models.unaccounted_Comptage)
|
|
.where(models.unaccounted_Comptage.id == comptage.id)
|
|
.values(
|
|
holiday=comptage.holiday,
|
|
week_day=comptage.week_day,
|
|
count=comptage.count,
|
|
)
|
|
)
|
|
else:
|
|
stmt = (
|
|
update(models.Comptage)
|
|
.where(models.Comptage.id == comptage.id)
|
|
.values(
|
|
holiday=comptage.holiday,
|
|
week_day=comptage.week_day,
|
|
count=comptage.count,
|
|
)
|
|
)
|
|
session.execute(stmt)
|
|
session.commit()
|
|
return True
|
|
|
|
|
|
def delete_comptage(comptage_id, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
stmt = delete(models.unaccounted_Comptage).where(
|
|
models.unaccounted_Comptage.id == comptage_id
|
|
)
|
|
else:
|
|
stmt = delete(models.Comptage).where(
|
|
models.Comptage.id == comptage_id
|
|
)
|
|
session.execute(stmt)
|
|
session.commit()
|
|
return True
|
|
|
|
|
|
def select_comptage(id_boucle, unaccounted_table):
|
|
with Session() as session:
|
|
if unaccounted_table:
|
|
stmt = select(models.unaccounted_Comptage).where(
|
|
models.unaccounted_Comptage.id_boucle == id_boucle
|
|
)
|
|
else:
|
|
stmt = select(models.Comptage).where(
|
|
models.unaccounted_Comptage.id_boucle == id_boucle
|
|
)
|
|
return (
|
|
session.execute(stmt, execution_options={"prebuffer_rows": True})
|
|
.scalars()
|
|
.all()
|
|
)
|