Hent data om dit elforbrug fra eloverblik.dk

For et stykke tid siden blev jeg kontaktet af en, der havde brug for hjælp til at hive data ud fra https://eloverblik.dk ved hjælp af sidens API. Derfor har jeg (ligesom de fleste andre danskere) skrevet et program i Python, der kan hjælpe med at hive data ud.

Det var sjovt at lave, for jeg fik øvet mig i at skrive et program, hvor man ved hjælp af argumenter i kommandolinjen, kan få programmet til at gøre nogle forskellige ting, fx at udtrække en liste over ens målere, kun hente data fra en bestemt elmåler, hente data med forskellig opløsning (time, dag, uge, osv.), og et par andre ting.

Jeg synes selv, at det er blevet ret brugervenligt af et kommandolinjeprogram af være.

Måske kan du bruge det? Du finder koden på GitHub og lige herunder:

# A utility to interact with the api from eloverblik.dk
# By Morten Helmstedt, https://helmstedt.dk
# API documentation:
# https://api.eloverblik.dk/CustomerApi/swagger/index.html
# https://www.niras.dk/media/4vbbvkig/eloverblik-adgang-til-egne-data-via-api-kald-forkortet-1.pdf
import argparse
import csv
from datetime import date, datetime, timedelta
import os
from os.path import exists
import pickle
import requests
import sys
import time
from zoneinfo import ZoneInfo

# Set token filename
token_filename = 'eloverblik.token'
data_access_token_filename = 'eloverblik_data_access.token'

# Number of API retries (API often returns 503 errors)
api_retries = 10

# API base url
base_url = 'https://api.eloverblik.dk/CustomerApi/api/'

# Get today's date
today = date.today()

# Prepare session for requests
session = requests.Session()

# Set session headers
session.headers = {
    'Accept': 'application/json',
    'Accept-Encoding': 'gzip, deflate, br',
    'Host': 'api.eloverblik.dk',
    'User-Agent': 'Eloverblik-Python'
}

# Gets a saved data token if it is not too old, alternatively gets a new token
def get_or_set_data_access_token(token):
    # If an existing data access token is less than 12 hours old, use it and return
    if exists(data_access_token_filename):
        with open(data_access_token_filename, 'rb') as data_access_token_file:
            save_time_and_token = pickle.load(data_access_token_file)
            if not datetime.now() - save_time_and_token[0] > timedelta(hours=12):
                print('Existing data access token found. Using this token.')
                session.headers['Authorization'] = 'Bearer ' + save_time_and_token[1]
                return
    # Data access token does not exist or is too old
    # Check whether API is alive
    print('Checking API status...')
    get_api_status = get_endpoint('isalive')
    if get_api_status == True:
        print('API reports that it is up')
        # Get data access token for subsequent requests
        print('Getting data access token...')
        session.headers['Authorization'] = 'Bearer ' + token
        token_get_time = datetime.now()
        get_data_access_token = get_endpoint('token')
        # Request succesful
        if get_data_access_token:
            print('Got data access token')
            data_access_token = get_data_access_token['result']
            # Save token to file with get time
            with open(data_access_token_filename, 'wb') as data_access_token_file:
                pickle.dump([token_get_time, data_access_token], data_access_token_file)
            session.headers['Authorization'] = 'Bearer ' + data_access_token
        # Request failed
        else:
            sys.exit('Error: Unable to get data access token. Exiting.')
    # API is down
    else:
        sys.exit('Error: API is down. Exiting.')

# Request an endpoint and return data
def get_endpoint(endpoint, json=None):
    tries = 1
    while tries <= api_retries:
        if not json:
            response = session.get(base_url + endpoint, timeout=10)
        else:
            response = session.post(base_url + endpoint, json=json, timeout=10)
        # Succesful request
        if response.status_code == 200:
            return response.json()
        # Unsuccesful request, try again after 1 second
        elif response.status_code == 429 or response.status_code == 503:
            tries += 1
            time.sleep(1)
        elif response.status_code == 403:
            print(f'API reports a 403 forbidden error. Please check your token is correct')
        else:
            print(f'API returned an unknown status code')
            print(f'Latest API response status code was: {response.status_code}')
            print(f'Latest API response content was: {response.text}')
            sys.exit('API request failed. Exiting.')
    if tries > api_retries:
        print(f'API request did not succeed after {api_retries} attempts')
        print(f'Latest API response status code was: {response.status_code}')
        print(f'Latest API response content was: {response.text}')
        sys.exit('API request failed. Exiting.')

# Lists all metering points
def list_meters():
    print('Getting list of meters...')
    get_metering_points = get_endpoint('meteringpoints/meteringpoints')
    print(f'Found {len(get_metering_points["result"])} meter(s)')
    print('Printing list of meter(s)...\n')
    for meter in get_metering_points['result']:
        meter_count = 1
        print(f'--- Meter {meter_count} ---')
        for key, value in meter.items():
            print(key, ':', value)
        print('---')
        meter_count += 1
    sys.exit('All meters printed. Exiting.')

