Jeg er ved at bygge min egen aula-klient:
Tag: api
Hent data om dit elforbrug fra eloverblik.dk
For et stykke tid siden blev jeg kontaktet af en, der havde brug for hjælp til at hive data ud fra https://eloverblik.dk ved hjælp af sidens API. Derfor har jeg (ligesom de fleste andre danskere) skrevet et program i Python, der kan hjælpe med at hive data ud.
Det var sjovt at lave, for jeg fik øvet mig i at skrive et program, hvor man ved hjælp af argumenter i kommandolinjen, kan få programmet til at gøre nogle forskellige ting, fx at udtrække en liste over ens målere, kun hente data fra en bestemt elmåler, hente data med forskellig opløsning (time, dag, uge, osv.), og et par andre ting.
Jeg synes selv, at det er blevet ret brugervenligt af et kommandolinjeprogram af være.
Måske kan du bruge det? Du finder koden på GitHub og lige herunder:
# A utility to interact with the api from eloverblik.dk
# By Morten Helmstedt, https://helmstedt.dk
# API documentation:
# https://api.eloverblik.dk/CustomerApi/swagger/index.html
# https://www.niras.dk/media/4vbbvkig/eloverblik-adgang-til-egne-data-via-api-kald-forkortet-1.pdf
import argparse
import csv
from datetime import date, datetime, timedelta
import os
from os.path import exists
import pickle
import requests
import sys
import time
from zoneinfo import ZoneInfo
# Set token filename
token_filename = 'eloverblik.token'
data_access_token_filename = 'eloverblik_data_access.token'
# Number of API retries (API often returns 503 errors)
api_retries = 10
# API base url
base_url = 'https://api.eloverblik.dk/CustomerApi/api/'
# Get today's date
today = date.today()
# Prepare session for requests
session = requests.Session()
# Set session headers
session.headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Host': 'api.eloverblik.dk',
'User-Agent': 'Eloverblik-Python'
}
# Gets a saved data token if it is not too old, alternatively gets a new token
def get_or_set_data_access_token(token):
# If an existing data access token is less than 12 hours old, use it and return
if exists(data_access_token_filename):
with open(data_access_token_filename, 'rb') as data_access_token_file:
save_time_and_token = pickle.load(data_access_token_file)
if not datetime.now() - save_time_and_token[0] > timedelta(hours=12):
print('Existing data access token found. Using this token.')
session.headers['Authorization'] = 'Bearer ' + save_time_and_token[1]
return
# Data access token does not exist or is too old
# Check whether API is alive
print('Checking API status...')
get_api_status = get_endpoint('isalive')
if get_api_status == True:
print('API reports that it is up')
# Get data access token for subsequent requests
print('Getting data access token...')
session.headers['Authorization'] = 'Bearer ' + token
token_get_time = datetime.now()
get_data_access_token = get_endpoint('token')
# Request succesful
if get_data_access_token:
print('Got data access token')
data_access_token = get_data_access_token['result']
# Save token to file with get time
with open(data_access_token_filename, 'wb') as data_access_token_file:
pickle.dump([token_get_time, data_access_token], data_access_token_file)
session.headers['Authorization'] = 'Bearer ' + data_access_token
# Request failed
else:
sys.exit('Error: Unable to get data access token. Exiting.')
# API is down
else:
sys.exit('Error: API is down. Exiting.')
# Request an endpoint and return data
def get_endpoint(endpoint, json=None):
tries = 1
while tries <= api_retries:
if not json:
response = session.get(base_url + endpoint, timeout=10)
else:
response = session.post(base_url + endpoint, json=json, timeout=10)
# Succesful request
if response.status_code == 200:
return response.json()
# Unsuccesful request, try again after 1 second
elif response.status_code == 429 or response.status_code == 503:
tries += 1
time.sleep(1)
elif response.status_code == 403:
print(f'API reports a 403 forbidden error. Please check your token is correct')
else:
print(f'API returned an unknown status code')
print(f'Latest API response status code was: {response.status_code}')
print(f'Latest API response content was: {response.text}')
sys.exit('API request failed. Exiting.')
if tries > api_retries:
print(f'API request did not succeed after {api_retries} attempts')
print(f'Latest API response status code was: {response.status_code}')
print(f'Latest API response content was: {response.text}')
sys.exit('API request failed. Exiting.')
# Lists all metering points
def list_meters():
print('Getting list of meters...')
get_metering_points = get_endpoint('meteringpoints/meteringpoints')
print(f'Found {len(get_metering_points["result"])} meter(s)')
print('Printing list of meter(s)...\n')
for meter in get_metering_points['result']:
meter_count = 1
print(f'--- Meter {meter_count} ---')
for key, value in meter.items():
print(key, ':', value)
print('---')
meter_count += 1
sys.exit('All meters printed. Exiting.')
# Gets and saves metering point electricity usage data as a csv file
def get_usage_data(meter_ids, args, periods):
print('Starting to save usage data...')
# Prepare csv file for writing
with open('eloverblik_usage_data.csv', 'w', newline='') as csvfile:
fieldnames = ['meter_id', 'resolution', 'timestart_utc', 'timestart_denmark', 'timeend_utc', 'timeend_denmark', 'point_position', 'point_out_quantity', 'point_out_quality']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for meter_id in meter_ids:
print(f'Getting and saving usage data for meter id {meter_id}...')
meter_json = {
"meteringPoints": {
"meteringPoint": [
meter_id
]
}
}
for date_period in periods:
print(f'Saving usage date for period {date_period[0]} to {date_period[1]}...')
usage_data_endpoint = 'meterdata/gettimeseries/' + date_period[0] + '/' + date_period[1] + '/' + args.aggregation
get_meter_usage_data = get_endpoint(usage_data_endpoint, meter_json)
for result in get_meter_usage_data['result']:
for time_serie in result['MyEnergyData_MarketDocument']['TimeSeries']:
for period in time_serie['Period']:
resolution = period['resolution']
timestart_utc = period['timeInterval']['start']
timestart_datetime = datetime.strptime(timestart_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
timestart_denmark = timestart_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
timestart_denmark_str = datetime.strftime(timestart_denmark, '%Y-%m-%dT%H:%M:%S')
timeend_utc = period['timeInterval']['end']
timeend_datetime = datetime.strptime(timeend_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
timeend_denmark = timeend_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
timeend_denmark_str = datetime.strftime(timeend_denmark, '%Y-%m-%dT%H:%M:%S')
period_rows = [
{
'meter_id': meter_id,
'resolution': resolution,
'timestart_utc': timestart_utc,
'timestart_denmark': timestart_denmark_str,
'timeend_utc': timeend_utc,
'timeend_denmark': timeend_denmark_str,
'point_position': point['position'],
'point_out_quantity': str(point['out_Quantity.quantity']).replace('.',','),
'point_out_quality': point['out_Quantity.quality']
}
for point in period['Point']
]
writer.writerows(period_rows)
print(f'Saved usage date for period {date_period[0]} to {date_period[1]}')
print(f'Saved usage data for meter {meter_id}')
print(f'Saved usage data for meter(s)')
# Gets and saves metering point electricity charges data as a csv file
def get_charges_data(meter_ids):
print('Starting to save charges data...')
# Prepare csv file for writing
with open('eloverblik_charges_data.csv', 'w', newline='') as csvfile:
fieldnames = ['meter_id', 'chargetype', 'name', 'description', 'owner', 'validfromdate', 'validtodate', 'periodtype', 'position', 'price', 'quantity']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for meter_id in meter_ids:
print(f'Getting and saving charges data for meter id {meter_id}...')
meter_json = {
"meteringPoints": {
"meteringPoint": [
meter_id
]
}
}
charges_data_endpoint = 'meteringpoints/meteringpoint/getcharges'
get_meter_charges_data = get_endpoint(charges_data_endpoint, meter_json)
for result in get_meter_charges_data['result']:
for item in result['result']['fees']:
chargetype = 'fee'
subscription_row = {
'meter_id': meter_id,
'chargetype': chargetype,
'name': item['name'],
'description': item['description'],
'owner': item['owner'],
'validfromdate': item['validFromDate'],
'validtodate': item['validToDate'],
'periodtype': item['periodType'],
'position': '',
'price': str(item['price']).replace('.',','),
'quantity': item['quantity']
}
writer.writerow(subscription_row)
for item in result['result']['subscriptions']:
chargetype = 'subscription'
subscription_row = {
'meter_id': meter_id,
'chargetype': chargetype,
'name': item['name'],
'description': item['description'],
'owner': item['owner'],
'validfromdate': item['validFromDate'],
'validtodate': item['validToDate'],
'periodtype': item['periodType'],
'position': '',
'price': str(item['price']).replace('.',','),
'quantity': item['quantity']
}
writer.writerow(subscription_row)
for item in result['result']['tariffs']:
chargetype = 'tariff'
name = item['name']
description = item['description']
owner = item['owner']
validfromdate = item['validFromDate']
validtodate = item['validToDate']
periodtype = item['periodType']
tariff_rows = [
{
'meter_id': meter_id,
'chargetype': chargetype,
'name': name,
'description': description,
'owner': owner,
'validfromdate': validfromdate,
'validtodate': validtodate,
'periodtype': periodtype,
'position': point['position'],
'price': str(point['price']).replace('.',','),
'quantity': ''
}
for point in item['prices']
]
writer.writerows(tariff_rows)
print(f'Saved charges data for meter {meter_id}')
print(f'Saved charges data for meter(s)')
# Main program logic
def main():
# Define and load parser arguments
parser = argparse.ArgumentParser(description='Get data on electricity usage from eloverblik.dk')
parser.add_argument('-m', '--mode', help='Mode: List meters or get data ', type=str, choices=['list', 'get'], required=True)
parser.add_argument('-n', '--meterid', help='Get data from this specific meter in get mode', type=str)
parser.add_argument('-a', '--aggregation', help='Get timeseries data with this aggregation in get mode', choices=['Actual', 'Quarter', 'Hour', 'Day', 'Month', 'Year'], default='Actual', type=str)
parser.add_argument('-f', '--fromdate', help='Get data from this date in get mode, format yyyy-mm-dd', type=str)
parser.add_argument('-t', '--todate', help='Get data to and including this date in get mode, format yyyy-mm-dd', type=str)
parser.add_argument('-d', '--deletetoken', help='Delete existing token file', action='store_true')
parser.add_argument('-r', '--refreshdatatoken', help='Force refresh of data access token by deleting token file', action='store_true')
args = parser.parse_args()
# Delete token file if set as argument
if args.deletetoken:
print('Deleting existing token file if it exists')
os.remove(token_filename)
# Delete data token file if set as argument
if args.refreshdatatoken:
print('Deleting existing data access token file if it exists')
os.remove(data_access_token_filename)
# Load or save token
if not exists(token_filename):
print('No token from eloverblik.dk saved. Paste your token here.')
token = str(input('Token: '))
with open(token_filename, 'wb') as token_file:
pickle.dump(token, token_file)
else:
with open(token_filename, 'rb') as token_file:
token = pickle.load(token_file)
# If mode is list meters, get a list of meters
if args.mode == 'list':
print('Listing available meters...')
# Get data access token
get_or_set_data_access_token(token)
# List meters
list_meters()
# If mode is get data, get data
elif args.mode == 'get':
# Date argument validation
if args.fromdate and not args.todate or args.todate and not args.fromdate:
sys.exit('Error: You must specify both a from date and a to date. Exiting.')
try:
from_date = datetime.strptime(args.fromdate, '%Y-%m-%d').date()
to_date = datetime.strptime(args.todate, '%Y-%m-%d').date()
if from_date > to_date:
sys.exit('Error: Your from date cannot be after your to date. Exiting.')
elif from_date == to_date:
sys.exit('Error: Your from date cannot be the same as your to date. Exiting.')
elif from_date > today:
sys.exit('Error: Your from date cannot be after today. Exiting.')
elif to_date > today + timedelta(days=1):
sys.exit('Error: Your to date cannot be later than one day after today. Exiting.')
except ValueError:
sys.exit('Error: From or to date in invalid format. Format must be yyyy-mm-dd with no quotes. Exiting.')
# Periods must be a maximum of 730 days, so longer periods are sliced into smaller pieces
if to_date > from_date + timedelta(days=730):
periods = []
start_of_period = from_date
slice_finished = False
while slice_finished == False:
end_of_period = start_of_period + timedelta(days=730)
if end_of_period <= to_date:
periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
start_of_period = end_of_period + timedelta(days=1)
else:
end_of_period = to_date
periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
slice_finished = True
# Smaller periods are saved as a list in a list to use the same for loop later
else:
periods = [[args.fromdate, args.todate]]
print('Getting data...')
# Get data access token
get_or_set_data_access_token(token)
# Specifik meter id is set by user
if args.meterid:
meter_ids = [args.meterid]
# Meter id argument is not set, so list of meters is fetched and listed
else:
# Get ids of meters
print('Getting list of meters...')
get_metering_points = get_endpoint('meteringpoints/meteringpoints')
print(f'Found {len(get_metering_points["result"])} meters')
meter_ids = [meter['meteringPointId'] for meter in get_metering_points['result']]
if meter_ids:
# Get data from meters
print('Getting and saving usage and charges data for meter(s)...')
# Get usage data
get_usage_data(meter_ids, args, periods)
# Get charges data
get_charges_data(meter_ids)
# Print status
print('Saved usage and charges data for meter(s)')
else:
sys.exit('Error: Did not find any meters, so no data to fetch. Exiting.')
if __name__ == '__main__':
main()
På jagt efter danske domænenavne
For lang tid siden fik jeg fat i domænet wishlist.dk til min ønskeseddelservice ved at klage over, at domænet så ud til at være registreret alene med henblik på at sælge det videre. Det må man nemlig ikke gøre med .dk-domæner.
Noget tid efter blev jeg kontaktet af en, der hedder Jakob, der havde læst mit indlæg og havde brugt samme metode til at få fat i et domænenavn, han havde drømt om.
Et nyt dataprojekt
Den oplevelse gav mig den idé, at jeg kunne tjene det godes sag, hvis jeg på en eller anden måde kunne lave en liste over domæner registreret med henblik på videresalg og offentliggøre listen. Jeg tænkte, jeg kunne:
- Hente en liste over danske domænenavne
- Lave en robot, der henter noget data om hvert domænenavn og måske tager et skærmbillede af siden
- Kategorisere et par tusind domæner efter hvad de i mine øjne bliver brugt til
- Bruge noget smart maskinlærings-AI-hokus-pokus til at kategorisere de resterende domænenavne
Men, men, men:
Ingen ville hjælpe med en domænenavnsliste
Jeg troede, at første trin i min plan om at finde en liste over .dk-domænenavne ville være det nemmeste. Jeg vidste, at jeg kan slå oplysninger op om domæner hos DK Hostmaster, der administrerer .dk-domæner (domænerne er ejet af staten). Og jeg havde også en anelse om, at der i Domæneloven står noget i retning af:
WHOIS-databasen
§ 18. Administrator skal oprette og vedligeholde en database indeholdende oplysninger om registranternes navn, adresse og telefonnummer.
Stk. 2. Administrator skal sikre, at oplysningerne nævnt i stk. 1 er retvisende, opdaterede og offentligt tilgængelige.
Jeg skrev til DK Hostmaster og fik nej. Jeg skrev til Dansk Internet Forum og fik intet svar. Jeg spurgte Klagenævnet for Domænenavne om jeg kunne klage over DK Hostmaster og fik nej. Så prøvede jeg at få aktindsigt i oplysningerne:

Det gik heller ikke så godt.
De oplysninger, jeg troede var offentlige, bliver åbenbart holdt tæt ind til kroppen. For at beskytte mig (og dig) mod spam!(?)
Efter afslaget klagede jeg til Erhvervsstyrelsen (nej!), forsøgte med aktindsigt hos Det Kongelige Bibliotek, som jeg fandt ud af var i besiddelse af listen (nej!), og skrev også til et par legitime domæneregistrationssælgere, som får listen af DK Hostmaster, om de var indstillet på at dele (nej!).
Jeg havde været ihærdig, men spildt så meget af min egen og andres tid, at jeg besluttede mig for at ændre kurs.
På datahøst
Det gik op for mig, at jeg ikke behøvede at kende alle .dk-domæner til mit hobbyprojekt. Nogle hundrede tusinde eller en million ville sikkert være rigeligt (der er registreret ca. 1,4 millioner i skrivende stund).
- Jeg fandt en lang liste over danske ord og lavede en lille robot til at slå ordene op som domæner hos DK Hostmaster.
- Jeg fandt en lignende liste med de mest brugte engelske ord og slog ordene op.
- Jeg søgte på ord fra ordlisterne hos en kendt søgemaskine og fik søgeresultater tilbage med danske domænenavne.
- Jeg fik API-adgang til CVR-registeret (som sjovt nok er rigtigt offentligt) og hentede domænenavne for alle danske virksomheder.
Disse metoder gav mig de første ca. 350.000 .dk-domæner og gav mig lejlighed til at skrive en masse små Python-scripts til at automatisere det meste.
Guldminen
Så fandt jeg guldminen. Apple, Google, Facebook, Cloudflare og andre hæderkronede virksomheder har et lidt andet syn på sikkerhed end DK Hostmaster, der jo gerne vil hemmeligholde danske domænenavne for at forhindre spam.
For at bekæmpe snyd med certifikater til sikker kommunikation på nettet (hængelåsen i browseren, du ved), logger de udstedelsen af certikater og har indtil videre logget lige under 8,5 milliard certifikater. I stedet for hemmeligholdelse: transparens.
Aha! Så når en ejer af et .dk-domæne får udstedt et certifikat til sin fine hjemmeside, bliver udstedelsen logget.
Jeg satte et smart program, der hedder Axeman, til at begynde at downloade logs, filtrerede for .dk-domænenavne og begyndte at tilføje dem til min database.
Det går langsomt, men det giver resultater.
På https://wallnot.dk/dotdk/ har jeg nu samlet 639.485 .dk-domænenavne til fri download og videredistribution. Og der er mange, mange flere på vej.
Tag den!
…Men hvad med projektet?
Jeg har faktisk fået kategoriseret nogle domænenavne efter hvordan jeg vurderer, de bliver brugt. Og taget en masse skærmbilleder, som jeg håber jeg kan bruge noget maskinlæring på. Men listen over .dk-domæner, jeg mener er registreret med henblik på videresalg, har lange udsigter.
Til gengæld håber jeg at listen over .dk-domæner i sig selv kan bruges af andre til et eller andet. Vi får se.
Udvikling i portefølje og på konti hos Nordnet
Opdatering 18/02/2023: Nordnet ændrer tit på deres ting. På https://github.com/helmstedt/nordnet-utilities forsøger jeg at følge med, så hent gerne din kode der, hvis koden neden for ikke virker længere.
I Nordnets porteføljerapport og kontooversigter, kan man se nogle flotte grafer over udviklingen i ens portefølje og/eller konti.

Jeg blev spurgt, om jeg ikke ville hjælpe med, hvordan man kan hive den slags ud af Nordnet til eget brug. Det er lidt nemmere til et hurtigt overblik, end hvis man skal hive alle sine transaktioner og kurser ud i Excel og tilrettelægge data der.
Her er nogle eksempler på mulighederne. Du kan også finde eksemplerne på GitHub:
# This program provides two examples of logging into a Nordnet account
# and extracting account performance as json data. One is based on standard
# intervals. The other is based on a user-defined interval.
# Storing and processing of returned data is left to you.
import requests
from nordnet_configuration import accounts
from nordnet_login import nordnet_login
session = requests.Session()
session = nordnet_login(session)
accounts_list = [value for value in accounts.values()]
### Nordnet standard intervals (one month, three months, six months, ytd, 1 year, 3 years and 5 years)
accounts_string = ','.join(accounts_list)
url = 'https://www.nordnet.dk/api/2/accounts/' + accounts_string + '/returns/performance'
period_options = ['m1','m3','m6','ty','y1','y3','y5']
standard_graph_data = {}
for period in period_options:
params = {
'period': period,
'start_at_zero': False
}
graph = session.get(url, params=params)
standard_graph_data[period] = graph.json()
# Store and process graph_data as needed
### User defined date intervals
start_date = '2019-01-30' # Edit as needed
end_date = '2019-05-14' # Edit as needed
user_defined_graph_data = {}
for account in accounts_list:
url = 'https://www.nordnet.dk/api/2/accounts/' + account + '/returns/performance'
params = {
'from': start_date,
'to': end_date
}
user_defined_graph = session.get(url, params=params)
user_defined_graph_data[account] = user_defined_graph.json()
# Store and process user_defined_graph_data as needed
Opdateret program til at logge på Nordnet med Powershell
Opdatering 18/02/2023: Nordnet ændrer tit på deres ting. På https://github.com/helmstedt/nordnet-utilities forsøger jeg at følge med, så hent gerne din kode der, hvis koden neden for ikke virker længere.
Jeg fik at vide, at mit eksempelprogram til at logge på Nordnet med Powershell ikke længere virkede. Nu har jeg lavet en ny udgave, der virker. Her er den:
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
$url = 'https://www.nordnet.dk/logind'
$r1 = iwr $url -SessionVariable cookies
$body = @{'username'=''; 'password'=''}
$headers = @{'Accept' = '*/*'; 'client-id' = 'NEXT'; 'sub-client-id' = 'NEXT'}
$url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
$r2 = iwr $url -method 'POST' -Body $body -Headers $headers -WebSession $cookies
$url = 'https://www.nordnet.dk/mediaapi/transaction/csv/filtered?locale=da-DK&account_id=1&from=2019-08-01&to=2019-10-01'
$r3 = iwr $url -WebSession $cookies
$content = $r3.Content
$encoding = [System.Text.Encoding]::unicode
$bytes = $encoding.GetBytes($content)
$decoded_content = [System.Text.Encoding]::utf32.GetString($bytes)
$decoded_content = $decoded_content.Substring(1,$decoded_content.length-1)
Write-Host $decoded_content
Nordnet-redskaber på Github
Du kan nu finde opdaterede udgaver af mine forskellige værktøjer til at hente data på Nordnet på https://github.com/helmstedt/nordnet-utilities. God fornøjelse.
Digital Post fra mit.dk til din e-mail
På https://github.com/helmstedt/digitalpost-utilities er jeg gået i luften med et program, der gør det muligt for dig at slippe for at logge ind på mit.dk hver gang du har fået ny Digital Post.
Jeg brugte https://github.com/dk/Net-MitDK til at forstå metodikken og Fiddler til at overvåge trafikken til og fra https://mit.dk og aflure sidens API.
De to hovedkomponenter i programmet er a) et program til at gennemføre første login på mit.dk i en browser med NemID/MitID og b) et program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted.
Program til at gennemføre første login på mit.dk i en browser med NemID/MitId
# Logs in to mit.dk og saves tokens needed for further requests.
# Method from https://github.com/dk/Net-MitDK/. Thank you.
from seleniumwire import webdriver
import requests
from bs4 import BeautifulSoup
import http.cookies
import gzip
import json
import base64
from hashlib import sha256
import string
import secrets
from mit_dk_configuration import tokens_filename
def random_string(size):
letters = string.ascii_lowercase+string.ascii_uppercase+string.digits+string.punctuation+string.whitespace
random_string = ''.join(secrets.choice(letters) for i in range(size))
encoded_string = random_string.encode(encoding="ascii")
url_safe_string = base64.urlsafe_b64encode(encoded_string).decode()
url_safe_string_no_padding = url_safe_string.replace('=','')
return url_safe_string_no_padding
def save_tokens(response):
with open(tokens_filename, "wt", encoding="utf8") as token_file:
token_file.write(response)
state = random_string(23)
nonce = random_string(93)
code_verifier = random_string(93)
code_challenge = base64.urlsafe_b64encode(sha256(code_verifier.encode('ascii')).digest()).decode().replace('=','')
login_url = 'https://gateway.mit.dk/view/client/authorization/login?client_id=view-client-id-mobile-prod-1-id&response_type=code&scope=openid&state=' + state + '&code_challenge=' + code_challenge + '&code_challenge_method=S256&response_mode=query&nonce=' + nonce + '&redirect_uri=com.netcompany.mitdk://nem-callback&deviceName=digitalpost-utilities&deviceId=pc&lang=en_US'
options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
driver = webdriver.Chrome(chrome_options=options)
login = driver.get(login_url)
print("Opening browser window. Log in to mit.dk using MitID or NemID in the browser.")
print("When you see a blank page in your browser at https://nemlog-in.mitid.dk/LoginOption.aspx, you're finished.")
input("Press ENTER once you're finished.")
session = requests.Session()
for request in driver.requests:
session.cookies.set('cookiecheck', 'Test', domain='nemlog-in.mitid.dk')
session.cookies.set('loginMethod', 'noeglekort', domain='nemlog-in.mitid.dk')
for request in driver.requests:
if '/api/mailboxes' in request.url and request.method == 'GET' and request.response.status_code == 200:
cookies = request.headers['Cookie'].split("; ")
for cookie in cookies:
if 'LoggedInBorgerDk' in cookie or 'CorrelationId' in cookie:
key_value = cookie.split('=')
session.cookies.set(key_value[0], key_value[1], domain='.post.borger.dk')
if request.response:
headers_string = str(request.response.headers)
headers_list = headers_string.split('\n')
for header in headers_list:
if 'set-cookie' in header:
cookie_string = header.replace('set-cookie: ','')
cookie = http.cookies.BaseCookie(cookie_string)
for key in cookie.keys():
# Requests is picky about dashes in cookie expiration dates. Fix.
if 'expires' in cookie[key]:
expiry = cookie[key]['expires']
if expiry:
expiry_list = list(expiry)
expiry_list[7] = '-'
expiry_list[11] = '-'
cookie[key]['expires'] = ''.join(expiry_list)
session.cookies.update(cookie)
if request.method == 'POST' and request.url == 'https://nemlog-in.mitid.dk/LoginOption.aspx' and request.response.status_code == 200:
if request.response.headers['content-encoding'] == 'gzip':
response = gzip.decompress(request.response.body).decode()
else:
response = request.response.body.decode()
soup = BeautifulSoup(response, "html.parser")
input = soup.find_all('input', {"name":"SAMLResponse"})
samlresponse = input[0]["value"]
driver.close()
request_code_part_one = session.post('https://gateway.digitalpost.dk/auth/s9/nemlogin/ssoack', data={'SAMLResponse': samlresponse}, allow_redirects=False)
request_code_part_one_redirect_location = request_code_part_one.headers['Location']
request_code_part_two = session.get(request_code_part_one_redirect_location, allow_redirects=False)
request_code_part_two_redirect_location = request_code_part_two.headers['Location']
request_code_part_three = session.get(request_code_part_two_redirect_location, allow_redirects=False)
request_code_part_three_redirect_location = request_code_part_three.headers['Location']
code_start = request_code_part_three_redirect_location.index('code=') + 5
code_end = request_code_part_three_redirect_location.index('&', code_start)
code = request_code_part_three_redirect_location[code_start:code_end]
redirect_url = 'com.netcompany.mitdk://nem-callback'
token_url = 'https://gateway.mit.dk/view/client/authorization/token?grant_type=authorization_code&redirect_uri=' + redirect_url + '&client_id=view-client-id-mobile-prod-1-id&code=' + code + '&code_verifier=' + code_verifier
request_tokens = session.post(token_url)
save_tokens(request_tokens.text)
print('Login to mit.dk went fine.')
print(f'Tokens saved to {tokens_filename}.')
Program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted
# Sends unread messages from mit.dk to an e-mail.
import requests
import json
import smtplib # Sending e-mails
from email.mime.multipart import MIMEMultipart # Creating multipart e-mails
from email.mime.text import MIMEText # Attaching text to e-mails
from email.mime.application import MIMEApplication # Attaching files to e-mails
from email.utils import formataddr # Used for correct encoding of senders with special characters in name (e.g. Københavns Kommune)
from mit_dk_configuration import email_data, tokens_filename
base_url = 'https://gateway.mit.dk/view/client/'
session = requests.Session()
def open_tokens():
try:
with open(tokens_filename, "r", encoding="utf8") as token_file:
tokens = json.load(token_file)
return tokens
except:
return print('Unable to open and parse token file. Did you run mit_dk_first_login.py?')
def revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken):
endpoint = 'authorization/revoke?client_id=view-client-id-mobile-prod-1-id'
json_data = {
'dpp': {
'token': mitdkToken,
'token_type_hint': 'access_token'
},
'ngdp': {
'token': ngdpToken,
'token_type_hint': 'access_token'
},
}
revoke_access_tokens = session.post(base_url + endpoint, json=json_data)
if not revoke_access_tokens.status_code == 200:
print("Something went wrong when trying to revoke old access tokens. Here is the response:")
print(revoke_access_tokens.text)
json_data = {
'dpp': {
'refresh_token': dppRefreshToken,
'token_type_hint': 'refresh_token'
},
'ngdp': {
'refresh_token': ngdpRefreshToken,
'token_type_hint': 'refresh_token'
},
}
revoke_refresh_tokens = session.post(base_url + endpoint, json=json_data)
if not revoke_refresh_tokens.status_code == 200:
print("Something went wrong when trying to revoke old refresh tokens. Here is the response:")
print(revoke_refresh_tokens.text)
def refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken):
endpoint = 'authorization/refresh?client_id=view-client-id-mobile-prod-1-id'
json_data = {
'dppRefreshToken': dppRefreshToken,
'ngdpRefreshToken': ngdpRefreshToken,
}
refresh = session.post(base_url + endpoint, json=json_data)
if not refresh.status_code == 200:
print("Something went wrong trying to fetch new tokens.")
refresh_json = refresh.json()
if 'code' in refresh_json:
print("Something went wrong trying to fetch new tokens. Here's the response:")
print(refresh_json)
return False
else:
with open(tokens_filename, "wt", encoding="utf8") as token_file:
token_file.write(refresh.text)
return refresh_json
def get_fresh_tokens_and_revoke_old_tokens():
tokens = open_tokens()
try:
if 'dpp' in tokens:
dppRefreshToken = tokens['dpp']['refresh_token']
mitdkToken = tokens['dpp']['access_token']
else:
dppRefreshToken = tokens['refresh_token']
mitdkToken = tokens['access_token']
ngdpRefreshToken = tokens['ngdp']['refresh_token']
ngdpToken = tokens['ngdp']['access_token']
fresh_tokens = refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken)
if fresh_tokens:
revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken)
return fresh_tokens
except Exception as error:
print(error)
print('Unable to find tokens in token file. Try running mit_dk_first_login.py again.')
def get_simple_endpoint(endpoint):
response = session.get(base_url + endpoint)
return response.json()
def get_inbox_folders_and_build_query(mailbox_ids):
endpoint = 'folders/query'
json_data = {
'mailboxes': {}
}
for mailbox in mailbox_ids:
json_data['mailboxes'][mailbox['dataSource']] = mailbox['mailboxId']
response = session.post(base_url + endpoint, json=json_data)
try:
response_json = response.json()
except:
print('Unable to convert response to json. Here is the response:')
print(response.text)
folders = []
for folder in response_json['folders']['INBOX']:
folder_info = {
'dataSource': folder['dataSource'],
'foldersId': [folder['id']],
'mailboxId': folder['mailboxId'],
'startIndex': 0
}
folders.append(folder_info)
return folders
def get_messages(folders):
endpoint = 'messages/query'
json_data = {
'any': [],
'folders': folders,
'size': 20,
'sortFields': ['receivedDateTime:DESC']
}
response = session.post(base_url + endpoint, json=json_data)
return response.json()
def get_content(message):
content = []
endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
for document in message['documents']:
doc_url = '/documents/' + document['id']
for file in document['files']:
encoding_format = file['encodingFormat']
file_name = file['filename']
file_url = '/files/' + file['id'] + '/content'
file_content = session.get(base_url + endpoint + doc_url + file_url)
content.append({
'file_name': file_name,
'encoding_format': encoding_format,
'file_content': file_content
})
return content
def mark_as_read(message):
endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
session.headers['If-Match'] = str(message['version'])
json_data = {
'read': True
}
mark_as_read = session.patch(base_url + endpoint, json=json_data)
mailserver_connect = False
tokens = get_fresh_tokens_and_revoke_old_tokens()
if tokens:
session.headers['mitdkToken'] = tokens['dpp']['access_token']
session.headers['ngdpToken'] = tokens['ngdp']['access_token']
session.headers['platform'] = 'web'
mailboxes = get_simple_endpoint('mailboxes')
mailbox_ids = []
for mailboxes in mailboxes['groupedMailboxes']:
for mailbox in mailboxes['mailboxes']:
mailbox_info = {
'dataSource': mailbox['dataSource'],
'mailboxId': mailbox['id']
}
mailbox_ids.append(mailbox_info)
folders = get_inbox_folders_and_build_query(mailbox_ids)
messages = get_messages(folders)
for message in messages['results']:
if message['read'] == False:
if mailserver_connect == False:
server = smtplib.SMTP(email_data['emailserver'], email_data['emailserverport'])
server.ehlo()
server.starttls()
server.login(email_data['emailusername'], email_data['emailpassword'])
mailserver_connect = True
label = message['label']
sender = message['sender']['label']
message_content = get_content(message)
msg = MIMEMultipart('alternative')
msg['From'] = formataddr((sender, email_data['emailfrom']))
msg['To'] = email_data['emailto']
msg['Subject'] = "mit.dk: " + label
for content in message_content:
if content['encoding_format'] == 'text/plain':
body = content['file_content'].text
msg.attach(MIMEText(body, 'plain'))
part = MIMEApplication(content['file_content'].content)
part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
msg.attach(part)
elif content['encoding_format'] == 'text/html':
body = content['file_content'].text
msg.attach(MIMEText(body, 'html'))
part = MIMEApplication(content['file_content'].content)
part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
msg.attach(part)
elif content['encoding_format'] == 'application/pdf':
part = MIMEApplication(content['file_content'].content)
part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
msg.attach(part)
else:
encoding_format = content['encoding_format']
print(f'Ny filtype {encoding_format}')
part = MIMEApplication(content['file_content'].content)
part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
msg.attach(part)
print(f'Sender en mail fra mit.dk fra {sender} med emnet {label}')
server.sendmail(email_data['emailfrom'], email_data['emailto'], msg.as_string())
mark_as_read(message)
if mailserver_connect:
server.quit()
Eldata fra eloverblik.dk med Python
Opdateret den 10. maj 2023: Jeg har skrevet en ny, forbedret udgave af programmet. Find det her.
Siden jeg fandt ud af at hente elforbrugsdata fra Ørsted har jeg fået nyt elselskab og portalen eloverblik.dk, som omfatter alle elkunder i Danmark, har fået et API. Du kan finde dokumentation til API’et her: https://api.eloverblik.dk/CustomerApi/swagger/index.html.
For at bruge API’et skal du oprette et token på eloverblik.dk. Det har Niras lavet en fin guide til.
Jeg er først lige kommet i gang, men her er et program, der beder om et token til at hente data fra API’et og derefter henter data om en måler, forbrugsdata og prisdata. Indsæt dit token fra eloverblik.dk i token-variablen øverst for at bruge programmet:
# https://api.eloverblik.dk/CustomerApi/swagger/index.html
import requests
token = ''
# Get data access token for subsequent requests
get_data_access_token_url = 'https://api.eloverblik.dk/CustomerApi/api/token'
headers = {
'accept': 'application/json',
'Authorization': 'Bearer ' + token,
}
response = requests.get(get_data_access_token_url, headers=headers)
data_access_token = response.json()['result']
# Get id of first meter - edit if you have more than one meter
metering_points_url = 'https://api.eloverblik.dk/CustomerApi/api/meteringpoints/meteringpoints'
headers = {
'accept': 'application/json',
'Authorization': 'Bearer ' + data_access_token,
}
meters = requests.get(metering_points_url, headers=headers)
first_meter = meters.json()['result'][0]['meteringPointId']
#Try to get data
meter_data = 'https://api.eloverblik.dk/CustomerApi/api/meterdata/gettimeseries/'
timeseries_data = {
'dateFrom': '2021-01-01',
'dateTo': '2021-01-31',
'aggregation': 'Actual'
}
meter_data_url = meter_data + timeseries_data['dateFrom'] + '/' + timeseries_data['dateTo'] + '/' + timeseries_data['aggregation']
meter_json = {
"meteringPoints": {
"meteringPoint": [
first_meter
]
}
}
meter_data_request = requests.post(meter_data_url, headers=headers, json=meter_json)
#Charges
charges_data = 'https://api.eloverblik.dk/CustomerApi/api/meteringpoints/meteringpoint/getcharges'
charges_data_request = requests.post(charges_data, headers=headers, json=meter_json)
breakpoint()
Besked når der åbnes for reservationer på [hypet restaurant i København]
En restaurant, jeg gerne vil prøve, er fuldstændig booket op og har endnu ikke åbnet op for reservationer i april. Hvordan kan jeg komme først til fadet?
Jeg besøgte reservationssystemet og iagttog hvordan det interne API spurgte om ledige borde.
Dette billede viser forespørgslen:

