Høst domænenavne med certifikatlogs

I På jagt efter danske domænenavne skrev jeg om en smart metode, jeg har fundet til at finde de .dk-domænenavne, som autoriteterne ikke ville dele med mig.

Måske er der andre end mig, der er interesseret i at holde lidt øje med nye steder på internettet?

Her er i hvert fald en lille opskrift på, hvordan man gør:

1. Hent Axeman

Axeman er et program, der hjælper med at automatisere og parallellisere downloads af certifikatlogs. Jeg måtte rette det en lille smule for at få det til at køre, da det vist ikke vedligeholdes aktivt. “Min” udgave finder du her:

https://github.com/helmstedt/Axeman

2. Download en masse logs

Axeman gemmer certifikatlogs som csv-filer. Skriv fx…

axeman -u 'ct.googleapis.com/logs/argon2022'

…for at hente Googles 2022-logs. Det gik ikke specielt hurtigt hos mig, men det virkede.

3. Gennemgå log-filer for .dk-domæner

Jeg skrev et lille program, der søger logfiler igennem for .dk-domæner (der kommer også nogle andre domæner med engang imellem, hvis der er “.dk” et sted i domænet). Det ser sådan her ud:

import os
from pathlib import Path
import csv
 
csv_dir = 'PATH_TO_LOG_FILES'
paths = Path(csv_dir).iterdir()

for file_path in paths:
    # Open and process csv file
    with open(file_path, 'rt') as csv_input:
        print('Processing: ', file_path)
        reader = csv.reader(csv_input)
        for row in reader:
            host = row[4]
            if '.dk' in host:
                print('Found: ', host)
                with open('PATH_TO_OUTPUT_FILE.CSV', 'a') as dotdk_output:
                    dotdk_output.write(host + '\n')
    # Delete csv file after processing
    os.remove(file_path)

Det eneste lidt obskure i programmet er måske hvad der gemmer sig i row[4]? Den indeholder en liste over de domæner og subdomæner, det enkelte certifikat er udstedt til. Elementerne i listen er adskilt med mellemrum.

4. Filtrer listen, så der kun kommer ægte, unikke .dk-domæner med

Efter grovsorteringen, skrev jeg endnu et lille program, der finsorterer. Det ser sådan her ud:

domains_in_file = set()
with open('PATH_TO_INPUT_FILE.CSV', 'rt') as file_input:
    for index, row in enumerate(file_input):
        items = row.split(' ')
        for item in items:
            item = item.replace('*.','').replace('\n','')
            matches = re.findall(r'([^.]*$)', item)
            dk_domain = ''
            if matches[0] == 'dk':
                dk_domain = item.split('.')
                dk_domain = dk_domain[-2] + '.' + dk_domain[-1]
                dk_domain = dk_domain.lower()
    with open('PATH_TO_OUTPUT_FILE.BIN', 'wb') as unique_domains_file:
        pickle.dump(domains_in_file, unique_domains_file)

Her er koden nok lidt mere obskur. Jeg tager hvert element i hver række fra CSV-filen i sidste trin og:

  • Fjerner evt. wildcard (*.) og linjeskift (\n) fra værtsnavnet
  • Finder domænesuffixet med et regulær udtryk
  • Hvis domænesuffixet er .dk, splitter jeg hvert enkelt element i værstnavnet op i en liste
  • Konstruerer selve domænenavnet ved at sætte det næstsidste (fx helmstedt) og sidste element (.dk) sammen
  • Sørger for en sikkerheds skyld for, at konvertere domænenavnet til små bogstaver

5. Nyd dit kendskab til nye og gamle domænenavne (der har fået udstedt SSL-certifikater)

Udvikling i portefølje og på konti hos Nordnet

Opdatering 18/02/2023: Nordnet ændrer tit på deres ting. På https://github.com/helmstedt/nordnet-utilities forsøger jeg at følge med, så hent gerne din kode der, hvis koden neden for ikke virker længere.

