Kodejulekalender

Ovre på https://adventofcode.com/ kan man hver dag finde to kodeopgaver og prøve at løse dem. Det er nærmest som kryds-og-tværs eller sudoko, bare med kode i stedet. Her er mine løsninger.

Dag 5, del 2:

straight_lines = []
diagonal_lines = []
x_values = []
y_values = []
with open("input_day5.txt", "r", encoding="utf8") as fin:
	for f in fin:
		f = f.replace("\n","")
		x1 = int(f[:f.index(",")])
		y1 = int(f[f.index(",")+1:f.index(" -> ")])
		x2 = int(f[f.index(" -> ")+4:f.index(",",f.index(" -> ")+4)])
		y2 = int(f[f.index(",",f.index(" -> "))+1:])
		x_values.append(x1)
		x_values.append(x2)
		y_values.append(y1)
		y_values.append(y2)
		if x1 == x2 or y1 == y2:
			straight_lines.append([(x1, y1), (x2, y2)])
		elif x1 != x2 and y1 != y2:
			diagonal_lines.append([(x1, y1), (x2, y2)])

coordinates = {}
for x in range(max(x_values)+1):
	for y in range(max(y_values)+1):
		coordinates[str(x) + "," + str(y)] = 0

def add_line_to_coordinates(x,y):
	key = str(x) + "," + str(y)
	coordinates[key] += 1

for line in diagonal_lines:
	if line[0][0] > line[1][0] and line[0][1] > line[1][1]:
		number_of_coordinates = line[0][0] - line[1][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[1][0] + i, line[1][1] + i)
	elif line[0][0] > line[1][0] and line[0][1] < line[1][1]:
		number_of_coordinates = line[0][0] - line[1][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[1][0] + i, line[1][1] - i)
	elif line[0][0] < line[1][0] and line[0][1] > line[1][1]:
		number_of_coordinates = line[1][0] - line[0][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[0][0] + i, line[0][1] - i)
	elif line[0][0] < line[1][0] and line[0][1] < line[1][1]:
		number_of_coordinates = line[1][0] - line[0][0]
		for i in range(number_of_coordinates + 1):
			add_line_to_coordinates(line[0][0] + i, line[0][1] + i)	
	
for line in straight_lines:
	if line[0][0] == line[1][0]:		# x values are equal
		if line[0][1] < line[1][1]:		# first y value is lowest
			for i in range(line[0][1], line[1][1]+1):
				add_line_to_coordinates(line[0][0], i)
		elif line[0][1] > line[1][1]:	# second y value is lowest
			for i in range(line[1][1], line[0][1]+1):
				add_line_to_coordinates(line[0][0], i)
		else:							# y values are equal, so only one point
			add_line_to_coordinates(line[0][0], line[0][1])
	else:								# y values are equeal
		if line[0][0] < line[1][0]:		# first x value is lowest
			for i in range(line[0][0], line[1][0]+1):
				add_line_to_coordinates(i, line[0][1])
		elif line[0][0] > line[1][0]:	# second x value is lowest
			for i in range(line[1][0], line[0][0]+1):
				add_line_to_coordinates(i, line[0][1])
		else:
			add_line_to_coordinates(line[0][0], line[0][1])

double_hits = 0
for coordinate, number_of_hits in coordinates.items():
	if number_of_hits > 1:
		double_hits += 1
print(double_hits)

Dag 5, del 1:

straight_lines = []
x_values = []
y_values = []
with open("input_day5.txt", "r", encoding="utf8") as fin:
	for f in fin:
		f = f.replace("\n","")
		x1 = int(f[:f.index(",")])
		y1 = int(f[f.index(",")+1:f.index(" -> ")])
		x2 = int(f[f.index(" -> ")+4:f.index(",",f.index(" -> ")+4)])
		y2 = int(f[f.index(",",f.index(" -> "))+1:])
		if x1 == x2 or y1 == y2:
			x_values.append(x1)
			x_values.append(x2)
			y_values.append(y1)
			y_values.append(y2)
			straight_lines.append([(x1, y1), (x2, y2)])

coordinates = {}
for x in range(max(x_values)+1):
	for y in range(max(y_values)+1):
		coordinates[str(x) + "," + str(y)] = 0

def add_line_to_coordinates(x,y):
	key = str(x) + "," + str(y)
	coordinates[key] += 1

for line in straight_lines:
	if line[0][0] == line[1][0]:		# x values are equal
		if line[0][1] < line[1][1]:		# first y value is lowest
			for i in range(line[0][1], line[1][1]+1):
				add_line_to_coordinates(line[0][0], i)
		elif line[0][1] > line[1][1]:	# second y value is lowest
			for i in range(line[1][1], line[0][1]+1):
				add_line_to_coordinates(line[0][0], i)
		else:							# y values are equal, so only one point
			add_line_to_coordinates(line[0][0], line[0][1])
	else:								# y values are equeal
		if line[0][0] < line[1][0]:		# first x value is lowest
			for i in range(line[0][0], line[1][0]+1):
				add_line_to_coordinates(i, line[0][1])
		elif line[0][0] > line[1][0]:	# second x value is lowest
			for i in range(line[1][0], line[0][0]+1):
				add_line_to_coordinates(i, line[0][1])
		else:
			add_line_to_coordinates(line[0][0], line[0][1])

double_hits = 0
for coordinate, number_of_hits in coordinates.items():
	if number_of_hits > 1:
		double_hits += 1
print(double_hits)

Dag 4, del 2:

boards = []
with open("input_day4.txt", "r", encoding="utf8") as fin:
	counter = 0
	sub_counter = 0
	board = []
	for f in fin:
		if counter == 0:
			f = f.replace("\n","")
			drawn_numbers = f.split(",")
		elif f == '\n':
			sub_counter = 0
			board = []
		else:
			if f[0] == " ":
				f = f[1:]
			if "  " in f:
				f = f.replace("  "," ")
			f = f.replace("\n","")
			board.extend(f.split(" "))
			sub_counter += 1
		if sub_counter == 5:
			board = [b.replace("\n","") for b in board]
			boards.append(board)
		counter = 1

def check_board(board):
	i = 0
	while i < 26:
		d_count = 0
		for number in board[i:i+5]:
			if 'd' in number:
				d_count += 1
				if d_count == 5:
					return True
		i += 5	
	row = 0
	while row < 5:
		i = row
		d_count = 0
		while i < row + 25:
			if 'd' in board[i]:
				d_count += 1
				if d_count == 5:
					return True
			i += 5
		row += 1	

def calculate_score(board, draw):
	sum_of_numbers = 0
	for number in board:
		if not 'd' in number:
			sum_of_numbers += int(number)
	return sum_of_numbers * int(draw)

winning_boards = []
scores = []

draw_counter = 0
end_loop = False
for draw in drawn_numbers:
	board_counter = 0
	for board in boards:
		if board not in winning_boards:
			number_counter = 0
			for number in board:
				if number == draw:
					boards[board_counter][number_counter] += 'd'
				number_counter += 1
			bingo = check_board(board)
			if bingo == True:
				winning_boards.append(board)
				scores.append(calculate_score(board, draw))
		board_counter += 1	
	draw_counter += 1

print(scores[-1])

Dag 4, del 1:

boards = []
with open("input_day4.txt", "r", encoding="utf8") as fin:
	counter = 0
	sub_counter = 0
	board = []
	for f in fin:
		if counter == 0:
			f = f.replace("\n","")
			drawn_numbers = f.split(",")
		elif f == '\n':
			sub_counter = 0
			board = []
		else:
			if f[0] == " ":
				f = f[1:]
			if "  " in f:
				f = f.replace("  "," ")
			f = f.replace("\n","")
			board.extend(f.split(" "))
			sub_counter += 1
		if sub_counter == 5:
			board = [b.replace("\n","") for b in board]
			boards.append(board)
		counter = 1

def check_board(board):
	i = 0
	while i < 26:
		d_count = 0
		for number in board[i:i+5]:
			if 'd' in number:
				d_count += 1
				if d_count == 5:
					return True
		i += 5	
	row = 0
	while row < 5:
		i = row
		d_count = 0
		while i < row + 25:
			if 'd' in board[i]:
				d_count += 1
				if d_count == 5:
					return True
			i += 5
		row += 1	

def calculate_score(board, draw):
	sum_of_numbers = 0
	for number in board:
		if not 'd' in number:
			sum_of_numbers += int(number)
	return sum_of_numbers * int(draw)

