Merge pull request 'API 1.2.0' (#9) from authorization_tokens into master
continuous-integration/drone/push Build is passing Details

Reviewed-on: #9
pull/10/head
Inex Code 2022-02-16 16:13:37 +02:00
commit 72a9b11541
24 changed files with 1515 additions and 69 deletions

View File

@ -1,5 +1,10 @@
{
"python.formatting.provider": "black",
"python.linting.pylintEnabled": true,
"python.linting.enabled": true
"python.linting.enabled": true,
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@ -9,6 +9,7 @@ flask-swagger-ui
pytz
huey
gevent
mnemonic
pytest
coverage

View File

@ -13,11 +13,14 @@ from selfprivacy_api.resources.users import User, Users
from selfprivacy_api.resources.common import ApiVersion
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
from selfprivacy_api.resources.api_auth import auth as api_auth
from selfprivacy_api.restic_controller.tasks import huey, init_restic
from selfprivacy_api.migrations import run_migrations
from selfprivacy_api.utils.auth import is_token_valid
swagger_blueprint = get_swaggerui_blueprint(
"/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"}
)
@ -29,9 +32,6 @@ def create_app(test_config=None):
api = Api(app)
if test_config is None:
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
if app.config["AUTH_TOKEN"] is None:
raise ValueError("AUTH_TOKEN is not set")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
app.config["B2_BUCKET"] = os.environ.get("B2_BUCKET")
else:
@ -40,14 +40,20 @@ def create_app(test_config=None):
# Check bearer token
@app.before_request
def check_auth():
# Exclude swagger-ui
if not request.path.startswith("/api"):
# Exclude swagger-ui, /auth/new_device/authorize, /auth/recovery_token/use
if request.path.startswith("/api"):
pass
elif request.path.startswith("/auth/new_device/authorize"):
pass
elif request.path.startswith("/auth/recovery_token/use"):
pass
else:
auth = request.headers.get("Authorization")
if auth is None:
return jsonify({"error": "Missing Authorization header"}), 401
# Check if token is valid
if auth != "Bearer " + app.config["AUTH_TOKEN"]:
# Strip Bearer from auth header
auth = auth.replace("Bearer ", "")
if not is_token_valid(auth):
return jsonify({"error": "Invalid token"}), 401
api.add_resource(ApiVersion, "/api/version")
@ -56,12 +62,13 @@ def create_app(test_config=None):
app.register_blueprint(api_system)
app.register_blueprint(api_services)
app.register_blueprint(api_auth)
@app.route("/api/swagger.json")
def spec():
if app.config["ENABLE_SWAGGER"] == "1":
swag = swagger(app)
swag["info"]["version"] = "1.1.1"
swag["info"]["version"] = "1.2.0"
swag["info"]["title"] = "SelfPrivacy API"
swag["info"]["description"] = "SelfPrivacy API"
swag["securityDefinitions"] = {

View File

@ -1,7 +1,18 @@
"""Migrations module.
Migrations module is introduced in v1.1.1 and provides one-shot
migrations which cannot be performed from the NixOS configuration file changes.
These migrations are checked and ran before every start of the API.
You can disable certain migrations if needed by creating an array
at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
migrations = [FixNixosConfigBranch()]
migrations = [FixNixosConfigBranch(), CreateTokensJson()]
def run_migrations():
@ -25,7 +36,7 @@ def run_migrations():
try:
if migration.is_migration_needed():
migration.migrate()
except Exception as e:
except Exception as err:
print(f"Error while migrating {migration.get_migration_name()}")
print(e)
print(err)
print("Skipping this migration")

View File

@ -0,0 +1,58 @@
from datetime import datetime
import os
import json
from pathlib import Path
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import TOKENS_FILE, ReadUserData
class CreateTokensJson(Migration):
def get_migration_name(self):
return "create_tokens_json"
def get_migration_description(self):
return """Selfprivacy API used a single token in userdata.json for authentication.
This migration creates a new tokens.json file with the old token in it.
This migration runs if the tokens.json file does not exist.
Old token is located at ["api"]["token"] in userdata.json.
tokens.json path is declared in TOKENS_FILE imported from utils.py
tokens.json must have the following format:
{
"tokens": [
{
"token": "token_string",
"name": "Master Token",
"date": "current date from str(datetime.now())",
}
]
}
tokens.json must have 0600 permissions.
"""
def is_migration_needed(self):
return not os.path.exists(TOKENS_FILE)
def migrate(self):
try:
print(f"Creating tokens.json file at {TOKENS_FILE}")
with ReadUserData() as userdata:
token = userdata["api"]["token"]
# Touch tokens.json with 0600 permissions
Path(TOKENS_FILE).touch(mode=0o600)
# Write token to tokens.json
structure = {
"tokens": [
{
"token": token,
"name": "primary_token",
"date": str(datetime.now()),
}
]
}
with open(TOKENS_FILE, "w", encoding="utf-8") as tokens:
json.dump(structure, tokens, indent=4)
print("Done")
except Exception as e:
print(e)
print("Error creating tokens.json")

View File

@ -24,10 +24,7 @@ class FixNixosConfigBranch(Migration):
["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True
)
os.chdir(current_working_directory)
if nixos_config_branch.decode("utf-8").strip() == "rolling-testing":
return True
else:
return False
return nixos_config_branch.decode("utf-8").strip() == "rolling-testing"
except subprocess.CalledProcessError:
os.chdir(current_working_directory)
return False

View File

@ -1,16 +1,16 @@
from abc import ABC, abstractmethod
"""
Abstract Migration class
This class is used to define the structure of a migration
Migration has a function is_migration_needed() that returns True or False
Migration has a function migrate() that does the migration
Migration has a function get_migration_name() that returns the migration name
Migration has a function get_migration_description() that returns the migration description
"""
class Migration(ABC):
"""
Abstract Migration class
This class is used to define the structure of a migration
Migration has a function is_migration_needed() that returns True or False
Migration has a function migrate() that does the migration
Migration has a function get_migration_name() that returns the migration name
Migration has a function get_migration_description() that returns the migration description
"""
@abstractmethod
def get_migration_name(self):
pass

View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""API authentication module"""
from flask import Blueprint
from flask_restful import Api
auth = Blueprint("auth", __name__, url_prefix="/auth")
api = Api(auth)
from . import (
new_device,
recovery_token,
app_tokens,
)

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""App tokens management module"""
from flask import request
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api
from selfprivacy_api.utils.auth import (
delete_token,
get_tokens_info,
is_token_name_exists,
is_token_name_pair_valid,
refresh_token,
get_token_name,
)
class Tokens(Resource):
"""Token management class
GET returns the list of active devices.
DELETE invalidates token unless it is the last one or the caller uses this token.
POST refreshes the token of the caller.
"""
def get(self):
"""
Get current device tokens
---
tags:
- Tokens
security:
- bearerAuth: []
responses:
200:
description: List of tokens
400:
description: Bad request
"""
caller_name = get_token_name(request.headers.get("Authorization").split(" ")[1])
tokens = get_tokens_info()
# Retrun a list of tokens and if it is the caller's token
# it will be marked with a flag
return [
{
"name": token["name"],
"date": token["date"],
"is_caller": token["name"] == caller_name,
}
for token in tokens
]
def delete(self):
"""
Delete token
---
tags:
- Tokens
security:
- bearerAuth: []
parameters:
- in: body
name: token
required: true
description: Token's name to delete
schema:
type: object
properties:
token:
type: string
description: Token name to delete
required: true
responses:
200:
description: Token deleted
400:
description: Bad request
404:
description: Token not found
"""
parser = reqparse.RequestParser()
parser.add_argument(
"token_name", type=str, required=True, help="Token to delete"
)
args = parser.parse_args()
token_name = args["token_name"]
if is_token_name_pair_valid(
token_name, request.headers.get("Authorization").split(" ")[1]
):
return {"message": "Cannot delete caller's token"}, 400
if not is_token_name_exists(token_name):
return {"message": "Token not found"}, 404
delete_token(token_name)
return {"message": "Token deleted"}, 200
def post(self):
"""
Refresh token
---
tags:
- Tokens
security:
- bearerAuth: []
responses:
200:
description: Token refreshed
400:
description: Bad request
404:
description: Token not found
"""
# Get token from header
token = request.headers.get("Authorization").split(" ")[1]
new_token = refresh_token(token)
if new_token is None:
return {"message": "Token not found"}, 404
return {"token": new_token}, 200
api.add_resource(Tokens, "/tokens")

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""New device auth module"""
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api
from selfprivacy_api.utils.auth import (
get_new_device_auth_token,
use_new_device_auth_token,
delete_new_device_auth_token,
)
class NewDevice(Resource):
"""New device auth class
POST returns a new token for the caller.
"""
def post(self):
"""
Get new device token
---
tags:
- Tokens
security:
- bearerAuth: []
responses:
200:
description: New device token
400:
description: Bad request
"""
token = get_new_device_auth_token()
return {"token": token}
def delete(self):
"""
Delete new device token
---
tags:
- Tokens
security:
- bearerAuth: []
responses:
200:
description: New device token deleted
400:
description: Bad request
"""
delete_new_device_auth_token()
return {"token": None}
class AuthorizeDevice(Resource):
"""Authorize device class
POST authorizes the caller.
"""
def post(self):
"""
Authorize device
---
tags:
- Tokens
parameters:
- in: body
name: data
required: true
description: Who is authorizing
schema:
type: object
properties:
token:
type: string
description: Mnemonic token to authorize
device:
type: string
description: Device to authorize
responses:
200:
description: Device authorized
400:
description: Bad request
404:
description: Token not found
"""
parser = reqparse.RequestParser()
parser.add_argument(
"token", type=str, required=True, help="Mnemonic token to authorize"
)
parser.add_argument(
"device", type=str, required=True, help="Device to authorize"
)
args = parser.parse_args()
auth_token = args["token"]
device = args["device"]
token = use_new_device_auth_token(auth_token, device)
if token is None:
return {"message": "Token not found"}, 404
return {"message": "Device authorized", "token": token}, 200
api.add_resource(NewDevice, "/new_device")
api.add_resource(AuthorizeDevice, "/new_device/authorize")

View File

@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""Recovery token module"""
from datetime import datetime
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api
from selfprivacy_api.utils.auth import (
is_recovery_token_exists,
is_recovery_token_valid,
get_recovery_token_status,
generate_recovery_token,
use_mnemonic_recoverery_token,
)
class RecoveryToken(Resource):
"""Recovery token class
GET returns the status of the recovery token.
POST generates a new recovery token.
"""
def get(self):
"""
Get recovery token status
---
tags:
- Tokens
security:
- bearerAuth: []
responses:
200:
description: Recovery token status
schema:
type: object
properties:
exists:
type: boolean
description: Recovery token exists
valid:
type: boolean
description: Recovery token is valid
date:
type: string
description: Recovery token date
expiration:
type: string
description: Recovery token expiration date
uses_left:
type: integer
description: Recovery token uses left
400:
description: Bad request
"""
if not is_recovery_token_exists():
return {
"exists": False,
"valid": False,
"date": None,
"expiration": None,
"uses_left": None,
}
status = get_recovery_token_status()
if not is_recovery_token_valid():
return {
"exists": True,
"valid": False,
"date": status["date"],
"expiration": status["expiration"],
"uses_left": status["uses_left"],
}
return {
"exists": True,
"valid": True,
"date": status["date"],
"expiration": status["expiration"],
"uses_left": status["uses_left"],
}
def post(self):
"""
Generate recovery token
---
tags:
- Tokens
security:
- bearerAuth: []
parameters:
- in: body
name: data
required: true
description: Token data
schema:
type: object
properties:
expiration:
type: string
description: Token expiration date
uses:
type: integer
description: Token uses
responses:
200:
description: Recovery token generated
schema:
type: object
properties:
token:
type: string
description: Mnemonic recovery token
400:
description: Bad request
"""
parser = reqparse.RequestParser()
parser.add_argument(
"expiration", type=str, required=False, help="Token expiration date"
)
parser.add_argument("uses", type=int, required=False, help="Token uses")
args = parser.parse_args()
# Convert expiration date to datetime and return 400 if it is not valid
if args["expiration"]:
try:
expiration = datetime.strptime(
args["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
# Retrun 400 if expiration date is in the past
if expiration < datetime.now():
return {"message": "Expiration date cannot be in the past"}, 400
except ValueError:
return {
"error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSSZ"
}, 400
else:
expiration = None
if args["uses"] != None and args["uses"] < 1:
return {"message": "Uses must be greater than 0"}, 400
# Generate recovery token
token = generate_recovery_token(expiration, args["uses"])
return {"token": token}
class UseRecoveryToken(Resource):
"""Use recovery token class
POST uses the recovery token.
"""
def post(self):
"""
Use recovery token
---
tags:
- Tokens
parameters:
- in: body
name: data
required: true
description: Token data
schema:
type: object
properties:
token:
type: string
description: Mnemonic recovery token
device:
type: string
description: Device to authorize
responses:
200:
description: Recovery token used
schema:
type: object
properties:
token:
type: string
description: Device authorization token
400:
description: Bad request
404:
description: Token not found
"""
parser = reqparse.RequestParser()
parser.add_argument(
"token", type=str, required=True, help="Mnemonic recovery token"
)
parser.add_argument(
"device", type=str, required=True, help="Device to authorize"
)
args = parser.parse_args()
# Use recovery token
token = use_mnemonic_recoverery_token(args["token"], args["device"])
if token is None:
return {"error": "Token not found"}, 404
return {"token": token}
api.add_resource(RecoveryToken, "/recovery_token")
api.add_resource(UseRecoveryToken, "/recovery_token/use")

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""Unassigned views"""
import subprocess
from flask_restful import Resource, reqparse
from flask_restful import Resource
class ApiVersion(Resource):
@ -24,4 +23,4 @@ class ApiVersion(Resource):
401:
description: Unauthorized
"""
return {"version": "1.1.1"}
return {"version": "1.2.0"}

View File

@ -212,17 +212,16 @@ class SSHKeys(Resource):
if "sshKeys" not in data:
data["sshKeys"] = []
return data["sshKeys"]
else:
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["sshKeys"]
return {
"error": "User not found",
}, 404
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["sshKeys"]
return {
"error": "User not found",
}, 404
def post(self, username):
"""

View File

@ -283,6 +283,8 @@ class PythonVersion(Resource):
class PullRepositoryChanges(Resource):
"""Pull NixOS config repository changes"""
def get(self):
"""
Pull Repository Changes
@ -324,12 +326,11 @@ class PullRepositoryChanges(Resource):
"message": "Update completed successfully",
"data": data,
}
elif git_pull_process_descriptor.returncode > 0:
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong",
"data": data,
}, 500
return {
"status": git_pull_process_descriptor.returncode,
"message": "Something went wrong",
"data": data,
}, 500
api.add_resource(Timezone, "/configuration/timezone")

View File

@ -105,10 +105,7 @@ class ResticController:
"--json",
]
if (
self.state == ResticStates.BACKING_UP
or self.state == ResticStates.RESTORING
):
if self.state in (ResticStates.BACKING_UP, ResticStates.RESTORING):
return
with subprocess.Popen(
backup_listing_command,
@ -129,10 +126,9 @@ class ResticController:
if "Is there a repository at the following location?" in snapshots_list:
self.state = ResticStates.NOT_INITIALIZED
return
else:
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
def initialize_repository(self):
"""
@ -195,10 +191,7 @@ class ResticController:
"""
backup_status_check_command = ["tail", "-1", "/var/backup.log"]
if (
self.state == ResticStates.NO_KEY
or self.state == ResticStates.NOT_INITIALIZED
):
if self.state in (ResticStates.NO_KEY, ResticStates.NOT_INITIALIZED):
return
# If the log file does not exists

View File

@ -1,13 +1,22 @@
#!/usr/bin/env python3
"""Various utility functions"""
from enum import Enum
import json
import portalocker
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
DOMAIN_FILE = "/var/domain"
class UserDataFiles(Enum):
"""Enum for userdata files"""
USERDATA = 0
TOKENS = 1
def get_domain():
"""Get domain from /var/domain without trailing new line"""
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
@ -18,8 +27,13 @@ def get_domain():
class WriteUserData(object):
"""Write userdata.json with lock"""
def __init__(self):
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
self.data = json.load(self.userdata_file)
@ -38,8 +52,13 @@ class WriteUserData(object):
class ReadUserData(object):
"""Read userdata.json with lock"""
def __init__(self):
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)

View File

@ -0,0 +1,317 @@
#!/usr/bin/env python3
"""Token management utils"""
import secrets
from datetime import datetime, timedelta
import re
from mnemonic import Mnemonic
from . import ReadUserData, UserDataFiles, WriteUserData
"""
Token are stored in the tokens.json file.
File contains device tokens, recovery token and new device auth token.
File structure:
{
"tokens": [
{
"token": "device token",
"name": "device name",
"date": "date of creation",
}
],
"recovery_token": {
"token": "recovery token",
"date": "date of creation",
"expiration": "date of expiration",
"uses_left": "number of uses left"
},
"new_device": {
"token": "new device auth token",
"date": "date of creation",
"expiration": "date of expiration",
}
}
Recovery token may or may not have expiration date and uses_left.
There may be no recovery token at all.
Device tokens must be unique.
"""
def _get_tokens():
"""Get all tokens as list of tokens of every device"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
return [token["token"] for token in tokens["tokens"]]
def _get_token_names():
"""Get all token names"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
return [t["name"] for t in tokens["tokens"]]
def _validate_token_name(name):
"""Token name must be an alphanumeric string and not empty.
Replace invalid characters with '_'
If token name exists, add a random number to the end of the name until it is unique.
"""
if not re.match("^[a-zA-Z0-9]*$", name):
name = re.sub("[^a-zA-Z0-9]", "_", name)
if name == "":
name = "Unknown device"
while name in _get_token_names():
name += str(secrets.randbelow(10))
return name
def is_token_valid(token):
"""Check if token is valid"""
if token in _get_tokens():
return True
return False
def is_token_name_exists(token_name):
"""Check if token name exists"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
return token_name in [t["name"] for t in tokens["tokens"]]
def is_token_name_pair_valid(token_name, token):
"""Check if token name and token pair exists"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
for t in tokens["tokens"]:
if t["name"] == token_name and t["token"] == token:
return True
return False
def get_token_name(token):
"""Return the name of the token provided"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
for t in tokens["tokens"]:
if t["token"] == token:
return t["name"]
return None
def get_tokens_info():
"""Get all tokens info without tokens themselves"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
return [
{"name": token["name"], "date": token["date"]} for token in tokens["tokens"]
]
def _generate_token():
"""Generates new token and makes sure it is unique"""
token = secrets.token_urlsafe(32)
while token in _get_tokens():
token = secrets.token_urlsafe(32)
return token
def create_token(name):
"""Create new token"""
token = _generate_token()
name = _validate_token_name(name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["tokens"].append(
{
"token": token,
"name": name,
"date": str(datetime.now()),
}
)
return token
def delete_token(token_name):
"""Delete token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["tokens"] = [t for t in tokens["tokens"] if t["name"] != token_name]
def refresh_token(token):
"""Change the token field of the existing token"""
new_token = _generate_token()
with WriteUserData(UserDataFiles.TOKENS) as tokens:
for t in tokens["tokens"]:
if t["token"] == token:
t["token"] = new_token
return new_token
return None
def is_recovery_token_exists():
"""Check if recovery token exists"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
return "recovery_token" in tokens
def is_recovery_token_valid():
"""Check if recovery token is valid"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
if "recovery_token" not in tokens:
return False
recovery_token = tokens["recovery_token"]
if "uses_left" in recovery_token and recovery_token["uses_left"] is not None:
if recovery_token["uses_left"] <= 0:
return False
if "expiration" not in recovery_token or recovery_token["expiration"] is None:
return True
return datetime.now() < datetime.strptime(
recovery_token["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
def get_recovery_token_status():
"""Get recovery token date of creation, expiration and uses left"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
if "recovery_token" not in tokens:
return None
recovery_token = tokens["recovery_token"]
return {
"date": recovery_token["date"],
"expiration": recovery_token["expiration"]
if "expiration" in recovery_token
else None,
"uses_left": recovery_token["uses_left"]
if "uses_left" in recovery_token
else None,
}
def _get_recovery_token():
"""Get recovery token"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
if "recovery_token" not in tokens:
return None
return tokens["recovery_token"]["token"]
def generate_recovery_token(expiration=None, uses_left=None):
"""Generate a 24 bytes recovery token and return a mneomnic word list.
Write a string representation of the recovery token to the tokens.json file.
"""
# expires must be a date or None
# uses_left must be an integer or None
if expiration is not None:
if not isinstance(expiration, datetime):
raise TypeError("expires must be a datetime object")
if uses_left is not None:
if not isinstance(uses_left, int):
raise TypeError("uses_left must be an integer")
if uses_left <= 0:
raise ValueError("uses_left must be greater than 0")
recovery_token = secrets.token_bytes(24)
recovery_token_str = recovery_token.hex()
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["recovery_token"] = {
"token": recovery_token_str,
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")),
"expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if expiration is not None
else None,
"uses_left": uses_left if uses_left is not None else None,
}
return Mnemonic(language="english").to_mnemonic(recovery_token)
def use_mnemonic_recoverery_token(mnemonic_phrase, name):
"""Use the recovery token by converting the mnemonic word list to a byte array.
If the recovery token if invalid itself, return None
If the binary representation of phrase not matches
the byte array of the recovery token, return None.
If the mnemonic phrase is valid then generate a device token and return it.
Substract 1 from uses_left if it exists.
mnemonic_phrase is a string representation of the mnemonic word list.
"""
if not is_recovery_token_valid():
return None
recovery_token_str = _get_recovery_token()
if recovery_token_str is None:
return None
recovery_token = bytes.fromhex(recovery_token_str)
if not Mnemonic(language="english").check(mnemonic_phrase):
return None
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if phrase_bytes != recovery_token:
return None
token = _generate_token()
name = _validate_token_name(name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["tokens"].append(
{
"token": token,
"name": name,
"date": str(datetime.now()),
}
)
if "recovery_token" in tokens:
if (
"uses_left" in tokens["recovery_token"]
and tokens["recovery_token"]["uses_left"] is not None
):
tokens["recovery_token"]["uses_left"] -= 1
return token
def get_new_device_auth_token():
"""Generate a new device auth token which is valid for 10 minutes
and return a mnemonic phrase representation
Write token to the new_device of the tokens.json file.
"""
token = secrets.token_bytes(16)
token_str = token.hex()
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["new_device"] = {
"token": token_str,
"date": str(datetime.now()),
"expiration": str(datetime.now() + timedelta(minutes=10)),
}
return Mnemonic(language="english").to_mnemonic(token)
def _get_new_device_auth_token():
"""Get new device auth token. If it is expired, return None"""
with ReadUserData(UserDataFiles.TOKENS) as tokens:
if "new_device" not in tokens:
return None
new_device = tokens["new_device"]
if "expiration" not in new_device:
return None
if datetime.now() > datetime.strptime(
new_device["expiration"], "%Y-%m-%d %H:%M:%S.%f"
):
return None
return new_device["token"]
def delete_new_device_auth_token():
"""Delete new device auth token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if "new_device" in tokens:
del tokens["new_device"]
def use_new_device_auth_token(mnemonic_phrase, name):
"""Use the new device auth token by converting the mnemonic string to a byte array.
If the mnemonic phrase is valid then generate a device token and return it.
New device auth token must be deleted.
"""
token_str = _get_new_device_auth_token()
if token_str is None:
return None
token = bytes.fromhex(token_str)
if not Mnemonic(language="english").check(mnemonic_phrase):
return None
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if phrase_bytes != token:
return None
token = create_token(name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if "new_device" in tokens:
del tokens["new_device"]
return token

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="selfprivacy_api",
version="1.1.0",
version="1.2.0",
packages=find_packages(),
scripts=[
"selfprivacy_api/app.py",

30
shell.nix Normal file
View File

@ -0,0 +1,30 @@
{ pkgs ? import <nixpkgs> {} }:
let
sp-python = pkgs.python39.withPackages (p: with p; [
flask
flask-restful
setuptools
portalocker
flask-swagger
flask-swagger-ui
pytz
pytest
pytest-mock
pytest-datadir
huey
gevent
mnemonic
coverage
pylint
]);
in
pkgs.mkShell {
buildInputs = [
sp-python
pkgs.black
];
shellHook = ''
PYTHONPATH=${sp-python}/${sp-python.sitePackages}
# maybe set more env-vars
'';
}

View File

@ -3,12 +3,19 @@ from flask import testing
from selfprivacy_api.app import create_app
@pytest.fixture
def tokens_file(mocker, shared_datadir):
mock = mocker.patch(
"selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json"
)
return mock
@pytest.fixture
def app():
app = create_app(
{
"AUTH_TOKEN": "TEST_TOKEN",
"ENABLE_SWAGGER": "0",
"ENABLE_SWAGGER": "1",
}
)
@ -16,7 +23,7 @@ def app():
@pytest.fixture
def client(app):
def client(app, tokens_file):
return app.test_client()
@ -45,17 +52,17 @@ class WrongAuthClient(testing.FlaskClient):
@pytest.fixture
def authorized_client(app):
def authorized_client(app, tokens_file):
app.test_client_class = AuthorizedClient
return app.test_client()
@pytest.fixture
def wrong_auth_client(app):
def wrong_auth_client(app, tokens_file):
app.test_client_class = WrongAuthClient
return app.test_client()
@pytest.fixture
def runner(app):
def runner(app, tokens_file):
return app.test_cli_runner()

14
tests/data/tokens.json Normal file
View File

@ -0,0 +1,14 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -0,0 +1,9 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "Test Token",
"date": "2022-01-14 08:31:10.789314"
}
]
}

528
tests/test_auth.py Normal file
View File

@ -0,0 +1,528 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import datetime
import json
import re
import pytest
from mnemonic import Mnemonic
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4)
def test_get_tokens_info(authorized_client, tokens_file):
response = authorized_client.get("/auth/tokens")
assert response.status_code == 200
assert response.json == [
{"name": "test_token", "date": "2022-01-14 08:31:10.789314", "is_caller": True},
{
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
"is_caller": False,
},
]
def test_get_tokens_unauthorized(client, tokens_file):
response = client.get("/auth/tokens")
assert response.status_code == 401
def test_delete_token_unauthorized(client, tokens_file):
response = client.delete("/auth/tokens")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token2"}
)
assert response.status_code == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
}
def test_delete_self_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token"}
)
assert response.status_code == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_nonexistent_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token3"}
)
assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_refresh_token_unauthorized(client, tokens_file):
response = client.post("/auth/tokens")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens")
assert response.status_code == 200
new_token = response.json["token"]
assert read_json(tokens_file)["tokens"][0]["token"] == new_token
# new device
def test_get_new_device_auth_token_unauthorized(client, tokens_file):
response = client.get("/auth/new_device")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_get_new_device_auth_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
def test_get_and_delete_new_device_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.delete(
"/auth/new_device", json={"token": response.json["token"]}
)
assert response.status_code == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_token_unauthenticated(client, tokens_file):
response = client.delete("/auth/new_device")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json["token"], "device": "new_device"},
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_authorize_new_device_with_invalid_token(client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"token": "invalid_token", "device": "new_device"},
)
assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json["token"], "device": "new_device"},
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json["token"], "device": "new_device"},
)
assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file
):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
file_data = read_json(tokens_file)
file_data["new_device"]["expiration"] = str(
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json["token"], "device": "new_device"},
)
assert response.status_code == 404
def test_authorize_without_token(client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"device": "new_device"},
)
assert response.status_code == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
# Recovery tokens
# GET /auth/recovery_token returns token status
# - if token is valid, returns 200 and token status
# - token status:
# - exists (boolean)
# - valid (boolean)
# - date (string)
# - expiration (string)
# - uses_left (int)
# - if token is invalid, returns 400 and empty body
# POST /auth/recovery_token generates a new token
# has two optional parameters:
# - expiration (string in datetime format)
# - uses_left (int)
# POST /auth/recovery_token/use uses the token
# required arguments:
# - token (string)
# - device (string)
# - if token is valid, returns 200 and token
# - if token is invalid, returns 404
# - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, tokens_file):
response = client.get("/auth/recovery_token")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": False,
"valid": False,
"date": None,
"expiration": None,
"uses_left": None,
}
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_generate_recovery_token(authorized_client, client, tokens_file):
# Generate token without expiration and uses_left
response = authorized_client.post("/auth/recovery_token")
assert response.status_code == 200
assert "token" in response.json
mnemonic_token = response.json["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": True,
"date": time_generated,
"expiration": None,
"uses_left": None,
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
def test_generate_recovery_token_with_expiration_date(
authorized_client, client, tokens_file
):
# Generate token with expiration date
# Generate expiration date in the future
# Expiration date format is YYYY-MM-DDTHH:MM:SS.SSSZ
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 200
assert "token" in response.json
mnemonic_token = response.json["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": True,
"date": time_generated,
"expiration": expiration_date_str,
"uses_left": None,
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
write_json(tokens_file, new_data)
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device3"},
)
assert recovery_response.status_code == 404
# Assert that the token was not created in JSON
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
# Get the status of the token
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": False,
"date": time_generated,
"expiration": new_data["recovery_token"]["expiration"],
"uses_left": None,
}
def test_generate_recovery_token_with_expiration_in_the_past(
authorized_client, client, tokens_file
):
# Server must return 400 if expiration date is in the past
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
def test_generate_recovery_token_with_invalid_time_format(
authorized_client, client, tokens_file
):
# Server must return 400 if expiration date is in the past
expiration_date = "invalid_time_format"
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
def test_generate_recovery_token_with_limited_uses(
authorized_client, client, tokens_file
):
# Generate token with limited uses
response = authorized_client.post(
"/auth/recovery_token",
json={"uses": 2},
)
assert response.status_code == 200
assert "token" in response.json
mnemonic_token = response.json["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
# Get the date of the token
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": True,
"date": time_generated,
"expiration": None,
"uses_left": 2,
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1
# Get the status of the token
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": True,
"date": time_generated,
"expiration": None,
"uses_left": 1,
}
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Get the status of the token
response = client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json == {
"exists": True,
"valid": False,
"date": time_generated,
"expiration": None,
"uses_left": 0,
}
# Try to use token after limited uses
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device3"},
)
assert recovery_response.status_code == 404
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0
def test_generate_recovery_token_with_negative_uses(
authorized_client, client, tokens_file
):
# Generate token with limited uses
response = authorized_client.post(
"/auth/recovery_token",
json={"uses": -2},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
# Generate token with limited uses
response = authorized_client.post(
"/auth/recovery_token",
json={"uses": 0},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)

View File

@ -3,6 +3,8 @@
import json
import pytest
from selfprivacy_api.utils import WriteUserData, ReadUserData
def test_get_api_version(authorized_client):
response = authorized_client.get("/api/version")
@ -14,3 +16,21 @@ def test_get_api_version_unauthorized(client):
response = client.get("/api/version")
assert response.status_code == 200
assert "version" in response.get_json()
def test_get_swagger_json(authorized_client):
response = authorized_client.get("/api/swagger.json")
assert response.status_code == 200
assert "swagger" in response.get_json()
def test_read_invalid_user_data():
with pytest.raises(ValueError):
with ReadUserData("invalid") as user_data:
pass
def test_write_invalid_user_data():
with pytest.raises(ValueError):
with WriteUserData("invalid") as user_data:
pass