I Nordnets porteføljerapport og kontooversigter, kan man se nogle flotte grafer over udviklingen i ens portefølje og/eller konti.

Et eksempel på porteføljeudvikling hos Nordnet. I dette tilfælde en nedadgående graf.

Jeg blev spurgt, om jeg ikke ville hjælpe med, hvordan man kan hive den slags ud af Nordnet til eget brug. Det er lidt nemmere til et hurtigt overblik, end hvis man skal hive alle sine transaktioner og kurser ud i Excel og tilrettelægge data der.

Her er nogle eksempler på mulighederne. Du kan også finde eksemplerne på GitHub:

# This program provides two examples of logging into a Nordnet account
# and extracting account performance as json data. One is based on standard
# intervals. The other is based on a user-defined interval.
# Storing and processing of returned data is left to you.
import requests
from nordnet_configuration import accounts
from nordnet_login import nordnet_login

session = requests.Session()
session = nordnet_login(session)
accounts_list = [value for value in accounts.values()]

### Nordnet standard intervals (one month, three months, six months, ytd, 1 year, 3 years and 5 years)
accounts_string = ','.join(accounts_list)
url = 'https://www.nordnet.dk/api/2/accounts/' + accounts_string + '/returns/performance'
period_options = ['m1','m3','m6','ty','y1','y3','y5']

standard_graph_data = {}
for period in period_options:
    params = {
        'period': period,
        'start_at_zero': False
    }
    graph = session.get(url, params=params)
    standard_graph_data[period] = graph.json()
# Store and process graph_data as needed

### User defined date intervals
start_date = '2019-01-30'   # Edit as needed
end_date = '2019-05-14'    # Edit as needed
user_defined_graph_data = {}
for account in accounts_list:
    url = 'https://www.nordnet.dk/api/2/accounts/' + account + '/returns/performance'
    params = {
        'from': start_date,
        'to': end_date
    }
    user_defined_graph = session.get(url, params=params)
    user_defined_graph_data[account] = user_defined_graph.json()
# Store and process user_defined_graph_data as needed

Alle danskeres CPR-numre til fri download

Her er et Python-script, der genererer en liste over alle kombinationer af datoer i formatet ddmmyy med alle kombinationer af tallene fra 0 (0000) til 9999:

from datetime import datetime, date, timedelta

# Credits to https://stackoverflow.com/a/62248100, https://creativecommons.org/licenses/by-sa/4.0/
start = '2000-01-01'    # First date is in year 2000, not 1900, since 1900 was not a leap year.
end = '2100-01-01'      # Last date in range will be 2099-12-31.
start_date = date.fromisoformat(start)
end_date = date.fromisoformat(end)
date_range = [start_date + timedelta(days=i) for i in range((end_date - start_date).days)]

with open("cpr.txt", "a") as cprfile:
    for i in range(0,10000):
        print(i)
        for date in date_range:
            i_formated = f"{i:04d}"
            date_formated = datetime.strftime(date, '%d%m%y')
            cpr = date_formated + '-' + i_formated + '\n'
            cprfile.write(cpr)

Hele listen fylder 4 GB, men kan heldigvis komprimeres. Med 7zip fik jeg den ned på under 40 MB.

Download den komprimerede liste her.

Til kamp mod phishing på lnk.dk

Der er desværre nogle kriminelle, der har opdaget min kortlink-service lnk.dk og bruger siden til at lave korte links, der peger på forskellige phishing-formularer. De fleste på fransk, enkelte på dansk.

Jeg vil helst kun have, at min side bruges til lovlige formål, og derfor har jeg i første omgang lavet et kontrolspørgsmål i formularen til at oprette links. Jeg håber, at det kun er ærlige mennesker, der kan svare på spørgsmålet, og at det er relativt nemt for dem:

Nyt kontrolspørgsmål om en kendt dansk cykelrytter på lnk.dk