# Gets and saves metering point electricity usage data as a csv file
def get_usage_data(meter_ids, args, periods):
    print('Starting to save usage data...')
    # Prepare csv file for writing
    with open('eloverblik_usage_data.csv', 'w', newline='') as csvfile:
        fieldnames = ['meter_id', 'resolution', 'timestart_utc', 'timestart_denmark', 'timeend_utc', 'timeend_denmark', 'point_position', 'point_out_quantity', 'point_out_quality']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for meter_id in meter_ids:
            print(f'Getting and saving usage data for meter id {meter_id}...')
            meter_json = {
                "meteringPoints": {
                    "meteringPoint": [
                        meter_id
                    ]
                }
            }
            for date_period in periods:
                print(f'Saving usage date for period {date_period[0]} to {date_period[1]}...')
                usage_data_endpoint = 'meterdata/gettimeseries/' + date_period[0] + '/' + date_period[1] + '/' + args.aggregation
                get_meter_usage_data = get_endpoint(usage_data_endpoint, meter_json)
                for result in get_meter_usage_data['result']:
                    for time_serie in result['MyEnergyData_MarketDocument']['TimeSeries']:
                        for period in time_serie['Period']:
                            resolution = period['resolution']
                            timestart_utc = period['timeInterval']['start']
                            timestart_datetime = datetime.strptime(timestart_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
                            timestart_denmark = timestart_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
                            timestart_denmark_str = datetime.strftime(timestart_denmark, '%Y-%m-%dT%H:%M:%S')
                            timeend_utc = period['timeInterval']['end']
                            timeend_datetime = datetime.strptime(timeend_utc, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=ZoneInfo('UTC'))
                            timeend_denmark = timeend_datetime.astimezone(ZoneInfo('Europe/Copenhagen'))
                            timeend_denmark_str = datetime.strftime(timeend_denmark, '%Y-%m-%dT%H:%M:%S')
                            period_rows = [
                                {
                                    'meter_id': meter_id,
                                    'resolution': resolution,
                                    'timestart_utc': timestart_utc,
                                    'timestart_denmark': timestart_denmark_str,
                                    'timeend_utc': timeend_utc,
                                    'timeend_denmark': timeend_denmark_str,
                                    'point_position': point['position'],
                                    'point_out_quantity': str(point['out_Quantity.quantity']).replace('.',','),
                                    'point_out_quality': point['out_Quantity.quality']
                                }
                                for point in period['Point']
                            ]
                            writer.writerows(period_rows)
                print(f'Saved usage date for period {date_period[0]} to {date_period[1]}')
            print(f'Saved usage data for meter {meter_id}')
        print(f'Saved usage data for meter(s)')    

# Gets and saves metering point electricity charges data as a csv file
def get_charges_data(meter_ids):
    print('Starting to save charges data...')
    # Prepare csv file for writing
    with open('eloverblik_charges_data.csv', 'w', newline='') as csvfile:
        fieldnames = ['meter_id', 'chargetype', 'name', 'description', 'owner', 'validfromdate', 'validtodate', 'periodtype', 'position', 'price', 'quantity']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for meter_id in meter_ids:
            print(f'Getting and saving charges data for meter id {meter_id}...')
            meter_json = {
                "meteringPoints": {
                    "meteringPoint": [
                        meter_id
                    ]
                }
            }
            charges_data_endpoint = 'meteringpoints/meteringpoint/getcharges'
            get_meter_charges_data = get_endpoint(charges_data_endpoint, meter_json)
            for result in get_meter_charges_data['result']:
                for item in result['result']['fees']:
                    chargetype = 'fee'
                    subscription_row = {
                        'meter_id': meter_id,
                        'chargetype': chargetype,
                        'name': item['name'],
                        'description': item['description'],
                        'owner': item['owner'],
                        'validfromdate': item['validFromDate'],
                        'validtodate': item['validToDate'],
                        'periodtype': item['periodType'],
                        'position': '',
                        'price': str(item['price']).replace('.',','),
                        'quantity': item['quantity']
                    }
                    writer.writerow(subscription_row)
                for item in result['result']['subscriptions']:
                    chargetype = 'subscription'
                    subscription_row = {
                        'meter_id': meter_id,
                        'chargetype': chargetype,
                        'name': item['name'],
                        'description': item['description'],
                        'owner': item['owner'],
                        'validfromdate': item['validFromDate'],
                        'validtodate': item['validToDate'],
                        'periodtype': item['periodType'],
                        'position': '',
                        'price': str(item['price']).replace('.',','),
                        'quantity': item['quantity']
                    }
                    writer.writerow(subscription_row)
                for item in result['result']['tariffs']:
                    chargetype = 'tariff'
                    name = item['name']
                    description = item['description']
                    owner = item['owner']
                    validfromdate = item['validFromDate']
                    validtodate = item['validToDate']
                    periodtype = item['periodType']
                    tariff_rows = [
                        {
                            'meter_id': meter_id,
                            'chargetype': chargetype,
                            'name': name,
                            'description': description,
                            'owner': owner,
                            'validfromdate': validfromdate,
                            'validtodate': validtodate,
                            'periodtype': periodtype,
                            'position': point['position'],
                            'price': str(point['price']).replace('.',','),
                            'quantity': ''
                        }
                        for point in item['prices']
                    ]
                    writer.writerows(tariff_rows)
            print(f'Saved charges data for meter {meter_id}')
        print(f'Saved charges data for meter(s)')      

# Main program logic
def main():
    # Define and load parser arguments
    parser = argparse.ArgumentParser(description='Get data on electricity usage from eloverblik.dk')
    parser.add_argument('-m', '--mode', help='Mode: List meters or get data ', type=str, choices=['list', 'get'], required=True)
    parser.add_argument('-n', '--meterid', help='Get data from this specific meter in get mode', type=str)
    parser.add_argument('-a', '--aggregation', help='Get timeseries data with this aggregation in get mode', choices=['Actual', 'Quarter', 'Hour', 'Day', 'Month', 'Year'], default='Actual', type=str)
    parser.add_argument('-f', '--fromdate', help='Get data from this date in get mode, format yyyy-mm-dd', type=str)
    parser.add_argument('-t', '--todate', help='Get data to and including this date in get mode, format yyyy-mm-dd', type=str)
    parser.add_argument('-d', '--deletetoken', help='Delete existing token file', action='store_true')
    parser.add_argument('-r', '--refreshdatatoken', help='Force refresh of data access token by deleting token file', action='store_true')
    args = parser.parse_args()

    # Delete token file if set as argument
    if args.deletetoken:
        print('Deleting existing token file if it exists')
        os.remove(token_filename)

    # Delete data token file if set as argument
    if args.refreshdatatoken:
        print('Deleting existing data access token file if it exists')
        os.remove(data_access_token_filename)

    # Load or save token
    if not exists(token_filename):
        print('No token from eloverblik.dk saved. Paste your token here.')
        token = str(input('Token: '))
        with open(token_filename, 'wb') as token_file:
            pickle.dump(token, token_file)
    else:
        with open(token_filename, 'rb') as token_file:
            token = pickle.load(token_file)
  
    # If mode is list meters, get a list of meters
    if args.mode == 'list':
        print('Listing available meters...')
        # Get data access token
        get_or_set_data_access_token(token)
        # List meters
        list_meters()
    # If mode is get data, get data
    elif args.mode == 'get':
        # Date argument validation
        if args.fromdate and not args.todate or args.todate and not args.fromdate:
            sys.exit('Error: You must specify both a from date and a to date. Exiting.')
        try:
            from_date = datetime.strptime(args.fromdate, '%Y-%m-%d').date()
            to_date = datetime.strptime(args.todate, '%Y-%m-%d').date()
            if from_date > to_date:
                sys.exit('Error: Your from date cannot be after your to date. Exiting.')
            elif from_date == to_date:
                sys.exit('Error: Your from date cannot be the same as your to date. Exiting.')
            elif from_date > today:
                sys.exit('Error: Your from date cannot be after today. Exiting.')
            elif to_date > today + timedelta(days=1):
                sys.exit('Error: Your to date cannot be later than one day after today. Exiting.')
        except ValueError:
            sys.exit('Error: From or to date in invalid format. Format must be yyyy-mm-dd with no quotes. Exiting.')

        # Periods must be a maximum of 730 days, so longer periods are sliced into smaller pieces
        if to_date > from_date + timedelta(days=730):
            periods = []
            start_of_period = from_date
            slice_finished = False
            while slice_finished == False:
                end_of_period = start_of_period + timedelta(days=730)
                if end_of_period <= to_date:
                    periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
                    start_of_period = end_of_period + timedelta(days=1)
                else:
                    end_of_period = to_date
                    periods.append([datetime.strftime(start_of_period, '%Y-%m-%d'), datetime.strftime(end_of_period, '%Y-%m-%d')])
                    slice_finished = True
        # Smaller periods are saved as a list in a list to use the same for loop later
        else:
            periods = [[args.fromdate, args.todate]]

        print('Getting data...')

        # Get data access token
        get_or_set_data_access_token(token)

        # Specifik meter id is set by user
        if args.meterid:
            meter_ids = [args.meterid]
        # Meter id argument is not set, so list of meters is fetched and listed
        else:
            # Get ids of meters
            print('Getting list of meters...')
            get_metering_points = get_endpoint('meteringpoints/meteringpoints')
            print(f'Found {len(get_metering_points["result"])} meters')
            meter_ids = [meter['meteringPointId'] for meter in get_metering_points['result']]

        if meter_ids:
            # Get data from meters
            print('Getting and saving usage and charges data for meter(s)...')
            # Get usage data
            get_usage_data(meter_ids, args, periods)
            # Get charges data
            get_charges_data(meter_ids)
            # Print status
            print('Saved usage and charges data for meter(s)')
        else:
            sys.exit('Error: Did not find any meters, so no data to fetch. Exiting.')

if __name__ == '__main__':
    main()

Høst domænenavne med certifikatlogs

I På jagt efter danske domænenavne skrev jeg om en smart metode, jeg har fundet til at finde de .dk-domænenavne, som autoriteterne ikke ville dele med mig.

Måske er der andre end mig, der er interesseret i at holde lidt øje med nye steder på internettet?

Her er i hvert fald en lille opskrift på, hvordan man gør:

1. Hent Axeman

Axeman er et program, der hjælper med at automatisere og parallellisere downloads af certifikatlogs. Jeg måtte rette det en lille smule for at få det til at køre, da det vist ikke vedligeholdes aktivt. “Min” udgave finder du her:

https://github.com/helmstedt/Axeman

2. Download en masse logs

Axeman gemmer certifikatlogs som csv-filer. Skriv fx…

axeman -u 'ct.googleapis.com/logs/argon2022'

…for at hente Googles 2022-logs. Det gik ikke specielt hurtigt hos mig, men det virkede.

3. Gennemgå log-filer for .dk-domæner

Jeg skrev et lille program, der søger logfiler igennem for .dk-domæner (der kommer også nogle andre domæner med engang imellem, hvis der er “.dk” et sted i domænet). Det ser sådan her ud:

import os
from pathlib import Path
import csv
 
csv_dir = 'PATH_TO_LOG_FILES'
paths = Path(csv_dir).iterdir()

for file_path in paths:
    # Open and process csv file
    with open(file_path, 'rt') as csv_input:
        print('Processing: ', file_path)
        reader = csv.reader(csv_input)
        for row in reader:
            host = row[4]
            if '.dk' in host:
                print('Found: ', host)
                with open('PATH_TO_OUTPUT_FILE.CSV', 'a') as dotdk_output:
                    dotdk_output.write(host + '\n')
    # Delete csv file after processing
    os.remove(file_path)

Det eneste lidt obskure i programmet er måske hvad der gemmer sig i row[4]? Den indeholder en liste over de domæner og subdomæner, det enkelte certifikat er udstedt til. Elementerne i listen er adskilt med mellemrum.

4. Filtrer listen, så der kun kommer ægte, unikke .dk-domæner med

Efter grovsorteringen, skrev jeg endnu et lille program, der finsorterer. Det ser sådan her ud:

domains_in_file = set()
with open('PATH_TO_INPUT_FILE.CSV', 'rt') as file_input:
    for index, row in enumerate(file_input):
        items = row.split(' ')
        for item in items:
            item = item.replace('*.','').replace('\n','')
            matches = re.findall(r'([^.]*$)', item)
            dk_domain = ''
            if matches[0] == 'dk':
                dk_domain = item.split('.')
                dk_domain = dk_domain[-2] + '.' + dk_domain[-1]
                dk_domain = dk_domain.lower()
    with open('PATH_TO_OUTPUT_FILE.BIN', 'wb') as unique_domains_file:
        pickle.dump(domains_in_file, unique_domains_file)

Her er koden nok lidt mere obskur. Jeg tager hvert element i hver række fra CSV-filen i sidste trin og:

  • Fjerner evt. wildcard (*.) og linjeskift (\n) fra værtsnavnet
  • Finder domænesuffixet med et regulær udtryk
  • Hvis domænesuffixet er .dk, splitter jeg hvert enkelt element i værstnavnet op i en liste
  • Konstruerer selve domænenavnet ved at sætte det næstsidste (fx helmstedt) og sidste element (.dk) sammen
  • Sørger for en sikkerheds skyld for, at konvertere domænenavnet til små bogstaver

5. Nyd dit kendskab til nye og gamle domænenavne (der har fået udstedt SSL-certifikater)

2-0 til wishlist.dk i kampen mod techgiganten Ønskeskyen

På min ukommercielle, overvågnings- og loginfrie ønskeseddelservice wishlist.dk, tilbyder jeg brugere at importere deres ønskesedler direkte fra min største konkurrent, den overvågningskapitalistiske monopolistiske techgigant Ønskeskyen.

Hvis du tror jeg overdriver det med overvågningen, så se lige dette lille bitte udsnit af vejen fra at klikke på et link til en såkaldt “Airfryer” hos Ønskeskyen og komme hen hos Elgiganten og kunne købe den… GDPR much?

Med wishlist.dks gigantiske scale-up-unicorn-succes-vækst vidste jeg, at det kun var et spørgsmål om tid, før Ønskeskyen ville begynde at bruge ufine metoder for at hindre den frie konkurrence (og brugernes ret til dataportabilitet):

Et let aflæseligt diagram over den eksplosive udvikling i brugere på wishlist.dk over tid.

Pludselig holdt min import-funktion op med at virke!

For et par dage siden holdt min import-funktion fra Ønskeskyen op med at virke.

Jeg undersøgte straks min kode og fyrede op for mit lokale testmiljø på min hjemmecomputer. Måske havde Ønskeskyen ændret på sit interne API, eller også havde jeg bare skrevet noget dårligt kode?

Alt fungerede præcis som det skulle i testmiljøet.

Så forsøgte jeg at sende en forespørgsel til ønskeskyen.dk fra min ydmyge start-up-webserver, der er placeret i Tyskland et sted. Forbindelsen timede ud.

Der var altså ingen kontakt overhovedet mellem min maskine og den maskine hos Ønskeskyen, hvis data brugerne af wishlist.dk forsøgte at rekvirere, for at undslippe overvågningskapitalismens skarpe kløer!

Har Ønskeskyen blokeret wishlist.dk?

Da jeg havde konstateret, at jeg slet ingen kontakt havde til Ønskeskyens server, gik det op for mig:

Måske føler Ønskeskyen sig truet og har blokeret for, at deres brugere emigrerer til wishlist.dk?

Jeg kontaktede Ønskeskyen:

Ønskeskyen føler sig slet ikke truet af wishlist.dks eksplosive vækst og roser endda min “fine” hjemmeside.

Ud over at blive glad for det hjertevarme “rigtig god dag”, følte jeg mig betrygget af mailen fra Ønskeskyens flinke supportperson.

Ønskeskyen er da alt for store og ædle til at blokere for lille iværksætter-wishlist.dk…

Plottet tykner!

Eller er de?

Med min nyfundte optimisme og tro på den fri konkurrence sendte jeg en opfølgende mail til Ønskeskyen. Jeg spurgte, om de ville undersøge deres teknik for fejl? Hvis de ikke havde blokeret wishlist.dk, var der måske noget i deres ende, der ikke fungerede som det skulle?

Når man har ambitioner om at erobre hele verden dur det jo ikke, at man ved et uheld kommer til at lukke for forespørgsler fra en server placeret hos Danmarks vigtigste handelspartner:

THE LARGEST WISH LIST APP IN THE WORLD!!!111!

Her er min opfølgende mail til Ønskeskyen:

Jeg har gjort et menneske fortræd ved at beskrive forretningsmodellen for personens arbejdsplads.

Mailen var nedtrykkende:

Desværre kan Ønskeskyens teknikere ikke hjælpe med mit problem, da de, (ligesom jeg selv), har travlt med at bygge en verdensdominerende ønskeseddelservice og slet ikke har tid til at beskæftige sig med negative vibes fra konkurrenter.

Æv! Og så fik jeg oven i købet gjort supporteren ked af det med min kritik.

Der er noget i tonen i svaret fra Ønskeskyen, der alligevel får mig til at konkludere:

Ønskeskyen har blokeret wishlist.dk!

Hvad gjorde jeg så?

Jeg spurgte de sociale medier om råd, og fik anbefalet noget, der hedder Tor, der gør det muligt at hente indhold fra internettet anonymt ved at sende sine forespørgsler gennem andre menneskers computere.

At installere Tor på min server tog ca. 10 sekunder, og at tilføjere et par linjers kode til mit Python-script, der importerer ønsker fra Ønskeskyen, tog ikke meget længere.

Nu virker importfunktionen igen.

2-0 til wishlist.dk. (Det første point er for at lave en meget bedre ønskeseddelservice.)

Hvad bliver Ønskeskyens næste træk?

På jagt efter danske domænenavne

For lang tid siden fik jeg fat i domænet wishlist.dk til min ønskeseddelservice ved at klage over, at domænet så ud til at være registreret alene med henblik på at sælge det videre. Det må man nemlig ikke gøre med .dk-domæner.

Noget tid efter blev jeg kontaktet af en, der hedder Jakob, der havde læst mit indlæg og havde brugt samme metode til at få fat i et domænenavn, han havde drømt om.

Et nyt dataprojekt

Den oplevelse gav mig den idé, at jeg kunne tjene det godes sag, hvis jeg på en eller anden måde kunne lave en liste over domæner registreret med henblik på videresalg og offentliggøre listen. Jeg tænkte, jeg kunne:

  • Hente en liste over danske domænenavne
  • Lave en robot, der henter noget data om hvert domænenavn og måske tager et skærmbillede af siden
  • Kategorisere et par tusind domæner efter hvad de i mine øjne bliver brugt til
  • Bruge noget smart maskinlærings-AI-hokus-pokus til at kategorisere de resterende domænenavne

Men, men, men:

Ingen ville hjælpe med en domænenavnsliste

Jeg troede, at første trin i min plan om at finde en liste over .dk-domænenavne ville være det nemmeste. Jeg vidste, at jeg kan slå oplysninger op om domæner hos DK Hostmaster, der administrerer .dk-domæner (domænerne er ejet af staten). Og jeg havde også en anelse om, at der i Domæneloven står noget i retning af:

WHOIS-databasen
§ 18. Administrator skal oprette og vedligeholde en database indeholdende oplysninger om registranternes navn, adresse og telefonnummer.
Stk. 2. Administrator skal sikre, at oplysningerne nævnt i stk. 1 er retvisende, opdaterede og offentligt tilgængelige.

Jeg skrev til DK Hostmaster og fik nej. Jeg skrev til Dansk Internet Forum og fik intet svar. Jeg spurgte Klagenævnet for Domænenavne om jeg kunne klage over DK Hostmaster og fik nej. Så prøvede jeg at få aktindsigt i oplysningerne:

Det gik heller ikke så godt.

De oplysninger, jeg troede var offentlige, bliver åbenbart holdt tæt ind til kroppen. For at beskytte mig (og dig) mod spam!(?)

Efter afslaget klagede jeg til Erhvervsstyrelsen (nej!), forsøgte med aktindsigt hos Det Kongelige Bibliotek, som jeg fandt ud af var i besiddelse af listen (nej!), og skrev også til et par legitime domæneregistrationssælgere, som får listen af DK Hostmaster, om de var indstillet på at dele (nej!).

Jeg havde været ihærdig, men spildt så meget af min egen og andres tid, at jeg besluttede mig for at ændre kurs.

På datahøst

Det gik op for mig, at jeg ikke behøvede at kende alle .dk-domæner til mit hobbyprojekt. Nogle hundrede tusinde eller en million ville sikkert være rigeligt (der er registreret ca. 1,4 millioner i skrivende stund).

  • Jeg fandt en lang liste over danske ord og lavede en lille robot til at slå ordene op som domæner hos DK Hostmaster.
  • Jeg fandt en lignende liste med de mest brugte engelske ord og slog ordene op.
  • Jeg søgte på ord fra ordlisterne hos en kendt søgemaskine og fik søgeresultater tilbage med danske domænenavne.
  • Jeg fik API-adgang til CVR-registeret (som sjovt nok er rigtigt offentligt) og hentede domænenavne for alle danske virksomheder.

Disse metoder gav mig de første ca. 350.000 .dk-domæner og gav mig lejlighed til at skrive en masse små Python-scripts til at automatisere det meste.

Guldminen

Så fandt jeg guldminen. Apple, Google, Facebook, Cloudflare og andre hæderkronede virksomheder har et lidt andet syn på sikkerhed end DK Hostmaster, der jo gerne vil hemmeligholde danske domænenavne for at forhindre spam.

For at bekæmpe snyd med certifikater til sikker kommunikation på nettet (hængelåsen i browseren, du ved), logger de udstedelsen af certikater og har indtil videre logget lige under 8,5 milliard certifikater. I stedet for hemmeligholdelse: transparens.

Aha! Så når en ejer af et .dk-domæne får udstedt et certifikat til sin fine hjemmeside, bliver udstedelsen logget.

Jeg satte et smart program, der hedder Axeman, til at begynde at downloade logs, filtrerede for .dk-domænenavne og begyndte at tilføje dem til min database.

Det går langsomt, men det giver resultater.

https://wallnot.dk/dotdk/ har jeg nu samlet 639.485 .dk-domænenavne til fri download og videredistribution. Og der er mange, mange flere på vej.

Tag den!

…Men hvad med projektet?

Jeg har faktisk fået kategoriseret nogle domænenavne efter hvordan jeg vurderer, de bliver brugt. Og taget en masse skærmbilleder, som jeg håber jeg kan bruge noget maskinlæring på. Men listen over .dk-domæner, jeg mener er registreret med henblik på videresalg, har lange udsigter.

Til gengæld håber jeg at listen over .dk-domæner i sig selv kan bruges af andre til et eller andet. Vi får se.

Udvikling i portefølje og på konti hos Nordnet

Opdatering 18/02/2023: Nordnet ændrer tit på deres ting. På https://github.com/helmstedt/nordnet-utilities forsøger jeg at følge med, så hent gerne din kode der, hvis koden neden for ikke virker længere.

I Nordnets porteføljerapport og kontooversigter, kan man se nogle flotte grafer over udviklingen i ens portefølje og/eller konti.

Et eksempel på porteføljeudvikling hos Nordnet. I dette tilfælde en nedadgående graf.

Jeg blev spurgt, om jeg ikke ville hjælpe med, hvordan man kan hive den slags ud af Nordnet til eget brug. Det er lidt nemmere til et hurtigt overblik, end hvis man skal hive alle sine transaktioner og kurser ud i Excel og tilrettelægge data der.

Her er nogle eksempler på mulighederne. Du kan også finde eksemplerne på GitHub:

# This program provides two examples of logging into a Nordnet account
# and extracting account performance as json data. One is based on standard
# intervals. The other is based on a user-defined interval.
# Storing and processing of returned data is left to you.
import requests
from nordnet_configuration import accounts
from nordnet_login import nordnet_login

session = requests.Session()
session = nordnet_login(session)
accounts_list = [value for value in accounts.values()]

### Nordnet standard intervals (one month, three months, six months, ytd, 1 year, 3 years and 5 years)
accounts_string = ','.join(accounts_list)
url = 'https://www.nordnet.dk/api/2/accounts/' + accounts_string + '/returns/performance'
period_options = ['m1','m3','m6','ty','y1','y3','y5']

standard_graph_data = {}
for period in period_options:
    params = {
        'period': period,
        'start_at_zero': False
    }
    graph = session.get(url, params=params)
    standard_graph_data[period] = graph.json()
# Store and process graph_data as needed

### User defined date intervals
start_date = '2019-01-30'   # Edit as needed
end_date = '2019-05-14'    # Edit as needed
user_defined_graph_data = {}
for account in accounts_list:
    url = 'https://www.nordnet.dk/api/2/accounts/' + account + '/returns/performance'
    params = {
        'from': start_date,
        'to': end_date
    }
    user_defined_graph = session.get(url, params=params)
    user_defined_graph_data[account] = user_defined_graph.json()
# Store and process user_defined_graph_data as needed

Digital Post fra mit.dk til din e-mail

https://github.com/helmstedt/digitalpost-utilities er jeg gået i luften med et program, der gør det muligt for dig at slippe for at logge ind på mit.dk hver gang du har fået ny Digital Post.

Jeg brugte https://github.com/dk/Net-MitDK til at forstå metodikken og Fiddler til at overvåge trafikken til og fra https://mit.dk og aflure sidens API.

De to hovedkomponenter i programmet er a) et program til at gennemføre første login på mit.dk i en browser med NemID/MitID og b) et program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted.

