#!./python_env/bin/python3 from argparse import ArgumentParser from datetime import date from getpass import getuser import re from xml.etree import ElementTree from bs4 import BeautifulSoup import requests import psycopg2 parser = ArgumentParser(description="Tool om data uit voedingscentrum.nl te importeren in PostgreSQL.") parser.add_argument('--email', type=str, required=True, help="Je emailadres voor voedingscentrum.nl") parser.add_argument('--password', type=str, required=True, help="Je wachtwoord voor voedingscentrum.nl") parser.add_argument('--dbhost', type=str, default='localhost', help="Je PostgreSQL hostname (standaard localhost)") parser.add_argument('--dbuser', type=str, default=getuser(), help=f"Je PostgreSQL username (standaard {getuser()})") parser.add_argument('--dbpassword', type=str, default=None, help="Je PostgreSQL password") parser.add_argument('--dbport', type=int, default=5432, help="Je PostgreSQL poort (standaard 5432)") parser.add_argument('--dbname', type=str, default='voeding', help="De naam van je PostgreSQL database (standaard voeding)") parser.add_argument('--from-date', type=str, help="Datum vanaf wanneer je gegevens wilt ophalen") parser.add_argument('--to-date', type=str, help="Datum tot wanneer je gegevens wilt ophalen") args = parser.parse_args() def parse_date(date_string): matches = re.match(r'(\d{4})-(\d{2})-(\d{2})', date_string) if matches: year, month, day = map(int, matches.groups()) return date(year, month, day) matches = re.match(r'(\d{1,2})-(\d{1,2})-(\d{4})', date_string) if matches: day, month, year = map(int, matches.groups()) return date(year, month, day) def snake_case(s): s = re.sub(r'([a-z])([A-Z])', r'\1_\2', s) return s.lower() def download_data(): # Laad de startpagina en haal de verborgen form-data op s = requests.Session() r = s.get("https://mijn.voedingscentrum.nl/nl/login/") soup = BeautifulSoup(r.text, 'html.parser') data = {element['name']: element.get('value', None) for element in soup.find_all('input')} # Voeg persoonlijke gegevens toe om in te loggen data['ctl00$ctl18$txtUser'] = args.email data['ctl00$ctl18$txtPassword'] = args.password data['__EVENTTARGET'] = 'ctl00$ctl18$lbLogin' data.pop('ctl00$ctl18$cbxRemember') # Log in s.post("https://mijn.voedingscentrum.nl/nl/login/", data=data) # Laad de overichtpagina en haal de verborgen form-data op r = s.get("https://mijn.voedingscentrum.nl/nl/dashboard/eetmeter/overzicht/") soup = BeautifulSoup(r.text, 'html.parser') data = {element['name']: element.get('value', None) for element in soup.find_all('input')} data['__EVENTTARGET'] = "ctl00$ctl20$lbXML" data['ctl00$ctl20$txtDateStart'] = DATE_FROM.strftime("%d-%m-%Y") data['ctl00$ctl20$txtDateEnd'] = DATE_TO.strftime("%d-%m-%Y") # Download de XML r = s.post("https://mijn.voedingscentrum.nl/nl/dashboard/eetmeter/overzicht/", data=data) if r.status_code != 200: print(f"No data was available between {DATE_FROM} and {DATE_TO} (error code {r.status_code}).") return None r.encoding = 'utf-8' return ElementTree.fromstring(r.text) def extract_consumpties(consumpties): for consumptie in consumpties: product = consumptie.find('Product') product_guid = product.find('Guid').text product_naam = product.find('Naam').text nutrienten = consumptie.find('Nutrienten') product_nutrients = {snake_case(n.tag) + "_" + n.attrib['Eenheid']: float(n.attrib.get('WaardePer100Gram', 0)) for n in nutrienten} product_dict= {'guid': product_guid, 'naam': product_naam} product_dict.update(product_nutrients) consumptie_eenheden = float(product.find('Eenheid').find('Aantal').text) consumptie_grampereenheid = float(product.find('Eenheid').attrib['GramPerEenheid']) consumptie_hoeveelheid = consumptie_eenheden * consumptie_grampereenheid consumptie_datum = date(*(int(consumptie.find('Datum').find(part).text) for part in ['Jaar', 'Maand', 'Dag'])) consumptie_periode = consumptie.attrib['Periode'] consumptie_dict = {'datum': consumptie_datum, 'product_guid': product_guid, 'periode': consumptie_periode, 'hoeveelheid': consumptie_hoeveelheid} yield product_dict, consumptie_dict def insert_product(cursor, guid, naam, **nutrients): cursor.execute("SELECT * FROM producten WHERE guid = %s", (guid,)) if cursor.rowcount > 0: return nutrients_cols = ", ".join(nutrients.keys()) placeholders = ", ".join(["%s"] * (len(nutrients.keys()) + 2)) cursor.execute(f"INSERT INTO producten (guid, naam, {nutrients_cols}) VALUES ({placeholders})", (guid, naam, *nutrients.values())) def insert_consumptie(cursor, datum, product_guid, periode, hoeveelheid): cursor.execute(f"INSERT INTO consumpties_{DATABASE['user']} (datum, product_guid, periode, hoeveelheid_g) VALUES (%s, %s, %s, %s)", (datum, product_guid, periode, hoeveelheid)) DATABASE = {'host': args.dbhost, 'user': args.dbuser, 'password': args.dbpassword, 'dbname': args.dbname, 'port': args.dbport} conn = psycopg2.connect(**DATABASE) cursor = conn.cursor() # Controleer van welke datum we voor het laatst gegevens hebben cursor.execute(f"SELECT MAX(datum) FROM consumpties_{args.dbuser}") if cursor.rowcount == 1: last_known_date = cursor.fetchone()[0] else: last_known_date = None # Bepaal de datum range waarbinnen we gaan zoeken if args.from_date: DATE_FROM = parse_date(args.from_date) elif last_known_date and last_known_date <= date.today(): DATE_FROM = last_known_date else: DATE_FROM = date.today() DATE_TO = parse_date(args.to_date) if args.to_date else date.today() if DATE_TO < DATE_FROM: DATE_TO = DATE_FROM data = download_data() if not data: quit(0) conn = psycopg2.connect(**DATABASE) cursor = conn.cursor() cursor.execute(f"DELETE FROM consumpties_{args.dbuser} WHERE datum BETWEEN %s AND %s", (DATE_FROM, DATE_TO)) for product, consumptie in extract_consumpties(data): insert_product(cursor, **product) insert_consumptie(cursor, **consumptie) conn.commit() conn.close()