Blokering af onde robotter med fail2ban

På min lille virtuelle private server kører bl.a. en række WordPress-sider (som fx denne blog). En ret stor del af indholdet på nettet leveres af WordPress, og derfor er det måske ikke så mærkeligt, at der også er en del aktivitet på nettet, der handler om at udnytte huller i og gætte brugernavne og kodeord til WordPress-sider.

I min log kunne jeg se en masse forespørgsler til WordPress’ loginside, fx:

xxx.xxx.xxx.xxx - - [25/Feb/2024:08:41:54 +0100] "POST https://[helmstedt].dk/wp-login.php HTTP/1.1" 200 240 "-" "Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:94.0) Gecko/20100101 Firefox/95.0"

Den slags forespørgsler er ret uskadelige, hvis man bruger et kodeord, der er svært at gætte, men jeg bryder mig ikke om tanken om, at en masse robotter forsøger at overtage mine sider.

Derfor installerede jeg fail2ban på min server. Fail2ban kan monitorere en logfil og blokere for IP-adresser, der gør noget, man ikke vil have. Det fungerer ved, at man opsætter en regel for, hvad fail2ban skal kigge efter, og når programmet så støder på et mønster i log-filen, der opfylder reglen, sættes IP-adresser “i fængsel” (blokeres i en periode).

Jeg startede med at flytte adressen på mine login-formularer fra /wp-login.php til en anden adresse (jeg brugte en plugin til WordPress til formålet), sådan at fremtidige besøg til denne URL ville give en 404-fejl.

Så definerede jeg mit “fængsel”. Ét enkelt besøg, der opfylder filterreglen “wordpress” betyder 96 timers blokering i min servers firewall. Jeg sørgede for at undtage min egen IP-adresse fra reglen, så jeg ikke ved en fejl kommer til at lukke mig selv ude:

[DEFAULT]
bantime = 1h
ignoreip = 127.0.0.1/8 ::1 xxx.xxx.xxx.xxx [min hjemme-ip]

[sshd]
enabled = false

[wordpress]
iptables-multiport[name=wordpress, port="http,https", protocol=tcp]
enabled = true
filter = wordpress
port = http,https
logpath = /var/log/nginx/access.log
maxretry = 1
bantime = 96h

Så skrev jeg (med hjælp fra en søgning på internettet) filter-reglen. En GET- eller POST-forespørgsel til på “wp-login.php” med HTTP-kode 404 eller 403 eller en forespørgsel til “xmlrpc.php”, sætter den IP-adresse (“<HOST>”), der har forespurgt i fængsel:

[Definition]
failregex = ^<HOST> .*"(GET|POST).*\/wp-login.php.*(404|403).*$
			^<HOST> .*"(GET|POST).*\/xmlrpc.php.*$

fail2ban har nu kørt i et par døgn, og der er allerede 1418 forskellige IP-adresser, der er havnet i mit nye fængsel.

Det må være en ret omfattende industri at bryde ind i folks WordPress-installationer.

Dengang jeg ikke blev stævnet af Ønskeskyen

Prolog

Alt, hvad jeg skriver her, skriver jeg som privatperson.

Kapitel 1: Opkaldet

For lidt over et år siden, hen mod slutningen af januar 2023, får min chef i Datatilsynet, hvor jeg arbejder som dataspecialist, et overraskende opkald. I telefonen er en advokat fra Mazanti-Andersen med titel af partner, som siger, han repræsenterer Ønskeskyen. Han vil gerne tale med min chef. Om mig.

Nogle dage før min chef tager telefonen, offentliggjorde jeg indlægget 2-0 til wishlist.dk i kampen mod Ønskeskyen på min blog. Indlægget er humoristisk. Når jeg omtaler ukommercielle wishlist.dk som en konkurrent til ønskeskyen.dk, er det faktisk ikke noget, jeg mener alvorligt. Jeg håber og tror, at den gennemsnitlige læser vil forstå indlægget på samme måde, som det var tænkt, nemlig som en satire over iværksætterlingo krydret med en fortælling om en teknisk udfordring, jeg havde fundet en løsning på.

Iværksætterbullshit og et totalt intetsigende diagram, som jeg ikke forestillede mig, at nogen ville tage alvorligt, (hvilket der også indtil videre kun har været én, der gjorde.)

Min ambition med wishlist.dk, var (og er) at udvikle en brugbar ønskeseddelservice, ligesom mange andre før mig. Jeg var stolt af resultatet, så jeg stillede siden til rådighed for den forsvindende lille del af verdens befolkning, der både kan dansk og kan finde min side blandt de mange lignende sider. Siden har ingen forretningsmodel, ingen vækststrategi, ingen ansatte, ingen værdisætning, nul exitplan, nul indtjening. Kun beskedne udgifter: et domænenavn og en smule server. Et con amore-projekt.

Ønskeskyen har 46 ansatte (november 2023), og er et kommercielt foretagende, som er ejet af ambitiøst- og valutaklingende Dotcom Capital ApS, som igen har ejere i fine byer som fx Marbella og Køge. Det er den type virksomhed, mine børn ser reklamer for i sekunderne før dagens afsnit af Valdes Jul på TV2. Ønskeskyen ønsker sig at blive DEN STØRSTE ØNSKESEDDELSERVICE I VERDEN. Den præcise forretningsmodel kender jeg ikke, men jeg tror, at det er noget med, at Ønskeskyen får en del af kagen, når din mor køber den Airfryer, du ønsker dig, fra en virksomhed, Ønskeskyen samarbejder med. Der er åbenbart mange, der ønsker sig en Airfryer.

Ønskeskyens advokat har (med stor sikkerhed) fundet ud af, at jeg arbejder i Datatilsynet ved at læse mit CV, for jeg har ikke skrevet om det andre steder. Jeg vil gerne have, at det, jeg ytrer som privatperson på min blog, Twitter, Aula, etc., ikke misforstås som min arbejdsplads’ holdninger, og derfor er mit arbejde ikke noget, jeg går og skilter med. Hvad jeg skriver på min blog som privatperson, har intet med min arbejdsplads at gøre.

Som min chef opfatter samtalen, er advokaten på Ønskeskyens vegne blevet betænkelig ved, at en medarbejder i Datatilsynet, som er tilsynsmyndighed for Ønskeskyens behandling af persondata, udtaler de ting om Ønskeskyen, som Ønskeskyen opfatter, at jeg skriver i mit blogindlæg.

Min chef betrygger advokaten ved at oplyse, at jeg ikke sidder med sagsbehandling. Det er ikke nogen nyhed, hvis man allerede har brugt tid på at finde og læse mit CV.

Derudover forstår min chef på advokaten, at Ønskeskyen har tænkt sig at stævne mig for noget i retning af “overtrædelse af markedsføringsloven” og “injurier” på baggrund af blogindlægget. Det er oplysninger, som slet ikke kommer min chef ved.

Kapitel 2: Chefens kontor

Da jeg bliver kaldt ind på min chefs kontor og hører om opkaldet, bliver jeg – for at sige det mildt – overrasket.

Jeg havde nemlig ikke selv hørt noget fra Ønskeskyen om mit blogindlæg. Min chef fortæller mig, at en virksomhed, jeg har skrevet om, er så utilfredse med det, jeg har skrevet, at de bruger dyre advokattimer på at reagere ved at kontakte min arbejdsplads, og at de oven i købet har tænkt sig at stævne mig.

Kontakt via wishlist.dk. To klik.
Kontakt via helmstedt.dk. To klik.

Det havde været både nemt og billigt at kontakte mig direkte. I stedet valgte advokaten at bruge af Ønskeskyens og skatteborgernes penge på først at finde frem til min arbejdsplads og min chef, og dernæst at ringe til hende. Jeg er helt sikker på, at hun kunne have brugt sin og min arbejdstid på noget mere nyttigt.

Min arbejdsplads er et fornuftigt sted, men alligevel har jeg det dårligt med, at den er blevet involveret i noget, jeg går og pusler med som privat-Morten. Jeg er ikke sådan rigtigt bange for konsekvenser på arbejdet, men i januar 2023 er jeg forholdsvis nyansat og væsentligt mere optaget af at løse problemer end at være årsag til dem.