Program til at gennemføre første login på mit.dk i en browser med NemID/MitId

# Logs in to mit.dk og saves tokens needed for further requests.
# Method from https://github.com/dk/Net-MitDK/. Thank you.
from seleniumwire import webdriver
import requests
from bs4 import BeautifulSoup
import http.cookies
import gzip
import json 
import base64
from hashlib import sha256
import string
import secrets
from mit_dk_configuration import tokens_filename

def random_string(size):        
    letters = string.ascii_lowercase+string.ascii_uppercase+string.digits+string.punctuation+string.whitespace           
    random_string = ''.join(secrets.choice(letters) for i in range(size))
    encoded_string = random_string.encode(encoding="ascii")
    url_safe_string = base64.urlsafe_b64encode(encoded_string).decode()
    url_safe_string_no_padding = url_safe_string.replace('=','')
    return url_safe_string_no_padding

def save_tokens(response):
    with open(tokens_filename, "wt", encoding="utf8") as token_file:
        token_file.write(response)

state = random_string(23)
nonce = random_string(93)
code_verifier = random_string(93)
code_challenge = base64.urlsafe_b64encode(sha256(code_verifier.encode('ascii')).digest()).decode().replace('=','')
 
login_url = 'https://gateway.mit.dk/view/client/authorization/login?client_id=view-client-id-mobile-prod-1-id&response_type=code&scope=openid&state=' + state + '&code_challenge=' + code_challenge + '&code_challenge_method=S256&response_mode=query&nonce=' + nonce + '&redirect_uri=com.netcompany.mitdk://nem-callback&deviceName=digitalpost-utilities&deviceId=pc&lang=en_US' 