For at implementere det nye felt, redigerede jeg min Django-applikations forms.py med feltet og krav til validering:

from django.forms import ModelForm
from django import forms
from .models import Link
from django.core.exceptions import ValidationError

class LinkForm(ModelForm):
    everyoneknows = forms.CharField(label='Hvad er fornavnet på cykelrytteren, der vandt Tour de France for mænd i 2022?', error_messages={'required': 'Indtast cykelrytterens fornavn'})

    def clean_everyoneknows(self):
        answer = self.cleaned_data['everyoneknows'].lower()
        if answer != 'jonas':
            raise ValidationError("Det fornavn, du har indtastet, er forkert.")
        return answer
    
    def __init__(self, *args, **kwargs):
        super(LinkForm, self).__init__(*args, **kwargs)
        self.fields['destination'].widget.attrs['placeholder'] = 'https://eksempel.dk/meget/lang/url'
        self.fields['shortlink'].widget.attrs['placeholder'] = 'eksempel'
        self.fields['shortlink'].label_suffix = ""  # Remove colon after label
        self.fields['shortlink'].required = False   # Not required in form

    def clean_shortlink(self):
        shortlink = self.cleaned_data['shortlink']
        return shortlink.lower()
    
    class Meta:
        model = Link
        fields = ['destination', 'shortlink']
        labels = {
            'shortlink': ('Evt. selvvalgt kort link:'),
        }
        error_messages = {
            'destination': {
                'max_length': ('Din destinationsurl er for lang til denne kortlinkservice.'),
                'invalid': ('Din destinationsurl er ikke en gyldig adresse. Husk http://, https:// eller ftp:// foran dit link, hvis du har glemt det.'),
            },
            'shortlink': {
                'unique': ('Det selvvalgte link, du har valgt, er allerede i brug. Find på et andet.'),
                'max_length': ('Dit selvvalgte link må maksimalt være 100 tegn langt.'),
                'invalid': ('Du kan kun bruge bogstaver (dog ikke æ, ø, å - kun ASCII-tegnsættet), tal, bindestreg og understreg i din selvvalgte adresse.'),
            }
        }

Det bliver spændende at se, om ændringen har nogen effekt.

Digital Post fra mit.dk til din e-mail

https://github.com/helmstedt/digitalpost-utilities er jeg gået i luften med et program, der gør det muligt for dig at slippe for at logge ind på mit.dk hver gang du har fået ny Digital Post.

Jeg brugte https://github.com/dk/Net-MitDK til at forstå metodikken og Fiddler til at overvåge trafikken til og fra https://mit.dk og aflure sidens API.

De to hovedkomponenter i programmet er a) et program til at gennemføre første login på mit.dk i en browser med NemID/MitID og b) et program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted.

Program til at gennemføre første login på mit.dk i en browser med NemID/MitId

# Logs in to mit.dk og saves tokens needed for further requests.
# Method from https://github.com/dk/Net-MitDK/. Thank you.
from seleniumwire import webdriver
import requests
from bs4 import BeautifulSoup
import http.cookies
import gzip
import json 
import base64
from hashlib import sha256
import string
import secrets
from mit_dk_configuration import tokens_filename

def random_string(size):        
    letters = string.ascii_lowercase+string.ascii_uppercase+string.digits+string.punctuation+string.whitespace           
    random_string = ''.join(secrets.choice(letters) for i in range(size))
    encoded_string = random_string.encode(encoding="ascii")
    url_safe_string = base64.urlsafe_b64encode(encoded_string).decode()
    url_safe_string_no_padding = url_safe_string.replace('=','')
    return url_safe_string_no_padding

def save_tokens(response):
    with open(tokens_filename, "wt", encoding="utf8") as token_file:
        token_file.write(response)

state = random_string(23)
nonce = random_string(93)
code_verifier = random_string(93)
code_challenge = base64.urlsafe_b64encode(sha256(code_verifier.encode('ascii')).digest()).decode().replace('=','')
 
