Kodejulekalender

Ovre på https://adventofcode.com/ kan man hver dag finde to kodeopgaver og prøve at løse dem. Det er nærmest som kryds-og-tværs eller sudoko, bare med kode i stedet. Her er mine løsninger.

Dag 5, del 2:

straight_lines = []
diagonal_lines = []
x_values = []
y_values = []
with open("input_day5.txt", "r", encoding="utf8") as fin:
	for f in fin:
		f = f.replace("\n","")
		x1 = int(f[:f.index(",")])
		y1 = int(f[f.index(",")+1:f.index(" -> ")])
		x2 = int(f[f.index(" -> ")+4:f.index(",",f.index(" -> ")+4)])
		y2 = int(f[f.index(",",f.index(" -> "))+1:])
		x_values.append(x1)
		x_values.append(x2)
		y_values.append(y1)
		y_values.append(y2)
		if x1 == x2 or y1 == y2:
			straight_lines.append([(x1, y1), (x2, y2)])
		elif x1 != x2 and y1 != y2:
			diagonal_lines.append([(x1, y1), (x2, y2)])

coordinates = {}
for x in range(max(x_values)+1):
	for y in range(max(y_values)+1):
		coordinates[str(x) + "," + str(y)] = 0

def add_line_to_coordinates(x,y):
	key = str(x) + "," + str(y)
	coordinates[key] += 1

for line in diagonal_lines:
	if line[0][0] > line[1][0] and line[0][1] > line[1][1]:
		number_of_coordinates = line[0][0] - line[1][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[1][0] + i, line[1][1] + i)
	elif line[0][0] > line[1][0] and line[0][1] < line[1][1]:
		number_of_coordinates = line[0][0] - line[1][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[1][0] + i, line[1][1] - i)
	elif line[0][0] < line[1][0] and line[0][1] > line[1][1]:
		number_of_coordinates = line[1][0] - line[0][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[0][0] + i, line[0][1] - i)
	elif line[0][0] < line[1][0] and line[0][1] < line[1][1]:
		number_of_coordinates = line[1][0] - line[0][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[0][0] + i, line[0][1] + i)	
	
for line in straight_lines:
	if line[0][0] == line[1][0]:		# x values are equal
		if line[0][1] < line[1][1]:		# first y value is lowest
			for i in range(line[0][1], line[1][1]+1):
				add_line_to_coordinates(line[0][0], i)
		elif line[0][1] > line[1][1]:	# second y value is lowest
			for i in range(line[1][1], line[0][1]+1):
				add_line_to_coordinates(line[0][0], i)
		else:							# y values are equal, so only one point
			add_line_to_coordinates(line[0][0], line[0][1])
	else:								# y values are equeal
		if line[0][0] < line[1][0]:		# first x value is lowest
			for i in range(line[0][0], line[1][0]+1):
				add_line_to_coordinates(i, line[0][1])
		elif line[0][0] > line[1][0]:	# second x value is lowest
			for i in range(line[1][0], line[0][0]+1):
				add_line_to_coordinates(i, line[0][1])
		else:
			add_line_to_coordinates(line[0][0], line[0][1])

double_hits = 0
for coordinate, number_of_hits in coordinates.items():
	if number_of_hits > 1:
		double_hits += 1
print(double_hits)

Dag 5, del 1:

straight_lines = []
x_values = []
y_values = []
with open("input_day5.txt", "r", encoding="utf8") as fin:
	for f in fin:
		f = f.replace("\n","")
		x1 = int(f[:f.index(",")])
		y1 = int(f[f.index(",")+1:f.index(" -> ")])
		x2 = int(f[f.index(" -> ")+4:f.index(",",f.index(" -> ")+4)])
		y2 = int(f[f.index(",",f.index(" -> "))+1:])
		if x1 == x2 or y1 == y2:
			x_values.append(x1)
			x_values.append(x2)
			y_values.append(y1)
			y_values.append(y2)
			straight_lines.append([(x1, y1), (x2, y2)])

coordinates = {}
for x in range(max(x_values)+1):
	for y in range(max(y_values)+1):
		coordinates[str(x) + "," + str(y)] = 0

def add_line_to_coordinates(x,y):
	key = str(x) + "," + str(y)
	coordinates[key] += 1

for line in straight_lines:
	if line[0][0] == line[1][0]:		# x values are equal
		if line[0][1] < line[1][1]:		# first y value is lowest
			for i in range(line[0][1], line[1][1]+1):
				add_line_to_coordinates(line[0][0], i)
		elif line[0][1] > line[1][1]:	# second y value is lowest
			for i in range(line[1][1], line[0][1]+1):
				add_line_to_coordinates(line[0][0], i)
		else:							# y values are equal, so only one point
			add_line_to_coordinates(line[0][0], line[0][1])
	else:								# y values are equeal
		if line[0][0] < line[1][0]:		# first x value is lowest
			for i in range(line[0][0], line[1][0]+1):
				add_line_to_coordinates(i, line[0][1])
		elif line[0][0] > line[1][0]:	# second x value is lowest
			for i in range(line[1][0], line[0][0]+1):
				add_line_to_coordinates(i, line[0][1])
		else:
			add_line_to_coordinates(line[0][0], line[0][1])

double_hits = 0
for coordinate, number_of_hits in coordinates.items():
	if number_of_hits > 1:
		double_hits += 1
print(double_hits)

Dag 4, del 2:

boards = []
with open("input_day4.txt", "r", encoding="utf8") as fin:
	counter = 0
	sub_counter = 0
	board = []
	for f in fin:
		if counter == 0:
			f = f.replace("\n","")
			drawn_numbers = f.split(",")
		elif f == '\n':
			sub_counter = 0
			board = []
		else:
			if f[0] == " ":
				f = f[1:]
			if "  " in f:
				f = f.replace("  "," ")
			f = f.replace("\n","")
			board.extend(f.split(" "))
			sub_counter += 1
		if sub_counter == 5:
			board = [b.replace("\n","") for b in board]
			boards.append(board)
		counter = 1

def check_board(board):
	i = 0
	while i < 26:
		d_count = 0
		for number in board[i:i+5]:
			if 'd' in number:
				d_count += 1
				if d_count == 5:
					return True
		i += 5	
	row = 0
	while row < 5:
		i = row
		d_count = 0
		while i < row + 25:
			if 'd' in board[i]:
				d_count += 1
				if d_count == 5:
					return True
			i += 5
		row += 1	

def calculate_score(board, draw):
	sum_of_numbers = 0
	for number in board:
		if not 'd' in number:
			sum_of_numbers += int(number)
	return sum_of_numbers * int(draw)

winning_boards = []
scores = []

draw_counter = 0
end_loop = False
for draw in drawn_numbers:
	board_counter = 0
	for board in boards:
		if board not in winning_boards:
			number_counter = 0
			for number in board:
				if number == draw:
					boards[board_counter][number_counter] += 'd'
				number_counter += 1
			bingo = check_board(board)
			if bingo == True:
				winning_boards.append(board)
				scores.append(calculate_score(board, draw))
		board_counter += 1	
	draw_counter += 1

print(scores[-1])

Dag 4, del 1:

boards = []
with open("input_day4.txt", "r", encoding="utf8") as fin:
	counter = 0
	sub_counter = 0
	board = []
	for f in fin:
		if counter == 0:
			f = f.replace("\n","")
			drawn_numbers = f.split(",")
		elif f == '\n':
			sub_counter = 0
			board = []
		else:
			if f[0] == " ":
				f = f[1:]
			if "  " in f:
				f = f.replace("  "," ")
			f = f.replace("\n","")
			board.extend(f.split(" "))
			sub_counter += 1
		if sub_counter == 5:
			board = [b.replace("\n","") for b in board]
			boards.append(board)
		counter = 1

def check_board(board):
	i = 0
	while i < 26:
		d_count = 0
		for number in board[i:i+5]:
			if 'd' in number:
				d_count += 1
				if d_count == 5:
					return True
		i += 5	
	row = 0
	while row < 5:
		i = row
		d_count = 0
		while i < row + 25:
			if 'd' in board[i]:
				d_count += 1
				if d_count == 5:
					return True
			i += 5
		row += 1	

def calculate_score(board, draw):
	sum_of_numbers = 0
	for number in board:
		if not 'd' in number:
			sum_of_numbers += int(number)
	return sum_of_numbers * int(draw)

