En restaurant, jeg gerne vil prøve, er fuldstændig booket op og har endnu ikke åbnet op for reservationer i april. Hvordan kan jeg komme først til fadet?
Jeg besøgte reservationssystemet og iagttog hvordan det interne API spurgte om ledige borde.
Dette billede viser forespørgslen:
Forespørgslen om ledige borde til 2 personer
Dette billede viser svaret fra API’et:
Svar fra API’et: Ingen ledige borde (hvilket billedet også viser)
For april så svaret sådan her ud:
Hverken ledige eller optagede borde i april.
API’et svarer altså med en tom ‘data’-nøgle, når der ikke er åbnet for reservationer endnu.
Jeg skrev et lille program, som jeg har sat til at køre hvert 5. minut, for at tjekke om jeg kan komme til at reservere. Programmet tjekker, om der er kommet noget indhold i ‘data’-nøglen i svaret fra API’et. Hvis der er, sender det mig en besked om, at jeg godt kan komme i gang med at reservere bord.
Mit lille program, der gerne skulle give mig en fordel i kampen.
Næste skridt kunne være at udvide programmet, sådan det også reserverer bordet for mig. Men i første omgang prøver jeg at gøre den del af arbejdet selv.
En bruger på wallnot.dk skrev til mig og foreslog at lade brugerne på siden vurdere kvaliteten af de artikler, siden linker til, ligesom på fx Hacker News. Idéen er at gode artikler så kan ligge øverst, mens metervaren synker ned i bunden – hvis altså folket har forstand på at vurdere den slags.
For at gøre det helt smart og dynamisk, havde jeg brug for noget JavaScript, der kan fyre en stemme af sted, så snart en bruger klikker på ▲ eller ▼.
Jeg er ikke helt ferm til JavaScript, men jeg begynder at forstå det, og med god hjælp og lidt copy/paste fra forskellige kilder, landede jeg til sidst på noget kode, der ser ud til at virke.
Den første del henter en såkaldt CRSF-cookie, der sørger for, at man er nødt til at besøge Wallnot, inden man kan stemme på artikler, og at man ikke kan stemme på vegne af andre fra andre hjemmesider.
Den anden del sender en forespørgsel af sted med cookie-værdien og selve stemmen og opdaterer stemmeantallet på siden, når forespørgslen er behandlet.
Forespørgslen sender et artikel-id af sted sammen med information om der er tale om en ▲-stemme eller en ▼-stemme.
I Djangos views.py skriver jeg en funktion, der kan modtage forespørgslen og returnerer stemmeantallet efter forespørgslen er behandlet. Funktionen sender JSON-data tilbage til mit JavaScript, hvis (og kun hvis) stemmen har et eksisterende artikel-id efterfulgt af enten “_up” eller “_down”. For alt andet svarer funktionen tilbage, at den er en tepotte og derfor ikke kan hjælpe:
Som det allersidste havde jeg brug for at udvikle en sorteringsalgoritme, der tog højde for artiklers alder, som jeg kunne bruge i mit view. Den tog lidt tid at skrive, fordi det nogle gange kan være svært at regne ud, hvordan man med Djangos databaseforespørgselssyntaks kan lave de beregninger, man har brug for, direkte med forespørgslen til databasen.
Algoritmen gør sådan her:
Tager antal stemmer og lægger 1 til. Hvis alle artikler starter på 1, forhindrer jeg at artikler med et positivt antal stemmer altid vil ligge over artikler uden stemmer overhovedet.
Deler dette tal med 1 plus antal timer siden artiklens offentliggørelsestidspunkt.
Antal timer udregnes ved at tage antal dage siden offentliggørelsestidspunktet og gange med 24 og dertil lægge det yderligere antal timer fra det samlede interval i dage og timer siden offentliggørelsestidspunktet.
For at undgå at komme til at dele med 0, lægger jeg 1 til antal timer og tager den absolutte værdi af antal timer siden offentliggørelsestidspunktet. Det er nødvendigt, fordi medierne engang imellem offentliggør artikler med et publiceringstidspunkt i fremtiden.
Fordi jeg deler stemmer med antal timer siden offentliggørelse, vil en nyhed hurtigt miste sin “værdi”. Hvis Folkets Wallnot ikke bliver en kæmpe succes, kan det være at jeg skal dele med antal dage i stedet, sådan “straffen” for at være en gammel artikel ikke bliver ligeså mærkbar.
Her er algoritmen skrevet som forespørgsel i Django:
Men hvordan kan det være, at en visning af nyheder søger på oprettelsesdatoer i december og januar? Det er februar nu.
Fordi “Oprettelsesdato” for en titel ikke er det samme som dato for titlens tilføjelse på eReolen. Hvad det betyder, ved jeg ikke med sikkerhed, men i hvert fald ikke titlens tilføjelse på eReolen.
Og det betyder, at der løbende kan dukke spændende bøger op, hvis “dkcclterm.op”-værdi ligger langt tilbage i tiden.
Og det betyder, at jeg kan risikere at misse noget, jeg gerne vil læse.
Hvad gjorde jeg så?
Jeg byggede min egen eReolen! Med en robot, der hver nat monitorerer, hvilke titler der rent faktisk er nye. Hver morgen ligger der en mail til mig om, hvor mange titler robotten har fundet, og hvis jeg har tid og kaffe til det, kan jeg kigge de nye titler igennem over morgenkaffen.
Det fungerer sådan her:
I Django byggede jeg en datamodel over titler med forskellige metadata:
from django.db import models
from isbn_field import ISBNField
class Author(models.Model):
full_name = models.CharField('Forfatter', max_length=200, unique=True)
birth_year = models.DateField(null=True)
def __str__(self):
return self.full_name
class Publisher(models.Model):
publisher = models.CharField('Udgiver', max_length=200, unique=True)
def __str__(self):
return self.publisher
class Keyword(models.Model):
keyword = models.CharField('Nøgleord', max_length=200, unique=True)
def __str__(self):
return self.keyword
class TitleType(models.Model):
title_type = models.CharField('Type', max_length=200, unique=True)
def __str__(self):
return self.title_type
class Language(models.Model):
language = models.CharField('Sprog', max_length=50, unique=True)
def __str__(self):
return self.language
class Isbn(models.Model):
isbn = ISBNField(null=True, blank=True)
def __str__(self):
return self.isbn
class Audience(models.Model):
audience = models.CharField('Målgruppe', max_length=200, unique=True)
def __str__(self):
return self.audience
class TitleFormat(models.Model):
title_format = models.CharField('Format', max_length=50, unique=True)
def __str__(self):
return self.title_format
class Title(models.Model):
added = models.DateField()
object_id = models.CharField('Ereolen-id', max_length=50, unique=True)
title = models.CharField('Titel', max_length=500)
original_title = models.CharField('Originaltitel', max_length=500, default="")
publish_date = models.DateField(null=True)
dk5 = models.CharField('DK5-kode', max_length=10, default="")
cover_url = models.URLField('Cover-url', max_length=500, null=True)
ereolen_url = models.URLField('Ereolen-url', max_length=500)
abstract = models.TextField(blank=True)
dkcclterm_op = models.DateField()
publisher = models.ForeignKey(Publisher, on_delete=models.CASCADE)
language = models.ForeignKey(Language, on_delete=models.CASCADE)
title_type = models.ForeignKey(TitleType, on_delete=models.CASCADE)
title_format = models.ForeignKey(TitleFormat, on_delete=models.CASCADE)
author = models.ManyToManyField(Author)
keyword = models.ManyToManyField(Keyword)
audience = models.ManyToManyField(Audience)
isbn = models.ManyToManyField(Isbn)
def __str__(self):
return self.title
def get_authors(self):
return " & ".join([author.full_name for author in self.author.all()])
get_authors.short_description = "Author(s)"
def get_isbns(self):
return ", ".join([isbn.isbn for isbn in self.isbn.all()])
get_isbns.short_description = "ISBN(s)"
def get_keywords(self):
return ", ".join([keyword.keyword for keyword in self.keyword.all()])
get_keywords.short_description = "Keyword(s)"
def get_audiences(self):
return ", ".join([audience.audience for audience in self.audience.all()])
get_audiences.short_description = "Audience(s)"
I Python skrev jeg en robot, der søger eReolen igennem, tilføjer nye titler til min database og ignorerer titler, der allerede er i databasen. Robotten satte jeg op til at køre hver nat på min server:
# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program saves ebooks, audiobooks and podcasts from ereolen.dk to a local database
that can be used to detect new titles better than ereolen.dk's own search options """
import requests # make http requests
from bs4 import BeautifulSoup # parse html responses
from datetime import date # create date objects
from dateutil.relativedelta import relativedelta # adding and subtracting months to dates
import re # regex for publish year parsing
import psycopg2 # work with postgresql databases
from psycopg2 import Error # database error handling
# Connect to database
try:
connection = psycopg2.connect(user = "",
password = "",
host = "",
port = "",
database = "")
cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
print("Error while connecting to PostgreSQL", error)
# Set configuration options and global variables
base_url = 'https://ereolen.dk'
term_types = ['ebog','lydbog','podcast']
added = date.today()
number_of_months_to_search = 200
start_month = added - relativedelta(months=number_of_months_to_search-2)
# Search period list goes from current month plus one month and back to start_month
search_period = []
for i in reversed(range(0,number_of_months_to_search)):
year_month_date = start_month + relativedelta(months=+i)
year_month = [year_month_date.year, year_month_date.month]
search_period.append(year_month)
# Crawl loop
title_counter = 0
for year_month in search_period:
for term_type in term_types:
start_date = date(year_month[0],year_month[1],1)
dkcclterm_op_search = start_date.strftime("%Y%m")
page = 0
pages_left = True
while pages_left == True:
# Search for hits
search_url = base_url + '/search/ting/dkcclterm.op%3D' + dkcclterm_op_search + '*%20AND%20term.type%3D' + term_type + '?page=' + str(page) + '&sort=date_descending'
request = requests.get(search_url)
result = request.text
# If an error message is returned in the search, either no results are left, or ereolen.dk is down for some reason
# In this case, the while loop is broken to try next item type and/or next year-month combination
if 'Vi kan desværre ikke finde noget, der matcher din søgning' in result or 'The website encountered an unexpected error. Please try again later.' in result:
pages_left = False
break
# Parse hits and get all item links
soup = BeautifulSoup(result, "lxml")
links = soup.find_all('a', href=True)
item_links = {link['href'] for link in links if "/ting/collection/" in link['href']}
# Go through item link
for link in item_links:
# Get id and check if link is already in databse
object_id = link[link.rfind('/')+1:].replace('%3A',':')
search_sql = '''SELECT * from ereolen_title WHERE object_id = %s'''
cursor.execute(search_sql, (object_id, ))
item_hit = cursor.fetchone()
# No hits means item is not in database and should be added
if not item_hit:
### ADD SEQUENCE ###
# Set full url for item
ereolen_url = base_url + link
# Request item and parse html
title_request = requests.get(ereolen_url)
title_result = title_request.text
title_soup = BeautifulSoup(title_result, "lxml")
# TITLE FIELDS #
# TITLE
try:
title = title_soup.find('div', attrs={'class':'field-name-ting-title'}).text.replace(" : ",": ")
except:
print("Ingen titel på:", ereolen_url)
break
# ORIGINAL TITLE
try:
original_title = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Original titel:")).next.next.text
except:
original_title = ''
# PUBLISHED
try:
published = title_soup.find('div', class_={"field-name-ting-author"}).get_text()
published = int(re.search("[(]\d\d\d\d[)]", published).group()[1:5])
publish_date = date(published,1,1)
except:
publish_date = None
# COVER URL
try:
cover_url = title_soup.find('div', class_={"ting-cover"}).img['src']
except:
try:
data = {
'coverData[0][id]': object_id,
'coverData[0][image_style]': 'ding_primary_large'
}
response = requests.post('https://ereolen.dk/ting/covers', data=data)
response_json = response.json()
cover_url = response_json[0]['url']
except:
cover_url = ''
# ABSTRACT
abstract = title_soup.find('div', attrs={'class':'field-name-ting-abstract'}).text
# DKCCLTERM_OP
dkcclterm_op = start_date
# FOREIGN KEY FIELDS #
# LANGUAGE
try:
ereolen_language = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Sprog:")).next.next.text
except:
ereolen_language = 'Ukendt'
language_sql = '''SELECT * from ereolen_language WHERE language = %s'''
cursor.execute(language_sql, (ereolen_language, ))
try:
language = cursor.fetchone()[0]
except:
language_insert = '''INSERT INTO ereolen_language(language) VALUES(%s) RETURNING id'''
cursor.execute(language_insert, (ereolen_language, ))
language = cursor.fetchone()[0]
# PUBLISHER
try:
ereolen_publisher = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Forlag:")).next.next.text
except:
ereolen_publisher = 'Ukendt'
publisher_sql = '''SELECT * from ereolen_publisher WHERE publisher = %s'''
cursor.execute(publisher_sql, (ereolen_publisher, ))
try:
publisher = cursor.fetchone()[0]
except:
publisher_insert = '''INSERT INTO ereolen_publisher(publisher) VALUES(%s) RETURNING id'''
cursor.execute(publisher_insert, (ereolen_publisher, ))
publisher = cursor.fetchone()[0]
# TYPE
try:
ereolen_type = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Type:")).next.next.text
except:
ereolen_type = 'Ukendt'
type_sql = '''SELECT * from ereolen_titletype WHERE title_type = %s'''
cursor.execute(type_sql, (ereolen_type, ))
try:
title_type = cursor.fetchone()[0]
except:
title_type_insert = '''INSERT INTO ereolen_titletype(title_type) VALUES(%s) RETURNING id'''
cursor.execute(title_type_insert, (ereolen_type, ))
title_type = cursor.fetchone()[0]
# FORMAT
try:
ereolen_format = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Ebogsformat:")).next.next.text
except:
ereolen_format = "Ukendt"
format_sql = '''SELECT * from ereolen_titleformat WHERE title_format = %s'''
cursor.execute(format_sql, (ereolen_format, ))
try:
title_format = cursor.fetchone()[0]
except:
title_format_insert = '''INSERT INTO ereolen_titleformat(title_format) VALUES(%s) RETURNING id'''
cursor.execute(title_format_insert, (ereolen_format, ))
title_format = cursor.fetchone()[0]
# DK5 - TODO: Not done yet
dk5 = ""
### SAVE BEFORE ADDING MANY-TO-MANY FIELDS ###
title_data = (added,title_type,title,original_title,publisher,object_id,language,publish_date,cover_url,ereolen_url,title_format,abstract,dkcclterm_op,dk5)
title_insert = '''INSERT INTO ereolen_title(added,title_type_id,title,original_title,publisher_id,object_id,language_id,publish_date,cover_url,ereolen_url,title_format_id,abstract,dkcclterm_op,dk5) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id'''
cursor.execute(title_insert, title_data)
title_id = cursor.fetchone()[0]
connection.commit()
# MANY-TO-MANY FIELDS #
# AUDIENCE(S)
try:
audience_div = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("Målgruppe:")).next.next
audiences = audience_div.find_all('span')
audiences_list = [aud.text for aud in audiences]
except:
audiences_list = ['Ukendt']
for audience in audiences_list:
audience_sql = '''SELECT * from ereolen_audience WHERE audience = %s'''
cursor.execute(audience_sql, (audience, ))
try:
audience_id = cursor.fetchone()[0]
except:
audience_insert = '''INSERT INTO ereolen_audience(audience) VALUES(%s) RETURNING id'''
cursor.execute(audience_insert, (audience, ))
audience_id = cursor.fetchone()[0]
audience_relation_sql = '''INSERT INTO ereolen_title_audience (title_id, audience_id) VALUES (%s,%s)'''
try:
cursor.execute(audience_relation_sql, (title_id,audience_id))
except:
connection.rollback()
# ISBN(S)
try:
isbn_div = title_soup.find('div', attrs={'class':'field-label'}, string=re.compile("ISBN:")).next.next
isbns = isbn_div.find_all('span')
isbns_list = [isb.text for isb in isbns]
for isbn in isbns_list:
isbn_sql = '''SELECT * from ereolen_isbn WHERE isbn = %s'''
cursor.execute(isbn_sql, (isbn, ))
try:
isbn_id = cursor.fetchone()[0]
except:
isbn_insert = '''INSERT INTO ereolen_isbn(isbn) VALUES(%s) RETURNING id'''
cursor.execute(isbn_insert, (isbn, ))
isbn_id = cursor.fetchone()[0]
isbn_relation_sql = '''INSERT INTO ereolen_title_isbn (title_id, isbn_id) VALUES (%s,%s)'''
try:
cursor.execute(isbn_relation_sql, (title_id,isbn_id))
except:
connection.rollback()
except:
pass
# KEYWORDS(S)
keywords_div = title_soup.find('div', attrs={'class':'field-name-ting-subjects'})
if keywords_div:
keywords = [link.text for link in keywords_div.find_all('a')]
for keyword in keywords:
keyword_sql = '''SELECT * from ereolen_keyword WHERE keyword = %s'''
cursor.execute(keyword_sql, (keyword, ))
try:
keyword_id = cursor.fetchone()[0]
except:
keyword_insert = '''INSERT INTO ereolen_keyword(keyword) VALUES(%s) RETURNING id'''
cursor.execute(keyword_insert, (keyword, ))
keyword_id = cursor.fetchone()[0]
keyword_relation_sql = '''INSERT INTO ereolen_title_keyword (title_id, keyword_id) VALUES (%s,%s)'''
try:
cursor.execute(keyword_relation_sql, (title_id,keyword_id))
except:
connection.rollback()
# AUTHOR(S)
creator_full = title_soup.find('div', attrs={'class':'field-name-ting-author'}).text.replace("Af ","")
# Remove date of book
creator = creator_full[:creator_full.rfind("(")-1]
authors = creator.split(",")
for author in authors:
birth_year = None
if ' (f. ' in author and not len(author) < 7:
if 'ca. ' in author:
author = author.replace('ca. ','')
birth_year_string = author[author.index("(f.")+4:author.index("(f.")+8]
if ')' in birth_year_string:
birth_year_string = birth_year_string.replace(')','')
birth_year = date(int(birth_year_string),1,1)
author = author[:author.index(" (f.")]
elif ' (f. ' in author:
breakpoint()
# Some times there are no authors, but still a published year
if len(author) == 5 and "(" in author:
author = ""
if author:
author = author.strip()
author_sql = '''SELECT * from ereolen_author WHERE full_name = %s'''
cursor.execute(author_sql, (author, ))
try:
author_id = cursor.fetchone()[0]
except:
if birth_year:
author_insert = '''INSERT INTO ereolen_author(full_name,birth_year) VALUES(%s,%s) RETURNING id'''
cursor.execute(author_insert, (author,birth_year))
else:
author_insert = '''INSERT INTO ereolen_author(full_name) VALUES(%s) RETURNING id'''
cursor.execute(author_insert, (author, ))
author_id = cursor.fetchone()[0]
author_relation_sql = '''INSERT INTO ereolen_title_author (title_id, author_id) VALUES (%s,%s)'''
try:
cursor.execute(author_relation_sql, (title_id,author_id))
except:
connection.rollback()
### SAVE ###
connection.commit()
title_counter += 1
page += 1
connection.close()
print('Ereolen crawl ran')
if title_counter > 0:
print('Added titles on ereolen:', title_counter)
Og i Djangos indbyggede administrationsinterface, kan jeg med fint overblik og gode søgnings-, sorterings- og filtreringsmuligheder få øje på en novellesamling af Georg Metz, der netop er dukket op i eReolen med en “dkcclterm.op”-værdi fra september 2013!
Min egen private eReolen. Beklager det bliver lidt småt.
Må jeg prøve?
Jeg ville gerne dele mit værktøj med andre, men det er ikke helt lige til at afklare, hvilke dele af eReolens bogdata, der er frie og offentlige, og hvilke der ejes af en (i mine øjne) lidt underlig konstruktion, der hedder DBC. Et KL-ejet firma (Kommunernes Landsforening), der tjener penge på at sælge data om bøger til – kommuner (og nogle andre aktører, som jeg gætter på næsten udelukkende er offentlige).
Jeg er ved at undersøge, hvad jeg kan offentliggøre uden at genere nogen eller bryde ophavsretsloven. Det kan godt være, det tager lidt tid.
Her er et eksempel på et lille program, der logger ind på politiken.dk. Det kan (sikkert) nemt tilpasses til Jyllands-Posten og evt. andre steder, der bruger samme loginløsning:
import requests
from bs4 import BeautifulSoup
def check_login_wall_presence(session):
# For verification purposes a shared article with a passage behind loginwall with
# is specified
login_wall_article_url = 'https://politiken.dk/del/_gCmczAApUpA'
passage_from_article = 'varieret kost og begrænset vægtøgning'
check_loginwall = session.get(login_wall_article_url)
if not passage_from_article in check_loginwall.text:
return print('Loginwall is on')
else:
return print('Loginwall is off')
# Initiate a requests session
session = requests.Session()
# Check login wall status
check_login_wall_presence(session)
# Medielogin/Politiken username and password
username = "" # ENTER E-MAIL
password = "" # ENTER PASSWORD
# STEP ONE OF LOGIN: Visit login page in order to set cookies and process form fields
login_page_url = 'https://politiken.dk/medielogin/login'
login_page = session.get(login_page_url)
login_page_soup = BeautifulSoup(login_page.text, "lxml")
login_information = {}
login_page_inputs = login_page_soup.find_all('input')
for input in login_page_inputs:
try:
login_information[input['name']] = input['value']
except:
pass
login_information['Username'] = username
login_information['Password'] = password
# STEP TWO OF LOGIN: Post form data from login page
process_login_url = 'https://medielogin.dk/politiken/login'
step_two_login = session.post(process_login_url, data=login_information)
step_two_login_soup = BeautifulSoup(step_two_login.text, "lxml")
# Get form destination
login_form = step_two_login_soup.find('form')
login_form_destination = login_form['action']
# Process form fields
step_two_information = {}
login_inputs = step_two_login_soup.find_all('input')
for input in login_inputs:
try:
step_two_information[input['name']] = input['value']
except:
pass
# STEP THREE OF LOGIN: Post form data to form destination
complete_login = session.post(login_form_destination, data=step_two_information)
# Check login wall status
check_login_wall_presence(session)
Koden bag består af to views i Django. En forside (index), der viser en tabel med en liste over borgerforslag, og en side (forslag), der viser data om det enkelte borgerforslag:
def index(request):
context = {}
today = date.today()
context['today'] = today
all_suggestions = Suggestion.objects.annotate(votes=Max('vote__votes'))
# Sort logic
sort = request.GET.get('sort')
if sort == "-date" or not sort:
all_suggestions = all_suggestions.order_by('-suggested_date')
elif sort == "date":
all_suggestions = all_suggestions.order_by('suggested_date')
elif sort == "title":
all_suggestions = all_suggestions.order_by('title')
elif sort == "-title":
all_suggestions = all_suggestions.order_by('-title')
elif sort == "votes":
all_suggestions = all_suggestions.order_by('votes')
elif sort == "-votes":
all_suggestions = all_suggestions.order_by('-votes')
context['all_suggestions'] = all_suggestions
return render(request, 'borgerforslag/index.html', context)
def forslag(request, id):
context = {}
today = date.today()
start_of_data_collection_date = date(2021, 10, 19)
context['start_of_data_collection_date'] = start_of_data_collection_date
# Votes per day is used to display the aggregated number of votes per day
votes_per_day = Suggestion.objects.filter(pk=id).annotate(date=TruncDate('vote__timestamp')).order_by('id', 'date').annotate(number_of_votes=Max('vote__votes'))
context['votes_per_day'] = votes_per_day
suggestion = Suggestion.objects.get(pk=id)
context['suggestion'] = suggestion
votes = suggestion.vote_set
# The number of votes per day since suggestion was made
votes_max = votes.aggregate(Max('votes'))['votes__max']
context['votes_max'] = votes_max
votes_per_day_average = int(votes_max / (today-suggestion.suggested_date).days)
context['votes_per_day_average'] = votes_per_day_average
# The number of votes per day for the last 7 days
votes_7_days = votes.filter(timestamp__gt=today-timedelta(days=7))
votes_max_min_7_days = votes_7_days.aggregate(Max('votes'), Min('votes'))
try:
votes_per_7_days_average = int((votes_max_min_7_days['votes__max'] - votes_max_min_7_days['votes__min']) / 7)
except:
votes_per_7_days_average = 0
context['votes_per_7_days_average'] = votes_per_7_days_average
days_left_of_suggestion = (suggestion.end_date-today).days
likely_to_succeed_votes_per_day = votes_max + days_left_of_suggestion * votes_per_day_average
context['likely_to_succeed_votes_per_day'] = likely_to_succeed_votes_per_day
likely_to_succeed_votes_per_7_days = votes_max + days_left_of_suggestion * votes_per_7_days_average
context['likely_to_succeed_votes_per_7_days'] = likely_to_succeed_votes_per_7_days
return render(request, 'borgerforslag/forslag.html', context)
Forsiden består af følgende skabelon:
{% extends "borgerforslag/base.html" %}
{% load static %}
{% block title %}Borgerforslag{% endblock %}
{% block content %}{% spaceless %}
<h1 class="display-4 mb-4">Borgerforslag</h1>
<table class="table table-striped"">
<caption>Liste over borgerforslag fra borgerforslag.dk</caption>
<tr>
<th><a title="Sorter efter stillet dato" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "date" %}-date{% else %}date{% endif %}">Stillet dato</a></th>
<th><a title="Sorter efter forslagets titel" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "title" %}-title{% else %}title{% endif %}">Titel</a></th>
<th><a title="Sorter efter antal støtter" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "votes" %}-votes{% else %}votes{% endif %}">Støtter</a></th>
</tr>
{% for suggestion in all_suggestions %}
<tr>
<td class="text-nowrap">{{ suggestion.suggested_date }}</td>
<td><a href="{% url 'borgerforslag_forslag' suggestion.id %}">{{ suggestion.title }}{% if suggestion.votes < 50000 %} ({% if suggestion.end_date < today %}udløb{% else %}udløber{% endif %} {{ suggestion.end_date }}){% endif %}</td>
<td>{{ suggestion.votes }}</td>
</tr>
{% endfor %}
</table>
{% endspaceless %}{% endblock %}
Og forslagssiden genereres af denne skabelon:
{% extends "borgerforslag/base.html" %}
{% load static %}
{% block title %}Borgerforslag: {{ suggestion.title }}{% endblock %}
{% block content %}{% spaceless %}
<h1 class="display-4 mb-4">{{ suggestion.title }}</h1>
<p><a href="https://borgerforslag.dk{{ suggestion.url }}">Læs om forslaget på borgerforslag.dk</a></p>
<p><strong>Startdato:</strong> {{ suggestion.suggested_date }}</p>
<p><strong>Slutdato:</strong> {{ suggestion.end_date }}</p>
<p><strong>Støtter i alt:</strong> {{ votes_max }}{% if votes_max >= 50000 %} - forslaget vil blive fremsat som beslutningsforslag i Folketinget!{% endif %}</p>
<p>I gennemsnit {{ votes_per_day_average }} støtter per dag siden forslaget blev stillet. Hvis trenden fortsætter, opnår forslaget ca. {{ likely_to_succeed_votes_per_day }} støtter inden slutdatoen{% if likely_to_succeed_votes_per_day >= 50000 %} og vil blive fremsat som beslutningsforslag i Folketinget.{% else %}. Det er ikke nok til at blive fremsat som beslutningsforslag i Folketinget.{% endif %}</p>
<p>I gennemsnit {{ votes_per_7_days_average }} støtter per dag de sidste 7 dage. Hvis trenden fortsætter, opnår forslaget ca. {{ likely_to_succeed_votes_per_7_days }} støtter inden slutdatoen{% if likely_to_succeed_votes_per_7_days >= 50000 %} og vil blive fremsat som beslutningsforslag i Folketinget.{% else %}. Det er ikke nok til at blive fremsat som beslutningsforslag i Folketinget.{% endif %}</p>
<h2 class="mb-4">Udvikling i støtter for forslaget</h2>
{% if suggestion.suggested_date < start_of_data_collection_date %}<p>Obs! Forslaget blev fremsat før dataindsamlingen til denne side fra borgerforslag.dk startede den 19. oktober 2021. Der vises derfor ikke en komplet graf over udviklingen i støtter.</p>{% endif %}
<div id="chart"></div>
<script>
var options = {
title: {
text: 'Antal støtter over tid for forslaget {{ suggestion.title }}',
align: 'left'
},
chart: {
type: 'line',
locales: [{
"name": "da",
"options": {
"months": ["Januar", "Februar", "Marts", "April", "Maj", "Juni", "Juli", "August", "September", "Oktober", "November", "December"],
"shortMonths": ["Jan", "Feb", "Mar", "Apr", "Maj", "Jun", "Jul", "Aug", "Sep", "Okt", "Nov", "Dec"],
"days": ["Søndag", "Mandag", "Tirsdag", "Onsdag", "Torsdag", "Fredag", "Lørdag"],
"shortDays": ["Søn", "Man", "Tir", "Ons", "Tor", "Fre", "Lør"],
"toolbar": {
"exportToSVG": "Download SVG",
"exportToPNG": "Download PNG",
"exportToCSV": "Download CSV",
"menu": "Menu",
"selection": "Valg",
"selectionZoom": "Zoom til valg",
"zoomIn": "Zoom ind",
"zoomOut": "Zoom ud",
"pan": "Panorer",
"reset": "Nulstil zoom"
}
}
}],
defaultLocale: "da",
},
series: [{
name: 'Støtter',
data: [{% for suggestion in votes_per_day %}{x: new Date('{{ suggestion.date|date:"Y-m-d" }}').getTime(), y: {{ suggestion.number_of_votes }}},{% endfor %}]
}],
xaxis: {
type: 'datetime',
title: {
text: 'Dato'
}
},
yaxis: {
title: {
text: 'Støtter'
}
},
}
var chart = new ApexCharts(document.querySelector("#chart"), options);
chart.render();
</script>
{% endspaceless %}{% endblock %}
Jeg bruger ApexCharts.js til at vise den fine graf over udviklingen i støtter for hvert enkelt forslag.
Første udgave af robotten gemte det aktuelle stemmeantal for hvert aktivt borgerforslag hvert 10. minut, og da der både er en del borgerforslag og en del minutter, blev det ret hurtigt til ret mange registreringer i min database.
Jeg kom i tanke om, at det kun er nødvendigt at gemme stemmeantallet, når stemmeantallet har ændret sig siden sidste registrering. Hvis et forslag er viralt, registreres stemmeantallet stadigvæk hvert 10. minut. Hvis et forslag er døende, kan der gå meget længere tid mellem hver registrering.
Her er den nye udgave af robotten, som tjekker om der findes andre registreringer af samme forslag med samme stemmeantal, og kun gemmer antal stemmer, hvis der ikke gør:
import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error
# Locale is set to Danish to parse dates correctly
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))
# API url
url = 'https://www.borgerforslag.dk/api/proposals/search'
# Query parameters
suggestions_per_request = 300
params_json = {
"filter": "active",
"sortOrder": "NewestFirst",
"searchQuery":"",
"pageNumber":0,
"pageSize": suggestions_per_request
}
# Connect to database
try:
connection = psycopg2.connect(user = "",
password = "",
host = "",
port = "",
database = "")
cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
print ("Error while connecting to PostgreSQL", error)
now = datetime.utcnow()
# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
with connection:
with connection.cursor() as cur:
try:
# By default, votes are inserted, except when no new votes have been added
# This variable is used to keep track of whether votes should be inserted
insert_votes = True
# See if suggestion already exists in table table borgerforslag_suggestion
sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
cur.execute(sql, (suggestion['externalId'],))
suggestion_records = cur.fetchone()
# If suggestion does not already exist, add suggestion to table borgerforslag_suggestion
if not suggestion_records:
suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
cur.execute(sql, suggestion_data)
id = cur.fetchone()[0]
# If yes, get id of already added suggestion
else:
id = suggestion_records[0]
# Check in table table borgerforslag_vote whether a record with the same number of votes exists.
# If it does, no need to save votes
sql = '''SELECT * FROM borgerforslag_vote WHERE suggestion_id = %s AND votes = %s'''
cur.execute(sql, (id,suggestion['votes']))
vote_record = cur.fetchone()
if vote_record:
insert_votes = False
# Add votes to table borgerforslag_vote (if suggestion is new or vote count has changed since last run)
if insert_votes == True:
sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
VALUES(%s,%s,%s)'''
cur.execute(sql, (id,now,suggestion['votes']))
except Error as e:
print(e, suggestion)
# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0
# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
response = requests.post(url, json=params_json)
json_response = response.json()
number_of_results = json_response['resultCount']
requested_results += suggestions_per_request
number_of_loops += 1
params_json['pageNumber'] += 1
for suggestion in json_response['data']:
suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y') # convert date to datetime
insert_suggestion_and_votes(connection, suggestion)
Oprydning
Nu hvor jeg fik gjort tempoet, min database vokser med, lidt langsommere, ville jeg også gerne rydde lidt op i de gamle registreringer, hvor jeg jo havde gemt antal stemmer hvert 10. minut, uanset om antallet havde ændret sig.
Det skrev jeg også et lille script til. Her er logikken at jeg henter alle stemmeregistreringer sorteret efter hvilket borgerforslag, de hører til, og dernæst efter tidspunkt for registreringen.
Med rækkefølgen på plads, kan jeg for hver registrering tjekke, om den både vedrører samme borgerforslag som den tidligere registrering, og at stemmeantallet er det samme som den tidligere registrering. Hvis begge dele er sandt, er registreringen overflødig og kan slettes:
import psycopg2
from psycopg2 import Error
# Connect to database
try:
connection = psycopg2.connect(user = "",
password = "",
host = "",
port = "",
database = "")
cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
print ("Error while connecting to PostgreSQL", error)
with connection:
with connection.cursor() as cur:
sql = '''SELECT "borgerforslag_vote"."id", "borgerforslag_vote"."suggestion_id", "borgerforslag_vote"."timestamp", "borgerforslag_vote"."votes" FROM "borgerforslag_vote" ORDER BY "borgerforslag_vote"."suggestion_id" ASC, "borgerforslag_vote"."timestamp" ASC'''
cur.execute(sql)
rows = cur.fetchall()
previous_vote_number = -1
previous_vote_suggestion = -1000
for row in rows:
votes = row[3]
suggestion = row[1]
id = row[0]
if votes == previous_vote_number and previous_vote_suggestion == suggestion:
sql = '''DELETE FROM "borgerforslag_vote" WHERE "borgerforslag_vote"."id" = %s'''
cur.execute(sql, (id, ))
previous_vote_number = row[3]
previous_vote_suggestion = row[1]
For tiden er jeg, i arbejdssammenhæng, ved at bestå en basiseksamen i en projektledelsesmetode, der hedder PRINCE2.
Eksamen er en multiple choice-eksamen med 60 spørgsmål. Hvert spørgsmål har 4 svarmuligheder. Og for at bestå skal man svare rigtigt på mindst 33 spørgsmål ud af de 60. Det kunne umiddelbart lyde som om, at man ikke skal vide/huske/kunne særligt meget for at bestå.
Den sandsynlighedsregning, jeg lige kan huske, siger at man i gennemsnit svarer rigtigt på 15 ud af 60 spørgsmål, men hvor tit kan man være heldig at svare rigtigt på mindst 33 spørgsmål?
Det satte jeg 1 million virtuelle aber til at undersøge for mig:
from random import randint
correct_answers = [randint(1,4) for i in range(60)]
correct_answers_times = {}
for times in range(1000000):
guesses = [randint(1,4) for i in range(60)]
number_of_correct_answers = 0
for i in range(60):
if correct_answers[i] == guesses[i]:
number_of_correct_answers += 1
if not number_of_correct_answers in correct_answers_times:
correct_answers_times[number_of_correct_answers] = 1
else:
correct_answers_times[number_of_correct_answers] += 1
print(times)
sorted_correct_answers_times = dict(sorted(correct_answers_times.items()))
print(sorted_correct_answers_times)
>> {2: 3, 3: 49, 4: 192, 5: 679, 6: 2177, 7: 5778, 8: 12346, 9: 23999,
10: 41084, 11: 61815, 12: 84219, 13: 103438, 14: 115277, 15: 117968,
16: 110145, 17: 95734, 18: 76522, 19: 56094, 20: 38622, 21: 24321,
22: 14233, 23: 7867, 24: 4060, 25: 1901, 26: 863, 27: 388, 28: 151,
29: 54, 30: 14, 31: 4, 32: 1, 33: 1, 34: 1}
Output nederst i programmet viser, hvor mange gange X rigtige svar forekom ud af de 1 million gange, mine aber tog testen (“2: 3” læses sådan at det forekom 3 gange, at kun 2 svar var rigtige, osv.)
Det lykkedes kun at bestå eksamen ved hjælp af tilfældige besvarelser i alt 2 ud af 1 million gange. Én gang med 33 rigtige svar. Én gang med 34 rigtige svar.
Hvis mit program ellers er rigtigt, konkluderer jeg at det kræver en god portion held – eller rigtig mange aber til at arbejde for sig – at bestå PRINCE2 uden at vide et eller andet om metoden på forhånd.
Twitterspørgsmål og -svar om skævheder i kandidattests
Tobias var først med at komme med et bud, men jeg kunne ikke lade være med selv at prøve at teste hypotesen. Det kom der dette lille program ud af, der fyrer 25.000 tilfældige svar på testen af sted og ser hvad Danmarks Radio svarer tilbage. Læg mærke til at programmet ikke generer 25.000 forskellige tilfældige kombinationer men blot genererer tilfældige kombinationer 25.000 gange. (Det vil sige, at den samme kombination kan forekomme flere gange ud af de 25.000 gange.):
import requests
import random
base_url = 'https://www.dr.dk/nyheder/politik/api/kandidattest/GetMunicipalityMatch?municipality=124&answers='
stats = {}
for i in range(25000):
try:
sequence = ",".join([str(random.choice([1,2,4,5])) for i in range(18)])
response = requests.get(base_url + sequence)
json = response.json()
candidate_one_party = json['TopMatches'][0]['CandidateBasic']['Party']
if candidate_one_party not in stats:
stats[candidate_one_party] = 1
else:
stats[candidate_one_party] += 1
print(i)
except:
pass
with open('stats.txt', 'w') as output:
output.write(str(stats))
Det kom der følgende rangliste ud af for Københavns Kommune. Tabellen viser hvor mange gange en kandidat fra partiet var den kandidat, man var mest enig med på baggrund af sine svar i testen:
Parti
Antal
Procent
Socialdemokratiet
4210
17%
Det Konservative Folkeparti
3680
15%
Radikale Venstre
3262
13%
Venstre, Danmarks Liberale Parti
2632
11%
SF – Socialistisk Folkeparti
2034
8%
Alternativet
1528
6%
Kristendemokraterne
1231
5%
Frihedslisten
1120
4%
Nye Borgerlige
1109
4%
Enhedslisten – De Rød-Grønne
963
4%
Dansk Folkeparti
960
4%
Kommunisterne
606
2%
Veganerpartiet
421
2%
Københavnerlisten
285
1%
Hampepartiet
273
1%
Liberal Alliance
193
1%
Kommunistisk Parti
190
1%
Danmark for Alle
170
1%
Det Demokratiske Parti
67
0%
Bæredygtigt Samfund
43
0%
Rolig Revolution
21
0%
Total
24998
100%
Tabel der viser udfald af partifarve for mest enige kandidater ved 24998 tilfældige udfyldninger af DRs kandidattest til kommunalvalget for Københavns Kommune
Prøv selv, hvis du gider! Og husk: De fleste, der udfylder kandidattests i virkeligheden, slår nok ikke med terning når de vælger svar.
Peters idé er sjov, synes jeg, så jeg er så småt begyndt at bygge et eller andet, der monitorerer hvordan antallet af underskrifter på borgerforslag udvikler sig over tid.
Så nu tygger min webserver sig igennem nedenstående script hvert 10. minut og gemmer det aktuelle antal underskrifter på hvert borgerforslag. Når der er gået nogle uger, vil jeg se om jeg kan lave nogle interessante visualiseringer af data.
import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error
### PREPARATION ###
# Locale is set to Danish to be able to parse dates from Borgerforslag
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))
# API url and request parameters
url = 'https://www.borgerforslag.dk/api/proposals/search'
suggestions_per_request = 300
params_json = {
"filter": "active",
"sortOrder": "NewestFirst",
"searchQuery":"",
"pageNumber":0,
"pageSize": suggestions_per_request
}
# Connect to database
try:
connection = psycopg2.connect(user = "",
password = "",
host = "",
port = "",
database = "")
cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
print ("Error while connecting to PostgreSQL", error)
now = datetime.utcnow()
# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
with connection:
with connection.cursor() as cur:
try:
# See if suggestion already exists
sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
cur.execute(sql, (suggestion['externalId'],))
suggestion_records = cur.fetchone()
# If not, add suggestion
if not suggestion_records:
suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
cur.execute(sql, suggestion_data)
id = cur.fetchone()[0]
# If yes, get id
else:
id = suggestion_records[0]
# Add votes
sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
VALUES(%s,%s,%s)'''
cur.execute(sql, (id,now,suggestion['votes']))
except Error as e:
print(e, suggestion)
# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0
# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
response = requests.post(url, json=params_json)
json_response = response.json()
number_of_results = json_response['resultCount']
requested_results += suggestions_per_request
number_of_loops += 1
params_json['pageNumber'] += 1
for suggestion in json_response['data']:
suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y') # convert date to datetime
insert_suggestion_and_votes(connection, suggestion)
Opdatering 18/02/2023: Nordnet ændrer tit på deres ting. På https://github.com/helmstedt/nordnet-utilities forsøger jeg at følge med, så hent gerne din kode der, hvis koden neden for ikke virker længere.
Nordnet har opdateret deres loginprocedure, så her er et dugfrist program til at hente kurser hos Nordnet – eller Morningstar, hvis Nordnet skulle fejle:
# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program extracts historical stock prices from Nordnet (and Morningstar as a fallback) """
import requests
from datetime import datetime
from datetime import date
# Nordnet user account credentials
user = ''
password = ''
# DATE AND STOCK DATA. SHOULD BE EDITED FOR YOUR NEEDS #
# Start date (start of historical price period)
startdate = '2013-01-01'
# List of shares to look up prices for.
# Format is: Name, Morningstar id, Nordnet stock identifier
# See e.g. https://www.nordnet.dk/markedet/aktiekurser/16256554-novo-nordisk-b
# (identifier is 16256554)
# All shares must have a name (whatever you like). To get prices they must
# either have a Nordnet identifier or a Morningstar id
sharelist = [
["Maj Invest Danske Obligationer","F0GBR064UX",16099874],
["Novo Nordisk B A/S","0P0000A5BQ",16256554],
]
# A variable to store historical prices before saving to csv
finalresult = ""
finalresult += '"date";"price";"instrument"' + '\n'
# LOGIN TO NORDNET #
session = requests.Session()
# Setting cookies prior to login by visiting login page
url = 'https://www.nordnet.dk/logind'
request = session.get(url)
# Update headers for login
session.headers['client-id'] = 'NEXT'
session.headers['sub-client-id'] = 'NEXT'
# Actual login
url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
request = session.post(url, data = {'username': user, 'password': password})
# LOOPS TO REQUEST HISTORICAL PRICES AT NORDNET AND MORNINGSTAR #
# Nordnet loop to get historical prices
nordnet_fail = []
for share in sharelist:
# Nordnet stock identifier and market number must both exist
if share[2]:
url = "https://www.nordnet.dk/api/2/instruments/historical/prices/" + str(share[2])
payload = {"from": startdate, "fields": "last"}
data = session.get(url, params=payload)
jsondecode = data.json()
# Sometimes the final date is returned twice. A list is created to check for duplicates.
datelist = []
if jsondecode[0]['prices']:
try:
for value in jsondecode[0]['prices']:
if 'last' in value:
price = str(value['last'])
elif 'close_nav' in value:
price = str(value['close_nav'])
price = price.replace(".",",")
date = datetime.fromtimestamp(value['time'] / 1000)
date = datetime.strftime(date, '%Y-%m-%d')
# Only adds a date if it has not been added before
if date not in datelist:
datelist.append(date)
finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
except Exception as error:
print(error)
breakpoint()
# No price data returned! Try another method!
else:
nordnet_fail.append(share)
if nordnet_fail:
print(nordnet_fail)
# Morningstar loop to get historical prices
for share in nordnet_fail:
# Only runs for one specific fund in this instance
payload = {"id": share[1], "currencyId": "DKK", "idtype": "Morningstar", "frequency": "daily", "startDate": startdate, "outputType": "COMPACTJSON"}
data = requests.get("http://tools.morningstar.dk/api/rest.svc/timeseries_price/nen6ere626", params=payload)
jsondecode = data.json()
for lists in jsondecode:
price = str(lists[1])
price = price.replace(".",",")
date = datetime.fromtimestamp(lists[0] / 1000)
date = datetime.strftime(date, '%Y-%m-%d')
finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
# WRITE CSV OUTPUT TO FILE #
with open("kurser.csv", "w", newline='', encoding='utf8') as fout:
fout.write(finalresult)