options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
driver = webdriver.Chrome(chrome_options=options)
login = driver.get(login_url)

print("Opening browser window. Log in to mit.dk using MitID or NemID in the browser.")
print("When you see a blank page in your browser at https://nemlog-in.mitid.dk/LoginOption.aspx, you're finished.")
input("Press ENTER once you're finished.")

session = requests.Session()

for request in driver.requests:
    session.cookies.set('cookiecheck', 'Test', domain='nemlog-in.mitid.dk')
    session.cookies.set('loginMethod', 'noeglekort', domain='nemlog-in.mitid.dk')
    for request in driver.requests:
        if '/api/mailboxes' in request.url and request.method == 'GET' and request.response.status_code == 200:
            cookies = request.headers['Cookie'].split("; ")
            for cookie in cookies:
                if 'LoggedInBorgerDk' in cookie or 'CorrelationId' in cookie:
                    key_value = cookie.split('=')
                    session.cookies.set(key_value[0], key_value[1], domain='.post.borger.dk')
        if request.response:
            headers_string = str(request.response.headers)
            headers_list = headers_string.split('\n')
            for header in headers_list:
                if 'set-cookie' in header:
                    cookie_string = header.replace('set-cookie: ','')
                    cookie = http.cookies.BaseCookie(cookie_string)
                    for key in cookie.keys():
                        # Requests is picky about dashes in cookie expiration dates. Fix.
                        if 'expires' in cookie[key]:
                            expiry = cookie[key]['expires']
                            if expiry:
                                expiry_list = list(expiry)
                                expiry_list[7] = '-'
                                expiry_list[11] = '-'
                                cookie[key]['expires'] = ''.join(expiry_list)
                    session.cookies.update(cookie)

        if request.method == 'POST' and request.url == 'https://nemlog-in.mitid.dk/LoginOption.aspx' and request.response.status_code == 200:
            if request.response.headers['content-encoding'] == 'gzip':
                response = gzip.decompress(request.response.body).decode()
            else:
                response = request.response.body.decode()
            soup = BeautifulSoup(response, "html.parser")
            input = soup.find_all('input', {"name":"SAMLResponse"})
            samlresponse = input[0]["value"]