def play_bingo():
	draw_counter = 0
	end_loop = False
	for draw in drawn_numbers:
		board_counter = 0
		for board in boards:
			number_counter = 0
			for number in board:
				if number == draw:
					boards[board_counter][number_counter] += 'd'
				number_counter += 1
			bingo = check_board(board)
			if bingo == True:
				return calculate_score(board, draw)
			board_counter += 1	
		draw_counter += 1

score = play_bingo()
print(score)

Dag 3, del 2:

rates = []
with open("input_day3", "r", encoding="utf8") as fin:
	for f in fin:
		rates.append(f)

oxygen_rates = rates
co2_rates = rates

for i in range(len(rates[0])-1):
	zero_count = 0
	one_count = 0
	for rate in oxygen_rates:
		if rate[i] == '0':
			zero_count += 1
		elif rate[i] == '1':
			one_count += 1
	if zero_count > one_count:
		oxygen_rates = [rate for rate in oxygen_rates if rate[i] == '0']
	elif one_count > zero_count or one_count == zero_count:
		oxygen_rates = [rate for rate in oxygen_rates if rate[i] == '1']
	
	if len(oxygen_rates) == 1:
		oxygen = oxygen_rates[0]
	
	zero_count = 0
	one_count = 0
	for rate in co2_rates:
		if rate[i] == '0':
			zero_count += 1
		elif rate[i] == '1':
			one_count += 1
	if zero_count > one_count:
		co2_rates = [rate for rate in co2_rates if rate[i] == '1']
	elif one_count > zero_count or one_count == zero_count:
		co2_rates = [rate for rate in co2_rates if rate[i] == '0']

	if len(co2_rates) == 1:
		co2 = co2_rates[0]
	
print(int(oxygen,2)*int(co2,2))

Dag 3, del 1:

rates = []
with open("input_day3", "r", encoding="utf8") as fin:
	for f in fin:
		rates.append(f)

gamma_rate = ""
epsilon_rate = ""
for i in range(len(rates[0])-1):
	zero_count = 0
	one_count = 0
	for rate in rates:
		if int(rate[i]) == 0:
			zero_count += 1
		elif int(rate[i]) == 1:
			one_count += 1
	if zero_count > one_count:
		gamma_rate += '0'
		epsilon_rate += '1'
	else:
		gamma_rate += '1'
		epsilon_rate += '0'	
print(int(gamma_rate,2) * int(epsilon_rate,2))

Dag 2, del 2:

increase_count = 0
loop_count = 0
position_list = []
with open("input_day2.txt", "r", encoding="utf8") as fin:
	for f in fin:
		position_list.append(f)

horisontal_position = 0
aim = 0
depth = 0
for move in position_list:
	if 'forward ' in move:
		number = int(move.replace('forward ',''))
		horisontal_position += number
		if aim > 0:
			depth += number * aim
	elif 'down ' in move:
		number = int(move.replace('down ',''))
		aim += number
	elif 'up ' in move:
		number = int(move.replace('up ',''))
		aim -= number
	else:
		breakpoint()
print(horisontal_position*depth)

Dag 2, del 1:

increase_count = 0
loop_count = 0
position_list = []
with open("input_day2.txt", "r", encoding="utf8") as fin:
	for f in fin:
		position_list.append(f)

forward_position = 0
depth = 0
for move in position_list:
	if 'forward ' in move:
		number = int(move.replace('forward ',''))
		forward_position += number
	elif 'down ' in move:
		number = int(move.replace('down ',''))
		depth += number
	elif 'up ' in move:
		number = int(move.replace('up ',''))
		depth -= number
	else:
		breakpoint()
print(forward_position*depth)		

Dag 1, del 2:

increase_count = 0
loop_count = 0
number_list = []
with open("input.txt", "r", encoding="utf8") as fin:
	for f in fin:
		number_list.append(int(f))

index_start = 0
index_end = 3
while index_end <= len(number_list):
	print(number_list[index_start:index_end])
	if loop_count == 0:
		last_sum = sum(number_list[index_start:index_end])
		loop_count += 1
		index_start += 1
		index_end += 1
	else:
		new_sum = sum(number_list[index_start:index_end])
		if new_sum > last_sum:
			increase_count += 1
		index_start += 1
		index_end += 1
		last_sum = new_sum
print(increase_count)

Dag 1, del 1:

increase_count = 0
loop_count = 0
with open("input.txt", "r", encoding="utf8") as fin:
	for f in fin:
		number = int(f)
		if loop_count == 0:
			last_number = number
			loop_count += 1
			continue
		else:
			if number > last_number:
				increase_count += 1
			last_number = number
print(increase_count)	

Borgerforslag i version 1

Jeg har endelig haft til til at vise mine nyindsamlede data fra borgerforslag.dk frem for verden.

Jeg har tidligere skrevet om, hvordan jeg har sat en robot til at indhente støttedata om borgerforslag, og hvordan jeg optimerede indsamlingen af data.

Nu har jeg bygget første udgave af en Django-app, der fremviser mine skønne data.

Du kan prøve den af på wallnot.dk/borgerforslag.

Koden bag består af to views i Django. En forside (index), der viser en tabel med en liste over borgerforslag, og en side (forslag), der viser data om det enkelte borgerforslag:

def index(request):
	context = {}
	today = date.today()
	context['today'] = today
	all_suggestions = Suggestion.objects.annotate(votes=Max('vote__votes'))
	
	# Sort logic
	sort = request.GET.get('sort')

	if sort == "-date" or not sort:
		all_suggestions = all_suggestions.order_by('-suggested_date')
	elif sort == "date":
		all_suggestions = all_suggestions.order_by('suggested_date')
	elif sort == "title":
		all_suggestions = all_suggestions.order_by('title')	
	elif sort == "-title":
		all_suggestions = all_suggestions.order_by('-title')	
	elif sort == "votes":
		all_suggestions = all_suggestions.order_by('votes')
	elif sort == "-votes":
		all_suggestions = all_suggestions.order_by('-votes')

	context['all_suggestions'] = all_suggestions
	return render(request, 'borgerforslag/index.html', context)

def forslag(request, id):
	context = {}
	today = date.today()
	start_of_data_collection_date = date(2021, 10, 19)
	context['start_of_data_collection_date'] = start_of_data_collection_date

	# Votes per day is used to display the aggregated number of votes per day
	votes_per_day = Suggestion.objects.filter(pk=id).annotate(date=TruncDate('vote__timestamp')).order_by('id', 'date').annotate(number_of_votes=Max('vote__votes'))
	context['votes_per_day'] = votes_per_day
	
	suggestion = Suggestion.objects.get(pk=id)
	context['suggestion'] = suggestion
	votes = suggestion.vote_set

	# The number of votes per day since suggestion was made
	votes_max = votes.aggregate(Max('votes'))['votes__max']
	context['votes_max'] = votes_max
	votes_per_day_average = int(votes_max / (today-suggestion.suggested_date).days)
	context['votes_per_day_average'] = votes_per_day_average
	
	# The number of votes per day for the last 7 days
	votes_7_days = votes.filter(timestamp__gt=today-timedelta(days=7))
	votes_max_min_7_days = votes_7_days.aggregate(Max('votes'), Min('votes'))
	try:
		votes_per_7_days_average = int((votes_max_min_7_days['votes__max'] - votes_max_min_7_days['votes__min']) / 7)
	except:
		votes_per_7_days_average = 0
	context['votes_per_7_days_average'] = votes_per_7_days_average
	
	days_left_of_suggestion = (suggestion.end_date-today).days
	likely_to_succeed_votes_per_day = votes_max + days_left_of_suggestion * votes_per_day_average
	context['likely_to_succeed_votes_per_day'] = likely_to_succeed_votes_per_day
	likely_to_succeed_votes_per_7_days = votes_max + days_left_of_suggestion * votes_per_7_days_average
	context['likely_to_succeed_votes_per_7_days'] = likely_to_succeed_votes_per_7_days
	
	return render(request, 'borgerforslag/forslag.html', context)

Forsiden består af følgende skabelon:

{% extends "borgerforslag/base.html" %}
{% load static %}
{% block title %}Borgerforslag{% endblock %}
{% block content %}{% spaceless %}

<h1 class="display-4 mb-4">Borgerforslag</h1>