def play_bingo():
	draw_counter = 0
	end_loop = False
	for draw in drawn_numbers:
		board_counter = 0
		for board in boards:
			number_counter = 0
			for number in board:
				if number == draw:
					boards[board_counter][number_counter] += 'd'
				number_counter += 1
			bingo = check_board(board)
			if bingo == True:
				return calculate_score(board, draw)
			board_counter += 1	
		draw_counter += 1

score = play_bingo()
print(score)

Dag 3, del 2:

rates = []
with open("input_day3", "r", encoding="utf8") as fin:
	for f in fin:
		rates.append(f)

oxygen_rates = rates
co2_rates = rates

for i in range(len(rates[0])-1):
	zero_count = 0
	one_count = 0
	for rate in oxygen_rates:
		if rate[i] == '0':
			zero_count += 1
		elif rate[i] == '1':
			one_count += 1
	if zero_count > one_count:
		oxygen_rates = [rate for rate in oxygen_rates if rate[i] == '0']
	elif one_count > zero_count or one_count == zero_count:
		oxygen_rates = [rate for rate in oxygen_rates if rate[i] == '1']
	
	if len(oxygen_rates) == 1:
		oxygen = oxygen_rates[0]
	
	zero_count = 0
	one_count = 0
	for rate in co2_rates:
		if rate[i] == '0':
			zero_count += 1
		elif rate[i] == '1':
			one_count += 1
	if zero_count > one_count:
		co2_rates = [rate for rate in co2_rates if rate[i] == '1']
	elif one_count > zero_count or one_count == zero_count:
		co2_rates = [rate for rate in co2_rates if rate[i] == '0']

	if len(co2_rates) == 1:
		co2 = co2_rates[0]
	
print(int(oxygen,2)*int(co2,2))

Dag 3, del 1:

rates = []
with open("input_day3", "r", encoding="utf8") as fin:
	for f in fin:
		rates.append(f)

gamma_rate = ""
epsilon_rate = ""
for i in range(len(rates[0])-1):
	zero_count = 0
	one_count = 0
	for rate in rates:
		if int(rate[i]) == 0:
			zero_count += 1
		elif int(rate[i]) == 1:
			one_count += 1
	if zero_count > one_count:
		gamma_rate += '0'
		epsilon_rate += '1'
	else:
		gamma_rate += '1'
		epsilon_rate += '0'	
print(int(gamma_rate,2) * int(epsilon_rate,2))

Dag 2, del 2:

increase_count = 0
loop_count = 0
position_list = []
with open("input_day2.txt", "r", encoding="utf8") as fin:
	for f in fin:
		position_list.append(f)

horisontal_position = 0
aim = 0
depth = 0
for move in position_list:
	if 'forward ' in move:
		number = int(move.replace('forward ',''))
		horisontal_position += number
		if aim > 0:
			depth += number * aim
	elif 'down ' in move:
		number = int(move.replace('down ',''))
		aim += number
	elif 'up ' in move:
		number = int(move.replace('up ',''))
		aim -= number
	else:
		breakpoint()
print(horisontal_position*depth)

Dag 2, del 1:

increase_count = 0
loop_count = 0
position_list = []
with open("input_day2.txt", "r", encoding="utf8") as fin:
	for f in fin:
		position_list.append(f)

forward_position = 0
depth = 0
for move in position_list:
	if 'forward ' in move:
		number = int(move.replace('forward ',''))
		forward_position += number
	elif 'down ' in move:
		number = int(move.replace('down ',''))
		depth += number
	elif 'up ' in move:
		number = int(move.replace('up ',''))
		depth -= number
	else:
		breakpoint()
print(forward_position*depth)		

Dag 1, del 2:

increase_count = 0
loop_count = 0
number_list = []
with open("input.txt", "r", encoding="utf8") as fin:
	for f in fin:
		number_list.append(int(f))

index_start = 0
index_end = 3
while index_end <= len(number_list):
	print(number_list[index_start:index_end])
	if loop_count == 0:
		last_sum = sum(number_list[index_start:index_end])
		loop_count += 1
		index_start += 1
		index_end += 1
	else:
		new_sum = sum(number_list[index_start:index_end])
		if new_sum > last_sum:
			increase_count += 1
		index_start += 1
		index_end += 1
		last_sum = new_sum
print(increase_count)

Dag 1, del 1:

increase_count = 0
loop_count = 0
with open("input.txt", "r", encoding="utf8") as fin:
	for f in fin:
		number = int(f)
		if loop_count == 0:
			last_number = number
			loop_count += 1
			continue
		else:
			if number > last_number:
				increase_count += 1
			last_number = number
print(increase_count)	

Optimering af indsamling af Borgerforslagsdata

For et par uger siden skrev jeg om en lille robot, jeg har lavet, der tjekker antallet af stemmer per borgerforslagborgerforslag.dk.

Første udgave af robotten gemte det aktuelle stemmeantal for hvert aktivt borgerforslag hvert 10. minut, og da der både er en del borgerforslag og en del minutter, blev det ret hurtigt til ret mange registreringer i min database.

Jeg kom i tanke om, at det kun er nødvendigt at gemme stemmeantallet, når stemmeantallet har ændret sig siden sidste registrering. Hvis et forslag er viralt, registreres stemmeantallet stadigvæk hvert 10. minut. Hvis et forslag er døende, kan der gå meget længere tid mellem hver registrering.

Her er den nye udgave af robotten, som tjekker om der findes andre registreringer af samme forslag med samme stemmeantal, og kun gemmer antal stemmer, hvis der ikke gør:

import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error

# Locale is set to Danish to parse dates correctly
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))

# API url
url = 'https://www.borgerforslag.dk/api/proposals/search'

# Query parameters
suggestions_per_request = 300
params_json = {
	"filter": "active",
	"sortOrder": "NewestFirst",
	"searchQuery":"",
	"pageNumber":0,
	"pageSize": suggestions_per_request
}

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

now = datetime.utcnow()

# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
	with connection:
		with connection.cursor() as cur:
			try:
				# By default, votes are inserted, except when no new votes have been added
				# This variable is used to keep track of whether votes should be inserted
				insert_votes = True
				
				# See if suggestion already exists in table table borgerforslag_suggestion
				sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
				cur.execute(sql, (suggestion['externalId'],))
				suggestion_records = cur.fetchone()
				# If suggestion does not already exist, add suggestion to table borgerforslag_suggestion
				if not suggestion_records:
					suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
					sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
					cur.execute(sql, suggestion_data)
					id = cur.fetchone()[0]
				# If yes, get id of already added suggestion
				else:
					id = suggestion_records[0]
					# Check in table table borgerforslag_vote whether a record with the same number of votes exists.
					# If it does, no need to save votes
					sql = '''SELECT * FROM borgerforslag_vote WHERE suggestion_id = %s AND votes = %s'''
					cur.execute(sql, (id,suggestion['votes']))
					vote_record = cur.fetchone()
					if vote_record:
						insert_votes = False

				# Add votes to table borgerforslag_vote (if suggestion is new or vote count has changed since last run)
				if insert_votes == True:
					sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
					VALUES(%s,%s,%s)'''
					cur.execute(sql, (id,now,suggestion['votes']))
			except Error as e:
				print(e, suggestion)

# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0

# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
	response = requests.post(url, json=params_json)
	json_response = response.json()
	number_of_results = json_response['resultCount']
	requested_results += suggestions_per_request
	number_of_loops += 1
	params_json['pageNumber'] += 1
	for suggestion in json_response['data']:
		suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y')	# convert date to datetime
		insert_suggestion_and_votes(connection, suggestion)

Oprydning

Nu hvor jeg fik gjort tempoet, min database vokser med, lidt langsommere, ville jeg også gerne rydde lidt op i de gamle registreringer, hvor jeg jo havde gemt antal stemmer hvert 10. minut, uanset om antallet havde ændret sig.

Det skrev jeg også et lille script til. Her er logikken at jeg henter alle stemmeregistreringer sorteret efter hvilket borgerforslag, de hører til, og dernæst efter tidspunkt for registreringen.

Med rækkefølgen på plads, kan jeg for hver registrering tjekke, om den både vedrører samme borgerforslag som den tidligere registrering, og at stemmeantallet er det samme som den tidligere registrering. Hvis begge dele er sandt, er registreringen overflødig og kan slettes:

import psycopg2
from psycopg2 import Error

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

with connection:
	with connection.cursor() as cur:
		sql = '''SELECT "borgerforslag_vote"."id", "borgerforslag_vote"."suggestion_id", "borgerforslag_vote"."timestamp", "borgerforslag_vote"."votes" FROM "borgerforslag_vote" ORDER BY "borgerforslag_vote"."suggestion_id" ASC, "borgerforslag_vote"."timestamp" ASC'''
		cur.execute(sql)
		rows = cur.fetchall()

		previous_vote_number = -1
		previous_vote_suggestion = -1000
		for row in rows:
			votes = row[3]
			suggestion = row[1]
			id = row[0]
			if votes == previous_vote_number and previous_vote_suggestion == suggestion:
				sql = '''DELETE FROM "borgerforslag_vote" WHERE "borgerforslag_vote"."id" = %s'''
				cur.execute(sql, (id, ))
			previous_vote_number = row[3]
			previous_vote_suggestion = row[1]

Test af kandidattest hos DR med Python

Hos Danmarks Radio kan man forsøge at afklare sine holdninger og se, hvilke politikeres holdninger, der ligner mest. Rækken af metodiske problemer med sådan nogle kandidattests er lang, men på Twitter skrev Søren om en mulig skævhed alene i kraft af, at der typisk er “flere kandidater i posen” fra store partier end små:

Twitterspørgsmål og -svar om skævheder i kandidattests

Tobias var først med at komme med et bud, men jeg kunne ikke lade være med selv at prøve at teste hypotesen. Det kom der dette lille program ud af, der fyrer 25.000 tilfældige svar på testen af sted og ser hvad Danmarks Radio svarer tilbage. Læg mærke til at programmet ikke generer 25.000 forskellige tilfældige kombinationer men blot genererer tilfældige kombinationer 25.000 gange. (Det vil sige, at den samme kombination kan forekomme flere gange ud af de 25.000 gange.):

import requests
import random

base_url = 'https://www.dr.dk/nyheder/politik/api/kandidattest/GetMunicipalityMatch?municipality=124&answers='
stats = {}

for i in range(25000):
	try:
		sequence = ",".join([str(random.choice([1,2,4,5])) for i in range(18)])
		response = requests.get(base_url + sequence)
		json = response.json()
		candidate_one_party = json['TopMatches'][0]['CandidateBasic']['Party']
		if candidate_one_party not in stats:
			stats[candidate_one_party] = 1
		else:
			stats[candidate_one_party] += 1
		print(i)	
	except:
		pass

with open('stats.txt', 'w') as output:
	output.write(str(stats))

Det kom der følgende rangliste ud af for Københavns Kommune. Tabellen viser hvor mange gange en kandidat fra partiet var den kandidat, man var mest enig med på baggrund af sine svar i testen:

Parti Antal Procent
Socialdemokratiet421017%
Det Konservative Folkeparti368015%
Radikale Venstre326213%
Venstre, Danmarks Liberale Parti263211%
SF – Socialistisk Folkeparti20348%
Alternativet15286%
Kristendemokraterne12315%
Frihedslisten11204%
Nye Borgerlige11094%
Enhedslisten – De Rød-Grønne9634%
Dansk Folkeparti9604%
Kommunisterne6062%
Veganerpartiet4212%
Københavnerlisten2851%
Hampepartiet2731%
Liberal Alliance1931%
Kommunistisk Parti1901%
Danmark for Alle1701%
Det Demokratiske Parti670%
Bæredygtigt Samfund430%
Rolig Revolution210%
Total 24998 100%
Tabel der viser udfald af partifarve for mest enige kandidater ved 24998 tilfældige udfyldninger af DRs kandidattest til kommunalvalget for Københavns Kommune

Prøv selv, hvis du gider! Og husk: De fleste, der udfylder kandidattests i virkeligheden, slår nok ikke med terning når de vælger svar.

Hvordan udvikler antal underskrifter sig på Borgerforslag.dk?

På Twitter skrev Peter Brodersen:

Peters idé er sjov, synes jeg, så jeg er så småt begyndt at bygge et eller andet, der monitorerer hvordan antallet af underskrifter på borgerforslag udvikler sig over tid.

Så nu tygger min webserver sig igennem nedenstående script hvert 10. minut og gemmer det aktuelle antal underskrifter på hvert borgerforslag. Når der er gået nogle uger, vil jeg se om jeg kan lave nogle interessante visualiseringer af data.

import requests
from datetime import datetime
import locale
import psycopg2
from psycopg2 import Error

### PREPARATION ###
# Locale is set to Danish to be able to parse dates from Borgerforslag
locale.setlocale(locale.LC_TIME, ('da_DK', 'UTF-8'))

# API url and request parameters
url = 'https://www.borgerforslag.dk/api/proposals/search'
suggestions_per_request = 300
params_json = {
	"filter": "active",
	"sortOrder": "NewestFirst",
	"searchQuery":"",
	"pageNumber":0,
	"pageSize": suggestions_per_request
}

# Connect to database
try:
	connection = psycopg2.connect(user = "",
									password = "",
									host = "",
									port = "",
									database = "")
	cursor = connection.cursor()
except (Exception, psycopg2.Error) as error:
	print ("Error while connecting to PostgreSQL", error)

now = datetime.utcnow()

# Insert into database function
def insert_suggestion_and_votes(connection, suggestion):
	with connection:
		with connection.cursor() as cur:
			try:
				# See if suggestion already exists
				sql = '''SELECT * FROM borgerforslag_suggestion WHERE unique_id = %s'''
				cur.execute(sql, (suggestion['externalId'],))
				suggestion_records = cur.fetchone()
				# If not, add suggestion
				if not suggestion_records:
					suggestion_data = (suggestion['externalId'],suggestion['title'],suggestion['date'],suggestion['url'],suggestion['status'])
					sql = '''INSERT INTO borgerforslag_suggestion(unique_id,title,suggested_date,url,status) VALUES(%s,%s,%s,%s,%s) RETURNING id'''
					cur.execute(sql, suggestion_data)
					id = cur.fetchone()[0]
				# If yes, get id
				else:
					id = suggestion_records[0]
			
				# Add votes
				sql = '''INSERT INTO borgerforslag_vote(suggestion_id,timestamp,votes)
				VALUES(%s,%s,%s)'''
				cur.execute(sql, (id,now,suggestion['votes']))
			except Error as e:
				print(e, suggestion)

# Loop preparation
requested_results = 0
number_of_results = requested_results + 1
number_of_loops = 0

# Loop to get suggestions and add them to database
while requested_results < number_of_results and number_of_loops < 10:
	response = requests.post(url, json=params_json)
	json_response = response.json()
	number_of_results = json_response['resultCount']
	requested_results += suggestions_per_request
	number_of_loops += 1
	params_json['pageNumber'] += 1
	for suggestion in json_response['data']:
		suggestion['date'] = datetime.strptime(suggestion['date'], '%d. %B %Y')	# convert date to datetime
		insert_suggestion_and_votes(connection, suggestion)

Opdateret program til at hent kurser på dine værdipapirer hos Nordnet

Nordnet har opdateret deres loginprocedure, så her er et dugfrist program til at hente kurser hos Nordnet – eller Morningstar, hvis Nordnet skulle fejle:

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program extracts historical stock prices from Nordnet (and Morningstar as a fallback) """

import requests
from datetime import datetime
from datetime import date

# Nordnet user account credentials
user = ''
password = ''

# DATE AND STOCK DATA. SHOULD BE EDITED FOR YOUR NEEDS #

# Start date (start of historical price period)
startdate = '2013-01-01'

# List of shares to look up prices for.
# Format is: Name, Morningstar id, Nordnet stock identifier
# See e.g. https://www.nordnet.dk/markedet/aktiekurser/16256554-novo-nordisk-b
# (identifier is 16256554)
# All shares must have a name (whatever you like). To get prices they must
# either have a Nordnet identifier or a Morningstar id
sharelist = [
	["Maj Invest Danske Obligationer","F0GBR064UX",16099874],
	["Novo Nordisk B A/S","0P0000A5BQ",16256554],
]

# A variable to store historical prices before saving to csv	
finalresult = ""
finalresult += '"date";"price";"instrument"' + '\n'

# LOGIN TO NORDNET #

session = requests.Session()

# Setting cookies prior to login by visiting login page
url = 'https://www.nordnet.dk/logind'
request = session.get(url)

# Update headers for login
session.headers['client-id'] = 'NEXT'
session.headers['sub-client-id'] = 'NEXT'

# Actual login
url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
request = session.post(url, data = {'username': user, 'password': password})

# LOOPS TO REQUEST HISTORICAL PRICES AT NORDNET AND MORNINGSTAR #

# Nordnet loop to get historical prices
nordnet_fail = []

for share in sharelist:
	# Nordnet stock identifier and market number must both exist
	if share[2]:
		url = "https://www.nordnet.dk/api/2/instruments/historical/prices/" + str(share[2])
		payload = {"from": startdate, "fields": "last"}
		data = session.get(url, params=payload)
		jsondecode = data.json()
		
		# Sometimes the final date is returned twice. A list is created to check for duplicates.
		datelist = []
		if jsondecode[0]['prices']:
			try:
				for value in jsondecode[0]['prices']:
					if 'last' in value:
						price = str(value['last'])
					elif 'close_nav' in value:
						price = str(value['close_nav'])
					price = price.replace(".",",")
					date = datetime.fromtimestamp(value['time'] / 1000)
					date = datetime.strftime(date, '%Y-%m-%d')
					# Only adds a date if it has not been added before
					if date not in datelist:
						datelist.append(date)
						finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
			except Exception as error:
				print(error)
				breakpoint()
		# No price data returned! Try another method!
		else:
			nordnet_fail.append(share)

if nordnet_fail:
	print(nordnet_fail)
	# Morningstar loop to get historical prices			
	for share in nordnet_fail:
		# Only runs for one specific fund in this instance
		payload = {"id": share[1], "currencyId": "DKK", "idtype": "Morningstar", "frequency": "daily", "startDate": startdate, "outputType": "COMPACTJSON"}
		data = requests.get("http://tools.morningstar.dk/api/rest.svc/timeseries_price/nen6ere626", params=payload)
		jsondecode = data.json()
		
		for lists in jsondecode:
			price = str(lists[1])
			price = price.replace(".",",")
			date = datetime.fromtimestamp(lists[0] / 1000)
			date = datetime.strftime(date, '%Y-%m-%d')
			finalresult += '"' + date + '"' + ";" + '"' + price + '"' + ";" + '"' + share[0] + '"' + "\n"
		
# WRITE CSV OUTPUT TO FILE #			

with open("kurser.csv", "w", newline='', encoding='utf8') as fout:
	fout.write(finalresult)

Opdateret program til at hente transaktioner hos Nordnet

Nordnet har opdateret deres login-procedure, så jeg har også opdateret mit Python-script til at logge ind på Nordnet og hente transaktioner. Her er den nye version.

# -*- coding: utf-8 -*-
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
""" This program logs into a Nordnet account and extracts transactions as a csv file.
Handy for exporting to Excel with as few manual steps as possible """

import requests 
from datetime import datetime
from datetime import date

# USER ACCOUNT, PORTFOLIO AND PERIOD DATA. SHOULD BE EDITED FOR YOUR NEEDS #

# Nordnet user account credentials and accounts/portfolios names (choose yourself) and numbers.
# To get account numbers go to https://www.nordnet.dk/transaktioner and change
# between accounts. The number after "accid=" in the new URL is your account number.
# If you have only one account, your account number is 1.
user = ''
password = ''
accounts = {
	'Nordnet: Frie midler': '1',
	'Nordnet: Ratepension': '3',
}

# Start date (start of period for transactions) and date today used for extraction of transactions
startdate = '2013-01-01'
today = date.today()
enddate = datetime.strftime(today, '%Y-%m-%d')

# Manual data lines. These can be used if you have portfolios elsewhere that you would
# like to add manually to the data set. If no manual data the variable manualdataexists
# should be set to False
manualdataexists = True
manualdata = '''
Id;Bogføringsdag;Handelsdag;Valørdag;Depot;Transaktionstype;Værdipapirer;Værdipapirtype;ISIN;Antal;Kurs;Rente;Samlede afgifter;Samlede afgifter Valuta ;Beløb;Valuta;Indkøbsværdi;Resultat;Totalt antal;Saldo;Vekslingskurs;Transaktionstekst;Makuleringsdato;Notanummer;Verifikationsnummer;Kurtage;Kurtage Valuta;Depotnavn
;30-09-2013;30-09-2013;30-09-2013;;KØBT;Obligationer 3,5%;Obligationer;;72000;;;;;-69.891,54;DKK;;;;;;;;;;;;Frie midler: Finansbanken
'''

# A variable to store transactions before saving to csv
transactions = ''

# LOGIN TO NORDNET #
session = requests.Session()

# Setting cookies prior to login by visiting login page
url = 'https://www.nordnet.dk/logind'
request = session.get(url)

# Update headers for login
session.headers['client-id'] = 'NEXT'
session.headers['sub-client-id'] = 'NEXT'

# Actual login
url = 'https://www.nordnet.dk/api/2/authentication/basic/login'
request = session.post(url, data = {'username': user, 'password': password})


# GET ACCOUNT(S) TRANSACTION DATA #

# Payload and url for transaction requests
payload = {
	'locale': 'da-DK',
	'from': startdate,
	'to': enddate,
}

url = 'https://www.nordnet.dk/mediaapi/transaction/csv/filtered'

firstaccount = True
for portfolioname, id in accounts.items():
	payload['account_id'] = id
	data = session.get(url, params=payload)
	result = data.content.decode('utf-16')
	result = result.replace('\t',';')
	result = result.splitlines()
	
	firstline = True
	for line in result:
		# For first account and first line, we use headers and add an additional column
		if line and firstline == True and firstaccount == True:
			transactions += line + ';' + 'Depotnavn' + '\n'
			firstaccount = False
			firstline = False
		# First lines of additional accounts are discarded
		elif line and firstline == True and firstaccount == False:
			firstline = False
		# Content lines are added
		elif line and firstline == False:
			# Fix because Nordnet sometimes adds one empty column too many
			if line.count(';') == 27:
				line = line.replace('; ',' ')
			transactions += line + ';' + portfolioname + '\n'

# ADD MANUAL LINES IF ANY #
if manualdataexists == True:
	manualdata = manualdata.split("\n",2)[2]
	transactions += manualdata				

# WRITE CSV OUTPUT TO FILE #
with open("transactions.csv", "w", encoding='utf8') as fout:
	fout.write(transactions)

RSS-feeds med Django

En Wallnot-bruger spurgte mig om ikke Wallnot burde have et RSS-feed? Jo da.

Det viste sig at den slags er indbygget i Django og meget nemt at lave.

Jeg oprettede feeds.py med to forskellige feeds. Et for alle artikler fra Wallnot, et for artikler for bestemte medier:

from django.contrib.syndication.views import Feed
from wall.models import Article
from django.urls import reverse

class RssFeed(Feed):
	title = "Nyeste artikler fra wallnot.dk"
	link = "/rss/"
	description = "De allernyeste artikler uden paywall fra wallnot.dk"
	def items(self):
		return Article.objects.filter(paywall_detected=False).order_by('-date')[:20]

	def item_title(self, item):
		return item.title

	def item_description(self, item):
		return "Artikel fra " + item.get_medium_display()

	def item_link(self, item):
		return item.url

class RssFeedMedium(Feed):
	title = "Nyeste artikler fra wallnot.dk"
	description = "De allernyeste artikler uden paywall fra wallnot.dk"
	def get_object(self, request, medium):
		return medium
	
	def link(self, obj):
		return "/rss/" + obj
	
	def items(self, obj):
		return Article.objects.filter(paywall_detected=False, medium=obj).order_by('-date')[:20]

	def item_title(self, item):
		return item.title

	def item_description(self, item):
		return "Artikel fra " + item.get_medium_display()

	def item_link(self, item):
		return item.url

Og pegede på dem fra urls.py:

from django.urls import path
from . import views
from . import feeds

urlpatterns = [
	path('', views.index, name='index'),
	path('links', views.linkindex, name='linkindex'),
	path('privatliv', views.privacy, name='privacy'),
	path('om', views.about, name='wabout'),
	path('rss', feeds.RssFeed(), name='rssfeed'),
	path('rss/<str:medium>', feeds.RssFeedMedium(), name='rssfeed_medium'),
]

Og vupti!

Du finder RSS-feeds på wallnot.dk lige ved siden af Twitter-logoet.

ETF’er og fonde med aktiebeskatning 2021

For et par år siden blev det muligt at købe og tjene/tabe penge på aktiebaserede ETF’er og udenlandske investeringsfonde som aktieindkomst og ikke længere som kapitalindkomst.

Det eneste problem er/var, er at det velmenende regneark, der viser aktiebaserede investeringsselskaber, som er godkendt til den lavere beskatningskat.dk, er en lille smule svært at bruge, når man gerne vil sammenligne værdipapirerne og finde ud af, hvor de kan købes.

Derfor har jeg lavet https://wallnot.dk/stocks.

Her kan du læse om, hvordan jeg gjorde.

  1. Jeg downloaded excelarket fra skat.dk
  2. Jeg tilføjede nogle kolonner og gemte som CSV-fil
  3. Jeg brugte Python til at hente data og links til værdipapirer hos Saxo Bank, Nordnet og Morningstar
  4. Jeg oprettede en app i Django og definerede en datamodel tilsvarende excelarket
  5. Jeg importerede data til Django
  6. Jeg byggede visningen

Nogle timers arbejde for mig. Forhåbentlig nogle sparede timer for dig.

Download af excelark

https://skat.dk/getfile.aspx?id=145013&type=xlsx

Tilføje nogle kolonner og gemme som CSV-fil

Lidt upædagogisk, men hvad:

Registreringsland/Skattemæssigt hjemsted;ISIN-kode;Navn;LEI kode;ASIDENT;CVR/SE/TIN;Venligt navn;Første registreringsår;Morningstar_id;Saxo_id;Nordnet_url;Nordnet_id;Nordnet_ÅOP;Nordnet_udbyttepolitik;Nordnet_prospekt;Saxo_url;Morningstar_prospekt;Morningstar_url;Morningstar_ÅOP

Hente data og links til værdipapirer

Ret sjusket Python-program. Men fungerer OK:

import csv
import requests
import re
import json
from bs4 import BeautifulSoup

def nordnet_cookies():
	# Nordnet user account credentials
	user = ''
	password = ''

	# A cookie dictionary for storing cookies
	cookies = {}
	
	# First part of cookie setting prior to login
	url = 'https://classic.nordnet.dk/mux/login/start.html?cmpi=start-loggain&state=signin'
	request = requests.get(url)
	cookies['LOL'] = request.cookies['LOL']
	cookies['TUX-COOKIE'] = request.cookies['TUX-COOKIE']

	# Second part of cookie setting prior to login
	url = 'https://classic.nordnet.dk/api/2/login/anonymous'
	request = requests.post(url, cookies=cookies)
	cookies['NOW'] = request.cookies['NOW']

	# Actual login that gets us cookies required for later use
	url = "https://classic.nordnet.dk/api/2/authentication/basic/login"
	request = requests.post(url,cookies=cookies, data = {'username': user, 'password': password})
	cookies['NOW'] = request.cookies['NOW']
	cookies['xsrf'] = request.cookies['xsrf']

	# Getting a NEXT cookie
	url = "https://classic.nordnet.dk/oauth2/authorize?client_id=NEXT&response_type=code&redirect_uri=https://www.nordnet.dk/oauth2/"
	request = requests.get(url, cookies=cookies)
	cookies['NEXT'] = request.history[1].cookies['NEXT']

	return cookies

def saxo_headers():
	# Saxo user account credentials
	user = ''
	password = ''

	# Visit login page and get AuthnRequest token value from input form
	url = 'https://www.saxoinvestor.dk/Login/da/'
	request = requests.get(url)
	soup = BeautifulSoup(request.text, "html.parser")
	input = soup.find_all('input', {"id":"AuthnRequest"})
	authnrequest = input[0]["value"]

	# Login step 1: Submit username, password and token and get another token back
	url = 'https://www.saxoinvestor.dk/Login/da/'
	request = requests.post(url, data = {'field_userid': user, 'field_password': password, 'AuthnRequest': authnrequest})
	soup = BeautifulSoup(request.text, "html.parser")
	input = soup.find_all('input', {"name":"SAMLResponse"})
	# Most of the time this works
	if input:
		samlresponse = input[0]["value"]
	# But sometimes there's a disclaimer that Saxo Bank would like you to accept
	else:
		input = soup.find_all('input')
		inputs = {}
		try:
			for i in input:
				inputs[i['name']] = i['value']
		except:
			pass
		url = 'https://www.saxotrader.com/disclaimer'
		request = requests.post(url, data=inputs)
		cook = request.cookies['DisclaimerApp']
		returnurl = cook[cook.find("ReturnUrl")+10:cook.find("&IsClientStation")]
		url = 'https://live.logonvalidation.net/complete-app-consent/' + returnurl[returnurl.find("complete-app-consent/")+21:]
		request = requests.get(url)
		soup = BeautifulSoup(request.text, "html.parser")
		input = soup.find_all('input', {"name":"SAMLResponse"})
		samlresponse = input[0]["value"]	

	# Login step 2: Get bearer token necessary for API requests
	url = 'https://www.saxoinvestor.dk/investor/login.sso.ashx'
	r = requests.post(url, data = {'SAMLResponse': samlresponse})

	bearer = r.history[0].headers['Location']
	bearer = bearer[bearer.find("BEARER"):bearer.find("/exp/")]
	bearer = bearer.replace("%20"," ")

	# START API CALLS
	# Documentation at https://www.developer.saxo/openapi/learn

	# Set bearer token as header
	headers = {'Authorization': bearer}

	return headers
	

nordnet_cookies = nordnet_cookies()
saxo_headers = saxo_headers()

filename = 'Copy of ABIS liste 2021 - opdateret den 11-01-2021.csv'
output_file = 'stocks.csv'

get_nordnet = True
get_saxo = True
get_morningstar = True


with open(output_file, 'w', newline='') as output_csv:
	paperwriter = csv.writer(output_csv, delimiter=';', quotechar ='"', quoting = csv.QUOTE_MINIMAL)

	with open(filename) as csvfile:
		paperreader = csv.reader(csvfile, delimiter=';')
		for row in paperreader:
			if row[1] != '0' and row[1] != 'ISIN-kode' and row[1] != '':
				isin = row[1]
				if get_morningstar == True:
					morningstar = requests.get('https://www.morningstar.dk/dk/util/SecuritySearch.ashx?q=' + isin)
					morningstar_text = morningstar.text
					if morningstar_text:
						first_hit = morningstar_text[morningstar_text.index("{"):morningstar_text.index("}")+1]
						first_hit_json = json.loads(first_hit)
						morningstar_id = first_hit_json['i']
						morningstar_url = 'https://www.morningstar.dk/dk/funds/snapshot/snapshot.aspx?id=' + morningstar_id
						morningstar_info = requests.get(morningstar_url)
						
						soup = BeautifulSoup(morningstar_info.text, "lxml")
						try:
							aop = soup.find(text=re.compile('Løbende omkostning'))
							aop_value = aop.parent.next.next.next.next.next.next.next.string
							if aop_value:
								cleaned_aop = aop_value.replace(",",".").replace("%","")
							else:
								cleaned_aop = ''
						except:
							cleaned_aop = ''
						
						morningstar_documents = requests.get('https://www.morningstar.dk/dk/funds/snapshot/snapshot.aspx?id=' + morningstar_id + '&tab=12')
						document_soup = BeautifulSoup(morningstar_documents.text, "lxml")
						try:
							prospect = document_soup.find(text=re.compile('CI'))
							prospect_link = prospect.parent.next.next.next.next.next.next.next.next.a['href']
							document_id = prospect_link[prospect_link.index("Id=")+3:prospect_link.rfind("&")]
							document_url = 'https://doc.morningstar.com/document/' + document_id + '.msdoc'
						except:
							try:
								prospect = document_soup.find(text=re.compile('Prospekt'))
								prospect_link = prospect.parent.next.next.next.next.next.next.next.next.a['href']
								document_id = prospect_link[prospect_link.index("Id=")+3:prospect_link.rfind("&")]
								document_url = 'https://doc.morningstar.com/document/' + document_id + '.msdoc'
							except:
								document_url = ''
						
						row[8] = morningstar_id
						row[16] = document_url
						row[17] = morningstar_url
						row[18] = cleaned_aop

				if get_saxo == True:
					saxo = requests.get('https://www.saxotrader.com/openapi/ref/v1/instruments/?$top=201&$skip=0&includeNonTradable=true&AssetTypes=Stock,Bond,MutualFund,Etf,Etc,Etn,Fund,Rights,CompanyWarrant,StockIndex&keywords=' + isin + '&OrderBy=', headers=saxo_headers)
					try:
						saxo_json = saxo.json()
						if saxo_json and saxo.status_code == 200:
							try:
								data = saxo_json['Data']
								if data:
									identifier = data[0]['Identifier']
									assettype = data[0]['AssetType']
									saxo_url = 'https://www.saxotrader.com/d/trading/product-overview?assetType=' + assettype + '&uic=' + str(identifier)
									row[9] = identifier
									row[15] = saxo_url
							except Exception as e:
								print(e)
								breakpoint()
					except:
						pass
				if get_nordnet == True:
					nordnet = requests.get('https://www.nordnet.dk/api/2/main_search?query=' + isin + '&search_space=ALL&limit=60', cookies=nordnet_cookies)
					nordnet_json = nordnet.json()
					if nordnet_json and nordnet.status_code == 200:
						try:
							display_types = [hit['display_group_type'] for hit in nordnet_json]
						except:
							breakpoint()
						good_hit = "wait"
						try:
							good_hit = display_types.index('ETF')
							base_url = 'https://www.nordnet.dk/markedet/etf-lister/'
						except:
							try:
								good_hit = display_types.index('PINV')
								base_url = 'https://www.nordnet.dk/markedet/investeringsforeninger-liste/'
							except:
								try:
									good_hit = display_types.index('FUND')
									base_url = 'https://www.nordnet.dk/markedet/fondslister/'
								except:
									try:
										bad_hit = display_types.index('NEWS')
									except:
										try:
											good_hit = display_types.index('EQUITY')
											base_url = 'https://www.nordnet.dk/markedet/aktiekurser/'
										except:
											breakpoint()
						if good_hit != 'wait':
							results = nordnet_json[good_hit]['results']
							instrument_id = results[0]['instrument_id']
							display_name = results[0]['display_name']

							space_counter = 0
							paper_url = ''
							for letter in display_name:
								if letter == " ":
									space_counter += 1
									if space_counter > 2:
										break
									letter = '-'
									paper_url += letter
								else:
									letter = letter.lower()
									paper_url += letter
							full_url = base_url + str(instrument_id) + '-' + paper_url
							if "&" in full_url:
								full_url = full_url.replace("&","")
							
							check_full_url = requests.get(full_url)
							
							soup = BeautifulSoup(check_full_url.text, "lxml")
							try:
								policy = soup.find('span', text=re.compile('Udbyttepolitik'))
								policy_value = policy.next.next.string
							except:
								policy_value = "Ukendt"
							try:
								prospectus = soup.find('span', text=re.compile('Faktaark'))
								prospectus_value = prospectus.next.next.a['href']
								cleaned_prospectus = prospectus_value[:prospectus_value.rfind("?")].replace('http','https')
							except:
								cleaned_prospectus = "Ukendt"
							try:
								aop = soup.find('span', text=re.compile('Årlig omkostning'))
								aop_value = aop.next.next.get_text()
								cleaned_aop = aop_value.replace(",",".").replace("%","")
							except:
								cleaned_aop = "Ukendt"							
							
							row[10] = check_full_url.url
							row[11] = instrument_id
							row[12] = cleaned_aop
							row[13] = policy_value
							row[14] = cleaned_prospectus
			print(row)
			paperwriter.writerow(row)

Datamodel i Django

Her er models.py:

from django.db import models

class Stock(models.Model):
	country = models.CharField('Registreringsland', max_length=2)
	isin = models.CharField('ISIN-kode', max_length=20, blank=True)
	name = models.CharField('Navn', max_length=200, blank=True)
	lei = models.CharField('LEI-kode', max_length=20, blank=True)
	asident = models.CharField('ASIDENT', max_length=20, blank=True)
	cvr = models.CharField('CVR/SE/TIN', max_length=20, blank=True)
	friendly_name = models.CharField('Venligt navn', max_length=200, blank=True)
	first_registration_year = models.CharField('Første registreringsår', max_length=4, blank=True)
	morningstar_id = models.CharField('Morningstar: Id', max_length=20, blank=True)
	saxo_id = models.CharField('Saxo Bank: Id', max_length=20, blank=True)
	nordnet_id = models.CharField('Nordnet: Id', max_length=20, blank=True)
	morningstar_url = models.URLField('Morningstar: Url', max_length=200, blank=True)
	saxo_url = models.URLField('Saxo Bank: Url', max_length=200, blank=True)
	nordnet_url = models.URLField('Nordnet: Url', max_length=200, blank=True)
	morningstar_aop = models.FloatField('Morningstar: Løbende omkostninger', null=True, blank=True)
	nordnet_aop = models.FloatField('Nordnet: Løbende omkostninger', null=True, blank=True)
	nordnet_dividend = models.CharField('Nordnet: Udbyttepolitik', max_length=20, blank=True)
	nordnet_prospect = models.URLField('Nordnet: Investorinformation', max_length=200, blank=True)
	morningstar_prospect = models.URLField('Morningstar: Investorinformation', max_length=200, blank=True)

Importere data til Django

Her brugte jeg Django’s databasehåndtering i stedet for selv at skrive SQL-sætninger:

import csv
with open('stocks.csv', newline='\n') as csvfile:
	reader = csv.DictReader(csvfile, delimiter=";")
	count = 0
	for row in reader:
		stock = Stock(country = row['Registreringsland/Skattemæssigt hjemsted'])
		if row['ISIN-kode']:
			stock.isin = row['ISIN-kode']
		if row['Navn']:
			stock.name = row['Navn']
		if row['LEI kode']:	
			stock.lei = row['LEI kode']
		if row['ASIDENT']:	
			stock.asident = row['ASIDENT']
		if row['CVR/SE/TIN']:	
			stock.cvr = row['CVR/SE/TIN']
		if row['Venligt navn']:	
			stock.friendly_name = row['Venligt navn']
		if row['Første registreringsår']:	
			stock.first_registration_year = row['Første registreringsår']
		if row['Morningstar_id']:	
			stock.morningstar_id = row['Morningstar_id']
		if row['Saxo_id']:	
			stock.saxo_id = row['Saxo_id']
		if row['Nordnet_id']:	
			stock.nordnet_id = row['Nordnet_id']
		if row['Morningstar_url']:	
			stock.morningstar_url = row['Morningstar_url']
		if row['Saxo_url']:	
			stock.saxo_url = row['Saxo_url']
		if row['Nordnet_url']:	
			stock.nordnet_url = row['Nordnet_url']
		if row['Morningstar_ÅOP']:	
			stock.morningstar_aop = row['Morningstar_ÅOP']
		if row['Nordnet_ÅOP'] and row['Nordnet_ÅOP'] != '-' and row['Nordnet_ÅOP'] != 'Ukendt':	
			stock.nordnet_aop = row['Nordnet_ÅOP']
		if row['Nordnet_udbyttepolitik']:	
			stock.nordnet_dividend = row['Nordnet_udbyttepolitik']
		if row['Nordnet_prospekt']:	
			stock.nordnet_prospect = row['Nordnet_prospekt']
		if row['Morningstar_prospekt']:	
			stock.morningstar_prospect = row['Morningstar_prospekt']

		stock.save()
		count += 1
		print(count)

Bygge visningen

Her er views.py:

from django.shortcuts import render
from .models import Stock

def index(request):
	#FILTER LOGIC
	if request.GET.get('filter'):
		filter = request.GET.get('filter')
		if filter == 'nordnetsaxo':
			stocks = Stock.objects.exclude(nordnet_url='') | Stock.objects.exclude(saxo_url='')
		elif filter == 'nordnet':
			stocks = Stock.objects.exclude(nordnet_url='')
		elif filter == 'saxo':
			stocks = Stock.objects.exclude(saxo_url='')
		elif filter == 'ikkenordnetsaxo':
			stocks = Stock.objects.filter(nordnet_url='').filter(saxo_url='')
		elif filter == 'alle':
			stocks = Stock.objects.all()
	else:
		stocks = Stock.objects.exclude(nordnet_url='') | Stock.objects.exclude(saxo_url='')
	
	#SORT LOGIC
	sort = request.GET.get('sort')
	print(sort)
	if sort == "name" or not sort:
		stocks = stocks.order_by('name')
	elif sort == "-name":
		stocks = stocks.order_by('-name')
	elif sort == "isin":
		stocks = stocks.order_by('isin')
	elif sort == "-isin":
		stocks = stocks.order_by('-isin')
	elif sort == "morningstar_aop":
		stocks = stocks.order_by('morningstar_aop')
	elif sort == "-morningstar_aop":
		stocks = stocks.order_by('-morningstar_aop')
	elif sort == "nordnet_aop":
		stocks = stocks.order_by('nordnet_aop')
	elif sort == "-nordnet_aop":
		stocks = stocks.order_by('-nordnet_aop')
		
	context = {'stocks': stocks}
	return render(request, 'stocks/index.html', context)

Og her er så skabelonen index.html:

{% extends "stocks/base.html" %}
{% load static %}
{% block title %}ETF'er og fonde med aktiebeskatning 2021{% endblock %}
{% block content %}{% spaceless %}

<h1>ETF'er og fonde med aktiebeskatning 2021</h1>

<p>Du har læst om, <a href="https://www.nordnet.dk/blog/nye-regler-for-beskatning-af-investeringsfonde/">at aktiebaserede ETF'er og udenlandske investeringsfonde fra 2020 beskattes som aktieindkomst og ikke længere som kapitalindkomst</a>.</p>

<p>Du har endda fundet <a href="https://skat.dk/getfile.aspx?id=145013&type=xlsx">det fine regneark, der viser aktiebaserede investeringsselskaber</a> på <a href="https://skat.dk/skat.aspx?oid=2244641">skat.dk</a>.</p>

<p>Men det er godt nok svært for dig at få overblik over, hvilke af papirerne du overhovedet kan købe som almindelig hobby-/cryptoinvestor, og at sammenligne omkostninger, ÅOP og hvad det ellers hedder, for at finde det rigtige køb.</p>

<p>Her er et forsøg på at løse dit (og mit) problem. Data kommer fra <a href="https://skat.dk/getfile.aspx?id=145013&type=xlsx">det fine regneark</a> og har samme fejl og mangler, men er suppleret med nyttige informationer og links.</p>

<p><a href="#forbehold">Du kan læse om forbehold nederst på siden</a> og du kan <a href="https://helmstedt.dk/2021/03/etfer-og-fonde-med-aktiebeskatning-2021/">læse om hvordan siden er lavet på min blog</a>.</p>

<p><strong>Vis til salg hos:</strong>
<form id="prefs">

	<input type="radio" id="nordnetsaxo" name="filter" value="nordnetsaxo"{% if request.GET.filter == "nordnetsaxo" or not request.GET.filter %} checked{% endif %}>
	<label title="Værdipapirer til salg hos Nordnet, Saxo Bank eller begge steder" for="nordnetsaxo">Nordnet og/eller Saxo Bank</label>
	<input type="radio" id="nordnet" name="filter" value="nordnet"{% if request.GET.filter == "nordnet" %} checked{% endif %}>
	<label title="Værdipapirer til salg hos Nordnet" for="nordnet">Nordnet</label>		
	<input type="radio" id="saxo" name="filter" value="saxo"{% if request.GET.filter == "saxo" %} checked{% endif %}>
	<label title="Værdipapirer til salg hos Saxo Bank" for="saxo">Saxo Bank</label>
	<input type="radio" id="ikkenordnetsaxo" name="filter" value="ikkenordnetsaxo"{% if request.GET.filter == "ikkenordnetsaxo" %} checked{% endif %}>
	<label title="Værdipapirer, der hverken er til salg hos Nordnet eller Saxo Bank" for="ikkenordnetsaxo">Ikke Nordnet og/eller Saxo</label>
	<input type="radio" id="alle" name="filter" value="alle"{% if request.GET.filter == "alle" %} checked{% endif %}>
	<label title="Alle værdipapirer, både dem der kan købes hos Nordnet/Saxo Bank og de, der ikke kan" for="alle">Hele pivtøjet</label>
</form>
</p>

<table>
	<tr>
		<th><a href="{% url 'stocks_index' %}?sort={% if request.GET.sort == "-name" %}name{% else %}-name{% endif %}">Navn</a></th>
		<th><a href="{% url 'stocks_index' %}?sort={% if request.GET.sort == "isin" %}-isin{% else %}isin{% endif %}">Isin</a></th>
		<th><a href="{% url 'stocks_index' %}?sort={% if request.GET.sort == "morningstar_aop" %}-morningstar_aop{% else %}morningstar_aop{% endif %}">Løbende omkostninger</a></th>
		<th><a href="{% url 'stocks_index' %}?sort={% if request.GET.sort == "nordnet_aop" %}-nordnet_aop{% else %}nordnet_aop{% endif %}">ÅOP</a></th>
		<th>Investorinformation</th>
		<th>Morningstar</th>
		<th>Nordnet</th>
		<th>Saxo</th>
	</tr>
	{% for stock in stocks %}
	<tr>
		<td>{{ stock.name }}</td>
		<td>{{ stock.isin }}</td>
		<td>{% if stock.morningstar_aop %}{{ stock.morningstar_aop }}%{% endif %}</td>
		<td>{% if stock.nordnet_aop %}{{ stock.nordnet_aop }}%{% endif %}</td>
		<td>{% if stock.nordnet_prospect %}<a href="{{ stock.nordnet_prospect }}">Info</a>{% elif stock.morningstar_prospect %}<a href="{{ stock.morningstar_prospect }}">Info</a>{% endif %}</td>
		<td>{% if stock.morningstar_url %}<a href="{{ stock.morningstar_url }}">Link</a>{% endif %}</td>
		<td>{% if stock.nordnet_url %}<a href="{{ stock.nordnet_url }}">Link</a>{% endif %}</td>
		<td>{% if stock.saxo_url %}<a href="{{ stock.saxo_url }}">Link</a>{% endif %}</td>
	</tr>
	{% endfor %}
</table>

<a name="forbehold"></a>
<h2>Forbehold</h2>
<p>Alt hvad du læser på denne side er løgn og fiktion fra ende til anden og har ingen relation til virkeligheden. Hvis du kunne finde på at læse indholdet, som om det omhandlede værdipapirer, eller at købe, sælge eller tage dig af din personlige hygiejne med værdipapirer på grund af indholdet på denne side, er det fuldstændig et hundrede procent på eget ansvar. Alt hvad der findes på siden er fejlbehæftet, forældet og lavet af en uduelig amatør uden forstand på noget som helst. Du skal regne med, at alle links fører til nogle andre værdipapirer, end man skulle tro, og at de værdipapirer som står til salg et sted sikkert ikke sælges der - og omvendt. Alle oplysninger om løbende omkostninger og ÅOP er fundet ved hjælp af hønebingo og dermed så godt som tilfældige.</p>
{% endspaceless %}{% endblock %}

Wallnots Twitterbot, version 3

Wallnots Twitter-bot finder delte artikler fra Politiken og Zetland på Twitter og deler dem med verden. Det fungerer sådan her:

# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com

import requests
from bs4 import BeautifulSoup
from datetime import datetime
from datetime import date
from datetime import timedelta
import json
import time
import random
from TwitterAPI import TwitterAPI
from nested_lookup import nested_lookup

# CONFIGURATION #
# List to store articles to post to Twitter
articlestopost = []

# Search tweets from last 3 hours
now = datetime.utcnow()
since_hours = 3
since = now - timedelta(hours=since_hours)
since_string = since.strftime("%Y-%m-%dT%H:%M:%SZ")

# Search configuration
# https://developer.twitter.com/en/docs/twitter-api/tweets/search/api-reference/get-tweets-search-recent
# https://github.com/twitterdev/Twitter-API-v2-sample-code/tree/master/Recent-Search
tweet_fields = "tweet.fields=entities"
media_fields = "media.fields=url"
max_results = "max_results=100"
start_time = "start_time=" + since_string

# Twitter API login
client_key = ''
client_secret = ''
access_token = ''
access_secret = ''
api = TwitterAPI(client_key, client_secret, access_token, access_secret)

bearer_token = ''

# POLITIKEN #
# Run search
query = 'politiken.dk/del'

url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
	query, tweet_fields, media_fields, max_results, start_time
)
headers = {"Authorization": "Bearer {}".format(bearer_token)}
response = requests.request("GET", url, headers=headers)
json_response = response.json()

urllist = list(set(nested_lookup('expanded_url', json_response)))

# Only proces urls that were not in our last Twitter query
proceslist = []
with open("./pol_lastbatch.json", "r", encoding="utf8") as fin:
	lastbatch = list(json.load(fin))
	for url in urllist:
		if url not in lastbatch and query in url:
			proceslist.append(url)
# Save current query to use for next time
with open("./pol_lastbatch.json", "wt", encoding="utf8") as fout:
	lastbatch = json.dumps(urllist)
	fout.write(lastbatch)

# Request articles and get titles and dates and sort by dates
articlelist = []

pol_therewasanerror = False
for url in proceslist:
	try:
		if 'https://www.google.com' in url:
			start = url.find('url=')+4
			end = url.find('&', start)
			url = url[start:end]	
		if not len(url) == 37:
			url = url[:37]
		data = requests.get(url)
		result = data.text
		if '"isAccessibleForFree": "True"' not in result:
			realurl = data.history[0].headers['Location']
			if not "/article" in realurl and not ".ece" in realurl:
				start_of_unique_id = realurl.index("/art")+1
				end_of_unique_id = realurl[start_of_unique_id:].index("/")
				unique_id = realurl[start_of_unique_id:start_of_unique_id+end_of_unique_id]
			elif "/article"	in realurl and ".ece" in realurl:
				start_of_unique_id = realurl.index("/article")+1
				end_of_unique_id = realurl[start_of_unique_id:].index(".ece")
				unique_id = realurl[start_of_unique_id:start_of_unique_id+end_of_unique_id]
			articlelist.append({"id": unique_id, "url": url})
	except Exception as e:
		print(url)
		print(e)
		pol_therewasanerror = True

#If something fails, we'll process everything again next time			
if pol_therewasanerror == True:
	with open("./pol_lastbatch.json", "wt", encoding="utf8") as fout:
		urllist = []
		lastbatch = json.dumps(urllist)
		fout.write(lastbatch)
	
# Check if article is already posted and update list of posted articles
with open("./pol_published_v2.json", "r", encoding="utf8") as fin:
	alreadypublished = list(json.load(fin))
	# File below used for paywall.py to update wallnot.dk
	for article in articlelist:
		hasbeenpublished = False
		for published_article in alreadypublished:
			if article['id'] == published_article['id']:
				hasbeenpublished = True
				break
		if hasbeenpublished == False:
			alreadypublished.append(article)
			articlestopost.append(article)
	# Save updated already published links
	with open("./pol_published_v2.json", "wt", encoding="utf8") as fout:
		alreadypublishedjson = json.dumps(alreadypublished)
		fout.write(alreadypublishedjson)

# ZETLAND #
# Run search
query = 'zetland.dk/historie'

url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
	query, tweet_fields, media_fields, max_results, start_time
)
headers = {"Authorization": "Bearer {}".format(bearer_token)}
response = requests.request("GET", url, headers=headers)
json_response = response.json()

urllist = list(set(nested_lookup('expanded_url', json_response)))

# Only proces urls that were not in our last Twitter query
proceslist = []
with open("./zet_lastbatch.json", "r", encoding="utf8") as fin:
	lastbatch = list(json.load(fin))
	for url in urllist:
		if url not in lastbatch and query in url:
			proceslist.append(url)
# Save current query to use for next time
with open("./zet_lastbatch.json", "wt", encoding="utf8") as fout:
	lastbatch = json.dumps(urllist)
	fout.write(lastbatch)

# Request articles and get titles and dates and sort by dates
articlelist = []
titlecheck = []

zet_therewasanerror = False
for url in proceslist:
	try:
		if 'https://www.google.com' in url:
			start = url.find('url=')+4
			end = url.find('&', start)
			url = url[start:end]		
		data = requests.get(url)
		result = data.text
		soup = BeautifulSoup(result, "lxml")
		title = soup.find('meta', attrs={'property':'og:title'})
		title = title['content']
		timestamp = soup.find('meta', attrs={'property':'article:published_time'})
		timestamp = timestamp['content']
		timestamp = timestamp[:timestamp.find("+")]
		dateofarticle = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%f')
		if title not in titlecheck:
			articlelist.append({"title": title, "url": url, "date": dateofarticle})
			titlecheck.append(title)
	except Exception as e:
		print(url)
		print(e)
		zet_therewasanerror = True

#If something fails, we'll process everything again next time
if zet_therewasanerror == True:
	with open("./zet_lastbatch.json", "wt", encoding="utf8") as fout:
		urllist = []
		lastbatch = json.dumps(urllist)
		fout.write(lastbatch)


			
articlelist_sorted = sorted(articlelist, key=lambda k: k['date']) 

# Check if article is already posted and update list of posted articles
with open("./zet_published.json", "r", encoding="utf8") as fin:
	alreadypublished = list(json.load(fin))
	for art in articlelist_sorted:
		title = art['title']
		if title not in alreadypublished:
			alreadypublished.append(title)
			articlestopost.append(art)
	# Save updated already published links
	with open("./zet_published.json", "wt", encoding="utf8") as fout:
		alreadypublishedjson = json.dumps(alreadypublished, ensure_ascii=False)
		fout.write(alreadypublishedjson)


# POST TO TWITTER AND FACEBOOK#
friendlyterms = ["flink","rar","gavmild","velinformeret","intelligent","sød","afholdt","bedårende","betagende","folkekær","godhjertet","henrivende","smagfuld","tækkelig","hjertensgod","graciøs","galant","tiltalende","prægtig","kær","godartet","human","indtagende","fortryllende","nydelig","venlig","udsøgt","klog","kompetent","dygtig","ejegod","afholdt","omsorgsfuld","elskværdig","prægtig","skattet","feteret"]
enjoyterms = ["God fornøjelse!", "Nyd den!", "Enjoy!", "God læsning!", "Interessant!", "Spændende!", "Vidunderligt!", "Fantastisk!", "Velsignet!", "Glæd dig!", "Læs den!", "Godt arbejde!", "Wauv!"]

if articlestopost:
	for art in articlestopost:
		if "zetland" in art['url']:
			medium = "@ZetlandMagasin"
		else:
			medium = "@politiken"
		friendlyterm = random.choice(friendlyterms)
		enjoyterm = random.choice(enjoyterms)
		status = "En " + friendlyterm + " abonnent på " + medium + " har delt en artikel. " + enjoyterm
		twitterstatus = status + " " + art['url']
		try:
			twitterupdate = api.request('statuses/update', {'status': twitterstatus})
		except Exception as e:
			print(e)
		time.sleep(15)

En crawler til mappe-visninger på nettet

Hvis du har været på internettet, er du sikkert en gang stødt på sådan ét her:

Mange webadministratorer vælger at skjule disse oversigter over filer på en webserver, som webserversoftwaren Apache kan generere automatisk.

Men jeg opdagede ved et tilfælde, at jeg kunne se, hvad fotoagenturet Magnum havde lagt op i deres WordPress-installation.

Jeg besluttede at forsøge at lave en lokal kopi, så jeg kunne kigge på flotte fotografier uden at skulle vente på downloads fra internettet.

Først forsøgte jeg med Wget, som er et lille program, der er designet til at dublere websteder lokalt. Men Wget havde problemer med at hente og tygge sig igennem de lange lister med filer. En af dem fyldte fx 36 megabytes. Det er altså rigtig mange links.

Derfor lavede jeg et lille Python-program, der kan tygge sig igennem denne type mappe- og filoversigter og downloade dem lokalt.

Her er det:

# apache-directory-downloader.py
# Author: Morten Helmstedt. E-mail: helmstedt@gmail.com
'''A program to fetch files from standard apache directory listings on the internet.
See https://duckduckgo.com/?t=ffab&q=apache%2Bdirectory%2Blisting&ia=images&iax=images
for examples of what this is.'''

import requests					# Send http requests and receive responses
from bs4 import BeautifulSoup	# Parse HTML data structures, e.g. to search for links
import os						# Used to create directories at local destination
import shutil					# Used to copy binary files from http response to local destination
import re						# Regex parser and search functions

# Terms to exclude, files with these strings in them are not downloaded
exclude = [
	"-medium",
	"-overlay",
	"-teaser-",
	"-overlay",
	"-thumbnail",
	"-collaboration",
	"-scaled",
	"-photographer-featured",
	"-photographer-listing",
	"-full-on-mobile",
	"-theme-small-teaser",
	"-post",
	"-large",
	"-breaker",
	]

# Takes an url and collects all links
def request(url, save_location):
	# Print status to let user know that something is going on
	print("Requesting:", url)
	# Fetch url
	response = requests.get(url)
	# Parse response
	soup = BeautifulSoup(response.text, "lxml")
	# Search for all links and exclude certain strings and patterns from links
	urllist = [a['href'] for a in soup.find_all('a', href=True) if not '?C=' in a['href'] and not a['href'][0] == "/" and not any(term in a['href'] for term in exclude) and not re.search("\d\d[x]\d\d",a['href'])]
	# If status code is not 200 (OK), add url to list of errors
	if not response.status_code == 200:
		errorlist.append(url)
	# Send current url, list of links and current local save collection to scrape function
	return scrape(url, urllist, save_location)

def scrape(path, content, save_location):
	# Loop through all links
	for url in content:
		# Print status to let user know that something is going on
		print("Parsing/downloading:", path+url)
		# If there's a slash ("/") in the link, it is a directory
		if "/" in url:
			# Create local directory if it doesn't exists
			try:
				os.mkdir(save_location+url)
			except:
				pass
			# Run request function to fetch contents of directory
			request(path+url, save_location+url)
		# If the link doesn't contain a slash, it's a file and is saved
		else:
			# Check if file already exists, e.g. has been downloaded in a prior run
			if not os.path.isfile(save_location+url):
				# If file doesn't exist, fetch it from remote location
				file = requests.get(path+url, stream=True)
				# Print status to let user know that something is going on
				print("Saving file:", save_location+url)
				# Save file to local destination
				with open(save_location+url, 'wb') as f:
					# Decodes file if received compressed from server
					file.raw.decode_content = True
					# Copies binary file to local destination
					shutil.copyfileobj(file.raw, f)

# List to collect crawling errors
errorlist = []
# Local destination, e.g. 'C:\Downloads' for Windows
save_location = "C:/Downloads/"
# Remote location, e.g. https://example.com/files
url = "https://content.magnumphotos.com/wp-content/uploads/"
# Call function to start crawling
request(url, save_location)
# Print any crawling errors
print(errorlist)