driver.close()

request_code_part_one = session.post('https://gateway.digitalpost.dk/auth/s9/nemlogin/ssoack', data={'SAMLResponse': samlresponse}, allow_redirects=False)
request_code_part_one_redirect_location = request_code_part_one.headers['Location']
request_code_part_two = session.get(request_code_part_one_redirect_location, allow_redirects=False)
request_code_part_two_redirect_location = request_code_part_two.headers['Location']
request_code_part_three = session.get(request_code_part_two_redirect_location, allow_redirects=False)
request_code_part_three_redirect_location = request_code_part_three.headers['Location']
code_start = request_code_part_three_redirect_location.index('code=') + 5
code_end = request_code_part_three_redirect_location.index('&', code_start)
code = request_code_part_three_redirect_location[code_start:code_end]
redirect_url = 'com.netcompany.mitdk://nem-callback'
token_url = 'https://gateway.mit.dk/view/client/authorization/token?grant_type=authorization_code&redirect_uri=' + redirect_url + '&client_id=view-client-id-mobile-prod-1-id&code=' + code + '&code_verifier=' + code_verifier
request_tokens = session.post(token_url)
save_tokens(request_tokens.text)    
print('Login to mit.dk went fine.')
print(f'Tokens saved to {tokens_filename}.')

Program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted

# Sends unread messages from mit.dk to an e-mail.
import requests
import json 
import smtplib										# Sending e-mails
from email.mime.multipart import MIMEMultipart		# Creating multipart e-mails
from email.mime.text import MIMEText				# Attaching text to e-mails
from email.mime.application import MIMEApplication	# Attaching files to e-mails
from email.utils import formataddr					# Used for correct encoding of senders with special characters in name (e.g. Københavns Kommune)
from mit_dk_configuration import email_data, tokens_filename

base_url = 'https://gateway.mit.dk/view/client/'
session = requests.Session()

def open_tokens():
    try:
        with open(tokens_filename, "r", encoding="utf8") as token_file:
            tokens = json.load(token_file)
            return tokens
    except:
        return print('Unable to open and parse token file. Did you run mit_dk_first_login.py?')
    
def revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/revoke?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dpp': {
            'token': mitdkToken,
            'token_type_hint': 'access_token'
        },
        'ngdp': {
            'token': ngdpToken,
            'token_type_hint': 'access_token'
        },
    }
    revoke_access_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_access_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old access tokens. Here is the response:")
        print(revoke_access_tokens.text)
    json_data = {
        'dpp': {
            'refresh_token': dppRefreshToken,
            'token_type_hint': 'refresh_token'
        },
        'ngdp': {
            'refresh_token': ngdpRefreshToken,
            'token_type_hint': 'refresh_token'
        },
    }        
    revoke_refresh_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_refresh_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old refresh tokens. Here is the response:")
        print(revoke_refresh_tokens.text)


def refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/refresh?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dppRefreshToken': dppRefreshToken,
        'ngdpRefreshToken': ngdpRefreshToken,
    }
    refresh = session.post(base_url + endpoint, json=json_data)    
    if not refresh.status_code == 200:
        print("Something went wrong trying to fetch new tokens.")
    refresh_json = refresh.json()
    if 'code' in refresh_json:
        print("Something went wrong trying to fetch new tokens. Here's the response:")
        print(refresh_json)
        return False
    else:
        with open(tokens_filename, "wt", encoding="utf8") as token_file:
            token_file.write(refresh.text)
        return refresh_json
        
def get_fresh_tokens_and_revoke_old_tokens():
    tokens = open_tokens()
    try:
        if 'dpp' in tokens:
            dppRefreshToken = tokens['dpp']['refresh_token']
            mitdkToken = tokens['dpp']['access_token']
        else:
            dppRefreshToken = tokens['refresh_token']
            mitdkToken = tokens['access_token']
        ngdpRefreshToken = tokens['ngdp']['refresh_token']
        ngdpToken = tokens['ngdp']['access_token']
        fresh_tokens = refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken)
        if fresh_tokens:
            revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken)
        return fresh_tokens
    except Exception as error:
        print(error)
        print('Unable to find tokens in token file. Try running mit_dk_first_login.py again.')
    
def get_simple_endpoint(endpoint):
    response = session.get(base_url + endpoint)
    return response.json()

def get_inbox_folders_and_build_query(mailbox_ids):
    endpoint = 'folders/query'
    json_data = {
        'mailboxes': {}
    }
    for mailbox in mailbox_ids:
        json_data['mailboxes'][mailbox['dataSource']] = mailbox['mailboxId']
    response = session.post(base_url + endpoint, json=json_data)    
    try:
        response_json = response.json()
    except:
        print('Unable to convert response to json. Here is the response:')
        print(response.text)
    folders = []
    for folder in response_json['folders']['INBOX']:
        folder_info = {
            'dataSource': folder['dataSource'],
            'foldersId': [folder['id']],
            'mailboxId': folder['mailboxId'],
            'startIndex': 0
        }
        folders.append(folder_info)
    return folders

def get_messages(folders):
    endpoint = 'messages/query'
    json_data = {
        'any': [],
        'folders': folders,
        'size': 20,
        'sortFields': ['receivedDateTime:DESC']
    }
    response = session.post(base_url + endpoint, json=json_data)    
    return response.json()   

def get_content(message):
    content = []
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    for document in message['documents']:
        doc_url = '/documents/' + document['id']
        for file in document['files']:
            encoding_format = file['encodingFormat']
            file_name = file['filename']
            file_url = '/files/' + file['id'] + '/content'
            file_content = session.get(base_url + endpoint + doc_url + file_url)
            content.append({
                'file_name': file_name,
                'encoding_format': encoding_format,
                'file_content': file_content
            })
    return content

def mark_as_read(message):
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    session.headers['If-Match'] = str(message['version'])
    json_data = {
        'read': True
    }
    mark_as_read = session.patch(base_url + endpoint, json=json_data)

mailserver_connect = False            
tokens = get_fresh_tokens_and_revoke_old_tokens()
if tokens:
    session.headers['mitdkToken'] = tokens['dpp']['access_token']
    session.headers['ngdpToken'] = tokens['ngdp']['access_token']
    session.headers['platform'] = 'web'
    mailboxes = get_simple_endpoint('mailboxes')
    mailbox_ids = []
    for mailboxes in mailboxes['groupedMailboxes']:
        for mailbox in mailboxes['mailboxes']:
            mailbox_info = {
                'dataSource': mailbox['dataSource'],
                'mailboxId': mailbox['id']
            }
            mailbox_ids.append(mailbox_info)
    folders = get_inbox_folders_and_build_query(mailbox_ids)
    messages = get_messages(folders)
    for message in messages['results']:
        if message['read'] == False:
            if mailserver_connect == False:
                server = smtplib.SMTP(email_data['emailserver'], email_data['emailserverport'])
                server.ehlo()
                server.starttls()
                server.login(email_data['emailusername'], email_data['emailpassword'])
                mailserver_connect  = True               
            label = message['label']
            sender = message['sender']['label']
            message_content = get_content(message)

            msg = MIMEMultipart('alternative')
            msg['From'] = formataddr((sender, email_data['emailfrom']))
            msg['To'] = email_data['emailto']
            msg['Subject'] = "mit.dk: " + label

            for content in message_content:
                if content['encoding_format'] == 'text/plain':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'plain')) 
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'text/html':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'html'))
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'application/pdf':   
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
                else:    
                    encoding_format = content['encoding_format']
                    print(f'Ny filtype {encoding_format}')
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
            print(f'Sender en mail fra mit.dk fra {sender} med emnet {label}')
            server.sendmail(email_data['emailfrom'], email_data['emailto'], msg.as_string())
            mark_as_read(message)
    if mailserver_connect:
        server.quit()