<table class="table table-striped"">
	<caption>Liste over borgerforslag fra borgerforslag.dk</caption>
	<tr>
		<th><a title="Sorter efter stillet dato" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "date" %}-date{% else %}date{% endif %}">Stillet dato</a></th>
		<th><a title="Sorter efter forslagets titel" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "title" %}-title{% else %}title{% endif %}">Titel</a></th>
		<th><a title="Sorter efter antal støtter" href="{% url 'borgerforslag_index' %}?sort={% if request.GET.sort == "votes" %}-votes{% else %}votes{% endif %}">Støtter</a></th>
	</tr>
	{% for suggestion in all_suggestions %}
	<tr>
		<td class="text-nowrap">{{ suggestion.suggested_date }}</td>
		<td><a href="{% url 'borgerforslag_forslag' suggestion.id  %}">{{ suggestion.title }}{% if suggestion.votes < 50000 %} ({% if suggestion.end_date < today %}udløb{% else %}udløber{% endif %} {{ suggestion.end_date }}){% endif %}</td>
		<td>{{ suggestion.votes }}</td>
	</tr>
	{% endfor %}
</table>
{% endspaceless %}{% endblock %}

Og forslagssiden genereres af denne skabelon:

{% extends "borgerforslag/base.html" %}
{% load static %}
{% block title %}Borgerforslag: {{ suggestion.title }}{% endblock %}
{% block content %}{% spaceless %}

<h1 class="display-4 mb-4">{{ suggestion.title }}</h1>
<p><a href="https://borgerforslag.dk{{ suggestion.url }}">Læs om forslaget på borgerforslag.dk</a></p>
<p><strong>Startdato:</strong> {{ suggestion.suggested_date }}</p>
<p><strong>Slutdato:</strong> {{ suggestion.end_date }}</p>
<p><strong>Støtter i alt:</strong> {{ votes_max }}{% if votes_max >= 50000 %} - forslaget vil blive fremsat som beslutningsforslag i Folketinget!{% endif %}</p>

<p>I gennemsnit {{ votes_per_day_average }} støtter per dag siden forslaget blev stillet. Hvis trenden fortsætter, opnår forslaget ca. {{ likely_to_succeed_votes_per_day }} støtter inden slutdatoen{% if likely_to_succeed_votes_per_day >= 50000 %} og vil blive fremsat som beslutningsforslag i Folketinget.{% else %}. Det er ikke nok til at blive fremsat som beslutningsforslag i Folketinget.{% endif %}</p>
<p>I gennemsnit {{ votes_per_7_days_average }} støtter per dag de sidste 7 dage. Hvis trenden fortsætter, opnår forslaget ca. {{ likely_to_succeed_votes_per_7_days }} støtter inden slutdatoen{% if likely_to_succeed_votes_per_7_days >= 50000 %} og vil blive fremsat som beslutningsforslag i Folketinget.{% else %}. Det er ikke nok til at blive fremsat som beslutningsforslag i Folketinget.{% endif %}</p>

<h2 class="mb-4">Udvikling i støtter for forslaget</h2>
{% if suggestion.suggested_date < start_of_data_collection_date %}<p>Obs! Forslaget blev fremsat før dataindsamlingen til denne side fra borgerforslag.dk startede den 19. oktober 2021. Der vises derfor ikke en komplet graf over udviklingen i støtter.</p>{% endif %}

<div id="chart"></div>

<script>
var options = {
	title: {
	  text: 'Antal støtter over tid for forslaget {{ suggestion.title }}',
	  align: 'left'
	},
  chart: {
    type: 'line',
    locales: [{
      "name": "da",
      "options": {
        "months": ["Januar", "Februar", "Marts", "April", "Maj", "Juni", "Juli", "August", "September", "Oktober", "November", "December"],
        "shortMonths": ["Jan", "Feb", "Mar", "Apr", "Maj", "Jun", "Jul", "Aug", "Sep", "Okt", "Nov", "Dec"],
        "days": ["Søndag", "Mandag", "Tirsdag", "Onsdag", "Torsdag", "Fredag", "Lørdag"],
        "shortDays": ["Søn", "Man", "Tir", "Ons", "Tor", "Fre", "Lør"],
        "toolbar": {
            "exportToSVG": "Download SVG",
            "exportToPNG": "Download PNG",
			"exportToCSV": "Download CSV",
            "menu": "Menu",
            "selection": "Valg",
            "selectionZoom": "Zoom til valg",
            "zoomIn": "Zoom ind",
            "zoomOut": "Zoom ud",
            "pan": "Panorer",
            "reset": "Nulstil zoom"
        }
      }
    }],
	defaultLocale: "da",
  },
	series: [{
	  name: 'Støtter',
	  data: [{% for suggestion in votes_per_day %}{x: new Date('{{ suggestion.date|date:"Y-m-d" }}').getTime(), y: {{ suggestion.number_of_votes }}},{% endfor %}]
	}], 
	xaxis: {
	  type: 'datetime',
	  title: {
		text: 'Dato'
	  }	  
	},  
	yaxis: {
	  title: {
		text: 'Støtter'
	  }
	},  
}

var chart = new ApexCharts(document.querySelector("#chart"), options);

chart.render();
</script>
{% endspaceless %}{% endblock %}

Jeg bruger ApexCharts.js til at vise den fine graf over udviklingen i støtter for hvert enkelt forslag.

Optimering af indsamling af Borgerforslagsdata

For et par uger siden skrev jeg om en lille robot, jeg har lavet, der tjekker antallet af stemmer per borgerforslagborgerforslag.dk.

Første udgave af robotten gemte det aktuelle stemmeantal for hvert aktivt borgerforslag hvert 10. minut, og da der både er en del borgerforslag og en del minutter, blev det ret hurtigt til ret mange registreringer i min database.

Jeg kom i tanke om, at det kun er nødvendigt at gemme stemmeantallet, når stemmeantallet har ændret sig siden sidste registrering. Hvis et forslag er viralt, registreres stemmeantallet stadigvæk hvert 10. minut. Hvis et forslag er døende, kan der gå meget længere tid mellem hver registrering.

Her er den nye udgave af robotten, som tjekker om der findes andre registreringer af samme forslag med samme stemmeantal, og kun gemmer antal stemmer, hvis der ikke gør:

import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error

# Locale is set to Danish to parse dates correctly
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))

# API url
url = 'https://www.borgerforslag.dk/api/proposals/search'

# Query parameters
suggestions_per_request = 300
params_json = {
	"filter": "active",
	"sortOrder": "NewestFirst",
	"searchQuery":"",
	"pageNumber":0,
	"pageSize": suggestions_per_request
}

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

now = datetime.utcnow()

# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
	with connection:
		with connection.cursor() as cur:
			try:
				# By default, votes are inserted, except when no new votes have been added
				# This variable is used to keep track of whether votes should be inserted
				insert_votes = True
				
				# See if suggestion already exists in table table borgerforslag_suggestion
				sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
				cur.execute(sql, (suggestion['externalId'],))
				suggestion_records = cur.fetchone()
				# If suggestion does not already exist, add suggestion to table borgerforslag_suggestion
				if not suggestion_records:
					suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
					sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
					cur.execute(sql, suggestion_data)
					id = cur.fetchone()[0]
				# If yes, get id of already added suggestion
				else:
					id = suggestion_records[0]
					# Check in table table borgerforslag_vote whether a record with the same number of votes exists.
					# If it does, no need to save votes
					sql = '''SELECT * FROM borgerforslag_vote WHERE suggestion_id = %s AND votes = %s'''
					cur.execute(sql, (id,suggestion['votes']))
					vote_record = cur.fetchone()
					if vote_record:
						insert_votes = False

				# Add votes to table borgerforslag_vote (if suggestion is new or vote count has changed since last run)
				if insert_votes == True:
					sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
					VALUES(%s,%s,%s)'''
					cur.execute(sql, (id,now,suggestion['votes']))
			except Error as e:
				print(e, suggestion)

# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0

# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
	response = requests.post(url, json=params_json)
	json_response = response.json()
	number_of_results = json_response['resultCount']
	requested_results += suggestions_per_request
	number_of_loops += 1
	params_json['pageNumber'] += 1
	for suggestion in json_response['data']:
		suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y')	# convert date to datetime
		insert_suggestion_and_votes(connection, suggestion)

Oprydning

Nu hvor jeg fik gjort tempoet, min database vokser med, lidt langsommere, ville jeg også gerne rydde lidt op i de gamle registreringer, hvor jeg jo havde gemt antal stemmer hvert 10. minut, uanset om antallet havde ændret sig.

Det skrev jeg også et lille script til. Her er logikken at jeg henter alle stemmeregistreringer sorteret efter hvilket borgerforslag, de hører til, og dernæst efter tidspunkt for registreringen.

Med rækkefølgen på plads, kan jeg for hver registrering tjekke, om den både vedrører samme borgerforslag som den tidligere registrering, og at stemmeantallet er det samme som den tidligere registrering. Hvis begge dele er sandt, er registreringen overflødig og kan slettes:

import psycopg2
from psycopg2 import Error

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

with connection:
	with connection.cursor() as cur:
		sql = '''SELECT "borgerforslag_vote"."id", "borgerforslag_vote"."suggestion_id", "borgerforslag_vote"."timestamp", "borgerforslag_vote"."votes" FROM "borgerforslag_vote" ORDER BY "borgerforslag_vote"."suggestion_id" ASC, "borgerforslag_vote"."timestamp" ASC'''
		cur.execute(sql)
		rows = cur.fetchall()

		previous_vote_number = -1
		previous_vote_suggestion = -1000
		for row in rows:
			votes = row[3]
			suggestion = row[1]
			id = row[0]
			if votes == previous_vote_number and previous_vote_suggestion == suggestion:
				sql = '''DELETE FROM "borgerforslag_vote" WHERE "borgerforslag_vote"."id" = %s'''
				cur.execute(sql, (id, ))
			previous_vote_number = row[3]
			previous_vote_suggestion = row[1]

Hvor mange aber skal der til at bestå PRINCE2?

For tiden er jeg, i arbejdssammenhæng, ved at bestå en basiseksamen i en projektledelsesmetode, der hedder PRINCE2.

Eksamen er en multiple choice-eksamen med 60 spørgsmål. Hvert spørgsmål har 4 svarmuligheder. Og for at bestå skal man svare rigtigt på mindst 33 spørgsmål ud af de 60. Det kunne umiddelbart lyde som om, at man ikke skal vide/huske/kunne særligt meget for at bestå.

Den sandsynlighedsregning, jeg lige kan huske, siger at man i gennemsnit svarer rigtigt på 15 ud af 60 spørgsmål, men hvor tit kan man være heldig at svare rigtigt på mindst 33 spørgsmål?

Det satte jeg 1 million virtuelle aber til at undersøge for mig:

from random import randint

correct_answers = [randint(1,4) for i in range(60)]
correct_answers_times = {}

for times in range(1000000):
	guesses = [randint(1,4) for i in range(60)]
	number_of_correct_answers = 0
	for i in range(60):
		if correct_answers[i] == guesses[i]:
			number_of_correct_answers += 1
	if not number_of_correct_answers in correct_answers_times:
		correct_answers_times[number_of_correct_answers] = 1
	else:
		correct_answers_times[number_of_correct_answers] += 1
	print(times)
sorted_correct_answers_times = dict(sorted(correct_answers_times.items()))

print(sorted_correct_answers_times)	
>> {2: 3, 3: 49, 4: 192, 5: 679, 6: 2177, 7: 5778, 8: 12346, 9: 23999,
10: 41084, 11: 61815, 12: 84219, 13: 103438, 14: 115277, 15: 117968,
16: 110145, 17: 95734, 18: 76522, 19: 56094, 20: 38622, 21: 24321,
22: 14233, 23: 7867, 24: 4060, 25: 1901, 26: 863, 27: 388, 28: 151,
29: 54, 30: 14, 31: 4, 32: 1, 33: 1, 34: 1}

Output nederst i programmet viser, hvor mange gange X rigtige svar forekom ud af de 1 million gange, mine aber tog testen (“2: 3” læses sådan at det forekom 3 gange, at kun 2 svar var rigtige, osv.)

Det lykkedes kun at bestå eksamen ved hjælp af tilfældige besvarelser i alt 2 ud af 1 million gange. Én gang med 33 rigtige svar. Én gang med 34 rigtige svar.

Hvis mit program ellers er rigtigt, konkluderer jeg at det kræver en god portion held – eller rigtig mange aber til at arbejde for sig – at bestå PRINCE2 uden at vide et eller andet om metoden på forhånd.

Test af kandidattest hos DR med Python

Hos Danmarks Radio kan man forsøge at afklare sine holdninger og se, hvilke politikeres holdninger, der ligner mest. Rækken af metodiske problemer med sådan nogle kandidattests er lang, men på Twitter skrev Søren om en mulig skævhed alene i kraft af, at der typisk er “flere kandidater i posen” fra store partier end små:

Twitterspørgsmål og -svar om skævheder i kandidattests

Tobias var først med at komme med et bud, men jeg kunne ikke lade være med selv at prøve at teste hypotesen. Det kom der dette lille program ud af, der fyrer 25.000 tilfældige svar på testen af sted og ser hvad Danmarks Radio svarer tilbage. Læg mærke til at programmet ikke generer 25.000 forskellige tilfældige kombinationer men blot genererer tilfældige kombinationer 25.000 gange. (Det vil sige, at den samme kombination kan forekomme flere gange ud af de 25.000 gange.):

import requests
import random

base_url = 'https://www.dr.dk/nyheder/politik/api/kandidattest/GetMunicipalityMatch?municipality=124&answers='
stats = {}

for i in range(25000):
	try:
		sequence = ",".join([str(random.choice([1,2,4,5])) for i in range(18)])
		response = requests.get(base_url + sequence)
		json = response.json()
		candidate_one_party = json['TopMatches'][0]['CandidateBasic']['Party']
		if candidate_one_party not in stats:
			stats[candidate_one_party] = 1
		else:
			stats[candidate_one_party] += 1
		print(i)	
	except:
		pass

with open('stats.txt', 'w') as output:
	output.write(str(stats))

Det kom der følgende rangliste ud af for Københavns Kommune. Tabellen viser hvor mange gange en kandidat fra partiet var den kandidat, man var mest enig med på baggrund af sine svar i testen:

Parti Antal Procent
Socialdemokratiet421017%
Det Konservative Folkeparti368015%
Radikale Venstre326213%
Venstre, Danmarks Liberale Parti263211%
SF – Socialistisk Folkeparti20348%
Alternativet15286%
Kristendemokraterne12315%
Frihedslisten11204%
Nye Borgerlige11094%
Enhedslisten – De Rød-Grønne9634%
Dansk Folkeparti9604%
Kommunisterne6062%
Veganerpartiet4212%
Københavnerlisten2851%
Hampepartiet2731%
Liberal Alliance1931%
Kommunistisk Parti1901%
Danmark for Alle1701%
Det Demokratiske Parti670%
Bæredygtigt Samfund430%
Rolig Revolution210%
Total 24998 100%
Tabel der viser udfald af partifarve for mest enige kandidater ved 24998 tilfældige udfyldninger af DRs kandidattest til kommunalvalget for Københavns Kommune

Prøv selv, hvis du gider! Og husk: De fleste, der udfylder kandidattests i virkeligheden, slår nok ikke med terning når de vælger svar.

Hvordan udvikler antal underskrifter sig på Borgerforslag.dk?

På Twitter skrev Peter Brodersen:

Peters idé er sjov, synes jeg, så jeg er så småt begyndt at bygge et eller andet, der monitorerer hvordan antallet af underskrifter på borgerforslag udvikler sig over tid.

Så nu tygger min webserver sig igennem nedenstående script hvert 10. minut og gemmer det aktuelle antal underskrifter på hvert borgerforslag. Når der er gået nogle uger, vil jeg se om jeg kan lave nogle interessante visualiseringer af data.

import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error

### PREPARATION ###
# Locale is set to Danish to be able to parse dates from Borgerforslag
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))

# API url and request parameters
url = 'https://www.borgerforslag.dk/api/proposals/search'
suggestions_per_request = 300
params_json = {
	"filter": "active",
	"sortOrder": "NewestFirst",
	"searchQuery":"",
	"pageNumber":0,
	"pageSize": suggestions_per_request
}

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

now = datetime.utcnow()

# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
	with connection:
		with connection.cursor() as cur:
			try:
				# See if suggestion already exists
				sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
				cur.execute(sql, (suggestion['externalId'],))
				suggestion_records = cur.fetchone()
				# If not, add suggestion
				if not suggestion_records:
					suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
					sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
					cur.execute(sql, suggestion_data)
					id = cur.fetchone()[0]
				# If yes, get id
				else:
					id = suggestion_records[0]
			
				# Add votes
				sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
				VALUES(%s,%s,%s)'''
				cur.execute(sql, (id,now,suggestion['votes']))
			except Error as e:
				print(e, suggestion)

# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0

# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
	response = requests.post(url, json=params_json)
	json_response = response.json()
	number_of_results = json_response['resultCount']
	requested_results += suggestions_per_request
	number_of_loops += 1
	params_json['pageNumber'] += 1
	for suggestion in json_response['data']:
		suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y')	# convert date to datetime
		insert_suggestion_and_votes(connection, suggestion)

Opdateret program til at hent kurser på dine værdipapirer hos Nordnet