På arbejdet sker der ikke mere. Henvendelsen bliver, som DJØF’ere kalder det, “taget til efterretning”. Det betyder, at den arkiveres, og at der formentlig aldrig er nogen, der vender tilbage til den.

Kapitel 3: Hjemme igen

“Stævnet”, “markedsføringsloven” og “injurier” er ikke mine favoritord (jeg kan bedre lide ord som “negroni” og “gedefeta”). Som privat-Morten er jeg på en og samme tid både meget principfast og ret konfliktsky, så lad os bare sige, at mine følelser er blandede. Jeg går i gang med at prøve at omsætte følelser til handling:

Næste dag får jeg kontaktet min forsikring for at høre, om min retshjælp dækker den slags. Jeg ringer til nogle hjælpsomme og empatiske advokater hos Bird & Bird, som har ry for at have forstand på teknologi. De flinke advokater beroliger mig (helt uden at fakturere) og anbefaler:

  • At vise mig samarbejdsvillig
  • At slå koldt vand i blodet, indtil jeg rent faktisk modtager noget på skrift fra Mazanti-Andersen og Ønskeskyen, hvorefter de gerne (mod betaling, forstår jeg), vil hjælpe mig så godt de kan

Jeg forsøger at ringe til advokaten fra Mazanti-Andersen et par gange, men han tager ikke telefonen. Jeg sender en e-mail og forsøger at udvise samarbejdsvillighed:

Jeg forsøger at gøre det endnu nemmere for advokaten at få fat i mig.

Jeg snakker med venner, familie og mine kolleger (i frokostpausen) om sagen. De udviser forståelse for min situation. Vi griner af det absurde og bizarre ved det. Det hjælper.

Epilog

Jeg hører aldrig tilbage fra advokaten. Der kommer ikke nogen stævning i postkassen. Tiden går.

Det, som Ønskeskyens advokat opnåede ved at involvere min arbejdsplads i, at virksomheden var utilfredse med noget, jeg havde skrevet som privatperson, var at intimidere mig ved kynisk at bruge det middel, der gav mig allermindst lyst til igen at ytre mig om Ønskeskyen. Det virkede, for der er gået over et år, til jeg har taget mig sammen til at fortælle historien, selv om jeg var så fortørnet over fremgangsmåden, at jeg havde lyst til at gøre det med det samme. I året, der er gået, har jeg ytret mig meget lidt om wishlist.dk og slet ikke om Ønskeskyen.

P.S.

For nogle måneder siden var der en der skrev noget vredt i /r/Denmark på Reddit (et sarkastisk diskussions- og selvhjælpsforum for yngre, liberale mænd) om, at Ønskeskyen ændrede på gavelinks.

En anden anbefalede wishlist.dk, og en tredje skrev, at mit 2-0 til wishlist.dk i kampen mod techgiganten Ønskeskyen” indlæg er ret morsomt”.

Jeg blev, af hvad jeg tror er forskellige, rigtige mennesker, kaldt:

  • Absolut madlad
  • Kæmpe chef
  • Ok Cool fyr

Jeg fik at vide, at jeg:

  • lyder alt for nice! Gode projekter han har! 10/10 ville købe øl til ham i byen!
  • virker som en super fyr. Hans projekt med at skaffe danske domæner har skaffet mig god værdi.
(Muligvis) forskellige (muligvis) mennesker på Reddit, der synes jeg gør en positiv forskel for menneskeheden.

Jeg blev endda kontaktet af en journalist, der havde læst Reddit-tråden, og som overvejede at nævne mine internetprojekter i en podcast.

Det føltes som en anerkendelse af, at mine hjemmesider og kode, som jeg stiller gratis til rådighed, er nyttige for andre. Det var meget livsbekræftende for mig at blive omtalt på den måde.

Hver eneste gang, jeg bliver kontaktet af forskellige privatpersoner, der har gode idéer til mine websider, eller problemer/projekter, som de håber jeg kan hjælpe dem med, bliver jeg glad. Jeg ønsker mig et internet, hvor vi hjælper hinanden.

Tak for hjælpen.

Forbedret script til udtræk af transaktioner hos Saxo Bank

Mit testpanel (hej Jess!) havde problemer med at få mit nye bud på et script til at logge ind og udtrække transaktioner hos Saxo Bank til at virke.

Det forstår jeg godt.

Men nu har jeg lavet en forbedret udgave, som jeg synes er mere brugervenlig. Den er herunder og er også at finde på GitHub.

God fornøjelse!

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
"""This script logs into a Saxo Bank account and performs a query to extract transactions.
For the script to work, make sure that you have less than six already approved devices for
Saxo Bank. Also, you should be receiving two factor codes by phone text message."""

import argparse
import base64
from datetime import datetime
from datetime import date
import json
import pickle
import requests
import secrets
import string
import sys

### INITIALIZE AND VALIDATE PARSER ARGUMENTS ###

IDENTITY_FILENAME = 'identity.pickle'
TODAY = date.today()
TODAY_STRING = datetime.strftime(TODAY, '%Y-%m-%d')

# Initialize requests session and set user agent
# I suspect that Saxo Bank requires a recent browser. If the program fails, try visiting a site
# in an updated browser with developer tools open and paste the user agent value from your browser below.
session = requests.Session()
session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; rv:122.0) Gecko/20100101 Firefox/122.0'

# Define parser arguments
parser = argparse.ArgumentParser(description='Fetch transaction data from Saxo Bank')
parser.add_argument('-f', '--firstrun', help='Run the script for the first time and login using two factor login', action='store_true')
parser.add_argument('-u', '--user', help='Saxo Bank user name', type=str, required=True)
parser.add_argument('-p', '--password', help='Saxo bank password', type=str, required=True)
parser.add_argument('-s', '--startdate', help='Get data from this date, format yyyy-mm-dd', type=str, required=True)
parser.add_argument('-e', '--enddate', help='Get data to and including this date, format yyyy-mm-dd, default is today', type=str, default=TODAY_STRING)
parser.add_argument('-d', '--devicename', help='Device name, default is SaxoPython', type=str, default='SaxoPython')
args = parser.parse_args()

# Set arguments as variables
user = args.user
password = args.password
device_name = args.devicename
startdate = args.startdate
enddate = args.enddate

# Date argument validation
try:
    start_date_date = datetime.strptime(startdate, '%Y-%m-%d').date()
    end_date_date = datetime.strptime(enddate, '%Y-%m-%d').date()
    if start_date_date > TODAY:
        sys.exit('Error: Your start date cannot be after today. Exiting.')
    elif end_date_date > TODAY:
        sys.exit('Error: Your end date cannot not be after today. Exiting.')
    elif start_date_date > end_date_date:
        sys.exit('Error: Your start date must not be after your end date. Exiting.')
except ValueError:
    sys.exit('Error: Start or end date in invalid format. Format must be yyyy-mm-dd with no quotes. Exiting.')

# If the first run parameter is set, create (minimal) unique identity string in order
# to later remember the device and avoid two factor authentification. The identity 
# string is pickled and saved.
if args.firstrun:
    print('You are running the script for the first time. Creating and saving identity file.')
    identifier = ''
    for i in range(3):
        identifier += ''.join(secrets.choice(string.digits) for i in range(10))
        if i < 2:
            identifier += '-'

    identity = '{\"identifier\":\"' + identifier + '\",\"metadata\":\"\"}'

    with open(IDENTITY_FILENAME, 'wb') as identity_file:
        pickle.dump(identity, identity_file)
        print(f'Identity file saved in script directory as {IDENTITY_FILENAME}.')
# If the first run parameter is not set, try to load and unpickle the identity file.
else:
    try:
        with open(IDENTITY_FILENAME, 'rb') as identity_file:
            identity = pickle.load(identity_file)
    except FileNotFoundError:        
        sys.exit('Error: The script was launched without the -f/--firstrun parameter, but no identity file was found. Use the -f parameter and try again.')


### LOGIN PROCEDURE ###
print('Starting login procedure...')

# Step one: Fetch login page and get correlation id from page source
step_one = session.get('https://www.saxoinvestor.dk/Login/da')
step_one_text = step_one.text
search_string = '"correlationId":"'
start_of_correlation_id = step_one_text.index(search_string)+len(search_string)
end_of_correlation_id = step_one_text.index('"', start_of_correlation_id)
correlation_id = step_one_text[start_of_correlation_id:end_of_correlation_id]