Besked når der åbnes for reservationer på [hypet restaurant i København]

En restaurant, jeg gerne vil prøve, er fuldstændig booket op og har endnu ikke åbnet op for reservationer i april. Hvordan kan jeg komme først til fadet?

Jeg besøgte reservationssystemet og iagttog hvordan det interne API spurgte om ledige borde.

Dette billede viser forespørgslen:

Forespørgslen om ledige borde til 2 personer

Dette billede viser svaret fra API’et:

Svar fra API’et: Ingen ledige borde (hvilket billedet også viser)

For april så svaret sådan her ud:

Hverken ledige eller optagede borde i april.

API’et svarer altså med en tom ‘data’-nøgle, når der ikke er åbnet for reservationer endnu.

Jeg skrev et lille program, som jeg har sat til at køre hvert 5. minut, for at tjekke om jeg kan komme til at reservere. Programmet tjekker, om der er kommet noget indhold i ‘data’-nøglen i svaret fra API’et. Hvis der er, sender det mig en besked om, at jeg godt kan komme i gang med at reservere bord.

Mit lille program, der gerne skulle give mig en fordel i kampen.

Næste skridt kunne være at udvide programmet, sådan det også reserverer bordet for mig. Men i første omgang prøver jeg at gøre den del af arbejdet selv.

Folkets wallnot.dk

En bruger på wallnot.dk skrev til mig og foreslog at lade brugerne på siden vurdere kvaliteten af de artikler, siden linker til, ligesom på fx Hacker News. Idéen er at gode artikler så kan ligge øverst, mens metervaren synker ned i bunden – hvis altså folket har forstand på at vurdere den slags.

Jeg forsøgte at lave en sådan løsning, og den kan du nu prøve af.

Dynamik med JavaScript

For at gøre det helt smart og dynamisk, havde jeg brug for noget JavaScript, der kan fyre en stemme af sted, så snart en bruger klikker på ▲ eller ▼.

Jeg er ikke helt ferm til JavaScript, men jeg begynder at forstå det, og med god hjælp og lidt copy/paste fra forskellige kilder, landede jeg til sidst på noget kode, der ser ud til at virke.

Den første del henter en såkaldt CRSF-cookie, der sørger for, at man er nødt til at besøge Wallnot, inden man kan stemme på artikler, og at man ikke kan stemme på vegne af andre fra andre hjemmesider.

Den anden del sender en forespørgsel af sted med cookie-værdien og selve stemmen og opdaterer stemmeantallet på siden, når forespørgslen er behandlet.

function getCookie(name) {
	let cookieValue = null;
	if (document.cookie && document.cookie !== '') {
		const cookies = document.cookie.split(';');
		for (let i = 0; i < cookies.length; i++) {
			const cookie = cookies[i].trim();
			if (cookie.substring(0, name.length + 1) === (name + '=')) {
				cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
				break;
			}
		}
	}
	return cookieValue;
}
const csrftoken = getCookie('csrftoken');

document.querySelectorAll('.vote').forEach(function(el){
  el.addEventListener('click', function() {
		article_votes_id = this.id.substring(0, this.id.indexOf('_')) + '_votes';
		votes_to_replace = document.getElementById(article_votes_id)
		fetch('/process_vote', {
			method: "POST",
			headers: {
				"X-CSRFToken": csrftoken,
			},
			body: JSON.stringify({
				vote: this.id
			})
		}).then(function (response) {
        return response.json();
    })
        .then(function (data) {
			votes_to_replace.innerHTML = data.votes;
        })
        .catch(function (err) {
            console.log(err);
        });
  });
});

Behandling af forespørgslen

Forespørgslen sender et artikel-id af sted sammen med information om der er tale om en ▲-stemme eller en ▼-stemme.

I Djangos views.py skriver jeg en funktion, der kan modtage forespørgslen og returnerer stemmeantallet efter forespørgslen er behandlet. Funktionen sender JSON-data tilbage til mit JavaScript, hvis (og kun hvis) stemmen har et eksisterende artikel-id efterfulgt af enten “_up” eller “_down”. For alt andet svarer funktionen tilbage, at den er en tepotte og derfor ikke kan hjælpe:

def process_vote(request):
	if request.method == "POST":
		try:
			vote = json.loads(request.body.decode())['vote']
			article_id = vote[:vote.index('_')]
			article = Article.objects.get(id=article_id)
			if '_up' in vote:
				article.votes += 1
			elif '_down' in vote:
				article.votes -= 1
			else:
				return HttpResponse(status=418)
			article.save()
			votes = {'votes': article.votes}
			return JsonResponse(votes)
		except:
			return HttpResponse(status=418)
	return HttpResponse(status=418)

En sorteringsalgoritme

Som det allersidste havde jeg brug for at udvikle en sorteringsalgoritme, der tog højde for artiklers alder, som jeg kunne bruge i mit view. Den tog lidt tid at skrive, fordi det nogle gange kan være svært at regne ud, hvordan man med Djangos databaseforespørgselssyntaks kan lave de beregninger, man har brug for, direkte med forespørgslen til databasen.

Algoritmen gør sådan her:

  • Tager antal stemmer og lægger 1 til. Hvis alle artikler starter på 1, forhindrer jeg at artikler med et positivt antal stemmer altid vil ligge over artikler uden stemmer overhovedet.
  • Deler dette tal med 1 plus antal timer siden artiklens offentliggørelsestidspunkt.
  • Antal timer udregnes ved at tage antal dage siden offentliggørelsestidspunktet og gange med 24 og dertil lægge det yderligere antal timer fra det samlede interval i dage og timer siden offentliggørelsestidspunktet.
  • For at undgå at komme til at dele med 0, lægger jeg 1 til antal timer og tager den absolutte værdi af antal timer siden offentliggørelsestidspunktet. Det er nødvendigt, fordi medierne engang imellem offentliggør artikler med et publiceringstidspunkt i fremtiden.
  • Fordi jeg deler stemmer med antal timer siden offentliggørelse, vil en nyhed hurtigt miste sin “værdi”. Hvis Folkets Wallnot ikke bliver en kæmpe succes, kan det være at jeg skal dele med antal dage i stedet, sådan “straffen” for at være en gammel artikel ikke bliver ligeså mærkbar.

Her er algoritmen skrevet som forespørgsel i Django:

articles = Article.objects.filter(paywall_detected=False)
    .annotate(score=ExpressionWrapper((F('votes') + 1) /
    (1+Abs(ExtractDay(Now()-F('date'))*24 + ExtractHour(Now()-F('date')))),output_field=FloatField()))
    .order_by('-score','-date')

Min egen private eReolen

Jeg kan godt lide at læse bøger, og mange af dem låner jeg på eReolen. Men for en bognørd er eReolen ikke særlig brugervenlig. Der er godt nok en masse mærkelige søgninger, man kan lave, hvis man er teknisk nok, men sådan noget som at se, hvad der rent faktisk er nytilføjede bøger, er svært at følge med i.

eReolen har godt nok en sektion, de kalder “nyheder”, med en søgning der i skrivende stund (februar 2022) hedder noget i retning af:

(dkcclterm.op=202112* OR dkcclterm.op=202201*) AND term.type=ebog and facet.category=voksenmaterialer

Slår man op i beskrivelsen af brøndindekser, kan man se at “dkcclterm.op” dækker over:

dkcclterm.opopOprettelsesdato

Men hvordan kan det være, at en visning af nyheder søger på oprettelsesdatoer i december og januar? Det er februar nu.