login_url = 'https://gateway.mit.dk/view/client/authorization/login?client_id=view-client-id-mobile-prod-1-id&response_type=code&scope=openid&state=' + state + '&code_challenge=' + code_challenge + '&code_challenge_method=S256&response_mode=query&nonce=' + nonce + '&redirect_uri=com.netcompany.mitdk://nem-callback&deviceName=digitalpost-utilities&deviceId=pc&lang=en_US' 

options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
driver = webdriver.Chrome(chrome_options=options)
login = driver.get(login_url)

print("Opening browser window. Log in to mit.dk using MitID or NemID in the browser.")
print("When you see a blank page in your browser at https://nemlog-in.mitid.dk/LoginOption.aspx, you're finished.")
input("Press ENTER once you're finished.")

session = requests.Session()

for request in driver.requests:
    session.cookies.set('cookiecheck', 'Test', domain='nemlog-in.mitid.dk')
    session.cookies.set('loginMethod', 'noeglekort', domain='nemlog-in.mitid.dk')
    for request in driver.requests:
        if '/api/mailboxes' in request.url and request.method == 'GET' and request.response.status_code == 200:
            cookies = request.headers['Cookie'].split("; ")
            for cookie in cookies:
                if 'LoggedInBorgerDk' in cookie or 'CorrelationId' in cookie:
                    key_value = cookie.split('=')
                    session.cookies.set(key_value[0], key_value[1], domain='.post.borger.dk')
        if request.response:
            headers_string = str(request.response.headers)
            headers_list = headers_string.split('\n')
            for header in headers_list:
                if 'set-cookie' in header:
                    cookie_string = header.replace('set-cookie: ','')
                    cookie = http.cookies.BaseCookie(cookie_string)
                    for key in cookie.keys():
                        # Requests is picky about dashes in cookie expiration dates. Fix.
                        if 'expires' in cookie[key]:
                            expiry = cookie[key]['expires']
                            if expiry:
                                expiry_list = list(expiry)
                                expiry_list[7] = '-'
                                expiry_list[11] = '-'
                                cookie[key]['expires'] = ''.join(expiry_list)
                    session.cookies.update(cookie)

        if request.method == 'POST' and request.url == 'https://nemlog-in.mitid.dk/LoginOption.aspx' and request.response.status_code == 200:
            if request.response.headers['content-encoding'] == 'gzip':
                response = gzip.decompress(request.response.body).decode()
            else:
                response = request.response.body.decode()
            soup = BeautifulSoup(response, "html.parser")
            input = soup.find_all('input', {"name":"SAMLResponse"})
            samlresponse = input[0]["value"]

driver.close()

request_code_part_one = session.post('https://gateway.digitalpost.dk/auth/s9/nemlogin/ssoack', data={'SAMLResponse': samlresponse}, allow_redirects=False)
request_code_part_one_redirect_location = request_code_part_one.headers['Location']
request_code_part_two = session.get(request_code_part_one_redirect_location, allow_redirects=False)
request_code_part_two_redirect_location = request_code_part_two.headers['Location']
request_code_part_three = session.get(request_code_part_two_redirect_location, allow_redirects=False)
request_code_part_three_redirect_location = request_code_part_three.headers['Location']
code_start = request_code_part_three_redirect_location.index('code=') + 5
code_end = request_code_part_three_redirect_location.index('&', code_start)
code = request_code_part_three_redirect_location[code_start:code_end]
redirect_url = 'com.netcompany.mitdk://nem-callback'
token_url = 'https://gateway.mit.dk/view/client/authorization/token?grant_type=authorization_code&redirect_uri=' + redirect_url + '&client_id=view-client-id-mobile-prod-1-id&code=' + code + '&code_verifier=' + code_verifier
request_tokens = session.post(token_url)
save_tokens(request_tokens.text)    
print('Login to mit.dk went fine.')
print(f'Tokens saved to {tokens_filename}.')