# Insert correlation id in state string and encode it as base 64 for later use
state_string = '{"appId":"investor","correlationId":"' + correlation_id + '"}'
state_string_b64_encoded = base64.b64encode(bytes(state_string, 'utf-8')).decode()

# Step two: Start authentication part one of X
login_url = 'https://www.saxoinvestor.dk/am/json/realms/root/realms/dca/authenticate?authIndexType=service&authIndexValue=authn-web-v6'

step_two = session.post(login_url)
step_two_json = step_two.json()

# Step three: Prepare data structure and perform authentication part two of a possible five
step_three_json_struct = step_two_json
step_two_json['callbacks'][0]['input'][0]['value'] = user
step_two_json['callbacks'][1]['input'][0]['value'] = "https://www.saxoinvestor.dk/investor"
step_two_json['callbacks'][2]['input'][0]['value'] = "SaxoInvestor"
step_two_json['callbacks'][3]['input'][0]['value'] = identity

step_three = session.post(login_url, json=step_two_json)
step_three_json = step_three.json()

# Step four: Prepare data structure and perform authentication part three of a possible five
step_three_json['callbacks'][1]['input'][0]['value'] = user
step_three_json['callbacks'][2]['input'][0]['value'] = password

step_four = session.post(login_url, json=step_three_json)
step_four_json = step_four.json()

# With an unknown device, the user is asked for a two factor login code
if args.firstrun:
    print('As this is your first run, you must enter a two factor code from your phone.')
    # Step five: Enter two factor code and perform authentication part four of a possible five
    two_factor_code = input('Enter two factor code (six digits): ')
    step_four_json['callbacks'][0]['input'][0]['value'] = two_factor_code

    step_five = session.post(login_url, json=step_four_json)
    step_five_json = step_five.json()

    # Step six: Save device to perform authentication part five of a possible five
    step_five_json['callbacks'][0]['input'][0]['value'] = device_name

    step_six = session.post(login_url, json=step_five_json)
    step_six_json = step_six.json()
    token_id = step_six_json['tokenId']
else:
    token_id = step_four_json['tokenId']

# Authenticate
auth_url = 'https://www.saxoinvestor.dk/am/oauth2/realms/root/realms/dca/authorize'

auth_data = f'csrf={token_id}&scope=openid%20profile%20openapi%20fr%3Aidm%3A*&response_type=code&client_id=SaxoInvestorPlatform&redirect_uri=https%3A%2F%2Fwww.saxoinvestor.dk%2Fapi%2Flogin%2Fcode&decision=allow&state={state_string_b64_encoded}'
session.headers['Content-Type'] = 'application/x-www-form-urlencoded'

authenticate = session.post(auth_url, data=auth_data)
authenticate_json = authenticate.json()

del session.headers['Content-Type']

# Open app website and extract API bearer token
beater_token_code = authenticate_json['code']
bearer_token_url = f'https://www.saxoinvestor.dk/showapp?code={beater_token_code}&state={state_string_b64_encoded}'

get_bearer_token = session.get(bearer_token_url)
get_bearer_token_text = get_bearer_token.text

search_string = ',"idToken":"'
start_of_bearer_token = get_bearer_token_text.index(search_string)+len(search_string)
end_of_bearer_token = get_bearer_token_text.index('"', start_of_bearer_token)
bearer_token = get_bearer_token_text[start_of_bearer_token:end_of_bearer_token]
bearer_token_string = 'Bearer ' + bearer_token

### PERFORM API CALLS ###
# Documentation at https://www.developer.saxo/openapi/learn
print('Login successful (I think). Getting transaction data.')

# Set bearer token as header
session.headers['Authorization'] = bearer_token_string

# API request to get Client Key which is used for most API calls
# See https://www.developer.saxo/openapi/learn/the-tutorial for expected return data
url = 'https://www.saxoinvestor.dk/openapi/port/v1/clients/me'
get_client = session.get(url)
clientdata = get_client.json()
clientkey = clientdata['ClientKey']

# Extract transactions
url = f'https://www.saxotrader.com/openapi/hist/v1/transactions?ClientKey={clientkey}&FromDate={startdate}&ToDate={enddate}'
saxo_transactions = session.get(url)
if saxo_transactions.status_code == 200:
    print('Looks like your transactions were extracted. Edit the script to process your data.')
    saxo_transactions_json = saxo_transactions.json()
else:
    print('Extracting your transactions failed for some reason. Sorry about that.')

Udtræk af transaktioner hos Saxo Bank efter indførsel af tofaktor-login

Redigeret den 3. februar 2024: Hent den nye, forbedrede udgave her!

Kapitalisterne hos Saxo Bank har indført tofaktor-login til deres selvbetjeningsløsning, hvilket er godt for sikkerheden, men desværre har ødelagt min tidligere løsning til udtræk af data fra deres API.

I dette indlæg fortæller jeg om, hvordan jeg undersøgte loginløsningen hos Saxo Bank og udviklede et nyt program, så jeg igen kan trække data ud. Hele det nye program finder du nederst.

Sådan virker den nye loginløsning

Saxo Banks nye login-løsning virker ved at bede om en kode, som sendes med SMS, første gang, man logger på med et nyt device/en ny browser. Når man først er logget ind én gang og har sagt ja til at huske ens enhed, bliver man ikke bedt om en tofaktor-kode igen.

Første del af login
Anden del af login

Hvad sker der bag kulisserne?

Når det kan lade sig gøre at huske, at jeg en gang har brugt tofaktor-kode i en browser, er cookies et godt bud på teknologien til at huske min browser.

For at undersøge, hvad der sker, trykker jeg F12 i Firefox, besøger login-siden og logger ind. I fanen Network får jeg en liste over alle de forespørgsler, min browser foretager, for at jeg kan logge ind. Ved at indkredse til to slags forespørgsler: HTML og XHR (XHR er forespørgsler, som scripts foretager), får jeg et nogenlunde overblik over proceduren fra login-siden frem til der foretages det første kald til Saxo Banks API.

Jeg har markeret de vigtige forespørgsler med rødt:

De forespørgsler, min browser foretager, når jeg logger ind på Saxo Bank.

For at kunne reproducere det, min browser gør, i et Python-script, undersøger jeg, hvad min browser sender af sted og modtager i hvert enkelt trin.

Trin 0: Klargøring af script

Inden jeg går i gang, importerer jeg nogle biblioteker, som jeg ved jeg skal bruge. Jeg indtaster også mine loginoplysninger og et par datoer, og til sidst gør jeg en session klar i biblioteket requests. Ved at bruge en session, bibeholder biblioteket de http-headers og cookies, som bliver sat undervejs i loginproceduren.

import requests 
from datetime import datetime
from datetime import date
import json

# Saxo user account credentials
user = 'USERNAME'
password = 'PASSWORD'

# Start date (start of period for transactions) and date today used for extraction of transactions
startdate = '2019-01-01'
today = date.today()
enddate = datetime.strftime(today, '%Y-%m-%d')

# Initialize requests session
session = requests.Session()

Trin 1: Besøg på loginsiden

Det første, jeg har brug for, er at kunne overbevise Saxo Banks hjemmeside om, at Python-biblioteket requests er den samme browser, som jeg netop er logget ind.

Jeg kan se, at den cookie min browser sender til login-siden indeholder nogle værdier, som ser tilpas tilfældige ud til, at de må udgøre en eller anden form for identifikation.

Jeg gør mit script klar til at imitere browser ved at reproducere en enkel header (den der siger noget om browseren) og alle de cookie-værdier, jeg kan se:

session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; rv:122.0) Gecko/20100101 Firefox/122.0'
session.cookies['LoginLanguage']='da'
session.cookies['UseNewLoginExperience']='True'
session.cookies['Theme_investor']='white'
session.cookies['stgo-V4_PROD_DMZ_SAXOTRADER_GO_LVE_81']='KOPIERET_FRA_BROWSER'
session.cookies['ASP.NET_SessionId']='KOPIERET_FRA_BROWSER'
session.cookies['stgo-SSO_LIVE_LOGIN_50201']='KOPIERET_FRA_BROWSER'
session.cookies['stgo-V4_ENT_DMZ_LVE_OA_CORE_8080']='KOPIERET_FRA_BROWSER'
session.cookies['lb']='KOPIERET_FRA_BROWSER'