Fordi “Oprettelsesdato” for en titel ikke er det samme som dato for titlens tilføjelse på eReolen. Hvad det betyder, ved jeg ikke med sikkerhed, men i hvert fald ikke titlens tilføjelse på eReolen.

Og det betyder, at der løbende kan dukke spændende bøger op, hvis “dkcclterm.op”-værdi ligger langt tilbage i tiden.

Og det betyder, at jeg kan risikere at misse noget, jeg gerne vil læse.

Hvad gjorde jeg så?

Jeg byggede min egen eReolen! Med en robot, der hver nat monitorerer, hvilke titler der rent faktisk er nye. Hver morgen ligger der en mail til mig om, hvor mange titler robotten har fundet, og hvis jeg har tid og kaffe til det, kan jeg kigge de nye titler igennem over morgenkaffen.

Det fungerer sådan her:

I Django byggede jeg en datamodel over titler med forskellige metadata:

from django.db import models
from isbn_field import ISBNField

class Author(models.Model):
	full_name = models.CharField('Forfatter', max_length=200, unique=True)
	birth_year = models.DateField(null=True)
	def __str__(self):
		return self.full_name

class Publisher(models.Model):
	publisher = models.CharField('Udgiver', max_length=200, unique=True)
	def __str__(self):
		return self.publisher

class Keyword(models.Model):
	keyword = models.CharField('Nøgleord', max_length=200, unique=True)
	def __str__(self):
		return self.keyword
		
class TitleType(models.Model):
	title_type = models.CharField('Type', max_length=200, unique=True)
	def __str__(self):
		return self.title_type

class Language(models.Model):
	language = models.CharField('Sprog', max_length=50, unique=True)
	def __str__(self):
		return self.language

class Isbn(models.Model):
	isbn = ISBNField(null=True, blank=True)
	def __str__(self):
		return self.isbn

class Audience(models.Model):
	audience = models.CharField('Målgruppe', max_length=200, unique=True)
	def __str__(self):
		return self.audience
	
class TitleFormat(models.Model):
	title_format = models.CharField('Format', max_length=50, unique=True)
	def __str__(self):
		return self.title_format

class Title(models.Model):
	added = models.DateField()
	object_id = models.CharField('Ereolen-id', max_length=50, unique=True)
	title = models.CharField('Titel', max_length=500)
	original_title = models.CharField('Originaltitel', max_length=500, default="")
	publish_date = models.DateField(null=True)
	dk5 = models.CharField('DK5-kode', max_length=10, default="")
	cover_url = models.URLField('Cover-url', max_length=500, null=True)
	ereolen_url = models.URLField('Ereolen-url', max_length=500)
	abstract = models.TextField(blank=True)
	dkcclterm_op = models.DateField()
	publisher = models.ForeignKey(Publisher, on_delete=models.CASCADE)
	language = models.ForeignKey(Language, on_delete=models.CASCADE)
	title_type = models.ForeignKey(TitleType, on_delete=models.CASCADE)
	title_format = models.ForeignKey(TitleFormat, on_delete=models.CASCADE)
	author = models.ManyToManyField(Author)
	keyword = models.ManyToManyField(Keyword)
	audience = models.ManyToManyField(Audience)
	isbn = models.ManyToManyField(Isbn)

	def __str__(self):
		return self.title	
	def get_authors(self):
		return " & ".join([author.full_name for author in self.author.all()])
	get_authors.short_description = "Author(s)"
	def get_isbns(self):
		return ", ".join([isbn.isbn for isbn in self.isbn.all()])
	get_isbns.short_description = "ISBN(s)"	
	def get_keywords(self):
		return ", ".join([keyword.keyword for keyword in self.keyword.all()])
	get_keywords.short_description = "Keyword(s)"		
	def get_audiences(self):
		return ", ".join([audience.audience for audience in self.audience.all()])
	get_audiences.short_description = "Audience(s)"

I Python skrev jeg en robot, der søger eReolen igennem, tilføjer nye titler til min database og ignorerer titler, der allerede er i databasen. Robotten satte jeg op til at køre hver nat på min server:

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program saves ebooks, audiobooks and podcasts from ereolen.dk to a local database
that can be used to detect new titles better than ereolen.dk's own search options """

import requests										# make http requests
from bs4 import BeautifulSoup						# parse html responses
from datetime import date							# create date objects
from dateutil.relativedelta import relativedelta	# adding and subtracting months to dates
import re											# regex for publish year parsing
import psycopg2										# work with postgresql databases
from psycopg2 import Error							# database error handling

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print("Error while connecting to PostgreSQL", error)

# Set configuration options and global variables
base_url = 'https://ereolen.dk'
term_types = ['ebog','lydbog','podcast']
added = date.today()
number_of_months_to_search = 200
start_month = added - relativedelta(months=number_of_months_to_search-2)

# Search period list goes from current month plus one month and back to start_month
search_period = []
for i in reversed(range(0,number_of_months_to_search)):
	year_month_date = start_month + relativedelta(months=+i)
	year_month = [year_month_date.year, year_month_date.month]
	search_period.append(year_month)