Nordnet har opdateret deres loginprocedure, så her er et dugfrist program til at hente kurser hos Nordnet – eller Morningstar, hvis Nordnet skulle fejle:

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program extracts historical stock prices from Nordnet (and Morningstar as a fallback) """

import requests
from datetime import datetime
from datetime import date

# Nordnet user account credentials
user = ''
password = ''

# DATE AND STOCK DATA. SHOULD BE EDITED FOR YOUR NEEDS #

# Start date (start of historical price period)
startdate = '2013-01-01'

# List of shares to look up prices for.
# Format is: Name, Morningstar id, Nordnet stock identifier
# See e.g. https://www.nordnet.dk/markedet/aktiekurser/16256554-novo-nordisk-b
# (identifier is 16256554)
# All shares must have a name (whatever you like). To get prices they must
# either have a Nordnet identifier or a Morningstar id
sharelist = [
	["Maj Invest Danske Obligationer","F0GBR064UX",16099874],
	["Novo Nordisk B A/S","0P0000A5BQ",16256554],
]

# A variable to store historical prices before saving to csv	
finalresult = ""
finalresult += '"date";"price";"instrument"' + '\n'

# LOGIN TO NORDNET #

session = requests.Session()

# Setting cookies prior to login by visiting login page
url = 'https://www.nordnet.dk/logind'
request = session.get(url)

# Update headers for login
session.headers['client-id'] = 'NEXT'
session.headers['sub-client-id'] = 'NEXT'

# Actual login
url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
request = session.post(url, data = {'username': user, 'password': password})

# LOOPS TO REQUEST HISTORICAL PRICES AT NORDNET AND MORNINGSTAR #

# Nordnet loop to get historical prices
nordnet_fail = []

for share in sharelist:
	# Nordnet stock identifier and market number must both exist
	if share[2]:
		url = "https://www.nordnet.dk/api/2/instruments/historical/prices/" + str(share[2])
		payload = {"from": startdate, "fields": "last"}
		data = session.get(url, params=payload)
		jsondecode = data.json()
		
		# Sometimes the final date is returned twice. A list is created to check for duplicates.
		datelist = []
		if jsondecode[0]['prices']:
			try:
				for value in jsondecode[0]['prices']:
					if 'last' in value:
						price = str(value['last'])
					elif 'close_nav' in value:
						price = str(value['close_nav'])
					price = price.replace(".",",")
					date = datetime.fromtimestamp(value['time'] / 1000)
					date = datetime.strftime(date, '%Y-%m-%d')
					# Only adds a date if it has not been added before
					if date not in datelist:
						datelist.append(date)
						finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
			except Exception as error:
				print(error)
				breakpoint()
		# No price data returned! Try another method!
		else:
			nordnet_fail.append(share)

if nordnet_fail:
	print(nordnet_fail)
	# Morningstar loop to get historical prices			
	for share in nordnet_fail:
		# Only runs for one specific fund in this instance
		payload = {"id": share[1], "currencyId": "DKK", "idtype": "Morningstar", "frequency": "daily", "startDate": startdate, "outputType": "COMPACTJSON"}
		data = requests.get("http://tools.morningstar.dk/api/rest.svc/timeseries_price/nen6ere626", params=payload)
		jsondecode = data.json()
		
		for lists in jsondecode:
			price = str(lists[1])
			price = price.replace(".",",")
			date = datetime.fromtimestamp(lists[0] / 1000)
			date = datetime.strftime(date, '%Y-%m-%d')
			finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
		
# WRITE CSV OUTPUT TO FILE #			

with open("kurser.csv", "w", newline='', encoding='utf8') as fout:
	fout.write(finalresult)

Opdateret program til at hente transaktioner hos Nordnet

Nordnet har opdateret deres login-procedure, så jeg har også opdateret mit Python-script til at logge ind på Nordnet og hente transaktioner. Her er den nye version.

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program logs into a Nordnet account and extracts transactions as a csv file.
Handy for exporting to Excel with as few manual steps as possible """

import requests 
from datetime import datetime
from datetime import date

# USER ACCOUNT, PORTFOLIO AND PERIOD DATA. SHOULD BE EDITED FOR YOUR NEEDS #

# Nordnet user account credentials and accounts/portfolios names (choose yourself) and numbers.
# To get account numbers go to https://www.nordnet.dk/transaktioner and change
# between accounts. The number after "accid=" in the new URL is your account number.
# If you have only one account, your account number is 1.
user = ''
password = ''
accounts = {
	'Nordnet: Frie midler': '1',
	'Nordnet: Ratepension': '3',
}

# Start date (start of period for transactions) and date today used for extraction of transactions
startdate = '2013-01-01'
today = date.today()
enddate = datetime.strftime(today, '%Y-%m-%d')

# Manual data lines. These can be used if you have portfolios elsewhere that you would
# like to add manually to the data set. If no manual data the variable manualdataexists
# should be set to False
manualdataexists = True
manualdata = '''
Id;Bogføringsdag;Handelsdag;Valørdag;Depot;Transaktionstype;Værdipapirer;Værdipapirtype;ISIN;Antal;Kurs;Rente;Samlede afgifter;Samlede afgifter Valuta ;Beløb;Valuta;Indkøbsværdi;Resultat;Totalt antal;Saldo;Vekslingskurs;Transaktionstekst;Makuleringsdato;Notanummer;Verifikationsnummer;Kurtage;Kurtage Valuta;Depotnavn
;30-09-2013;30-09-2013;30-09-2013;;KØBT;Obligationer 3,5%;Obligationer;;72000;;;;;-69.891,54;DKK;;;;;;;;;;;;Frie midler: Finansbanken
'''

# A variable to store transactions before saving to csv
transactions = ''

# LOGIN TO NORDNET #
session = requests.Session()

# Setting cookies prior to login by visiting login page
url = 'https://www.nordnet.dk/logind'
request = session.get(url)

# Update headers for login
session.headers['client-id'] = 'NEXT'
session.headers['sub-client-id'] = 'NEXT'

# Actual login
url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
request = session.post(url, data = {'username': user, 'password': password})


# GET ACCOUNT(S) TRANSACTION DATA #

# Payload and url for transaction requests
payload = {
	'locale': 'da-DK',
	'from': startdate,
	'to': enddate,
}

url = 'https://www.nordnet.dk/mediaapi/transaction/csv/filtered'

firstaccount = True
for portfolioname, id in accounts.items():
	payload['account_id'] = id
	data = session.get(url, params=payload)
	result = data.content.decode('utf-16')
	result = result.replace('\t',';')
	result = result.splitlines()
	
	firstline = True
	for line in result:
		# For first account and first line, we use headers and add an additional column
		if line and firstline == True and firstaccount == True:
			transactions += line + ';' + 'Depotnavn' + '\n'
			firstaccount = False
			firstline = False
		# First lines of additional accounts are discarded
		elif line and firstline == True and firstaccount == False:
			firstline = False
		# Content lines are added
		elif line and firstline == False:
			# Fix because Nordnet sometimes adds one empty column too many
			if line.count(';') == 27:
				line = line.replace('; ',' ')
			transactions += line + ';' + portfolioname + '\n'

# ADD MANUAL LINES IF ANY #
if manualdataexists == True:
	manualdata = manualdata.split("\n",2)[2]
	transactions += manualdata				

# WRITE CSV OUTPUT TO FILE #
with open("transactions.csv", "w", encoding='utf8') as fout:
	fout.write(transactions)

En lille kodeforbedring på Wallnot

I takt med at wallnot.dk har fået flere og flere funktionaliteter, og dermed også flere databasefelter, er koden bag blevet rodet.

Her er et eksempel på, hvordan min kode blev svær for mig selv at forstå, og hvad jeg gjorde for at gøre den lidt bedre.

For at hente artikler til Wallnot, besøger en robot avisers hjemmesider og finder gratis-artikler. Logikken er lidt forskellig fra medie til medie, for det er forskelligt, hvordan medierne afslører, om en artikel er bag en betalingsmur eller ej.

Det er også meget forskelligt, om aviserne benytter sig af “gratis artikler, der kræver login” (Politiken, Ingeniøren, Jyllands-Posten), og det er forskelligt, hvad risikoen er for dubletter (Ritzau-artikler går igen mange steder, men ikke alle. Jyllands-Posten og Finans kopierer artikler til og fra hinanden.)

Her er for eksempel den gamle logik for Danmarks Radio:

def dr():
	# Define medium
	medium = "dr"
	
	# Request site
	data = requests.get("https://www.dr.dk/nyheder/service/feeds/allenyheder/")
	result = data.text
	
	# Soup site and create a list of links and their titles
	soup = BeautifulSoup(result, "xml")
	
	# List of unique urls
	urllist = {link.text for link in soup.find_all('link') if "/nyheder/" in link.text and not any(term in link.text for term in excluded) and not any(term == link.text for term in mustnotbe)}
	
	# Loop that requests all article links, soups them and checks whether they have a paywall and generates a list with current free articles. Also gets titles of links.	
	for url in urllist:
		if not 'http' in url:
			url = 'https://dr.dk' + url	
		if url not in lastbatch and url not in newbatch:
			try:
				data = requests.get(url)
				result = data.text
				soup = BeautifulSoup(result, "lxml")
				node_id = soup.find('meta', attrs={'name':'ensighten:urn'})
				if node_id:
					id = node_id['content'][node_id['content'].find("article:")+8:]
					urlid = "dr_" + id
					if urlid not in newbatch and urlid not in lastbatch:
						title = soup.find('meta', attrs={'property':'og:title'})
						title = title['content']
						title = title.strip(" ")						
						#api_url = "https://www.dr.dk/tjenester/urd/tms/urn:dr:drupal:article:" + id
						#api_request = requests.get(api_url)
						timestamp = soup.find('meta', attrs={'property':'article:published_time'})['content']
						dateofarticle = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S%z')
						if '<p class="dre-article-body__paragraph dre-variables">/ritzau/</p>' in result:
							ritzau = True
							body = soup.find('div', attrs={'class': 'dre-article-body'})
							firstparagraph = body.find('p').get_text()
							firstparagraph = firstparagraph.strip(" ")
						else:
							ritzau = False

						if ritzau == True:	
							article = (title, urlid, dateofarticle, medium, url, datetime.now(), ritzau, firstparagraph)
						else:	
							article = (title, urlid, dateofarticle, medium, url, datetime.now(), ritzau)

						insert_unique_id_article(connection, article)
						newbatch.append(url)
						newbatch.append(urlid)				
			except Exception as e:
				print(url)
				print(e)
		else:
			newbatch.append(url)

Hvert medie har altså sin egen funktion til at hente artikler, og som du måske kan læse af koden, kalder hver funktionen en anden funktion, som indsætter artikler i databasen:

insert_unique_id_article(connection, article)
newbatch.append(url)

Problemet

Funktionen insert_unique_id_article var mildest talt blevet rodet.

Først blev indsat nogle ekstra værdier til article for at understøtte, at jeg for et tidspunkt valgte at artikler, der senere får en paywall, ikke slettes fra databasen, men blot markeres som med paywall.

Så havde den en overordnet logik, der skilte artikler, der skal tjekkes for dubletter fra artikler, der ikke skal.

Og under denne logik nogle forskellige variationer for, hvor mange ekstra variable og hvilke værdier, disse variable har, der tilføjes til hver article. Og mange forskellige sql-sætninger i forgreningen til rent faktisk at tilføje hver artikel til databasen.

Her er den uoverskuelige kode:

def insert_unique_id_article(connection, article):
	with connection:
		with connection.cursor() as cur:
			try:
				article += (False, "")	# Not currently behind a paywall, no current archive url, 
				if not article[6] and not article[3] == "jyllandsposten" and not article[3] == "finansdk":	# Ritzau is false and not jyllandsposten and not finansdk
					if len(article) == 10:	# article has loginwall variable inserted already
						article += (False, ) # Not a duplicate
						sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,loginwall,paywall_detected,archive_url,duplicate)
						VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
						cur.execute(sql, article)
					else:
						article += (False, False)	# Not behind loginwall, not a duplicate
						sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,paywall_detected,archive_url,loginwall,duplicate)
						VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
						cur.execute(sql, article)
				else:	# Ritzau is true or jyllandsposten or finans
					sql = ''' SELECT * from wall_article WHERE title ILIKE %s OR excerpt ILIKE %s'''
					article_title = "%"+article[0]+"%"
					article_excerpt = "%"+article[7]+"%"
					cur.execute(sql, (article_title, article_excerpt))
					results = cur.fetchall()
					if results:
						duplicate = False
						for result in results:
							timestamp = result[3].astimezone(pytz.utc)
							# Time in minutes between suspected duplicates is calculated
							if article[2] < timestamp:
								difference_in_minutes = (timestamp-article[2]).seconds/60
							elif article[2] > timestamp:
								difference_in_minutes = (article[2]-timestamp).seconds/60
							elif article[2] == timestamp:
								difference_in_minutes = 0
							
							# Less than or 5 hours between, mark duplicate and insert as duplicate
							if difference_in_minutes <= 300:
								duplicate = True				# important to avoid duplicate insert
								article += (True, False)		# a duplicate, no loginwall
								sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,excerpt,paywall_detected,archive_url,duplicate,loginwall)
								VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
								cur.execute(sql, article)
								break
						# ONLY FOR NON-duplicates
						if not duplicate:
							article += (False, False) # Not a duplicate, no loginwall
							sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,excerpt,paywall_detected,archive_url,duplicate,loginwall)
								VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
							cur.execute(sql, article)
					else:
						article += (False, False) # Not a duplicate, no loginwall
						sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,excerpt,paywall_detected,archive_url,duplicate,loginwall)
						VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
						cur.execute(sql, article)
			except Error as e:
				# If unique_id is already in database and the article is a politiken share link,
				# the original URL is updated with the share url
				if e.pgcode == "23505" and "politiken.dk/del" in article[4]:
					connection.rollback()
					sql = ''' Update wall_article set url = %s, loginwall = %s where unique_id = %s '''
					cur.execute(sql, (article[4], True, article[1]))
				elif not e.pgcode == "23505":
					print(e, article)

Løsning

Jeg løste – eller måske formindskede – problemet ved at ensarte, hvor mange variable hvert medies funktion sender til funktionen insert_unique_id_article, sådan at medierne uanset om de fx bruger “login-mure” eller ej, sender en article af samme længde med samme variabelrækkefølge af sted til insert_unique_id_article.

Så nu går disse kodelinjer igen og er identiske, uanset hvilket medie, der er tale om:

article = (title, urlid, dateofarticle, medium, url, datetime.now(), ritzau, excerpt, loginwall)
insert_unique_id_article(connection, article)

Det har gjort insert_unique_id_article en del kortere og mere læsbar. Der er nu kun én mulig sql-sætning, færre mulige forgreninger og det er ensartet, hvor mange variable der tilføjes til article i hver forgrening:

def insert_unique_id_article(connection, article):
	with connection:
		with connection.cursor() as cur:
			try:
				sql = ''' INSERT INTO wall_article(title,unique_id,date,medium,url,created_at,ritzau,excerpt,loginwall,paywall_detected,archive_url,duplicate)
				VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '''
				article += (False, "")	# Add defaults: Not currently behind a paywall, no current archive url, 
				if not article[6] and not article[3] == "jyllandsposten" and not article[3] == "finansdk":	# Ritzau is false and not jyllandsposten and not finansdk so no duplicate risk based on current newspaper practices
					article += (False, ) # Not a duplicate
					cur.execute(sql, article)
				else:	# Ritzau is true or jyllandsposten or finans, so run duplicate check
					search_sql = ''' SELECT * from wall_article WHERE title ILIKE %s OR excerpt ILIKE %s'''
					article_title = "%"+article[0]+"%"
					article_excerpt = "%"+article[7]+"%"
					cur.execute(search_sql, (article_title, article_excerpt))
					results = cur.fetchall()
					if results:
						duplicate = False
						for result in results:
							timestamp = result[3].astimezone(pytz.utc)
							# Time in minutes between suspected duplicates is calculated
							if article[2] < timestamp:
								difference_in_minutes = (timestamp-article[2]).seconds/60
							elif article[2] > timestamp:
								difference_in_minutes = (article[2]-timestamp).seconds/60
							elif article[2] == timestamp:
								difference_in_minutes = 0
							
							# Less than or 5 hours between, mark duplicate and insert as duplicate
							if difference_in_minutes <= 300:
								duplicate = True		# important to avoid duplicate insert
								break
						article += (duplicate, ) # Add duplicate status
						cur.execute(sql, article)
					else:
						article += (False, ) # Not a duplicate
						cur.execute(sql, article)
			except Error as e:
				# If unique_id is already in database and the article is a politiken share link,
				# the original URL is updated with the share url
				if e.pgcode == "23505" and "politiken.dk/del" in article[4]:
					connection.rollback()
					sql = ''' Update wall_article set url = %s, loginwall = %s where unique_id = %s '''
					cur.execute(sql, (article[4], True, article[1]))
				elif not e.pgcode == "23505":
					print(e, article)