For en sikkerheds skyld (jeg ved strengt taget ikke om det er nødvendigt endnu), besøger mit script login-siden:

step_one = session.get('https://www.saxoinvestor.dk/Login/da')

For at undersøge, om dette er nok til at imitere min browser, går jeg til næste trin i login-proceduren:

Trin 2: Authenticate (1/3)

Den næste interessante forespørgel går til:

https://www.saxoinvestor.dk/am/json/realms/root/realms/dca/authenticate?authIndexType=service&authIndexValue=authn-web-v6

I min browser kan jeg se, at forespørgslen er af typen POST, og at der ikke sendes noget data med forespørgslen, andet end nogle headers (som cookies er en del af).

Ved at kigge på response-fanen i browseren, kan jeg se hvad jeg gerne skulle få tilbage, hvis forespørgslen går godt:

En succesfuld forespørgsel returnerer en json-struktur med et langt “authId”, noget om hvor jeg er nået til i processen (“stage”) og en liste af “callbacks”, der kunne minde om noget, der beskriver felterne i en login-formular.

Jeg forsøger mig:

login_url = 'https://www.saxoinvestor.dk/am/json/realms/root/realms/dca/authenticate?authIndexType=service&authIndexValue=authn-web-v6'
step_two = session.post(login_url)
step_two_json = step_two.json()

Ved at inspicere “step_two” og “step_two_json”, kan jeg se, at forespørgslen er gået godt: Jeg får samme datastruktur tilbage, som jeg lige har set på i min browser.

Videre til næste trin!

Trin 3: Authenticate (2/3)

I browseren går jeg videre til næste forespørgsel til:

https://www.saxoinvestor.dk/am/json/realms/root/realms/dca/authenticate?authIndexType=service&authIndexValue=authn-web-v6

Ved at undersøge forespørgslen fra min browser under “Request”, kan jeg se at der sendes en JSON-datastruktur af sted, der minder meget om den, jeg lige har modtaget i trinnet før. Der er dog kommet lidt ekstra på. I listen med “callbacks” finder jeg mit brugernavn og nogle ret detaljerede oplysninger om mit system (fx styresystem, browser, et id-nummer og den slags).

Forespørgsel der svarer til svaret på forespørgslen i trin 2, bare med lidt ekstra oplysninger om blandt andet mit brugernavn.

I mit script reproducerer jeg datastrukturen ved hjælp af kopier/sæt ind fra browseren. Det bliver lidt kluntet, for jeg har svært ved at få værdien i det fjerde callback (som er en streng, men ligner JSON til forveksling) til at spille pænt med. Læg mærke til, at jeg indsætter variablen “user” i JSON-strukturen (step_three_json_struct) og at benytter det AuthId, som jeg fik tilbage på forespørgslen i trin 2:

identity = "{\"identifier\":\"BORTREDIGERET\",\"metadata\":{\"hardware\":{\"cpuClass\":null,\"deviceMemory\":null,\"hardwareConcurrency\":2,\"maxTouchPoints\":0,\"oscpu\":\"Windows NT 10.0; Win64; x64\",\"display\":{\"width\":2560,\"height\":771,\"pixelDepth\":24,\"angle\":0}},\"browser\":{\"userAgent\":\"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0\",\"appName\":\"Netscape\",\"appCodeName\":\"Mozilla\",\"appVersion\":\"5.0 (Windows)\",\"appMinorVersion\":null,\"buildID\":\"20181001000000\",\"product\":\"Gecko\",\"productSub\":\"20100101\",\"vendor\":\"\",\"vendorSub\":\"\",\"browserLanguage\":null,\"plugins\":\"internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;\"},\"platform\":{\"language\":\"en\",\"platform\":\"Win32\",\"userLanguage\":null,\"systemLanguage\":null,\"deviceName\":\"Windows (Browser)\",\"fonts\":\"BORTREDIGERET;\",\"timezone\":0}}}"

step_three_json_struct = json.loads('{"authId":"","callbacks":[{"type":"NameCallback","output":[{"name":"prompt","value":"User Name"}],"input":[{"name":"IDToken1","value":"' + user +'"}],"_id":0},{"type":"HiddenValueCallback","output":[{"name":"value","value":""},{"name":"id","value":"Enter SP"}],"input":[{"name":"IDToken2","value":"https://www.saxoinvestor.dk/investor"}],"_id":1},{"type":"HiddenValueCallback","output":[{"name":"value","value":""},{"name":"id","value":"Enter Application"}],"input":[{"name":"IDToken3","value":"SaxoInvestor"}],"_id":2},{"type":"DeviceProfileCallback","output":[{"name":"metadata","value":true},{"name":"location","value":false},{"name":"message","value":""}],"input":[{"name":"IDToken4","value":""}],"_id":3}],"stage":"loginIdPage","status":200,"ok":true}')

step_three_json_struct['authId'] = step_two_json['authId']
step_three_json_struct['callbacks'][3]['input'][0]['value'] = identity

Med data på plads, sender jeg forespørgslen med mit brugernavn af sted:

step_three = session.post(login_url, json=step_three_json_struct)
step_three_json = step_three.json()

Ligesom før inspicerer jeg step_three og step_three_json og konstaterer at min forespørgsel (efter et par forsøg på at få JSON-strukturen til at blive helt rigtig), lykkes.

Jeg får endnu en JSON-struktur tilbage med et nyt “authId” og nogle nye callbacks. Mon ikke den struktur skal bruges til næste trin i login-proceduren?

Svaret på forespørgslen i trin 3

Trin 4: Authenticate (3/3)

I trin 4 gentager jeg proceduren fra trin 3 med at gøre en JSON-datastruktur klar med copy/paste fra browseren, indsætte mit password i strengen, indsætte “authId” fra trinnet lige før og fyre min forespørgsel af sted:

step_four_json_struct = json.loads('{"authId":"","callbacks":[{"type":"MetadataCallback","output":[{"name":"data","value":{"authenticationAttributes":{"type":"loginid-password"}}}],"_id":4},{"type":"NameCallback","output":[{"name":"prompt","value":"User Name"}],"input":[{"name":"IDToken2","value":"' + user + '"}],"_id":5},{"type":"PasswordCallback","output":[{"name":"prompt","value":"Password"}],"input":[{"name":"IDToken3","value":"' + password + '"}],"_id":6},{"type":"ChoiceCallback","output":[{"name":"prompt","value":"LoginId or password?"},{"name":"choices","value":["loginid","password"]},{"name":"defaultChoice","value":1}],"input":[{"name":"IDToken4","value":1}],"_id":7}],"stage":"loginIdOrPasswordPage","status":200,"ok":true}')

step_four_json_struct['authId'] = step_three_json['authId']

step_four = session.post(login_url, json=step_four_json_struct)
step_four_json = step_four.json()

Når det lykkes, ser svaret fra serveren ca. sådan her ud:

Svaret på forespørgslen indeholder et “tokenId”, “successUrl” og “realms”.

Videre til trin 5!

Trin 5: Authorize

Trin 5 i login-proceduren går til en ny URL:

https://www.saxoinvestor.dk/am/oauth2/realms/root/realms/dca/authorize

Når jeg kigger på forespørgslen i min browser, kan jeg se at der både er kommet:

  • En ny header, der hedder “x-forgerock-transaction-id”
  • En ny cookie, der hedder “butterAndStocks” (hvem har godkendt DEN navngivning?), som blev sat i trin 4.
  • Noget formulardata, hvor især “crsf” og “state” ser ud til at have nogle ikke-generiske værdier.
Formulardata fra trin 5

For at undersøge, hvor alle de værdier kommer fra, søger jeg i min forespørgselshistorik i browseren på de strenge, der ikke er generiske, fx værdien fra “crsf”.

Jeg finder ud af:

  • Værdien af “crsf” er værdien af “tokenId”, som jeg fik i trin 4-forespørgslen

Værdien af “x-forgerock-transaction-id” finder jeg helt tilbage i den allerførste forespørgsel til:

https://www.saxoinvestor.dk/login/

I sidens kilde (Response > Raw i Developer Tools) finder jeg et script med en variabel, der hedder “window.config”, der indeholder en JSON-struktur. I den struktur er en nøgle, der hedder “correlationId”, og værdien af den nøgle er den samme som i “x-forgerock-transaction-id”. Helt tilbage mellem trin 0 og trin 1 indsætter jeg et besøg på login-siden, hvor jeg udtrækker værdien:

step_one = session.get('https://www.saxoinvestor.dk/Login/da')
step_one_text = step_one.text
search_string = '"correlationId":"'
start_of_correlation_id = step_one_text.index(search_string)+len(search_string)
end_of_correlation_id = step_one_text.index('"', start_of_correlation_id)
correlation_id = step_one_text[start_of_correlation_id:end_of_correlation_id]

Jeg har straks sværere ved at finde “state”-værdien tidligere i forespørgselshistorikken. Faktisk så svært, jeg slet ikke kan finde den.

Jeg søger på internettet og finder ud af, at det er en sikkerhedsparameter, og at det typisk er tilfældigt genereret. I Googles eksempler kan jeg se, at det typisk bliver kodet til fx hex-værdier, og det giver mig idéen om, at jeg måske kan dekode state-værdien. Jeg gætter på, at der er tale om base64-kodning, forsøger at dekode værdien og går fra:

eyJhcHBJZCI6ImludmVzdG8yIiwiY29ycmVsYXRpb25JZCI6IjA1NMNkNjE3LTM5YzQtNGRjNS02OGEwLWY2Y2ViMDZlNDMzNi1yNjEzNiJ9 (lettere redigeret)

Til:

{"appId":"investor","correlationId":"054cf617-39c4-4dc5-28a0-f6ceb03e4336-26136"} (lettere redigeret)

I den dekodede streng, kan jeg se at denne værdi også svarer til “correlationId” tilbage fra besøget på loginsiden i trin 1. Jeg kan derfor generere en værdi til “state” sådan her:

state_string = '{"appId":"investor","correlationId":"' + correlation_id + '"}'
state_string_b64_encoded = base64.b64encode(bytes(state_string, 'utf-8')).decode()

Med det på plads er jeg klar til at kode trin 5. I browseren kan jeg se, at jeg gerne først skulle få en viderestilling (http-kode 302) og dernæst endnu et JSON-svar:

I trin 5 bliver jeg viderestillet og får et JSON-svar med “code” og “state”

Den del af mit script, der gør klar til at udføre forespørgslen og udfører forespørgslen i trin 5, ser sådan her ud:

auth_url = 'https://www.saxoinvestor.dk/am/oauth2/realms/root/realms/dca/authorize'

data = 'csrf=' + step_four_json['tokenId'] + '&scope=openid%20profile%20openapi%20fr%3Aidm%3A*&response_type=code&client_id=SaxoInvestorPlatform&redirect_uri=https%3A%2F%2Fwww.saxoinvestor.dk%2Fapi%2Flogin%2Fcode&decision=allow&state=' + state_string_b64_encoded

session.headers['X-ForgeRock-TransactionId'] = correlation_id
session.headers['Content-Type'] = 'application/x-www-form-urlencoded'

step_five = session.post(auth_url, data=data)
step_five_json = step_five.json()

…Men vi skal videre.

Trin 6: Bearer token

Fra mit tidligere script – og ved at gå lidt frem i forespørgselshistorikken – kan jeg se, at jeg har brug for et “Bearer token” for at lave forespørgsler i API’et. Det bliver sendt i en header, der hedder “Authorization” og består af stringen “Bearer”, et mellemrum, og en lang streng af en masse forskellige tegn.

Forespørgsel til API’et med et Bearer token

Jeg søger i strengen fra “Bearer token” i forespørgselshistorikken i browseren, og finder ud af, at det først dukker op i kildekoden for siden:

https://www.saxoinvestor.dk/showapp

I forespørgselshistorikken kan jeg se, at denne side besøges umiddelbart efter trin 5. Min browser laver en tom forespørgsel af typen GET med nogle parametre, nemlig “code”, som jeg har fået fat i fra trin 5, og state, som jeg fik fat i helt tilbage i trin 1.

Forespørgslen til “showapp” med parametrene “code” og “state”.

Selve mit “Bearer token” befinder sig i (endnu) en window.config-variabel i et script i kildekoden og hedder “idToken”.

I min kode sletter jeg mine headers fra trin 5, laver en url til “showapp”-siden, besøger siden, udtrækker mit “Bearer token” af kildekoden og genererer en streng til brug for API-headeren:

del session.headers['X-ForgeRock-TransactionId']
del session.headers['Content-Type']

step_six_code = step_five_json['code']
step_six_url = f'https://www.saxoinvestor.dk/showapp?code={step_six_code}&state={state_string_b64_encoded}'

step_six = session.get(step_six_url)
step_six_text = step_six.text

search_string = ',"idToken":"'
start_of_bearer_token = step_six_text.index(search_string)+len(search_string)
end_of_bearer_token = step_six_text.index('"', start_of_bearer_token)
bearer_token = step_six_text[start_of_bearer_token:end_of_bearer_token]
bearer_string = 'Bearer ' + bearer_token

Trin 7: Hente data

Med loginproceduren overstået, er jeg klar til at hente mine transaktioner. Her kan jeg genbruge kode fra tidligere. Jeg bruger en frisk session, da jeg ikke længere har brug for diverse cookies:

# Start session and set bearer token as header
api_session = requests.Session()
api_session.headers['Authorization'] = bearer_string

# First API request gets Client Key which is used for most API calls
# See https://www.developer.saxo/openapi/learn/the-tutorial for expected return data
url = 'https://www.saxoinvestor.dk/openapi/port/v1/clients/me'
api_query = api_session.get(url)
clientdata = api_query.json()
clientkey = clientdata['ClientKey']

# Extract transactions
url = f'https://www.saxotrader.com/openapi/hist/v1/transactions?ClientKey={clientkey}&FromDate={startdate}&ToDate={enddate}'
transactions = api_session.get(url)
transactions_json = transactions.json()

Det færdige program

Det færdige script finder du her. Jeg har ryddet lidt op i det ift. gennemgangen oven for. Før du bruger scriptet, skal du logge ind i Saxo Bank og bruge udviklerværktøjerne i din browser, til at kopiere nogle værdier ind i scriptet. Jeg har forsøgt at hjælpe dig på vej i kommentarerne.

God fornøjelse.

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
"""This script logs into a Saxo Bank account and performs a query to extract transactions.
In order to use the program, first log in to Saxo Bank in a browser. When prompted for a two
factor code, enter code and select "trust this device". Next, log out and open developer tools
in your browser (F12). Visit the Saxo Bank login page again and log in. This time you should not
be prompted for a two factor login code. Go through the request history in the network tab of
developer tools and insert the relevant values in the script below. I have marked the parts 
you need to edit with "EDIT THIS PART". Once you're done, run the script."""

import requests 
from datetime import datetime
from datetime import date
import json
import base64

# CONFIGURATION #

# EDIT THIS PART
# Saxo user account credentials
user = 'USERNAME'
password = 'PASSWORD'

# EDIT THIS PART (AS NEEDED)
# Start date (start of period for transactions) and date today used for extraction of transactions
startdate = '2019-01-01'
today = date.today()
enddate = datetime.strftime(today, '%Y-%m-%d')


# LOGIN TO SAXO BANK #

# Start requests session and set user agent
session = requests.Session()
# EDIT THIS PART
# Copy the user agent header value from your browser
session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; rv:122.0) Gecko/20100101 Firefox/122.0'

# Step one: Fetch login page and get correlation id from page source
step_one = session.get('https://www.saxoinvestor.dk/Login/da')
step_one_text = step_one.text
search_string = '"correlationId":"'
start_of_correlation_id = step_one_text.index(search_string)+len(search_string)
end_of_correlation_id = step_one_text.index('"', start_of_correlation_id)
correlation_id = step_one_text[start_of_correlation_id:end_of_correlation_id]

# Insert correlation id in state string and encode it as base 64 for later use
state_string = '{"appId":"investor","correlationId":"' + correlation_id + '"}'
state_string_b64_encoded = base64.b64encode(bytes(state_string, 'utf-8')).decode()

# Step two: Start authentication part one of three

login_url = 'https://www.saxoinvestor.dk/am/json/realms/root/realms/dca/authenticate?authIndexType=service&authIndexValue=authn-web-v6'

step_two = session.post(login_url)
step_two_json = step_two.json()

# Step three: Prepare data structure and perform authentication part two of three