Program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted

# Sends unread messages from mit.dk to an e-mail.
import requests
import json 
import smtplib										# Sending e-mails
from email.mime.multipart import MIMEMultipart		# Creating multipart e-mails
from email.mime.text import MIMEText				# Attaching text to e-mails
from email.mime.application import MIMEApplication	# Attaching files to e-mails
from email.utils import formataddr					# Used for correct encoding of senders with special characters in name (e.g. Københavns Kommune)
from mit_dk_configuration import email_data, tokens_filename

base_url = 'https://gateway.mit.dk/view/client/'
session = requests.Session()

def open_tokens():
    try:
        with open(tokens_filename, "r", encoding="utf8") as token_file:
            tokens = json.load(token_file)
            return tokens
    except:
        return print('Unable to open and parse token file. Did you run mit_dk_first_login.py?')
    
def revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/revoke?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dpp': {
            'token': mitdkToken,
            'token_type_hint': 'access_token'
        },
        'ngdp': {
            'token': ngdpToken,
            'token_type_hint': 'access_token'
        },
    }
    revoke_access_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_access_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old access tokens. Here is the response:")
        print(revoke_access_tokens.text)
    json_data = {
        'dpp': {
            'refresh_token': dppRefreshToken,
            'token_type_hint': 'refresh_token'
        },
        'ngdp': {
            'refresh_token': ngdpRefreshToken,
            'token_type_hint': 'refresh_token'
        },
    }        
    revoke_refresh_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_refresh_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old refresh tokens. Here is the response:")
        print(revoke_refresh_tokens.text)


def refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/refresh?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dppRefreshToken': dppRefreshToken,
        'ngdpRefreshToken': ngdpRefreshToken,
    }
    refresh = session.post(base_url + endpoint, json=json_data)    
    if not refresh.status_code == 200:
        print("Something went wrong trying to fetch new tokens.")
    refresh_json = refresh.json()
    if 'code' in refresh_json:
        print("Something went wrong trying to fetch new tokens. Here's the response:")
        print(refresh_json)
        return False
    else:
        with open(tokens_filename, "wt", encoding="utf8") as token_file:
            token_file.write(refresh.text)
        return refresh_json
        
def get_fresh_tokens_and_revoke_old_tokens():
    tokens = open_tokens()
    try:
        if 'dpp' in tokens:
            dppRefreshToken = tokens['dpp']['refresh_token']
            mitdkToken = tokens['dpp']['access_token']
        else:
            dppRefreshToken = tokens['refresh_token']
            mitdkToken = tokens['access_token']
        ngdpRefreshToken = tokens['ngdp']['refresh_token']
        ngdpToken = tokens['ngdp']['access_token']
        fresh_tokens = refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken)
        if fresh_tokens:
            revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken)
        return fresh_tokens
    except Exception as error:
        print(error)
        print('Unable to find tokens in token file. Try running mit_dk_first_login.py again.')
    
def get_simple_endpoint(endpoint):
    response = session.get(base_url + endpoint)
    return response.json()

def get_inbox_folders_and_build_query(mailbox_ids):
    endpoint = 'folders/query'
    json_data = {
        'mailboxes': {}
    }
    for mailbox in mailbox_ids:
        json_data['mailboxes'][mailbox['dataSource']] = mailbox['mailboxId']
    response = session.post(base_url + endpoint, json=json_data)    
    try:
        response_json = response.json()
    except:
        print('Unable to convert response to json. Here is the response:')
        print(response.text)
    folders = []
    for folder in response_json['folders']['INBOX']:
        folder_info = {
            'dataSource': folder['dataSource'],
            'foldersId': [folder['id']],
            'mailboxId': folder['mailboxId'],
            'startIndex': 0
        }
        folders.append(folder_info)
    return folders