En bedre løsning?

Min problematik handler rigtigt meget om vægtningen mellem graden af identisk kode, der går igen flere steder, og graden af abstraktion.

Det er rart og logisk for mig, at kunne arbejde med og rette fejl i hvert enkelt medie hver for sig. Men der er meget kode, der går igen for hvert medie, og også forskellig kode, der gør det samme, afhængig af hvornår jeg lige har haft fat i koden sidst.

Det er besværligt, at jeg, hvis jeg på et tidspunkt indfører en ny funktionalitet, der kræver et nyt databasefelt, hvis værdi kan være forskellig fra medie til medie, er nødt til at opdatere hvert enkelt medies funktion, for at sikre mig at længden på den artikel, der sendes til insert_unique_id_article er den samme for alle medier.

En løsning kunne være et lidt højere abstraktionsniveau, hvor jeg:

  • Laver article om til en ordbog (dictionary) i hvert medies funktion.
  • Sørger for at hvert medie sender article til en hjælpefunktion, der gør artiklen klar til at indsætte i databasen, ved at gennemgå ordbogens nøgler og tilføje de nøgler, der evt. mangler, for at artiklen har de nødvendige variable til at kunne indsættes. Funktionen kunne hedder prepare_article.
  • Sætter prepare_article til at sende artiklen til en endnu mere forenklet insert_unique_id_article.

En anden mulig løsning kunne være at kigge nærmere på modulet psycopg2, som er det modul, der lader mit Python-program tale med min database. Lige nu bruger jeg datatypen tuple når jeg indsætter data i databasen, og hvis der mangler værdier i min tuple i forhold til min datamodel for artikler, fungerer mit program ikke. Måske kan psycopg2 forstå dictionaries i stedet og fodres med standardværdier, der kan indsættes, hvis en artikel mangler felter fra datamodellen?

Det må jeg finde ud af, når jeg har tid, eller et problem der kræver, at jeg finder en bedre løsning.

Jeg fik Saxo til at lukke for adgang til ebøger for gratister

Jeg er både interesseret i at udforske API’er og i at læse bøger. Jeg har fx tidligere vist, hvordan jeg ved hjælp af en Android-emulator og programmet Charles fandt en løsning til at få videresendt E-boksbeskeder til min e-mail.

Og så skete der det, at jeg fik et prøveabonnement til Saxo Premium.

Derfor gav jeg mig i kast med at udforske boghandelen Saxos app til streaming af bøger og opdagede nogle sikkerhedsproblemer.

Jeg fandt ud af, at enhver med lidt teknisk snilde kunne skaffe sig adgang til Saxos ebøger, uden først at købe et streaming-abonnement.

Efter først at have kontaktet Saxo anonymt for at blive forsikret om, at de ikke ville melde mig til politiet, så længe jeg ikke havde udnyttet sikkerhedshullerne, fik jeg indrapporteret problemerne.

Saxo var både flinke og professionelle i dialogen, og nu har de lanceret en mere sikker app. Derfor vil og kan jeg nu fortælle om, hvordan jeg undersøgte Saxos app, og hvad jeg opdagede.

Om streaming og at sikre sig mod kopiering af data

Men først en sidebemærkning: Det er meget svært at give midlertidigt adgang til data (bøger, film, musik), som Saxo, Netflix, Spotify osv., gør, og være helt sikker på, at adgangen altid og i alle tilfælde kun er midlertidig.

Hvis noget kan ses på en skærm eller lyttes til på højttalere, skal der meget til at forhindre ihærdige brugere, (der måske endda er villige til at bryde ophavsretsloven – gisp!), i at få gemt en kopi af materialet.

Spørgsmålet er mere, hvor svært og tidskrævende man gør det.

Problemet hos Saxo var altså ikke, efter min mening, at en abonnement kunne vælge at misbruge deres streamingabonnement til at tage kopier af ebøger. Problemet var, at selv en ikke-abonnement kunne gøre det.

Om at undersøge hvordan apps virker og opdage sikkerhedshuller

Hvis du tænker på selv at give dig i kast med at undersøge apps, API’er og eventuelle sikkerhedshuller, er det vigtigt, du har hjertet på det rette sted.

Brancheorganisationen IT-Branchen har udarbejdet et kodeks, som handler om hvordan privatpersoner og virksomheder bør opføre sig, når der opdages og indrapporteres sikkerhedshuller.

Som privatperson er det vigtigt, at:

  • Du ikke udnytter en utilsigtet adgang til data. Straffeloven siger, du ikke uberettiget må skaffe dig adgang til andres data. Ophavsretsloven siger, du ikke må dele ophavsretsbeskyttede værker med andre. Du kan også (måske uden selv at have opdaget det) have accepteret nogle vilkår for brug af en app, som ejeren af appen måske kan bruge til at anlægge en sag mod dig, som de måske/måske ikke kan vinde.
  • Du hurtigst muligt giver virksomheden besked om sikkerhedshullerne, så den kan rette dem.
  • Du ikke deler din viden om sikkerhedshuller med andre, fx dine venner eller medierne. Det ville medføre, at nogle kunne udnytte hullerne og begå ulovligheder.

Læs IT-Branchens glimrende vejledning til anmeldere af sikkerhedsbrister.

Til gengæld for din ædelhed bør virksomheden:

  • Behandle henvendelsen fra dig fortroligt
  • Handle på henvendelsen og løse sikkerhedsproblemerne inden for rimelig tid
  • Undlade at melde dig til politiet, hvis du (anmelderen) har handlet i overensstemmelse med kodekset

Du kan ikke være sikker på, at virksomheden handler ædelt. Derfor valgte jeg selv at kontakte Saxo anonymt, for at høre om deres ædelhed, inden jeg fortalte om det sikkerhedshul, jeg havde opdaget.

Forberedelsesfasen

Android-emulator

Det første man skal bruge for at kunne undersøge mobilapps til Android er en emulator, så apps kan køre på ens PC og man har mulighed for at overvåge trafikken.

Jeg bruger en emulator, der hedder Nox. Den er mest lavet til at kunne spille spil og er fyldt med reklamer og sikkert også overvågning. Men: Nox gør det let at hente, installere og eksportere apps, og at begynde at overvåge trafikken til og fra dem.

For at overvåge trafikken på moderne Android-versioner er det nødvendigt først at pille lidt ved den app, hvis trafik man vil overvåge.

(Nogle apps med højere sikkerhedsniveau, bruger noget der hedder “SSL pinning” til at forhindre trafikovervågning. Det gør det væsentligt sværere at overvåge trafikken, end hvad jeg beskriver her.)

Rette i appen for at tillade traffikovervågning

I Nox installerede jeg Saxos app og eksporterede appen som apk-fil.

Det er nemt at eksportere en apk-fil fra en app ud af emulatoren Nox

Derefter brugte jeg værktøjet Apktool til at dekompilere appen. Værktøjet er gratis, kræver Java og er ret nemt at bruge.

Sådan ser Saxos app ud, når Apktool har haft fat i den.

For at kunne overvåge trafikken skal appen tillade, at jeg bruger andre SSL-certifikater end dem, der kommer med Android.

I mappen res/xml oprettede jeg filen network_security_config.xml (nogle apps har den allerede) og satte det her indhold ind. Det fortæller, at jeg både stoler på systemcertifikater og brugerinstallerede certifikater:

<?xml version="1.0" encoding="utf-8"?>
<network-security-config>
    <base-config>
        <trust-anchors>
            <certificates src="system" />
            <certificates src="user" />
        </trust-anchors>
    </base-config>
</network-security-config>

Derefter åbnede jeg AndroidManifest.xml og indsatte stien på den nye fil i tagget “application”.

Før:

<application android:allowBackup="false" android:appComponentFactory="androidx.core.app.CoreComponentFactory" android:hardwareAccelerated="true" android:icon="@mipmap/ic_launcher" android:label="@string/app_name" android:largeHeap="true" android:name="com.saxo.read.saxoread.application.SaxoReadApplication" android:resizeableActivity="false" android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/SaxoTheme">

Efter:

<application android:networkSecurityConfig="@xml/network_security_config" android:allowBackup="false" android:appComponentFactory="androidx.core.app.CoreComponentFactory" android:hardwareAccelerated="true" android:icon="@mipmap/ic_launcher" android:label="@string/app_name" android:largeHeap="true" android:name="com.saxo.read.saxoread.application.SaxoReadApplication" android:resizeableActivity="false" android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/SaxoTheme">