# EDIT THIS PART
# Copy this value from the second request to login_url in your browser. Use the
# "Raw" request payload to copy from. Your string should look similar to the one here
identity = "{\"identifier\":\"EDITED\",\"metadata\":{\"hardware\":{\"cpuClass\":null,\"deviceMemory\":null,\"hardwareConcurrency\":2,\"maxTouchPoints\":0,\"oscpu\":\"Windows NT 10.0; Win64; x64\",\"display\":{\"width\":2560,\"height\":771,\"pixelDepth\":24,\"angle\":0}},\"browser\":{\"userAgent\":\"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0\",\"appName\":\"Netscape\",\"appCodeName\":\"Mozilla\",\"appVersion\":\"5.0 (Windows)\",\"appMinorVersion\":null,\"buildID\":\"20181001000000\",\"product\":\"Gecko\",\"productSub\":\"20100101\",\"vendor\":\"\",\"vendorSub\":\"\",\"browserLanguage\":null,\"plugins\":\"internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;internal-pdf-viewer;\"},\"platform\":{\"language\":\"en\",\"platform\":\"Win32\",\"userLanguage\":null,\"systemLanguage\":null,\"deviceName\":\"Windows (Browser)\",\"fonts\":\"EDITED;\",\"timezone\":0}}}"

step_three_json_struct = json.loads('{"authId":"","callbacks":[{"type":"NameCallback","output":[{"name":"prompt","value":"User Name"}],"input":[{"name":"IDToken1","value":"' + user +'"}],"_id":0},{"type":"HiddenValueCallback","output":[{"name":"value","value":""},{"name":"id","value":"Enter SP"}],"input":[{"name":"IDToken2","value":"https://www.saxoinvestor.dk/investor"}],"_id":1},{"type":"HiddenValueCallback","output":[{"name":"value","value":""},{"name":"id","value":"Enter Application"}],"input":[{"name":"IDToken3","value":"SaxoInvestor"}],"_id":2},{"type":"DeviceProfileCallback","output":[{"name":"metadata","value":true},{"name":"location","value":false},{"name":"message","value":""}],"input":[{"name":"IDToken4","value":""}],"_id":3}],"stage":"loginIdPage","status":200,"ok":true}')

step_three_json_struct['authId'] = step_two_json['authId']
step_three_json_struct['callbacks'][3]['input'][0]['value'] = identity

step_three = session.post(login_url, json=step_three_json_struct)
step_three_json = step_three.json()

# Step four: Prepare data structure and perform authentication part three of three
step_four_json_struct = json.loads('{"authId":"","callbacks":[{"type":"MetadataCallback","output":[{"name":"data","value":{"authenticationAttributes":{"type":"loginid-password"}}}],"_id":4},{"type":"NameCallback","output":[{"name":"prompt","value":"User Name"}],"input":[{"name":"IDToken2","value":"' + user + '"}],"_id":5},{"type":"PasswordCallback","output":[{"name":"prompt","value":"Password"}],"input":[{"name":"IDToken3","value":"' + password + '"}],"_id":6},{"type":"ChoiceCallback","output":[{"name":"prompt","value":"LoginId or password?"},{"name":"choices","value":["loginid","password"]},{"name":"defaultChoice","value":1}],"input":[{"name":"IDToken4","value":1}],"_id":7}],"stage":"loginIdOrPasswordPage","status":200,"ok":true}')

step_four_json_struct['authId'] = step_three_json['authId']

step_four = session.post(login_url, json=step_four_json_struct)
step_four_json = step_four.json()

# Step five: Authenticate
auth_url = 'https://www.saxoinvestor.dk/am/oauth2/realms/root/realms/dca/authorize'

data = 'csrf=' + step_four_json['tokenId'] + '&scope=openid%20profile%20openapi%20fr%3Aidm%3A*&response_type=code&client_id=SaxoInvestorPlatform&redirect_uri=https%3A%2F%2Fwww.saxoinvestor.dk%2Fapi%2Flogin%2Fcode&decision=allow&state=' + state_string_b64_encoded

session.headers['X-ForgeRock-TransactionId'] = correlation_id
session.headers['Content-Type'] = 'application/x-www-form-urlencoded'

step_five = session.post(auth_url, data=data)
step_five_json = step_five.json()

del session.headers['X-ForgeRock-TransactionId']
del session.headers['Content-Type']

# Step six: Open app website and extract API bearer token
step_six_code = step_five_json['code']
step_six_url = f'https://www.saxoinvestor.dk/showapp?code={step_six_code}&state={state_string_b64_encoded}'

step_six = session.get(step_six_url)
step_six_text = step_six.text

search_string = ',"idToken":"'
start_of_bearer_token = step_six_text.index(search_string)+len(search_string)
end_of_bearer_token = step_six_text.index('"', start_of_bearer_token)
bearer_token = step_six_text[start_of_bearer_token:end_of_bearer_token]
bearer_string = 'Bearer ' + bearer_token


# Perform API calls #
# Documentation at https://www.developer.saxo/openapi/learn

# Start new session and set bearer token as header
api_session = requests.Session()
api_session.headers['Authorization'] = bearer_string

# First API request gets Client Key which is used for most API calls
# See https://www.developer.saxo/openapi/learn/the-tutorial for expected return data
url = 'https://www.saxoinvestor.dk/openapi/port/v1/clients/me'
api_query = api_session.get(url)
clientdata = api_query.json()
clientkey = clientdata['ClientKey']

# Extract transactions
url = f'https://www.saxotrader.com/openapi/hist/v1/transactions?ClientKey={clientkey}&FromDate={startdate}&ToDate={enddate}'
transactions = api_session.get(url)
transactions_json = transactions.json()

Hvordan gik det egentlig med alle de domæner?

Jeg har flere gange skrevet om min jagt på en nogenlunde opdateret og komplet liste over danske domænenavne.

Her kommer en status.

https://wallnot.dk/dotdk/ har jeg samlet en liste over 1.3 millioner danske domænenavne, som allesammen har været aktive og registreret i perioden fra sådan cirka sommeren 2022 til starten af 2023.

Nogle er udløbet, nye er kommet til, men om ikke andet er der tale om den største offentligt tilgængelige database over danske domænenavne, som frit kan downloades.

Jeg har planer om at sætte noget i gang, der kan holde oversigten opdateret, men jeg har ikke haft tid til at bygge det. Og jeg vil stadigvæk gerne forsøge en form for maskinlæringsstøttet klassifikation af domænerne, der kan gøre det nemt at finde de domæner, der ikke rigtig bliver brugt til noget, og som måske kunne bruges til dine (og mine) gode idéer…

Hent data om dit elforbrug fra eloverblik.dk

For et stykke tid siden blev jeg kontaktet af en, der havde brug for hjælp til at hive data ud fra https://eloverblik.dk ved hjælp af sidens API. Derfor har jeg (ligesom de fleste andre danskere) skrevet et program i Python, der kan hjælpe med at hive data ud.

Det var sjovt at lave, for jeg fik øvet mig i at skrive et program, hvor man ved hjælp af argumenter i kommandolinjen, kan få programmet til at gøre nogle forskellige ting, fx at udtrække en liste over ens målere, kun hente data fra en bestemt elmåler, hente data med forskellig opløsning (time, dag, uge, osv.), og et par andre ting.

Jeg synes selv, at det er blevet ret brugervenligt af et kommandolinjeprogram af være.

Måske kan du bruge det? Du finder koden på GitHub og lige herunder:

# A utility to interact with the api from eloverblik.dk
# By Morten Helmstedt, https://helmstedt.dk
# API documentation:
# https://api.eloverblik.dk/CustomerApi/swagger/index.html
# https://www.niras.dk/media/4vbbvkig/eloverblik-adgang-til-egne-data-via-api-kald-forkortet-1.pdf
import argparse
import csv
from datetime import date, datetime, timedelta
import os
from os.path import exists
import pickle
import requests
import sys
import time
from zoneinfo import ZoneInfo

# Set token filename
token_filename = 'eloverblik.token'
data_access_token_filename = 'eloverblik_data_access.token'

# Number of API retries (API often returns 503 errors)
api_retries = 10

# API base url
base_url = 'https://api.eloverblik.dk/CustomerApi/api/'

# Get today's date
today = date.today()