def get_messages(folders):
    endpoint = 'messages/query'
    json_data = {
        'any': [],
        'folders': folders,
        'size': 20,
        'sortFields': ['receivedDateTime:DESC']
    }
    response = session.post(base_url + endpoint, json=json_data)    
    return response.json()   

def get_content(message):
    content = []
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    for document in message['documents']:
        doc_url = '/documents/' + document['id']
        for file in document['files']:
            encoding_format = file['encodingFormat']
            file_name = file['filename']
            file_url = '/files/' + file['id'] + '/content'
            file_content = session.get(base_url + endpoint + doc_url + file_url)
            content.append({
                'file_name': file_name,
                'encoding_format': encoding_format,
                'file_content': file_content
            })
    return content

def mark_as_read(message):
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    session.headers['If-Match'] = str(message['version'])
    json_data = {
        'read': True
    }
    mark_as_read = session.patch(base_url + endpoint, json=json_data)

mailserver_connect = False            
tokens = get_fresh_tokens_and_revoke_old_tokens()
if tokens:
    session.headers['mitdkToken'] = tokens['dpp']['access_token']
    session.headers['ngdpToken'] = tokens['ngdp']['access_token']
    session.headers['platform'] = 'web'
    mailboxes = get_simple_endpoint('mailboxes')
    mailbox_ids = []
    for mailboxes in mailboxes['groupedMailboxes']:
        for mailbox in mailboxes['mailboxes']:
            mailbox_info = {
                'dataSource': mailbox['dataSource'],
                'mailboxId': mailbox['id']
            }
            mailbox_ids.append(mailbox_info)
    folders = get_inbox_folders_and_build_query(mailbox_ids)
    messages = get_messages(folders)
    for message in messages['results']:
        if message['read'] == False:
            if mailserver_connect == False:
                server = smtplib.SMTP(email_data['emailserver'], email_data['emailserverport'])
                server.ehlo()
                server.starttls()
                server.login(email_data['emailusername'], email_data['emailpassword'])
                mailserver_connect  = True               
            label = message['label']
            sender = message['sender']['label']
            message_content = get_content(message)

            msg = MIMEMultipart('alternative')
            msg['From'] = formataddr((sender, email_data['emailfrom']))
            msg['To'] = email_data['emailto']
            msg['Subject'] = "mit.dk: " + label

            for content in message_content:
                if content['encoding_format'] == 'text/plain':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'plain')) 
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'text/html':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'html'))
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'application/pdf':   
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
                else:    
                    encoding_format = content['encoding_format']
                    print(f'Ny filtype {encoding_format}')
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
            print(f'Sender en mail fra mit.dk fra {sender} med emnet {label}')
            server.sendmail(email_data['emailfrom'], email_data['emailto'], msg.as_string())
            mark_as_read(message)
    if mailserver_connect:
        server.quit()

Eldata fra eloverblik.dk med Python

Siden jeg fandt ud af at hente elforbrugsdata fra Ørsted har jeg fået nyt elselskab og portalen eloverblik.dk, som omfatter alle elkunder i Danmark, har fået et API. Du kan finde dokumentation til API’et her: https://api.eloverblik.dk/CustomerApi/swagger/index.html.

For at bruge API’et skal du oprette et token på eloverblik.dk. Det har Niras lavet en fin guide til.

Jeg er først lige kommet i gang, men her er et program, der beder om et token til at hente data fra API’et og derefter henter data om en måler, forbrugsdata og prisdata. Indsæt dit token fra eloverblik.dk i token-variablen øverst for at bruge programmet:

# https://api.eloverblik.dk/CustomerApi/swagger/index.html
import requests
token = ''

# Get data access token for subsequent requests
get_data_access_token_url = 'https://api.eloverblik.dk/CustomerApi/api/token'
headers = {
    'accept': 'application/json',
    'Authorization': 'Bearer ' + token,
}

response = requests.get(get_data_access_token_url, headers=headers)
data_access_token = response.json()['result']

