Digital Post fra mit.dk til din e-mail

https://github.com/helmstedt/digitalpost-utilities er jeg gået i luften med et program, der gør det muligt for dig at slippe for at logge ind på mit.dk hver gang du har fået ny Digital Post.

Jeg brugte https://github.com/dk/Net-MitDK til at forstå metodikken og Fiddler til at overvåge trafikken til og fra https://mit.dk og aflure sidens API.

De to hovedkomponenter i programmet er a) et program til at gennemføre første login på mit.dk i en browser med NemID/MitID og b) et program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted.

Program til at gennemføre første login på mit.dk i en browser med NemID/MitId

# Logs in to mit.dk og saves tokens needed for further requests.
# Method from https://github.com/dk/Net-MitDK/. Thank you.
from seleniumwire import webdriver
import requests
from bs4 import BeautifulSoup
import http.cookies
import gzip
import json 
import base64
from hashlib import sha256
import string
import secrets
from mit_dk_configuration import tokens_filename

def random_string(size):        
    letters = string.ascii_lowercase+string.ascii_uppercase+string.digits+string.punctuation+string.whitespace           
    random_string = ''.join(secrets.choice(letters) for i in range(size))
    encoded_string = random_string.encode(encoding="ascii")
    url_safe_string = base64.urlsafe_b64encode(encoded_string).decode()
    url_safe_string_no_padding = url_safe_string.replace('=','')
    return url_safe_string_no_padding

def save_tokens(response):
    with open(tokens_filename, "wt", encoding="utf8") as token_file:
        token_file.write(response)

state = random_string(23)
nonce = random_string(93)
code_verifier = random_string(93)
code_challenge = base64.urlsafe_b64encode(sha256(code_verifier.encode('ascii')).digest()).decode().replace('=','')
 
login_url = 'https://gateway.mit.dk/view/client/authorization/login?client_id=view-client-id-mobile-prod-1-id&response_type=code&scope=openid&state=' + state + '&code_challenge=' + code_challenge + '&code_challenge_method=S256&response_mode=query&nonce=' + nonce + '&redirect_uri=com.netcompany.mitdk://nem-callback&deviceName=digitalpost-utilities&deviceId=pc&lang=en_US' 

options = webdriver.ChromeOptions()
options.add_argument("--log-level=3")
driver = webdriver.Chrome(chrome_options=options)
login = driver.get(login_url)

print("Opening browser window. Log in to mit.dk using MitID or NemID in the browser.")
print("When you see a blank page in your browser at https://nemlog-in.mitid.dk/LoginOption.aspx, you're finished.")
input("Press ENTER once you're finished.")

session = requests.Session()

for request in driver.requests:
    session.cookies.set('cookiecheck', 'Test', domain='nemlog-in.mitid.dk')
    session.cookies.set('loginMethod', 'noeglekort', domain='nemlog-in.mitid.dk')
    for request in driver.requests:
        if '/api/mailboxes' in request.url and request.method == 'GET' and request.response.status_code == 200:
            cookies = request.headers['Cookie'].split("; ")
            for cookie in cookies:
                if 'LoggedInBorgerDk' in cookie or 'CorrelationId' in cookie:
                    key_value = cookie.split('=')
                    session.cookies.set(key_value[0], key_value[1], domain='.post.borger.dk')
        if request.response:
            headers_string = str(request.response.headers)
            headers_list = headers_string.split('\n')
            for header in headers_list:
                if 'set-cookie' in header:
                    cookie_string = header.replace('set-cookie: ','')
                    cookie = http.cookies.BaseCookie(cookie_string)
                    for key in cookie.keys():
                        # Requests is picky about dashes in cookie expiration dates. Fix.
                        if 'expires' in cookie[key]:
                            expiry = cookie[key]['expires']
                            if expiry:
                                expiry_list = list(expiry)
                                expiry_list[7] = '-'
                                expiry_list[11] = '-'
                                cookie[key]['expires'] = ''.join(expiry_list)
                    session.cookies.update(cookie)

        if request.method == 'POST' and request.url == 'https://nemlog-in.mitid.dk/LoginOption.aspx' and request.response.status_code == 200:
            if request.response.headers['content-encoding'] == 'gzip':
                response = gzip.decompress(request.response.body).decode()
            else:
                response = request.response.body.decode()
            soup = BeautifulSoup(response, "html.parser")
            input = soup.find_all('input', {"name":"SAMLResponse"})
            samlresponse = input[0]["value"]