# Prepare session for requests
session = requests.Session()

# Set session headers
session.headers = {
    'Accept': 'application/json',
    'Accept-Encoding': 'gzip, deflate, br',
    'Host': 'api.eloverblik.dk',
    'User-Agent': 'Eloverblik-Python'
}

# Gets a saved data token if it is not too old, alternatively gets a new token
def get_or_set_data_access_token(token):
    # If an existing data access token is less than 12 hours old, use it and return
    if exists(data_access_token_filename):
        with open(data_access_token_filename, 'rb') as data_access_token_file:
            save_time_and_token = pickle.load(data_access_token_file)
            if not datetime.now() - save_time_and_token[0] > timedelta(hours=12):
                print('Existing data access token found. Using this token.')
                session.headers['Authorization'] = 'Bearer ' + save_time_and_token[1]
                return
    # Data access token does not exist or is too old
    # Check whether API is alive
    print('Checking API status...')
    get_api_status = get_endpoint('isalive')
    if get_api_status == True:
        print('API reports that it is up')
        # Get data access token for subsequent requests
        print('Getting data access token...')
        session.headers['Authorization'] = 'Bearer ' + token
        token_get_time = datetime.now()
        get_data_access_token = get_endpoint('token')
        # Request succesful
        if get_data_access_token:
            print('Got data access token')
            data_access_token = get_data_access_token['result']
            # Save token to file with get time
            with open(data_access_token_filename, 'wb') as data_access_token_file:
                pickle.dump([token_get_time, data_access_token], data_access_token_file)
            session.headers['Authorization'] = 'Bearer ' + data_access_token
        # Request failed
        else:
            sys.exit('Error: Unable to get data access token. Exiting.')
    # API is down
    else:
        sys.exit('Error: API is down. Exiting.')

# Request an endpoint and return data
def get_endpoint(endpoint, json=None):
    tries = 1
    while tries <= api_retries:
        if not json:
            response = session.get(base_url + endpoint, timeout=10)
        else:
            response = session.post(base_url + endpoint, json=json, timeout=10)
        # Succesful request
        if response.status_code == 200:
            return response.json()
        # Unsuccesful request, try again after 1 second
        elif response.status_code == 429 or response.status_code == 503:
            tries += 1
            time.sleep(1)
        elif response.status_code == 403:
            print(f'API reports a 403 forbidden error. Please check your token is correct')
        else:
            print(f'API returned an unknown status code')
            print(f'Latest API response status code was: {response.status_code}')
            print(f'Latest API response content was: {response.text}')
            sys.exit('API request failed. Exiting.')
    if tries > api_retries:
        print(f'API request did not succeed after {api_retries} attempts')
        print(f'Latest API response status code was: {response.status_code}')
        print(f'Latest API response content was: {response.text}')
        sys.exit('API request failed. Exiting.')

# Lists all metering points
def list_meters():
    print('Getting list of meters...')
    get_metering_points = get_endpoint('meteringpoints/meteringpoints')
    print(f'Found {len(get_metering_points["result"])} meter(s)')
    print('Printing list of meter(s)...\n')
    for meter in get_metering_points['result']:
        meter_count = 1
        print(f'--- Meter {meter_count} ---')
        for key, value in meter.items():
            print(key, ':', value)
        print('---')
        meter_count += 1
    sys.exit('All meters printed. Exiting.')