# Get id of first meter - edit if you have more than one meter
metering_points_url = 'https://api.eloverblik.dk/CustomerApi/api/meteringpoints/meteringpoints'
headers = {
    'accept': 'application/json',
    'Authorization': 'Bearer ' + data_access_token,
}
meters = requests.get(metering_points_url, headers=headers)
first_meter = meters.json()['result'][0]['meteringPointId']

#Try to get data
meter_data = 'https://api.eloverblik.dk/CustomerApi/api/meterdata/gettimeseries/'
timeseries_data = {
    'dateFrom': '2021-01-01',
    'dateTo': '2021-01-31',
    'aggregation': 'Actual'
}

meter_data_url = meter_data + timeseries_data['dateFrom'] + '/' + timeseries_data['dateTo'] + '/' + timeseries_data['aggregation']

meter_json = {
    "meteringPoints": {
        "meteringPoint": [
            first_meter
        ]
    }
}

meter_data_request = requests.post(meter_data_url, headers=headers, json=meter_json)

#Charges
charges_data = 'https://api.eloverblik.dk/CustomerApi/api/meteringpoints/meteringpoint/getcharges'
charges_data_request = requests.post(charges_data, headers=headers, json=meter_json)
breakpoint()

Besked når der åbnes for reservationer på [hypet restaurant i København]

En restaurant, jeg gerne vil prøve, er fuldstændig booket op og har endnu ikke åbnet op for reservationer i april. Hvordan kan jeg komme først til fadet?

Jeg besøgte reservationssystemet og iagttog hvordan det interne API spurgte om ledige borde.

Dette billede viser forespørgslen:

Forespørgslen om ledige borde til 2 personer

Dette billede viser svaret fra API’et:

Svar fra API’et: Ingen ledige borde (hvilket billedet også viser)

For april så svaret sådan her ud:

Hverken ledige eller optagede borde i april.

API’et svarer altså med en tom ‘data’-nøgle, når der ikke er åbnet for reservationer endnu.

Jeg skrev et lille program, som jeg har sat til at køre hvert 5. minut, for at tjekke om jeg kan komme til at reservere. Programmet tjekker, om der er kommet noget indhold i ‘data’-nøglen i svaret fra API’et. Hvis der er, sender det mig en besked om, at jeg godt kan komme i gang med at reservere bord.

Mit lille program, der gerne skulle give mig en fordel i kampen.

Næste skridt kunne være at udvide programmet, sådan det også reserverer bordet for mig. Men i første omgang prøver jeg at gøre den del af arbejdet selv.

Folkets wallnot.dk

En bruger på wallnot.dk skrev til mig og foreslog at lade brugerne på siden vurdere kvaliteten af de artikler, siden linker til, ligesom på fx Hacker News. Idéen er at gode artikler så kan ligge øverst, mens metervaren synker ned i bunden – hvis altså folket har forstand på at vurdere den slags.

Jeg forsøgte at lave en sådan løsning, og den kan du nu prøve af.

Dynamik med JavaScript

For at gøre det helt smart og dynamisk, havde jeg brug for noget JavaScript, der kan fyre en stemme af sted, så snart en bruger klikker på ▲ eller ▼.

Jeg er ikke helt ferm til JavaScript, men jeg begynder at forstå det, og med god hjælp og lidt copy/paste fra forskellige kilder, landede jeg til sidst på noget kode, der ser ud til at virke.

Den første del henter en såkaldt CRSF-cookie, der sørger for, at man er nødt til at besøge Wallnot, inden man kan stemme på artikler, og at man ikke kan stemme på vegne af andre fra andre hjemmesider.

Den anden del sender en forespørgsel af sted med cookie-værdien og selve stemmen og opdaterer stemmeantallet på siden, når forespørgslen er behandlet.

