Write it pylama friendly
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Dryusdan 2022-04-09 00:16:58 +02:00
parent b368f62f7f
commit 2e0dbaa3ce
1 changed files with 47 additions and 44 deletions

View File

@ -1,14 +1,11 @@
#!/usr/bin/env python3
import json
import ovh
import pendulum
import requests
import typer
import yaml
import pprint
from pathlib import Path
@ -21,16 +18,17 @@ with open(f"{home}/.config/ovh-bills-pdf/secrets.yaml", "r") as stream:
except yaml.YAMLError as exc:
print(exc)
@app.command()
def download_all():
typer.echo(f"Download all bills")
typer.echo("Download all bills")
params = {}
download(params)
#/me/bill
@app.command()
def download_week():
typer.echo(f"Download week")
typer.echo("Download week")
now = pendulum.now(f"{secrets['timezone']}")
week_start = now.start_of('week')
week_end = now.end_of('week')
@ -42,46 +40,51 @@ def download_week():
def download(params):
if Path(secrets['download_path']).is_dir():
for account in secrets['accounts']:
typer.echo(f"Processing of {account['name']}")
client = ovh.Client(
endpoint=f"{account['region']}",
application_key=f"{account['app_key']}",
application_secret=f"{account['app_secret']}",
consumer_key=f"{account['consumer_key']}",
)
billIds = client.get('/me/bill', **params)
if billIds:
for billId in billIds:
bill = client.get(f"/me/bill/{billId}")
try:
r = requests.get(bill['pdfUrl'])
r.raise_for_status()
except requests.exceptions.Timeout:
typer.echo("URL timeout")
continue
except requests.exceptions.TooManyRedirects:
typer.echo("To many redirect. URL is good ?")
continue
except requests.exceptions.RequestException as e:
raise SystemExit(e)
except requests.exceptions.HTTPError as err:
typer.echo(f"Error when getting PDF : {err}")
continue
typer.echo(f"Write {bill['billId']}.pdf")
try:
with open(f"{secrets['download_path']}/{bill['billId']}.pdf", 'wb') as f:
f.write(r.content)
except OSError:
typer.echo(f"Can't open {secrets['download_path']}/{bill['billId']}.pdf).")
typer.echo("Folder exist ?")
else:
typer.echo("No bill to download")
else:
if not Path(secrets['download_path']).is_dir():
typer.echo(f"{secrets['download_path']} don't exist. Please create it")
raise SystemExit()
for account in secrets['accounts']:
typer.echo(f"Processing of {account['name']}")
client = ovh.Client(
endpoint=f"{account['region']}",
application_key=f"{account['app_key']}",
application_secret=f"{account['app_secret']}",
consumer_key=f"{account['consumer_key']}",
)
billIds = client.get('/me/bill', **params)
if billIds:
_processBill(client, billIds)
else:
typer.echo("No bill to download")
def _processBill(client, billIds):
for billId in billIds:
bill = client.get(f"/me/bill/{billId}")
try:
r = requests.get(bill['pdfUrl'])
r.raise_for_status()
except requests.exceptions.Timeout:
typer.echo("URL timeout")
continue
except requests.exceptions.TooManyRedirects:
typer.echo("To many redirect. URL is good ?")
continue
except requests.exceptions.RequestException as e:
raise SystemExit(e)
except requests.exceptions.HTTPError as err:
typer.echo(f"Error when getting PDF : {err}")
continue
typer.echo(f"Write {bill['billId']}.pdf")
try:
with open(f"{secrets['download_path']}/{bill['billId']}.pdf", 'wb') as f:
f.write(r.content)
except OSError:
typer.echo(f"Can't open {secrets['download_path']}/{bill['billId']}.pdf).")
typer.echo("Folder exist ?")
if __name__ == "__main__":