# Gets and saves metering point electricity usage data as a csv file
def get_usage_data(meter_ids, args, periods):
    print('Starting to save usage data...')
    # Prepare csv file for writing
    with open('eloverblik_usage_data.csv', 'w', newline='') as csvfile:
        fieldnames = ['meter_id', 'resolution', 'timestart_utc', 'timestart_denmark', 'timeend_utc', 'timeend_denmark', 'point_position', 'point_out_quantity', 'point_out_quality']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for meter_id in meter_ids:
            print(f'Getting and saving usage data for meter id {meter_id}...')
            meter_json = {
                "meteringPoints": {
                    "meteringPoint": [
                        meter_id
                    ]
                }
            }
            for date_period in periods:
                print(f'Saving usage date for period {date_period[0]} to {date_period[1]}...')
                usage_data_endpoint = 'meterdata/gettimeseries/' + date_period[0] + '/' + date_period[1] + '/' + args.aggregation
                get_meter_usage_data = get_endpoint(usage_data_endpoint, meter_json)
                for result in get_meter_usage_data['result']:
                    for time_serie in result['MyEnergyData_MarketDocument']['TimeSeries']:
                        for period in time_serie['Period']:
                            resolution = period['resolution']
                            timestart_utc = period['timeInterval']['start']
                            timestart_datetime = datetime.strptime(timestart_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
                            timestart_denmark = timestart_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
                            timestart_denmark_str = datetime.strftime(timestart_denmark, '%Y-%m-%dT%H:%M:%S')
                            timeend_utc = period['timeInterval']['end']
                            timeend_datetime = datetime.strptime(timeend_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
                            timeend_denmark = timeend_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
                            timeend_denmark_str = datetime.strftime(timeend_denmark, '%Y-%m-%dT%H:%M:%S')
                            period_rows = [
                                {
                                    'meter_id': meter_id,
                                    'resolution': resolution,
                                    'timestart_utc': timestart_utc,
                                    'timestart_denmark': timestart_denmark_str,
                                    'timeend_utc': timeend_utc,
                                    'timeend_denmark': timeend_denmark_str,
                                    'point_position': point['position'],
                                    'point_out_quantity': str(point['out_Quantity.quantity']).replace('.',','),
                                    'point_out_quality': point['out_Quantity.quality']
                                }
                                for point in period['Point']
                            ]
                            writer.writerows(period_rows)
                print(f'Saved usage date for period {date_period[0]} to {date_period[1]}')
            print(f'Saved usage data for meter {meter_id}')
        print(f'Saved usage data for meter(s)')    

# Gets and saves metering point electricity charges data as a csv file
def get_charges_data(meter_ids):
    print('Starting to save charges data...')
    # Prepare csv file for writing
    with open('eloverblik_charges_data.csv', 'w', newline='') as csvfile:
        fieldnames = ['meter_id', 'chargetype', 'name', 'description', 'owner', 'validfromdate', 'validtodate', 'periodtype', 'position', 'price', 'quantity']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for meter_id in meter_ids:
            print(f'Getting and saving charges data for meter id {meter_id}...')
            meter_json = {
                "meteringPoints": {
                    "meteringPoint": [
                        meter_id
                    ]
                }
            }
            charges_data_endpoint = 'meteringpoints/meteringpoint/getcharges'
            get_meter_charges_data = get_endpoint(charges_data_endpoint, meter_json)
            for result in get_meter_charges_data['result']:
                for item in result['result']['fees']:
                    chargetype = 'fee'
                    subscription_row = {
                        'meter_id': meter_id,
                        'chargetype': chargetype,
                        'name': item['name'],
                        'description': item['description'],
                        'owner': item['owner'],
                        'validfromdate': item['validFromDate'],
                        'validtodate': item['validToDate'],
                        'periodtype': item['periodType'],
                        'position': '',
                        'price': str(item['price']).replace('.',','),
                        'quantity': item['quantity']
                    }
                    writer.writerow(subscription_row)
                for item in result['result']['subscriptions']:
                    chargetype = 'subscription'
                    subscription_row = {
                        'meter_id': meter_id,
                        'chargetype': chargetype,
                        'name': item['name'],
                        'description': item['description'],
                        'owner': item['owner'],
                        'validfromdate': item['validFromDate'],
                        'validtodate': item['validToDate'],
                        'periodtype': item['periodType'],
                        'position': '',
                        'price': str(item['price']).replace('.',','),
                        'quantity': item['quantity']
                    }
                    writer.writerow(subscription_row)
                for item in result['result']['tariffs']:
                    chargetype = 'tariff'
                    name = item['name']
                    description = item['description']
                    owner = item['owner']
                    validfromdate = item['validFromDate']
                    validtodate = item['validToDate']
                    periodtype = item['periodType']
                    tariff_rows = [
                        {
                            'meter_id': meter_id,
                            'chargetype': chargetype,
                            'name': name,
                            'description': description,
                            'owner': owner,
                            'validfromdate': validfromdate,
                            'validtodate': validtodate,
                            'periodtype': periodtype,
                            'position': point['position'],
                            'price': str(point['price']).replace('.',','),
                            'quantity': ''
                        }
                        for point in item['prices']
                    ]
                    writer.writerows(tariff_rows)
            print(f'Saved charges data for meter {meter_id}')
        print(f'Saved charges data for meter(s)')      

# Main program logic
def main():
    # Define and load parser arguments
    parser = argparse.ArgumentParser(description='Get data on electricity usage from eloverblik.dk')
    parser.add_argument('-m', '--mode', help='Mode: List meters or get data ', type=str, choices=['list', 'get'], required=True)
    parser.add_argument('-n', '--meterid', help='Get data from this specific meter in get mode', type=str)
    parser.add_argument('-a', '--aggregation', help='Get timeseries data with this aggregation in get mode', choices=['Actual', 'Quarter', 'Hour', 'Day', 'Month', 'Year'], default='Actual', type=str)
    parser.add_argument('-f', '--fromdate', help='Get data from this date in get mode, format yyyy-mm-dd', type=str)
    parser.add_argument('-t', '--todate', help='Get data to and including this date in get mode, format yyyy-mm-dd', type=str)
    parser.add_argument('-d', '--deletetoken', help='Delete existing token file', action='store_true')
    parser.add_argument('-r', '--refreshdatatoken', help='Force refresh of data access token by deleting token file', action='store_true')
    args = parser.parse_args()

    # Delete token file if set as argument
    if args.deletetoken:
        print('Deleting existing token file if it exists')
        os.remove(token_filename)

    # Delete data token file if set as argument
    if args.refreshdatatoken:
        print('Deleting existing data access token file if it exists')
        os.remove(data_access_token_filename)

    # Load or save token
    if not exists(token_filename):
        print('No token from eloverblik.dk saved. Paste your token here.')
        token = str(input('Token: '))
        with open(token_filename, 'wb') as token_file:
            pickle.dump(token, token_file)
    else:
        with open(token_filename, 'rb') as token_file:
            token = pickle.load(token_file)
  
    # If mode is list meters, get a list of meters
    if args.mode == 'list':
        print('Listing available meters...')
        # Get data access token
        get_or_set_data_access_token(token)
        # List meters
        list_meters()
    # If mode is get data, get data
    elif args.mode == 'get':
        # Date argument validation
        if args.fromdate and not args.todate or args.todate and not args.fromdate:
            sys.exit('Error: You must specify both a from date and a to date. Exiting.')
        try:
            from_date = datetime.strptime(args.fromdate, '%Y-%m-%d').date()
            to_date = datetime.strptime(args.todate, '%Y-%m-%d').date()
            if from_date > to_date:
                sys.exit('Error: Your from date cannot be after your to date. Exiting.')
            elif from_date == to_date:
                sys.exit('Error: Your from date cannot be the same as your to date. Exiting.')
            elif from_date > today:
                sys.exit('Error: Your from date cannot be after today. Exiting.')
            elif to_date > today + timedelta(days=1):
                sys.exit('Error: Your to date cannot be later than one day after today. Exiting.')
        except ValueError:
            sys.exit('Error: From or to date in invalid format. Format must be yyyy-mm-dd with no quotes. Exiting.')

        # Periods must be a maximum of 730 days, so longer periods are sliced into smaller pieces
        if to_date > from_date + timedelta(days=730):
            periods = []
            start_of_period = from_date
            slice_finished = False
            while slice_finished == False:
                end_of_period = start_of_period + timedelta(days=730)
                if end_of_period <= to_date:
                    periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
                    start_of_period = end_of_period + timedelta(days=1)
                else:
                    end_of_period = to_date
                    periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
                    slice_finished = True
        # Smaller periods are saved as a list in a list to use the same for loop later
        else:
            periods = [[args.fromdate, args.todate]]

        print('Getting data...')

        # Get data access token
        get_or_set_data_access_token(token)

        # Specifik meter id is set by user
        if args.meterid:
            meter_ids = [args.meterid]
        # Meter id argument is not set, so list of meters is fetched and listed
        else:
            # Get ids of meters
            print('Getting list of meters...')
            get_metering_points = get_endpoint('meteringpoints/meteringpoints')
            print(f'Found {len(get_metering_points["result"])} meters')
            meter_ids = [meter['meteringPointId'] for meter in get_metering_points['result']]

        if meter_ids:
            # Get data from meters
            print('Getting and saving usage and charges data for meter(s)...')
            # Get usage data
            get_usage_data(meter_ids, args, periods)
            # Get charges data
            get_charges_data(meter_ids)
            # Print status
            print('Saved usage and charges data for meter(s)')
        else:
            sys.exit('Error: Did not find any meters, so no data to fetch. Exiting.')

if __name__ == '__main__':
    main()

Høst domænenavne med certifikatlogs

I På jagt efter danske domænenavne skrev jeg om en smart metode, jeg har fundet til at finde de .dk-domænenavne, som autoriteterne ikke ville dele med mig.

Måske er der andre end mig, der er interesseret i at holde lidt øje med nye steder på internettet?

Her er i hvert fald en lille opskrift på, hvordan man gør:

1. Hent Axeman

Axeman er et program, der hjælper med at automatisere og parallellisere downloads af certifikatlogs. Jeg måtte rette det en lille smule for at få det til at køre, da det vist ikke vedligeholdes aktivt. “Min” udgave finder du her:

https://github.com/helmstedt/Axeman

2. Download en masse logs

Axeman gemmer certifikatlogs som csv-filer. Skriv fx…

axeman -u 'ct.googleapis.com/logs/argon2022'

…for at hente Googles 2022-logs. Det gik ikke specielt hurtigt hos mig, men det virkede.

3. Gennemgå log-filer for .dk-domæner

Jeg skrev et lille program, der søger logfiler igennem for .dk-domæner (der kommer også nogle andre domæner med engang imellem, hvis der er “.dk” et sted i domænet). Det ser sådan her ud:

import os
from pathlib import Path
import csv
 
csv_dir = 'PATH_TO_LOG_FILES'
paths = Path(csv_dir).iterdir()

for file_path in paths:
    # Open and process csv file
    with open(file_path, 'rt') as csv_input:
        print('Processing: ', file_path)
        reader = csv.reader(csv_input)
        for row in reader:
            host = row[4]
            if '.dk' in host:
                print('Found: ', host)
                with open('PATH_TO_OUTPUT_FILE.CSV', 'a') as dotdk_output:
                    dotdk_output.write(host + '\n')
    # Delete csv file after processing
    os.remove(file_path)

Det eneste lidt obskure i programmet er måske hvad der gemmer sig i row[4]? Den indeholder en liste over de domæner og subdomæner, det enkelte certifikat er udstedt til. Elementerne i listen er adskilt med mellemrum.

4. Filtrer listen, så der kun kommer ægte, unikke .dk-domæner med

Efter grovsorteringen, skrev jeg endnu et lille program, der finsorterer. Det ser sådan her ud:

domains_in_file = set()
with open('PATH_TO_INPUT_FILE.CSV', 'rt') as file_input:
    for index, row in enumerate(file_input):
        items = row.split(' ')
        for item in items:
            item = item.replace('*.','').replace('\n','')
            matches = re.findall(r'([^.]*$)', item)
            dk_domain = ''
            if matches[0] == 'dk':
                dk_domain = item.split('.')
                dk_domain = dk_domain[-2] + '.' + dk_domain[-1]
                dk_domain = dk_domain.lower()
    with open('PATH_TO_OUTPUT_FILE.BIN', 'wb') as unique_domains_file:
        pickle.dump(domains_in_file, unique_domains_file)

Her er koden nok lidt mere obskur. Jeg tager hvert element i hver række fra CSV-filen i sidste trin og:

  • Fjerner evt. wildcard (*.) og linjeskift (\n) fra værtsnavnet
  • Finder domænesuffixet med et regulær udtryk
  • Hvis domænesuffixet er .dk, splitter jeg hvert enkelt element i værstnavnet op i en liste
  • Konstruerer selve domænenavnet ved at sætte det næstsidste (fx helmstedt) og sidste element (.dk) sammen
  • Sørger for en sikkerheds skyld for, at konvertere domænenavnet til små bogstaver

5. Nyd dit kendskab til nye og gamle domænenavne (der har fået udstedt SSL-certifikater)