Med disse små rettelser på plads, brugte jeg Apktool til at pakke en ny apk-fil og signerede filen med et værktøj, der hedder Uber Apk Signer.

Min modificerede app var klar, og jeg trak-og-slap den over i Nox for at installere den.

Overvågning af app-trafik

Jeg brugte programmet Charles til at overvåge trafikken fra Saxos app. Kort fortalt installerer man Charles’ SSL-certifikat og sætter Nox op til at bruge Charles som proxy for internettrafik.

Hul igennem

Hvis alt er gået godt, kan det se sådan her ud, når Charles trafikovervåger. Her er fx oprettelsen af en bruger og de første par handlinger i Saxos app:

Hul igennem til traffikovervågning med Charles

Undersøgelsesfasen

Når først trafikken til og fra en app kan overvåges, er det bare at begynde at bruge app’en for at finde ud af, hvordan dens API virker.

Det jeg fandt ud af, var et problem ved Saxos app, var:

  1. Når man søger med Saxos app, returneres en række id-numre på bøger
  2. Søgning i appen kræver ikke en Premium-konto, alle kan oprette en konto og søge for at se udvalget af bøger
  3. Download-adressen på en bog til offline-læsning kunne regnes ud alene ud fra en bogs id-nummer
  4. Downloadede bøger var krypterede, men:
  5. Appens nøgle til at dekryptere downloadede bøger, så de kunne læses offline, var meget nem at finde

Problemet betød, at brugere uden abonnement kunne få adgang til bøger uden abonnement.

Brugere med abonnement kunne få adgang til bøger uden at deres download blev registreret gennem Saxos API og dermed, formoder jeg, uden at Saxo kunne honorere bogens forlag og i sidste ende bogens forfatter.

Muligvis – det har jeg ikke testet – gjorde hullet også, at eventuelle check, som Saxo har af, hvor mange bøger en bruger kan hente, blev sat ud af spillet.

I de næste afsnit prøver jeg at tage dig med gennem undersøgelsesfasen.

Download af bøger til offline-læsning

Når jeg downloade en bog hos Saxo, kunne jeg se, at der blev spurgt efter en fil herfra:

Når jeg forsøgte at tilgå adressen i en browser, kunne jeg downloade filen. Men, som adressen afslører, er filerne krypteret (“encrypted-base-files”).

Nøglesammenfald mellem søgeresultater og download-adresse

Så opdagede jeg, at en bogs id gik igen i download-adressen. Her har jeg klikket mig ind på den novellesamling af Jens Blendstrup, som jeg downloadede til offlinebrug. Læg mærke til id:

Id:

a3519df0-182a-4f77-90f7-52a82c5bacf9/user/ee3b4be0-fcab-48b8-944f-e2bffe372f45

Bliver til download-url:

https://readcontentprdcryptbase.blob.core.windows.net/encrypted-base-files/a3/51/9d/f0/a3519df0-182a-4f77-90f7-52a82c5bacf9/a3519df0-182a-4f77-90f7-52a82c5bacf9

Mapperne “a3“, “51“, “9d” og “F0” i stien kommer fra de første otte tegn i id’et: a3519df0.

Altså: Kender du id på en bog, kender du også downloadadressen på den krypterede bog!

Søgeresultater tilgængelige for alle

Jeg fandt også ud af, at søgning i appen, som er åben for både premium-medlemmer og ikke-betalende brugere på appen, udstiller bøgers id. Her er et eksempel, hvor jeg leder efter Puk Damsgårds Arabica:

Næsten alle kan downloade alt!

Når søgningen udstiller bøgers id, og download-adressen til bøger kan findes alene ud fra en bogs id, kunne en ondsindet bruger med god tid, eller med evnen til at programmere en robot, have:

  • Hentet alle ISBN-numre på alle bøger i Saxos streaming-katalog
  • Søgt på alle ISBN-numrene ved hjælp af Saxos app-api og fundet bøgernes id
  • Downloadet alle bøgerne i kataloget

Det kan være, at Saxo havde implementeret noget, der fx blokerede en bruger, der foretog rigtig mange søgninger, eller downloadede rigtigt meget fra downloadserveren. Hvis de havde det, havde det gjort øvelsen med at kopiere hele kataloget lidt sværere. Men kun en lille smule.

Sidste brik: Dekryptering af downloadede bøger

Efter at have fundet muligheden for at downloade krypterede bøger hos Saxo uden at være abonnent, ledte jeg efter en krypteringsnøgle.

Når Saxos app kan læse de downloadede, krypterede bøger, må krypteringsnøglen jo befinde sig i – eller blive leveret til – Saxos app på en eller anden måde.

Jeg fandt nøglen i mine dekompilerede filer fra appen på min harddisk efter at have lavet forskellige søgninger i filerne, som jeg dårligt nok kan huske og heller ikke vil afsløre. Nøglen viste sig at ligge meget dårligt skjult, og med en lille bid Python-kode og modulet Cryptography skrev jeg et lille program, der kunne dekryptere bøgerne:

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

filename = ""	# Filnavn på krypteret bog
key = ""		# Fra Saxos app
iv = ""			# Fra Saxos app
encoded_key = key.encode('utf-8')
encoded_iv = iv.encode('utf-8')

backend = default_backend()
cipher = Cipher(algorithms.AES(encoded_key), modes.CBC(encoded_iv), backend=backend)

with open(filename, "rb") as encrypted_book:
	encrypted = encrypted_book.read()
	decryptor = cipher.decryptor()
	decrypted = decryptor.update(encrypted) + decryptor.finalize()
	with open(filename + '.epub', "wb") as fout:
		fout.write(decrypted)

Saxos rettelser

Efter at Saxo har opdateret deres app, har jeg genbesøgt appen. Jeg kan se, at:

  • Krypteringsnøglen til Saxos ebøger er blevet skjult en hel del bedre end tidligere.
  • Det er ikke længere nok at have internetadressen på en bog for at downloade den. For at brugeren kan downloade en bog, genererer appen nu en unik nøgle per bog, som downloadserveren vil have for at sende en bog tilbage.

Den gamle version af appen tillod en bruger at springe det trin over, hvor Saxo registererer, at en bog er blevet hentet. I den nye er det kun muligt at downloade en bog, hvis man inden da har spurgt API’et om downloadadressen:

Downloadadresserne på bøger har fået nogle ekstra parametre på, bl.a. en unik “signatur” (“sig”), som API’et returnerer for hver bog, brugeren downloader. Jeg har censureret nogle få bytes her:

De nye downloadadresser er mere komplicerede end tidligere. Jeg ved ikke, hvad alle de nye parametre gør, men mon ikke datoparametrene har noget at gøre med en udløbsdato, hvorefter det ikke længere er muligt at bruge linket?

Derudover har Saxo, har de fortalt mig, lavet andre sikkerhedsopstramninger bag kulisserne.

Skønt!

Hvad betyder ændringerne i Saxos app?

Ændringerne betyder, at ikke-betalende brugere af Saxos app, så vidt jeg kan se, nu er effektivt afskåret fra at kunne tilgå bøger.

Betalende brugere har ikke længere mulighed for at downloade bøger, uden at Saxo kan registrere det og sørge for passende honorering af forfatter og forlag.

Tidsforløb og Saxos kommentarer

Jeg gjorde opmærksom på sikkerhedsproblemet den 22. marts 2021. Jeg kontaktede først Saxos kundeservice den 12. marts og opdagede, så vidt jeg husker, problemet den 11. marts. Saxos opdaterede app blev rullet ud den 17. maj 2021.

Jeg har slettet de bøger, jeg selv har downloadet og dekrypteret i forbindelse med undersøgelsen af Saxos app.

Saxo har haft mulighed for at foreslå rettelser og kommentere dette blogindlæg.

Saxo foreslog at gøre ekstra opmærksom på lovgivningen omkring at få uberettiget adgang til og dele data. Det synes jeg var en god idé, så det har jeg gjort i afsnittet om at undersøge hvordan apps virker og at opdage sikkerhedshuller.

De foreslog også at gøre opmærksom på, at det at dekompilere apps, ændre dem og bygge dem igen, efter deres mening er en juridisk gråzone. Her har din hensigt med at undersøge en app betydning: Hvis du blot er ude på at undersøge, hvordan ting virker, kender jeg ikke noget lovgivning, der siger, at du gør noget forkert (hvis du gør, vil jeg gerne høre om det). Hvis du er ude på at få uberettiget adgang til data eller ophavsretsbeskyttet materiale er det en anden sag – så er du ude på at bryde loven.