driver.close()

request_code_part_one = session.post('https://gateway.digitalpost.dk/auth/s9/nemlogin/ssoack', data={'SAMLResponse': samlresponse}, allow_redirects=False)
request_code_part_one_redirect_location = request_code_part_one.headers['Location']
request_code_part_two = session.get(request_code_part_one_redirect_location, allow_redirects=False)
request_code_part_two_redirect_location = request_code_part_two.headers['Location']
request_code_part_three = session.get(request_code_part_two_redirect_location, allow_redirects=False)
request_code_part_three_redirect_location = request_code_part_three.headers['Location']
code_start = request_code_part_three_redirect_location.index('code=') + 5
code_end = request_code_part_three_redirect_location.index('&', code_start)
code = request_code_part_three_redirect_location[code_start:code_end]
redirect_url = 'com.netcompany.mitdk://nem-callback'
token_url = 'https://gateway.mit.dk/view/client/authorization/token?grant_type=authorization_code&redirect_uri=' + redirect_url + '&client_id=view-client-id-mobile-prod-1-id&code=' + code + '&code_verifier=' + code_verifier
request_tokens = session.post(token_url)
save_tokens(request_tokens.text)    
print('Login to mit.dk went fine.')
print(f'Tokens saved to {tokens_filename}.')

Program til at forny adgangstokens til siden, forespørge API’et om ny post og sende e-mails af sted

# Sends unread messages from mit.dk to an e-mail.
import requests
import json 
import smtplib										# Sending e-mails
from email.mime.multipart import MIMEMultipart		# Creating multipart e-mails
from email.mime.text import MIMEText				# Attaching text to e-mails
from email.mime.application import MIMEApplication	# Attaching files to e-mails
from email.utils import formataddr					# Used for correct encoding of senders with special characters in name (e.g. Københavns Kommune)
from mit_dk_configuration import email_data, tokens_filename

base_url = 'https://gateway.mit.dk/view/client/'
session = requests.Session()

def open_tokens():
    try:
        with open(tokens_filename, "r", encoding="utf8") as token_file:
            tokens = json.load(token_file)
            return tokens
    except:
        return print('Unable to open and parse token file. Did you run mit_dk_first_login.py?')
    
def revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/revoke?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dpp': {
            'token': mitdkToken,
            'token_type_hint': 'access_token'
        },
        'ngdp': {
            'token': ngdpToken,
            'token_type_hint': 'access_token'
        },
    }
    revoke_access_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_access_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old access tokens. Here is the response:")
        print(revoke_access_tokens.text)
    json_data = {
        'dpp': {
            'refresh_token': dppRefreshToken,
            'token_type_hint': 'refresh_token'
        },
        'ngdp': {
            'refresh_token': ngdpRefreshToken,
            'token_type_hint': 'refresh_token'
        },
    }        
    revoke_refresh_tokens = session.post(base_url + endpoint, json=json_data)
    if not revoke_refresh_tokens.status_code == 200:
        print("Something went wrong when trying to revoke old refresh tokens. Here is the response:")
        print(revoke_refresh_tokens.text)


def refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken):
    endpoint = 'authorization/refresh?client_id=view-client-id-mobile-prod-1-id'
    json_data = {
        'dppRefreshToken': dppRefreshToken,
        'ngdpRefreshToken': ngdpRefreshToken,
    }
    refresh = session.post(base_url + endpoint, json=json_data)    
    if not refresh.status_code == 200:
        print("Something went wrong trying to fetch new tokens.")
    refresh_json = refresh.json()
    if 'code' in refresh_json:
        print("Something went wrong trying to fetch new tokens. Here's the response:")
        print(refresh_json)
        return False
    else:
        with open(tokens_filename, "wt", encoding="utf8") as token_file:
            token_file.write(refresh.text)
        return refresh_json
        
def get_fresh_tokens_and_revoke_old_tokens():
    tokens = open_tokens()
    try:
        if 'dpp' in tokens:
            dppRefreshToken = tokens['dpp']['refresh_token']
            mitdkToken = tokens['dpp']['access_token']
        else:
            dppRefreshToken = tokens['refresh_token']
            mitdkToken = tokens['access_token']
        ngdpRefreshToken = tokens['ngdp']['refresh_token']
        ngdpToken = tokens['ngdp']['access_token']
        fresh_tokens = refresh_and_save_tokens(dppRefreshToken, ngdpRefreshToken)
        if fresh_tokens:
            revoke_old_tokens(mitdkToken, ngdpToken, dppRefreshToken, ngdpRefreshToken)
        return fresh_tokens
    except Exception as error:
        print(error)
        print('Unable to find tokens in token file. Try running mit_dk_first_login.py again.')
    
