diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index 594217b..6deba7c 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -23,7 +23,7 @@ from selfprivacy_api.migrations import run_migrations from selfprivacy_api.utils.auth import is_token_valid -from selfprivacy_api.graphql.query import schema +from selfprivacy_api.graphql import schema swagger_blueprint = get_swaggerui_blueprint( "/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"} diff --git a/selfprivacy_api/graphql/__init__.py b/selfprivacy_api/graphql/__init__.py index e69de29..762a324 100644 --- a/selfprivacy_api/graphql/__init__.py +++ b/selfprivacy_api/graphql/__init__.py @@ -0,0 +1,37 @@ +"""GraphQL API for SelfPrivacy.""" +# pylint: disable=too-few-public-methods +import typing +import strawberry +from strawberry.permission import BasePermission +from strawberry.types import Info +from flask import request + +from selfprivacy_api.graphql.queries.api import Api +from selfprivacy_api.graphql.queries.system import System +from selfprivacy_api.utils.auth import is_token_valid + +class IsAuthenticated(BasePermission): + """Is authenticated permission""" + message = "You must be authenticated to access this resource." + + def has_permission(self, source: typing.Any, info: Info, **kwargs) -> bool: + auth = request.headers.get("Authorization") + if auth is None: + return False + # Strip Bearer from auth header + auth = auth.replace("Bearer ", "") + if not is_token_valid(auth): + return False + return True + + +@strawberry.type +class Query: + """Root schema for queries""" + system: System + @strawberry.field(permission_classes=[IsAuthenticated]) + def api(self) -> Api: + """API access status""" + return Api() + +schema = strawberry.Schema(query=Query) diff --git a/selfprivacy_api/graphql/queries/api.py b/selfprivacy_api/graphql/queries/api.py index dbea5a1..a79222a 100644 --- a/selfprivacy_api/graphql/queries/api.py +++ b/selfprivacy_api/graphql/queries/api.py @@ -1,11 +1,74 @@ """API access status""" # pylint: disable=too-few-public-methods +import datetime import typing +from flask import request import strawberry +from selfprivacy_api.utils import parse_date -from selfprivacy_api.graphql.queries.api_fields import ApiDevice, ApiRecoveryKeyStatus -from selfprivacy_api.resolvers.api import get_api_version, get_devices, get_recovery_key_status +from selfprivacy_api.utils.auth import ( + get_recovery_token_status, + get_tokens_info, + is_recovery_token_exists, + is_recovery_token_valid, + is_token_name_exists, + is_token_name_pair_valid, + refresh_token, + get_token_name, +) +def get_api_version() -> str: + """Get API version""" + return "1.2.7" + +@strawberry.type +class ApiDevice: + """A single device with SelfPrivacy app installed""" + name: str + creation_date: datetime.datetime + is_caller: bool + +def get_devices() -> typing.List[ApiDevice]: + """Get list of devices""" + caller_name = get_token_name(request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None) + tokens = get_tokens_info() + return [ + ApiDevice( + name=token["name"], + creation_date=parse_date(token["date"]), + is_caller=token["name"] == caller_name, + ) + for token in tokens + ] + + +@strawberry.type +class ApiRecoveryKeyStatus: + """Recovery key status""" + exists: bool + valid: bool + creation_date: typing.Optional[datetime.datetime] + expiration_date: typing.Optional[datetime.datetime] + uses_left: typing.Optional[int] + +def get_recovery_key_status() -> ApiRecoveryKeyStatus: + """Get recovery key status""" + if not is_recovery_token_exists(): + return ApiRecoveryKeyStatus( + exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None + ) + status = get_recovery_token_status() + if status is None: + return ApiRecoveryKeyStatus( + exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None + ) + return ApiRecoveryKeyStatus( + exists=True, + valid=is_recovery_token_valid(), + creation_date=parse_date(status["date"]), + expiration_date=parse_date(status["expiration"]) if status["expiration"] is not None else None, + uses_left=status["uses_left"] if status["uses_left"] is not None else None, + ) @strawberry.type class Api: diff --git a/selfprivacy_api/graphql/queries/api_fields.py b/selfprivacy_api/graphql/queries/api_fields.py deleted file mode 100644 index 2e46d50..0000000 --- a/selfprivacy_api/graphql/queries/api_fields.py +++ /dev/null @@ -1,22 +0,0 @@ -"""API access status""" -# pylint: disable=too-few-public-methods -import datetime -import typing -import strawberry - - -@strawberry.type -class ApiDevice: - """A single device with SelfPrivacy app installed""" - name: str - creation_date: datetime.datetime - is_caller: bool - -@strawberry.type -class ApiRecoveryKeyStatus: - """Recovery key status""" - exists: bool - valid: bool - creation_date: typing.Optional[datetime.datetime] - expiration_date: typing.Optional[datetime.datetime] - uses_left: typing.Optional[int] diff --git a/selfprivacy_api/graphql/query.py b/selfprivacy_api/graphql/query.py deleted file mode 100644 index 80b3d78..0000000 --- a/selfprivacy_api/graphql/query.py +++ /dev/null @@ -1,18 +0,0 @@ -"""GraphQL API for SelfPrivacy.""" -# pylint: disable=too-few-public-methods -import typing -import strawberry -from selfprivacy_api.graphql.queries.api import Api - -from selfprivacy_api.graphql.queries.system import System - -@strawberry.type -class Query: - """Root schema for queries""" - system: System - @strawberry.field - def api(self) -> Api: - """API access status""" - return Api() - -schema = strawberry.Schema(query=Query) diff --git a/selfprivacy_api/resolvers/__init__.py b/selfprivacy_api/resolvers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/selfprivacy_api/resolvers/api.py b/selfprivacy_api/resolvers/api.py deleted file mode 100644 index c6068b8..0000000 --- a/selfprivacy_api/resolvers/api.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Resolvers for API module""" -import datetime -import typing -from flask import request - -from selfprivacy_api.graphql.queries.api_fields import ApiDevice, ApiRecoveryKeyStatus - -from selfprivacy_api.utils.auth import ( - get_recovery_token_status, - get_tokens_info, - is_recovery_token_exists, - is_recovery_token_valid, - is_token_name_exists, - is_token_name_pair_valid, - refresh_token, - get_token_name, -) - -def parse_date(date_str: str) -> datetime.datetime: - """Parse date string which can be in - %Y-%m-%dT%H:%M:%S.%fZ or %Y-%m-%d %H:%M:%S.%f format""" - return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") if date_str.endswith("Z") else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") - -def get_api_version() -> str: - """Get API version""" - return "1.2.7" - -def get_devices() -> typing.List[ApiDevice]: - """Get list of devices""" - caller_name = get_token_name(request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None) - tokens = get_tokens_info() - return [ - ApiDevice( - name=token["name"], - creation_date=parse_date(token["date"]), - is_caller=token["name"] == caller_name, - ) - for token in tokens - ] - -def get_recovery_key_status() -> ApiRecoveryKeyStatus: - """Get recovery key status""" - if not is_recovery_token_exists(): - return ApiRecoveryKeyStatus( - exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None - ) - status = get_recovery_token_status() - if status is None: - return ApiRecoveryKeyStatus( - exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None - ) - return ApiRecoveryKeyStatus( - exists=True, - valid=is_recovery_token_valid(), - creation_date=parse_date(status["date"]), - expiration_date=parse_date(status["expiration"]) if status["expiration"] is not None else None, - uses_left=status["uses_left"] if status["uses_left"] is not None else None, - ) diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index 7c8937b..93c1dde 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """Unassigned views""" from flask_restful import Resource -from selfprivacy_api.resolvers.api import get_api_version +from selfprivacy_api.graphql.queries.api import get_api_version class ApiVersion(Resource): """SelfPrivacy API version""" diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index 5322fae..556ee60 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Various utility functions""" +import datetime from enum import Enum import json import portalocker @@ -119,3 +120,8 @@ def is_username_forbidden(username): return True return False + +def parse_date(date_str: str) -> datetime.datetime: + """Parse date string which can be in + %Y-%m-%dT%H:%M:%S.%fZ or %Y-%m-%d %H:%M:%S.%f format""" + return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") if date_str.endswith("Z") else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")