# Crawl loop
title_counter = 0
for year_month in search_period:
	for term_type in term_types:
		start_date = date(year_month[0],year_month[1],1)
		dkcclterm_op_search = start_date.strftime("%Y%m")
		page = 0
		pages_left = True
		while pages_left == True:
			# Search for hits
			search_url = base_url + '/search/ting/dkcclterm.op%3D' + dkcclterm_op_search + '*%20AND%20term.type%3D' + term_type + '?page=' + str(page) + '&sort=date_descending'
			request = requests.get(search_url)
			result = request.text
			# If an error message is returned in the search, either no results are left, or ereolen.dk is down for some reason
			# In this case, the while loop is broken to try next item type and/or next year-month combination
			if 'Vi kan desværre ikke finde noget, der matcher din søgning' in result or 'The website encountered an unexpected error. Please try again later.' in result:
				pages_left = False
				break
			# Parse hits and get all item links
			soup = BeautifulSoup(result, "lxml")
			links = soup.find_all('a', href=True)
			item_links = {link['href'] for link in links if "/ting/collection/" in link['href']}
			# Go through item link
			for link in item_links:
				# Get id and check if link is already in databse
				object_id = link[link.rfind('/')+1:].replace('%3A',':')
				search_sql = '''SELECT * from ereolen_title WHERE object_id = %s'''
				cursor.execute(search_sql, (object_id, ))
				item_hit = cursor.fetchone()
				# No hits means item is not in database and should be added
				if not item_hit:
					### ADD SEQUENCE ###
					
					# Set full url for item					
					ereolen_url = base_url + link
					
					# Request item and parse html
					title_request = requests.get(ereolen_url)
					title_result = title_request.text
					title_soup = BeautifulSoup(title_result, "lxml")
					
					# TITLE FIELDS #
					
					# TITLE
					try:
						title = title_soup.find('div', attrs={'class':'field-name-ting-title'}).text.replace(" : ",": ")
					except:
						print("Ingen titel på:", ereolen_url)
						break	

					# ORIGINAL TITLE
					try:
						original_title = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Original titel:")).next.next.text
					except:
						original_title = ''
					
					# PUBLISHED
					try:
						published = title_soup.find('div', class_={"field-name-ting-author"}).get_text()
						published = int(re.search("[(]\d\d\d\d[)]", published).group()[1:5])
						publish_date = date(published,1,1)
					except:
						publish_date = None

					# COVER URL
					try:
						cover_url = title_soup.find('div', class_={"ting-cover"}).img['src']
					except:
						try:
							data = {
							  'coverData[0][id]': object_id,
							  'coverData[0][image_style]': 'ding_primary_large'
							}
							response = requests.post('https://ereolen.dk/ting/covers', data=data)
							response_json = response.json()
							cover_url = response_json[0]['url']
						except:
							cover_url = ''
		
					# ABSTRACT
					abstract = title_soup.find('div', attrs={'class':'field-name-ting-abstract'}).text

					# DKCCLTERM_OP
					dkcclterm_op = start_date
					
					# FOREIGN KEY FIELDS #
					
					# LANGUAGE
					try:
						ereolen_language = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Sprog:")).next.next.text
					except:
						ereolen_language = 'Ukendt'
					language_sql = '''SELECT * from ereolen_language WHERE language = %s'''
					cursor.execute(language_sql, (ereolen_language, ))
					try:
						language = cursor.fetchone()[0]
					except:
						language_insert = '''INSERT INTO ereolen_language(language) VALUES(%s) RETURNING id'''
						cursor.execute(language_insert, (ereolen_language, ))
						language = cursor.fetchone()[0]
	
					# PUBLISHER
					try:
						ereolen_publisher = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Forlag:")).next.next.text
					except:
						ereolen_publisher = 'Ukendt'
					publisher_sql = '''SELECT * from ereolen_publisher WHERE publisher = %s'''
					cursor.execute(publisher_sql, (ereolen_publisher, ))
					try:
						publisher = cursor.fetchone()[0]
					except:
						publisher_insert = '''INSERT INTO ereolen_publisher(publisher) VALUES(%s) RETURNING id'''
						cursor.execute(publisher_insert, (ereolen_publisher, ))
						publisher = cursor.fetchone()[0]

					# TYPE
					try:
						ereolen_type = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Type:")).next.next.text
					except:
						ereolen_type = 'Ukendt'
					type_sql = '''SELECT * from ereolen_titletype WHERE title_type = %s'''
					cursor.execute(type_sql, (ereolen_type, ))
					try:
						title_type = cursor.fetchone()[0]
					except:
						title_type_insert = '''INSERT INTO ereolen_titletype(title_type) VALUES(%s) RETURNING id'''
						cursor.execute(title_type_insert, (ereolen_type, ))
						title_type = cursor.fetchone()[0]

					# FORMAT
					try:
						ereolen_format = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Ebogsformat:")).next.next.text
					except:
						ereolen_format = "Ukendt"
					format_sql = '''SELECT * from ereolen_titleformat WHERE title_format = %s'''
					cursor.execute(format_sql, (ereolen_format, ))
					try:
						title_format = cursor.fetchone()[0]
					except:
						title_format_insert = '''INSERT INTO ereolen_titleformat(title_format) VALUES(%s) RETURNING id'''
						cursor.execute(title_format_insert, (ereolen_format, ))
						title_format = cursor.fetchone()[0]				
					
					# DK5 - TODO: Not done yet
					dk5 = ""
					
					### SAVE BEFORE ADDING MANY-TO-MANY FIELDS ###
					title_data = (added,title_type,title,original_title,publisher,object_id,language,publish_date,cover_url,ereolen_url,title_format,abstract,dkcclterm_op,dk5)
					
					title_insert = '''INSERT INTO ereolen_title(added,title_type_id,title,original_title,publisher_id,object_id,language_id,publish_date,cover_url,ereolen_url,title_format_id,abstract,dkcclterm_op,dk5) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id'''
					cursor.execute(title_insert, title_data)
					title_id = cursor.fetchone()[0]
					connection.commit()
					
					# MANY-TO-MANY FIELDS #

					# AUDIENCE(S)
					try:
						audience_div = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Målgruppe:")).next.next
						audiences = audience_div.find_all('span')
						audiences_list = [aud.text for aud in audiences]
					except:
						audiences_list = ['Ukendt']
					for audience in audiences_list:
						audience_sql = '''SELECT * from ereolen_audience WHERE audience = %s'''
						cursor.execute(audience_sql, (audience, ))
						try:
							audience_id = cursor.fetchone()[0]
						except:
							audience_insert = '''INSERT INTO ereolen_audience(audience) VALUES(%s) RETURNING id'''
							cursor.execute(audience_insert, (audience, ))
							audience_id = cursor.fetchone()[0]
						audience_relation_sql = '''INSERT INTO ereolen_title_audience (title_id, audience_id) VALUES (%s,%s)'''
						try:
							cursor.execute(audience_relation_sql, (title_id,audience_id))
						except:
							connection.rollback()
					
					# ISBN(S)
					try:
						isbn_div = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("ISBN:")).next.next
						isbns = isbn_div.find_all('span')
						isbns_list = [isb.text for isb in isbns]
						for isbn in isbns_list:
							isbn_sql = '''SELECT * from ereolen_isbn WHERE isbn = %s'''
							cursor.execute(isbn_sql, (isbn, ))
							try:
								isbn_id = cursor.fetchone()[0]
							except:
								isbn_insert = '''INSERT INTO ereolen_isbn(isbn) VALUES(%s) RETURNING id'''
								cursor.execute(isbn_insert, (isbn, ))
								isbn_id = cursor.fetchone()[0]
							isbn_relation_sql = '''INSERT INTO ereolen_title_isbn (title_id, isbn_id) VALUES (%s,%s)'''
							try:
								cursor.execute(isbn_relation_sql, (title_id,isbn_id))
							except:	
								connection.rollback()							
					except:
						pass
					
					# KEYWORDS(S)
					keywords_div = title_soup.find('div', attrs={'class':'field-name-ting-subjects'})
					if keywords_div:
						keywords = [link.text for link in keywords_div.find_all('a')]
						for keyword in keywords:
							keyword_sql = '''SELECT * from ereolen_keyword WHERE keyword = %s'''
							cursor.execute(keyword_sql, (keyword, ))
							try:
								keyword_id = cursor.fetchone()[0]
							except:
								keyword_insert = '''INSERT INTO ereolen_keyword(keyword) VALUES(%s) RETURNING id'''
								cursor.execute(keyword_insert, (keyword, ))
								keyword_id = cursor.fetchone()[0]
							keyword_relation_sql = '''INSERT INTO ereolen_title_keyword (title_id, keyword_id) VALUES (%s,%s)'''
							try:
								cursor.execute(keyword_relation_sql, (title_id,keyword_id))						
							except:
								connection.rollback()	

					# AUTHOR(S)
					creator_full = title_soup.find('div', attrs={'class':'field-name-ting-author'}).text.replace("Af ","")
					# Remove date of book
					creator = creator_full[:creator_full.rfind("(")-1]
					
					authors = creator.split(",")
					
					for author in authors:
						birth_year = None
						if ' (f. ' in author and not len(author) < 7:
							if 'ca. ' in author:
								author = author.replace('ca. ','')
							birth_year_string = author[author.index("(f.")+4:author.index("(f.")+8]
							if ')' in birth_year_string:
								birth_year_string = birth_year_string.replace(')','')
							birth_year = date(int(birth_year_string),1,1)
							author = author[:author.index(" (f.")]
						elif ' (f. ' in author:
							breakpoint()
							
						# Some times there are no authors, but still a published year
						if len(author) == 5 and "(" in author:
							author = ""
						
						if author:
							author = author.strip()
							author_sql = '''SELECT * from ereolen_author WHERE full_name = %s'''
							cursor.execute(author_sql, (author, ))
							try:
								author_id = cursor.fetchone()[0]
							except:
								if birth_year:
									author_insert = '''INSERT INTO ereolen_author(full_name,birth_year) VALUES(%s,%s) RETURNING id'''
									cursor.execute(author_insert, (author,birth_year))
								else:
									author_insert = '''INSERT INTO ereolen_author(full_name) VALUES(%s) RETURNING id'''
									cursor.execute(author_insert, (author, ))
								author_id = cursor.fetchone()[0]
							author_relation_sql = '''INSERT INTO ereolen_title_author (title_id, author_id) VALUES (%s,%s)'''
							try:
								cursor.execute(author_relation_sql, (title_id,author_id))
							except:
								connection.rollback()
	
					### SAVE ###
					connection.commit()
					title_counter += 1
			page += 1
connection.close()
print('Ereolen crawl ran')
if title_counter > 0:
	print('Added titles on ereolen:', title_counter)

Og i Djangos indbyggede administrationsinterface, kan jeg med fint overblik og gode søgnings-, sorterings- og filtreringsmuligheder få øje på en novellesamling af Georg Metz, der netop er dukket op i eReolen med en “dkcclterm.op”-værdi fra september 2013!

Min egen private eReolen. Beklager det bliver lidt småt.

Må jeg prøve?

Jeg ville gerne dele mit værktøj med andre, men det er ikke helt lige til at afklare, hvilke dele af eReolens bogdata, der er frie og offentlige, og hvilke der ejes af en (i mine øjne) lidt underlig konstruktion, der hedder DBC. Et KL-ejet firma (Kommunernes Landsforening), der tjener penge på at sælge data om bøger til – kommuner (og nogle andre aktører, som jeg gætter på næsten udelukkende er offentlige).

Jeg er ved at undersøge, hvad jeg kan offentliggøre uden at genere nogen eller bryde ophavsretsloven. Det kan godt være, det tager lidt tid.