def get_simple_endpoint(endpoint):
    response = session.get(base_url + endpoint)
    return response.json()

def get_inbox_folders_and_build_query(mailbox_ids):
    endpoint = 'folders/query'
    json_data = {
        'mailboxes': {}
    }
    for mailbox in mailbox_ids:
        json_data['mailboxes'][mailbox['dataSource']] = mailbox['mailboxId']
    response = session.post(base_url + endpoint, json=json_data)    
    try:
        response_json = response.json()
    except:
        print('Unable to convert response to json. Here is the response:')
        print(response.text)
    folders = []
    for folder in response_json['folders']['INBOX']:
        folder_info = {
            'dataSource': folder['dataSource'],
            'foldersId': [folder['id']],
            'mailboxId': folder['mailboxId'],
            'startIndex': 0
        }
        folders.append(folder_info)
    return folders

def get_messages(folders):
    endpoint = 'messages/query'
    json_data = {
        'any': [],
        'folders': folders,
        'size': 20,
        'sortFields': ['receivedDateTime:DESC']
    }
    response = session.post(base_url + endpoint, json=json_data)    
    return response.json()   

def get_content(message):
    content = []
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    for document in message['documents']:
        doc_url = '/documents/' + document['id']
        for file in document['files']:
            encoding_format = file['encodingFormat']
            file_name = file['filename']
            file_url = '/files/' + file['id'] + '/content'
            file_content = session.get(base_url + endpoint + doc_url + file_url)
            content.append({
                'file_name': file_name,
                'encoding_format': encoding_format,
                'file_content': file_content
            })
    return content

def mark_as_read(message):
    endpoint = message['dataSource'] + '/mailboxes/' + message['mailboxId'] + '/messages/' + message['id']
    session.headers['If-Match'] = str(message['version'])
    json_data = {
        'read': True
    }
    mark_as_read = session.patch(base_url + endpoint, json=json_data)

mailserver_connect = False            
tokens = get_fresh_tokens_and_revoke_old_tokens()
if tokens:
    session.headers['mitdkToken'] = tokens['dpp']['access_token']
    session.headers['ngdpToken'] = tokens['ngdp']['access_token']
    session.headers['platform'] = 'web'
    mailboxes = get_simple_endpoint('mailboxes')
    mailbox_ids = []
    for mailboxes in mailboxes['groupedMailboxes']:
        for mailbox in mailboxes['mailboxes']:
            mailbox_info = {
                'dataSource': mailbox['dataSource'],
                'mailboxId': mailbox['id']
            }
            mailbox_ids.append(mailbox_info)
    folders = get_inbox_folders_and_build_query(mailbox_ids)
    messages = get_messages(folders)
    for message in messages['results']:
        if message['read'] == False:
            if mailserver_connect == False:
                server = smtplib.SMTP(email_data['emailserver'], email_data['emailserverport'])
                server.ehlo()
                server.starttls()
                server.login(email_data['emailusername'], email_data['emailpassword'])
                mailserver_connect  = True               
            label = message['label']
            sender = message['sender']['label']
            message_content = get_content(message)

            msg = MIMEMultipart('alternative')
            msg['From'] = formataddr((sender, email_data['emailfrom']))
            msg['To'] = email_data['emailto']
            msg['Subject'] = "mit.dk: " + label

            for content in message_content:
                if content['encoding_format'] == 'text/plain':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'plain')) 
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'text/html':
                    body = content['file_content'].text
                    msg.attach(MIMEText(body, 'html'))
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part) 
                elif content['encoding_format'] == 'application/pdf':   
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
                else:    
                    encoding_format = content['encoding_format']
                    print(f'Ny filtype {encoding_format}')
                    part = MIMEApplication(content['file_content'].content)
                    part.add_header('Content-Disposition', 'attachment', filename=content['file_name'])
                    msg.attach(part)
            print(f'Sender en mail fra mit.dk fra {sender} med emnet {label}')
            server.sendmail(email_data['emailfrom'], email_data['emailto'], msg.as_string())
            mark_as_read(message)
    if mailserver_connect:
        server.quit()