Beautify code

This commit is contained in:
Dryusdan 2023-10-15 21:12:00 +02:00
parent 4d2d96ea9e
commit 562cb67ee7
5 changed files with 181 additions and 64 deletions

View file

@ -15,11 +15,10 @@ app = FastAPI()
async def root():
return {"message": "Hello World"}
@app.get("/geojson")
async def geojson(response: Response):
with open("/var/www/opendata_nm/www/streets/sorinieres.geojson", "r") as f:
data = json.load(f)
response.headers["Access-Control-Allow-Origin"] = "*"
return data

View file

@ -26,12 +26,12 @@ app = typer.Typer()
def create_db():
db = database.Database()
# schemas.Categories.__table__.create(db.engine)
#schemas.Points.__table__.create(db.engine)
# schemas.Points.__table__.create(db.engine)
# schemas.VigiloCategories.__table__.create(db.engine)
# schemas.VigiloObservations.__table__.create(db.engine)
# schemas.UnMetre.__table__.create(db.engine)
#schemas.Onisr_2021_Processed.__table__.create(db.engine)
#schemas.Onisr_2022_Processed.__table__.create(db.engine)
# schemas.Onisr_2021_Processed.__table__.create(db.engine)
# schemas.Onisr_2022_Processed.__table__.create(db.engine)
@app.command()
@ -84,7 +84,9 @@ def populate_full_vigilo_observation():
@app.command()
def populate_vigilo_observation(days: Annotated[int,typer.Option(help="Number of days to get occurences")] = 14):
def populate_vigilo_observation(
days: Annotated[int, typer.Option(help="Number of days to get occurences")] = 14
):
url = f"https://vigilo.placeauvelo-nantes.fr//get_issues.php?scope=44_nantesmetropole&format=json&count=40000&approved=1&since={days}&since_unit=day"
req = requests.get(url=url)
data_req = req.json()
@ -132,6 +134,7 @@ def populate_geojson():
@app.command()
def import_unmetre():
import gdaltools
geojson = [
{
"name": "1metre",
@ -152,13 +155,14 @@ def import_unmetre():
ogr.set_encoding("UTF-8")
log.info("Setting")
ogr.set_input("/var/www/opendata_nm/www/nantes_unmetre.geojson")
conn = gdaltools.PgConnectionString(host=db["host"],
port=db["port"],
dbname=db["dbname"],
user=db["user"],
password=db["password"]
)
ogr.set_output(conn, table_name="un_metre")
conn = gdaltools.PgConnectionString(
host=db["host"],
port=db["port"],
dbname=db["dbname"],
user=db["user"],
password=db["password"],
)
ogr.set_output(conn, table_name="un_metre")
log.info("OGR Execute")
ogr.execute()
@ -184,9 +188,11 @@ def process_raw_onisr_value():
)
crud.add_onisr_processed(onisr_processed)
@app.command()
def get_all_streets():
import overpass
communes = crud.get_all_communes()
log.info("Get all cities")
insee_code = []
@ -204,10 +210,10 @@ def get_all_streets():
);
"""
log.info("Run query")
r = api.get(query, verbosity='geom', responseformat="geojson")
r = api.get(query, verbosity="geom", responseformat="geojson")
log.info(f"Write geojson /var/www/opendata_nm/www/streets/{insee}.geojson")
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="w") as f:
geojson.dump(r,f)
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="w") as f:
geojson.dump(r, f)
log.info(f"Geojson writen. Wait 10 seconds")
time.sleep(10)
log.info("Sleep well. Next city !")
@ -215,15 +221,30 @@ def get_all_streets():
@app.command()
def analyze_unmetre_streets(onlyscore: Annotated[bool,typer.Option(help="Run only score calculation")] = False, reset: Annotated[bool,typer.Option(help="Reset count")] = False,):
def analyze_unmetre_streets(
onlyscore: Annotated[bool, typer.Option(help="Run only score calculation")] = False,
reset: Annotated[bool, typer.Option(help="Reset count")] = False,
):
category = "unmetre"
log.info("Get all cities")
communes = crud.get_all_communes()
log.info(f"Analyze streets with {category} data")
data_categories = [
{"type": f"{category}_legal", "where": "WHERE distance_overtaker >= 1", "score": 0},
{"type": f"{category}_illegal", "where": "WHERE distance_overtaker > 0.5 AND distance_overtaker < 1", "score": 1},
{"type": f"{category}_dangerous", "where": "WHERE distance_overtaker <= 0.5", "score": 2},
{
"type": f"{category}_legal",
"where": "WHERE distance_overtaker >= 1",
"score": 0,
},
{
"type": f"{category}_illegal",
"where": "WHERE distance_overtaker > 0.5 AND distance_overtaker < 1",
"score": 1,
},
{
"type": f"{category}_dangerous",
"where": "WHERE distance_overtaker <= 0.5",
"score": 2,
},
]
if not onlyscore:
for data_category in data_categories:
@ -235,15 +256,34 @@ def analyze_unmetre_streets(onlyscore: Annotated[bool,typer.Option(help="Run onl
point = shapely.get_geometry(multipoint, 0)
points_list.append(point)
if reset:
reset_streets(category=category, data_type=data_category['type'], communes=communes)
analyze_streets(category=category, data_type=data_category['type'], points=points_list, communes=communes)
reset_streets(
category=category,
data_type=data_category["type"],
communes=communes,
)
analyze_streets(
category=category,
data_type=data_category["type"],
points=points_list,
communes=communes,
)
for data_category in data_categories:
score_streets(category=category, data_type=data_category['type'], category_score=data_category["score"], communes=communes, reset=reset)
score_streets(
category=category,
data_type=data_category["type"],
category_score=data_category["score"],
communes=communes,
reset=reset,
)
log.info(f"End of analyze streets with {category} data")
@app.command()
def analyze_onisr_streets(onlyscore: Annotated[bool,typer.Option(help="Run only score calculation")] = False, reset: Annotated[bool,typer.Option(help="Reset count")] = False,):
def analyze_onisr_streets(
onlyscore: Annotated[bool, typer.Option(help="Run only score calculation")] = False,
reset: Annotated[bool, typer.Option(help="Reset count")] = False,
):
log.info("Get all cities")
communes = crud.get_all_communes()
category = "onisr"
@ -255,10 +295,10 @@ def analyze_onisr_streets(onlyscore: Annotated[bool,typer.Option(help="Run only
points_list = []
for data_category in data_categories:
log.info(f"Get all points for {data_category['type']}")
if data_category['type'] == f"{category}_2021":
if data_category["type"] == f"{category}_2021":
log.info(f"Use {category}_2021")
onisr_datas = crud.get_all_onisr_2021()
elif data_category['type'] == f"{category}_2022":
elif data_category["type"] == f"{category}_2022":
log.info(f"Use {category}_2022")
onisr_datas = crud.get_all_onisr_2022()
for onisr_data in onisr_datas:
@ -266,14 +306,34 @@ def analyze_onisr_streets(onlyscore: Annotated[bool,typer.Option(help="Run only
points_list.append(point)
log.debug(f"{data_category['type']} have {len(points_list)} points")
if reset:
reset_streets(category=category, data_type=data_category['type'], communes=communes)
analyze_streets(category=category, data_type=data_category['type'], points=points_list, communes=communes, precision=1e-4)
reset_streets(
category=category,
data_type=data_category["type"],
communes=communes,
)
analyze_streets(
category=category,
data_type=data_category["type"],
points=points_list,
communes=communes,
precision=1e-4,
)
for data_category in data_categories:
score_streets(category=category, data_type=data_category['type'], category_score=data_category["score"], communes=communes, reset=reset)
score_streets(
category=category,
data_type=data_category["type"],
category_score=data_category["score"],
communes=communes,
reset=reset,
)
@app.command()
def analyze_vigilo_streets(onlyscore: Annotated[bool,typer.Option(help="Run only score calculation")] = False, reset: Annotated[bool,typer.Option(help="Reset count")] = False,):
def analyze_vigilo_streets(
onlyscore: Annotated[bool, typer.Option(help="Run only score calculation")] = False,
reset: Annotated[bool, typer.Option(help="Reset count")] = False,
):
log.info("Get all cities")
communes = crud.get_all_communes()
category = "vigilo"
@ -286,26 +346,45 @@ def analyze_vigilo_streets(onlyscore: Annotated[bool,typer.Option(help="Run only
points_list = []
for data_category in data_categories:
log.info(f"Get all points for {data_category['type']}")
vigilo_datas = crud.get_vigilo(data_category['category_id'])
vigilo_datas = crud.get_vigilo(data_category["category_id"])
for vigilo in vigilo_datas:
point = shapely.get_geometry(to_shape(vigilo.geolocalisation), 0)
points_list.append(point)
log.debug(f"{data_category['type']} have {len(points_list)} points")
if reset:
reset_streets(category=category, data_type=data_category['type'], communes=communes)
analyze_streets(category=category, data_type=data_category['type'], points=points_list, communes=communes, precision=1e-4)
reset_streets(
category=category,
data_type=data_category["type"],
communes=communes,
)
analyze_streets(
category=category,
data_type=data_category["type"],
points=points_list,
communes=communes,
precision=1e-4,
)
for data_category in data_categories:
score_streets(category=category, data_type=data_category['type'], category_score=data_category["score"], communes=communes, reset=reset)
score_streets(
category=category,
data_type=data_category["type"],
category_score=data_category["score"],
communes=communes,
reset=reset,
)
def analyze_streets(category: str, data_type: str, points: list, communes, precision: float=1e-7):
def analyze_streets(
category: str, data_type: str, points: list, communes, precision: float = 1e-7
):
log.info(f"Process category {data_type}")
for commune in communes:
insee = int(commune.identifiant_insee)
name = commune.toponyme
log.info(f"Load geojson for {name} ({insee})")
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="r") as f:
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="r") as f:
data = geojson.load(f)
for feature in data['features']:
for feature in data["features"]:
if feature["properties"].get(f"{category}_all") is None:
feature["properties"][f"{category}_all"] = 0
linestring = shape(feature["geometry"])
@ -317,36 +396,47 @@ def analyze_streets(category: str, data_type: str, points: list, communes, preci
street_name = "unknown"
if feature["properties"].get(f"{data_type}") is None:
log.info(f"[{data_type}] Point {point} found in a new street ({street_name})")
log.info(
f"[{data_type}] Point {point} found in a new street ({street_name})"
)
feature["properties"][f"{data_type}"] = 1
else:
feature["properties"][f"{data_type}"] += 1
log.info(f"[{data_type}]Point {point} found in an already known street ({street_name}).")
log.info(
f"[{data_type}]Point {point} found in an already known street ({street_name})."
)
log.debug("Adding one to {}".format(feature["properties"][f"{category}_all"]))
log.debug(
"Adding one to {}".format(
feature["properties"][f"{category}_all"]
)
)
feature["properties"][f"{category}_all"] += 1
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="w") as f:
geojson.dump(data,f)
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="w") as f:
geojson.dump(data, f)
log.info(f"Check streets complete for {data_type}")
def reset_streets(category: str, data_type: str, communes):
log.info(f"Process category {data_type}")
for commune in communes:
insee = int(commune.identifiant_insee)
name = commune.toponyme
log.info(f"Load geojson for {name} ({insee})")
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="r") as f:
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="r") as f:
data = geojson.load(f)
for feature in data['features']:
for feature in data["features"]:
feature["properties"][f"{category}_all"] = 0
feature["properties"][f"{data_type}"] = 0
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="w") as f:
geojson.dump(data,f)
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="w") as f:
geojson.dump(data, f)
log.info(f"Check streets complete for {data_type}")
def score_streets(category: str, data_type: str, category_score: int, communes, reset: bool = False):
def score_streets(
category: str, data_type: str, category_score: int, communes, reset: bool = False
):
if category_score == 0:
log.debug("category_score is 0")
return
@ -354,30 +444,41 @@ def score_streets(category: str, data_type: str, category_score: int, communes,
insee = int(commune.identifiant_insee)
name = commune.toponyme
log.info(f"[{data_type}] Load geojson for {name} ({insee})")
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="r") as f:
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="r") as f:
data = geojson.load(f)
for feature in data['features']:
for feature in data["features"]:
score = 0
if reset:
feature["properties"][f"{category}_score"] = 0
if feature["properties"].get(f"{category}_score") is None:
feature["properties"][f"{category}_score"] = 0
if feature["properties"].get(f"{data_type}") is not None and feature["properties"][f"{category}_all"] != 0:
score = feature["properties"][f"{data_type}"] / feature["properties"][f"{category}_all"] * 100 * category_score
#if feature["properties"].get("name") is not None and feature["properties"]["name"] == "Rue de Strasbourg":
if (
feature["properties"].get(f"{data_type}") is not None
and feature["properties"][f"{category}_all"] != 0
):
score = (
feature["properties"][f"{data_type}"]
/ feature["properties"][f"{category}_all"]
* 100
* category_score
)
# if feature["properties"].get("name") is not None and feature["properties"]["name"] == "Rue de Strasbourg":
# print(score)
# print(category_score)
# print(feature["properties"][f"{data_type}"])
# print(feature["properties"][f"{category}_all"])
#log.debug("In {} ctg there are {} with total of {}. Score of {}".format(data_type, feature["properties"][f"{data_type}"], feature["properties"][f"{category}_all"], category_score))
# log.debug("In {} ctg there are {} with total of {}. Score of {}".format(data_type, feature["properties"][f"{data_type}"], feature["properties"][f"{category}_all"], category_score))
if score > 0:
feature["properties"][f"{category}_score"] += score
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="w") as f:
geojson.dump(data,f)
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="w") as f:
geojson.dump(data, f)
log.info("Process complete")
@app.command()
def calculate_score_streets(reset: Annotated[bool,typer.Option(help="Reset result")] = False,):
def calculate_score_streets(
reset: Annotated[bool, typer.Option(help="Reset result")] = False,
):
data_categories = [
{"score_name": "onisr_score", "ponderate": 0.30},
{"score_name": "unmetre_score", "ponderate": 0.60},
@ -389,24 +490,27 @@ def calculate_score_streets(reset: Annotated[bool,typer.Option(help="Reset resul
insee = int(commune.identifiant_insee)
name = commune.toponyme
log.info(f"Load geojson for {name} ({insee})")
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="r") as f:
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="r") as f:
data = geojson.load(f)
for feature in data['features']:
for feature in data["features"]:
score = 0
if feature["properties"].get("score") is None or reset:
feature["properties"]["score"] = 0
for data_category in data_categories:
if feature["properties"].get(data_category["score_name"]) is None:
feature["properties"][data_category["score_name"]] = 0
score += feature["properties"][data_category["score_name"]] * data_category["ponderate"]
score += (
feature["properties"][data_category["score_name"]]
* data_category["ponderate"]
)
if score < 0:
feature["properties"]["score"] = -1
else:
else:
feature["properties"]["score"] = score
if feature["properties"]["score"] == 0:
feature["properties"]["score"] = -1
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson",mode="w") as f:
geojson.dump(data,f)
with open(f"/var/www/opendata_nm/www/streets/{insee}.geojson", mode="w") as f:
geojson.dump(data, f)
log.info("Process complete")

View file

@ -70,7 +70,7 @@ def is_vigilo_observation_exists(observation: models.VigiloObservations):
def get_vigilo(category: int):
today = pendulum.today('Europe/Paris')
today = pendulum.today("Europe/Paris")
six_months_ago = today.subtract(months=6)
with Session() as session:
return (

View file

@ -15,19 +15,24 @@ class PointsBase(BaseModel):
class Categories(CategoriesBase):
pass
class PointsCreate(PointsBase):
pass
class Points(PointsBase):
id: int
class VigiloCategoriesBase(BaseModel):
id: int
name: str
class VigiloCategories(VigiloCategoriesBase):
pass
class VigiloObservations(BaseModel):
id: str
vigilo_categories_id: int
@ -35,6 +40,7 @@ class VigiloObservations(BaseModel):
datetime: datetime
comment: str
class OnisrProcessedBase(BaseModel):
accident_id: int
gravite: int
@ -45,8 +51,10 @@ class OnisrProcessedBase(BaseModel):
datetime: datetime
geolocalisation: str
class OnisrProcessedCreate(OnisrProcessedBase):
pass
class OnisrProcessed(OnisrProcessedBase):
id: int

View file

@ -30,6 +30,7 @@ class Points(Base):
id_categories = Column(Integer, ForeignKey("categories.id"))
geolocalisation = Column(Geometry("POINT"))
class CommuneNM(Base):
__tablename__ = "commune_nm"
objectid = Column("OBJECTID", Numeric)
@ -40,11 +41,13 @@ class CommuneNM(Base):
toponyme = Column("Toponyme", String)
id_comm = Column("id_comm", Numeric, primary_key=True)
class VigiloCategories(Base):
__tablename__ = "vigilo_categories"
id = Column(Integer, primary_key=True)
name = Column(String)
class VigiloObservations(Base):
__tablename__ = "vigilo_observations"
id = Column(String, primary_key=True)
@ -53,12 +56,14 @@ class VigiloObservations(Base):
datetime = Column("datetime", DateTime(timezone=True), default=func.now())
comment = Column(String)
class UnMetre(Base):
__tablename__ = "un_metre"
id = Column(Integer, primary_key=True)
geolocalisation = Column(Geometry("POINT"))
datetime = Column("datetime", DateTime(timezone=True), default=func.now())
class Onisr_2021_Processed(Base):
__tablename__ = "onisr_2021_processed"
id = Column(BigInteger, primary_key=True)
@ -71,6 +76,7 @@ class Onisr_2021_Processed(Base):
datetime = Column("datetime", DateTime(timezone=True), default=func.now())
geolocalisation = Column(Geometry("POINT"))
class Onisr_2022_Processed(Base):
__tablename__ = "onisr_2022_processed"
id = Column(BigInteger, primary_key=True)