function getCookie(name) {
	let cookieValue = null;
	if (document.cookie && document.cookie !== '') {
		const cookies = document.cookie.split(';');
		for (let i = 0; i < cookies.length; i++) {
			const cookie = cookies[i].trim();
			if (cookie.substring(0, name.length + 1) === (name + '=')) {
				cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
				break;
			}
		}
	}
	return cookieValue;
}
const csrftoken = getCookie('csrftoken');

document.querySelectorAll('.vote').forEach(function(el){
  el.addEventListener('click', function() {
		article_votes_id = this.id.substring(0, this.id.indexOf('_')) + '_votes';
		votes_to_replace = document.getElementById(article_votes_id)
		fetch('/process_vote', {
			method: "POST",
			headers: {
				"X-CSRFToken": csrftoken,
			},
			body: JSON.stringify({
				vote: this.id
			})
		}).then(function (response) {
        return response.json();
    })
        .then(function (data) {
			votes_to_replace.innerHTML = data.votes;
        })
        .catch(function (err) {
            console.log(err);
        });
  });
});

Behandling af forespørgslen

Forespørgslen sender et artikel-id af sted sammen med information om der er tale om en ▲-stemme eller en ▼-stemme.

I Djangos views.py skriver jeg en funktion, der kan modtage forespørgslen og returnerer stemmeantallet efter forespørgslen er behandlet. Funktionen sender JSON-data tilbage til mit JavaScript, hvis (og kun hvis) stemmen har et eksisterende artikel-id efterfulgt af enten “_up” eller “_down”. For alt andet svarer funktionen tilbage, at den er en tepotte og derfor ikke kan hjælpe:

def process_vote(request):
	if request.method == "POST":
		try:
			vote = json.loads(request.body.decode())['vote']
			article_id = vote[:vote.index('_')]
			article = Article.objects.get(id=article_id)
			if '_up' in vote:
				article.votes += 1
			elif '_down' in vote:
				article.votes -= 1
			else:
				return HttpResponse(status=418)
			article.save()
			votes = {'votes': article.votes}
			return JsonResponse(votes)
		except:
			return HttpResponse(status=418)
	return HttpResponse(status=418)

En sorteringsalgoritme

Som det allersidste havde jeg brug for at udvikle en sorteringsalgoritme, der tog højde for artiklers alder, som jeg kunne bruge i mit view. Den tog lidt tid at skrive, fordi det nogle gange kan være svært at regne ud, hvordan man med Djangos databaseforespørgselssyntaks kan lave de beregninger, man har brug for, direkte med forespørgslen til databasen.

Algoritmen gør sådan her:

  • Tager antal stemmer og lægger 1 til. Hvis alle artikler starter på 1, forhindrer jeg at artikler med et positivt antal stemmer altid vil ligge over artikler uden stemmer overhovedet.
  • Deler dette tal med 1 plus antal timer siden artiklens offentliggørelsestidspunkt.
  • Antal timer udregnes ved at tage antal dage siden offentliggørelsestidspunktet og gange med 24 og dertil lægge det yderligere antal timer fra det samlede interval i dage og timer siden offentliggørelsestidspunktet.
  • For at undgå at komme til at dele med 0, lægger jeg 1 til antal timer og tager den absolutte værdi af antal timer siden offentliggørelsestidspunktet. Det er nødvendigt, fordi medierne engang imellem offentliggør artikler med et publiceringstidspunkt i fremtiden.
  • Fordi jeg deler stemmer med antal timer siden offentliggørelse, vil en nyhed hurtigt miste sin “værdi”. Hvis Folkets Wallnot ikke bliver en kæmpe succes, kan det være at jeg skal dele med antal dage i stedet, sådan “straffen” for at være en gammel artikel ikke bliver ligeså mærkbar.

Her er algoritmen skrevet som forespørgsel i Django:

articles = Article.objects.filter(paywall_detected=False)
    .annotate(score=ExpressionWrapper((F('votes') + 1) /
    (1+Abs(ExtractDay(Now()-F('date'))*24 + ExtractHour(Now()-F('date')))),output_field=FloatField()))
    .order_by('-score','-date')