Dette billede viser svaret fra API’et:

For april så svaret sådan her ud:

API’et svarer altså med en tom ‘data’-nøgle, når der ikke er åbnet for reservationer endnu.
Jeg skrev et lille program, som jeg har sat til at køre hvert 5. minut, for at tjekke om jeg kan komme til at reservere. Programmet tjekker, om der er kommet noget indhold i ‘data’-nøglen i svaret fra API’et. Hvis der er, sender det mig en besked om, at jeg godt kan komme i gang med at reservere bord.

Næste skridt kunne være at udvide programmet, sådan det også reserverer bordet for mig. Men i første omgang prøver jeg at gøre den del af arbejdet selv.
Hvordan udvikler antal underskrifter sig på Borgerforslag.dk?
På Twitter skrev Peter Brodersen:

Peters idé er sjov, synes jeg, så jeg er så småt begyndt at bygge et eller andet, der monitorerer hvordan antallet af underskrifter på borgerforslag udvikler sig over tid.
Så nu tygger min webserver sig igennem nedenstående script hvert 10. minut og gemmer det aktuelle antal underskrifter på hvert borgerforslag. Når der er gået nogle uger, vil jeg se om jeg kan lave nogle interessante visualiseringer af data.
import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error
### PREPARATION ###
# Locale is set to Danish to be able to parse dates from Borgerforslag
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))
# API url and request parameters
url = 'https://www.borgerforslag.dk/api/proposals/search'
suggestions_per_request = 300
params_json = {
"filter": "active",
"sortOrder": "NewestFirst",
"searchQuery":"",
"pageNumber":0,
"pageSize": suggestions_per_request
}
# Connect to database
try:
connection = psycopg2.connect(user = "",
password = "",
host = "",
port = "",
database = "")
cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
print ("Error while connecting to PostgreSQL", error)
now = datetime.utcnow()
# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
with connection:
with connection.cursor() as cur:
try:
# See if suggestion already exists
sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
cur.execute(sql, (suggestion['externalId'],))
suggestion_records = cur.fetchone()
# If not, add suggestion
if not suggestion_records:
suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
cur.execute(sql, suggestion_data)
id = cur.fetchone()[0]
# If yes, get id
else:
id = suggestion_records[0]
# Add votes
sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
VALUES(%s,%s,%s)'''
cur.execute(sql, (id,now,suggestion['votes']))
except Error as e:
print(e, suggestion)
# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0
# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
response = requests.post(url, json=params_json)
json_response = response.json()
number_of_results = json_response['resultCount']
requested_results += suggestions_per_request
number_of_loops += 1
params_json['pageNumber'] += 1
for suggestion in json_response['data']:
suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y') # convert date to datetime
insert_suggestion_and_votes(connection, suggestion)