Merge pull request 'Input sanitization, added swagger' (#5) from more-RESTful-api into master

Reviewed-on: ilchub/selfprivacy-rest-api#5
pull/6/head
Inex Code 2021-11-17 10:27:37 +02:00
commit 82b7f97dce
19 changed files with 877 additions and 291 deletions

View File

@ -1,3 +1,5 @@
{
"python.formatting.provider": "black"
"python.formatting.provider": "black",
"python.linting.pylintEnabled": true,
"python.linting.enabled": true
}

View File

@ -1,3 +1,3 @@
[build-system]
requires = ["setuptools", "wheel", "portalocker"]
requires = ["setuptools", "wheel", "portalocker", "flask-swagger", "flask-swagger-ui"]
build-backend = "setuptools.build_meta"

View File

@ -4,3 +4,5 @@ flask_restful
flask_socketio
setuptools
portalocker
flask-swagger
flask-swagger-ui

View File

@ -1,40 +1,74 @@
#!/usr/bin/env python3
"""SelfPrivacy server management API"""
import os
from flask import Flask, request, jsonify
from flask_restful import Api
import os
from flask_swagger import swagger
from flask_swagger_ui import get_swaggerui_blueprint
from selfprivacy_api.resources.users import Users
from selfprivacy_api.resources.users import User, Users
from selfprivacy_api.resources.common import DecryptDisk
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
swagger_blueprint = get_swaggerui_blueprint(
"/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"}
)
def create_app():
"""Initiate Flask app and bind routes"""
app = Flask(__name__)
api = Api(app)
app.config['AUTH_TOKEN'] = os.environ.get('AUTH_TOKEN')
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
# Check bearer token
@app.before_request
def check_auth():
auth = request.headers.get("Authorization")
if auth is None:
return jsonify({"error": "Missing Authorization header"}), 401
# Exclude swagger-ui
if not request.path.startswith("/api"):
auth = request.headers.get("Authorization")
if auth is None:
return jsonify({"error": "Missing Authorization header"}), 401
# Check if token is valid
if auth != "Bearer " + app.config["AUTH_TOKEN"]:
return jsonify({"error": "Invalid token"}), 401
# Check if token is valid
if auth != "Bearer " + app.config['AUTH_TOKEN']:
return jsonify({"error": "Invalid token"}), 401
api.add_resource(Users, "/users")
api.add_resource(User, "/users/<string:username>")
api.add_resource(DecryptDisk, "/decryptDisk")
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
app.register_blueprint(api_system)
app.register_blueprint(api_services)
@app.route("/api/swagger.json")
def spec():
if app.config["ENABLE_SWAGGER"] == "1":
swag = swagger(app)
swag["info"]["version"] = "1.0"
swag["info"]["title"] = "SelfPrivacy API"
swag["info"]["description"] = "SelfPrivacy API"
swag["securityDefinitions"] = {
"bearerAuth": {
"type": "apiKey",
"name": "Authorization",
"in": "header",
}
}
swag["security"] = [{"bearerAuth": []}]
return jsonify(swag)
return jsonify({}), 404
if app.config["ENABLE_SWAGGER"] == "1":
app.register_blueprint(swagger_blueprint, url_prefix="/api/docs")
return app
if __name__ == "__main__":
app = create_app()
app.run(port=5050, debug=False)
created_app = create_app()
created_app.run(port=5050, debug=False)

View File

@ -1,20 +1,56 @@
#!/usr/bin/env python3
from flask import Flask, jsonify, request, json
from flask_restful import Resource
"""Unassigned views"""
import subprocess
from flask_restful import Resource, reqparse
from selfprivacy_api.utils import get_domain
# Decrypt disk
class DecryptDisk(Resource):
def post(self):
decryptionCommand = """
echo -n {0} | cryptsetup luksOpen /dev/sdb decryptedVar""".format(
request.headers.get("X-Decryption-Key")
)
"""Decrypt disk"""
decryptionService = subprocess.Popen(
decryptionCommand, shell=True, stdout=subprocess.PIPE
def post(self):
"""
Decrypt /dev/sdb using cryptsetup luksOpen
---
consumes:
- application/json
tags:
- System
security:
- bearerAuth: []
parameters:
- in: body
name: body
required: true
description: Provide a password for decryption
schema:
type: object
required:
- password
properties:
password:
type: string
description: Decryption password.
responses:
201:
description: OK
400:
description: Bad request
401:
description: Unauthorized
"""
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
decryption_command = ["cryptsetup", "luksOpen", "/dev/sdb", "decryptedVar"]
# TODO: Check if this works at all
decryption_service = subprocess.Popen(
decryption_command,
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
decryptionService.communicate()
return {"status": decryptionService.returncode}
decryption_service.communicate(input=args["password"])
return {"status": decryption_service.returncode}, 201

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python3
"""Services management module"""
from flask import Blueprint
from flask_restful import Api

View File

@ -1,25 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
import portalocker
"""Bitwarden management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Bitwarden
class EnableBitwarden(Resource):
"""Enable Bitwarden"""
def post(self):
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable Bitwarden
---
tags:
- Bitwarden
security:
- bearerAuth: []
responses:
200:
description: Bitwarden enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -27,21 +45,37 @@ class EnableBitwarden(Resource):
}
# Disable Bitwarden
class DisableBitwarden(Resource):
"""Disable Bitwarden"""
def post(self):
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Disable Bitwarden
---
tags:
- Bitwarden
security:
- bearerAuth: []
responses:
200:
description: Bitwarden disabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "bitwarden" not in data:
data["bitwarden"] = {}
data["bitwarden"]["enable"] = False
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,

View File

@ -1,25 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
import portalocker
"""Gitea management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Gitea
class EnableGitea(Resource):
"""Enable Gitea"""
def post(self):
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable Gitea
---
tags:
- Gitea
security:
- bearerAuth: []
responses:
200:
description: Gitea enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -27,21 +45,37 @@ class EnableGitea(Resource):
}
# Disable Gitea
class DisableGitea(Resource):
"""Disable Gitea"""
def post(self):
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Disable Gitea
---
tags:
- Gitea
security:
- bearerAuth: []
responses:
200:
description: Gitea disabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "gitea" not in data:
data["gitea"] = {}
data["gitea"]["enable"] = False
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,

View File

@ -1,20 +1,36 @@
#!/usr/bin/env python3
from flask_restful import Resource
"""Mail server management module"""
import base64
import subprocess
from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import get_domain
# Get DKIM key from file
class DKIMKey(Resource):
"""Get DKIM key from file"""
def get(self):
"""
Get DKIM key from file
---
tags:
- Email
security:
- bearerAuth: []
responses:
200:
description: DKIM key encoded in base64
401:
description: Unauthorized
"""
domain = get_domain()
catProcess = subprocess.Popen(
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = catProcess.communicate()[0]
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim

View File

@ -1,42 +1,83 @@
#!/usr/bin/env python3
from flask_restful import Resource, Api
"""Services status module"""
import subprocess
from flask_restful import Resource
from . import api
# Get service status
class ServiceStatus(Resource):
"""Get service status"""
def get(self):
imapService = subprocess.Popen(["systemctl", "status", "dovecot2.service"])
imapService.communicate()[0]
smtpService = subprocess.Popen(["systemctl", "status", "postfix.service"])
smtpService.communicate()[0]
httpService = subprocess.Popen(["systemctl", "status", "nginx.service"])
httpService.communicate()[0]
bitwardenService = subprocess.Popen(
"""
Get service status
---
tags:
- Services
responses:
200:
description: Service status
schema:
type: object
properties:
imap:
type: integer
description: Dovecot service status
smtp:
type: integer
description: Postfix service status
http:
type: integer
description: Nginx service status
bitwarden:
type: integer
description: Bitwarden service status
gitea:
type: integer
description: Gitea service status
nextcloud:
type: integer
description: Nextcloud service status
ocserv:
type: integer
description: OpenConnect VPN service status
pleroma:
type: integer
description: Pleroma service status
401:
description: Unauthorized
"""
imap_service = subprocess.Popen(["systemctl", "status", "dovecot2.service"])
imap_service.communicate()[0]
smtp_service = subprocess.Popen(["systemctl", "status", "postfix.service"])
smtp_service.communicate()[0]
http_service = subprocess.Popen(["systemctl", "status", "nginx.service"])
http_service.communicate()[0]
bitwarden_service = subprocess.Popen(
["systemctl", "status", "bitwarden_rs.service"]
)
bitwardenService.communicate()[0]
giteaService = subprocess.Popen(["systemctl", "status", "gitea.service"])
giteaService.communicate()[0]
nextcloudService = subprocess.Popen(
bitwarden_service.communicate()[0]
gitea_service = subprocess.Popen(["systemctl", "status", "gitea.service"])
gitea_service.communicate()[0]
nextcloud_service = subprocess.Popen(
["systemctl", "status", "phpfpm-nextcloud.service"]
)
nextcloudService.communicate()[0]
ocservService = subprocess.Popen(["systemctl", "status", "ocserv.service"])
ocservService.communicate()[0]
pleromaService = subprocess.Popen(["systemctl", "status", "pleroma.service"])
pleromaService.communicate()[0]
nextcloud_service.communicate()[0]
ocserv_service = subprocess.Popen(["systemctl", "status", "ocserv.service"])
ocserv_service.communicate()[0]
pleroma_service = subprocess.Popen(["systemctl", "status", "pleroma.service"])
pleroma_service.communicate()[0]
return {
"imap": imapService.returncode,
"smtp": smtpService.returncode,
"http": httpService.returncode,
"bitwarden": bitwardenService.returncode,
"gitea": giteaService.returncode,
"nextcloud": nextcloudService.returncode,
"ocserv": ocservService.returncode,
"pleroma": pleromaService.returncode,
"imap": imap_service.returncode,
"smtp": smtp_service.returncode,
"http": http_service.returncode,
"bitwarden": bitwarden_service.returncode,
"gitea": gitea_service.returncode,
"nextcloud": nextcloud_service.returncode,
"ocserv": ocserv_service.returncode,
"pleroma": pleroma_service.returncode,
}

View File

@ -1,25 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
import portalocker
"""Nextcloud management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Nextcloud
class EnableNextcloud(Resource):
"""Enable Nextcloud"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable Nextcloud
---
tags:
- Nextcloud
security:
- bearerAuth: []
responses:
200:
description: Nextcloud enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -27,21 +45,37 @@ class EnableNextcloud(Resource):
}
# Disable Nextcloud
class DisableNextcloud(Resource):
"""Disable Nextcloud"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Disable Nextcloud
---
tags:
- Nextcloud
security:
- bearerAuth: []
responses:
200:
description: Nextcloud disabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "nextcloud" not in data:
data["nextcloud"] = {}
data["nextcloud"]["enable"] = False
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,

View File

@ -1,25 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
import portalocker
"""OpenConnect VPN server management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable OpenConnect VPN server
class EnableOcserv(Resource):
"""Enable OpenConnect VPN server"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable OCserv
---
tags:
- OCserv
security:
- bearerAuth: []
responses:
200:
description: OCserv enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -27,21 +45,37 @@ class EnableOcserv(Resource):
}
# Disable OpenConnect VPN server
class DisableOcserv(Resource):
"""Disable OpenConnect VPN server"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Disable OCserv
---
tags:
- OCserv
security:
- bearerAuth: []
responses:
200:
description: OCserv disabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "ocserv" not in data:
data["ocserv"] = {}
data["ocserv"]["enable"] = False
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,

View File

@ -1,25 +1,43 @@
#!/usr/bin/env python3
from flask_restful import Resource
import portalocker
"""Pleroma management module"""
import json
import portalocker
from flask_restful import Resource
from selfprivacy_api.resources.services import api
# Enable Pleroma
class EnablePleroma(Resource):
"""Enable Pleroma"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable Pleroma
---
tags:
- Pleroma
security:
- bearerAuth: []
responses:
200:
description: Pleroma enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -27,21 +45,37 @@ class EnablePleroma(Resource):
}
# Disable Pleroma
class DisablePleroma(Resource):
"""Disable Pleroma"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Disable Pleroma
---
tags:
- Pleroma
security:
- bearerAuth: []
responses:
200:
description: Pleroma disabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "pleroma" not in data:
data["pleroma"] = {}
data["pleroma"]["enable"] = False
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,

View File

@ -1,69 +1,131 @@
#!/usr/bin/env python3
"""Backups management module"""
import json
import subprocess
from flask import request
from flask_restful import Resource
import subprocess
import json
from selfprivacy_api.resources.services import api
# List all restic backups
class ListAllBackups(Resource):
def get(self):
backupListingCommand = """
restic -r b2:{0}:/sfbackup snapshots --password-file /var/lib/restic/rpass --json
""".format(
request.headers.get("X-Repository-Name")
)
backupListingProcessDescriptor = subprocess.Popen(
backupListingCommand,
shell=True,
class ListAllBackups(Resource):
"""List all restic backups"""
def get(self):
"""
Get all restic backups
---
tags:
- Backups
security:
- bearerAuth: []
responses:
200:
description: A list of snapshots
400:
description: Bad request
401:
description: Unauthorized
"""
repository_name = request.headers.get("X-Repository-Name")
backup_listing_command = [
"restic",
"-r",
f"b2:{repository_name}:/sfbackup",
"snapshots",
"--json",
]
with subprocess.Popen(
backup_listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
) as backup_listing_process_descriptor:
snapshots_list = backup_listing_process_descriptor.communicate()[0]
snapshotsList = backupListingProcessDescriptor.communicate()[0]
return snapshotsList.decode("utf-8")
return snapshots_list.decode("utf-8")
# Create a new restic backup
class AsyncCreateBackup(Resource):
def put(self):
backupCommand = """
restic -r b2:{0}:/sfbackup --verbose backup /var --password-file /var/lib/restic/rpass > /tmp/backup.log
""".format(
request.headers.get("X-Repository-Name")
)
"""Create a new restic backup"""
backupProcessDescriptor = subprocess.Popen(
backupCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
def put(self):
"""
Initiate a new restic backup
---
tags:
- Backups
security:
- bearerAuth: []
responses:
200:
description: Backup creation has started
400:
description: Bad request
401:
description: Unauthorized
"""
repository_name = request.headers.get("X-Repository-Name")
backup_command = [
"restic",
"-r",
f"b2:{repository_name}:/sfbackup",
"--verbose",
"--json",
"backup",
"/var",
]
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command, shell=False, stdout=log_file, stderr=subprocess.STDOUT
)
return {
"status": 0,
"message": "Backup creation has started",
}
class CheckBackupStatus(Resource):
"""Check current backup status"""
def get(self):
backupStatusCheckCommand = """
tail -1 /tmp/backup.log
"""
Get backup status
---
tags:
- Backups
security:
- bearerAuth: []
responses:
200:
description: Backup status
400:
description: Bad request
401:
description: Unauthorized
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
backupStatusCheckProcessDescriptor = subprocess.Popen(
backupStatusCheckCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
backupProcessStatus = backupStatusCheckProcessDescriptor.communicate()[0].decode("utf-8")
with subprocess.Popen(
backup_status_check_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_status_check_process_descriptor:
backup_process_status = (
backup_status_check_process_descriptor.communicate()[0].decode("utf-8")
)
try:
json.loads(backupProcessStatus)
json.loads(backup_process_status)
except ValueError:
return {
"message": backupProcessStatus
}
return backupProcessStatus
return {"message": backup_process_status}
return backup_process_status
api.add_resource(ListAllBackups, "/restic/backup/list")

View File

@ -1,26 +1,43 @@
#!/usr/bin/env python3
from flask import Blueprint, request
from flask_restful import Resource, reqparse
import portalocker
"""SSH management module"""
import json
import portalocker
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
# Enable SSH
class EnableSSH(Resource):
"""Enable SSH"""
def post(self):
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
"""
Enable SSH
---
tags:
- SSH
security:
- bearerAuth: []
responses:
200:
description: SSH enabled
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "ssh" not in data:
data["ssh"] = {}
data["ssh"]["enable"] = True
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
@ -28,40 +45,75 @@ class EnableSSH(Resource):
}
# Write new SSH key
class WriteSSHKey(Resource):
"""Write new SSH key"""
def put(self):
"""
Add a SSH root key
---
consumes:
- application/json
tags:
- SSH
security:
- bearerAuth: []
parameters:
- in: body
name: body
required: true
description: Public key to add
schema:
type: object
required:
- public_key
properties:
public_key:
type: string
description: ssh-ed25519 public key.
responses:
201:
description: Key added
400:
description: Bad request
401:
description: Unauthorized
409:
description: Key already exists
"""
parser = reqparse.RequestParser()
parser.add_argument(
"public_key", type=str, required=True, help="Key cannot be blank!"
)
args = parser.parse_args()
publicKey = args["public_key"]
public_key = args["public_key"]
with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f:
portalocker.lock(f, portalocker.LOCK_EX)
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
if "ssh" not in data:
data["ssh"] = {}
# Return 400 if key already in array
# Return 409 if key already in array
for key in data["ssh"]["rootSshKeys"]:
if key == publicKey:
if key == public_key:
return {
"error": "Key already exists",
}, 400
data["ssh"]["rootSshKeys"].append(publicKey)
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
}, 409
data["ssh"]["rootSshKeys"].append(public_key)
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {
"status": 0,
"message": "New SSH key successfully written",
}
}, 201
api.add_resource(EnableSSH, "/ssh/enable")

View File

@ -1,38 +1,96 @@
#!/usr/bin/env python3
"""System management module"""
import subprocess
from flask import Blueprint
from flask_restful import Resource, Api
import subprocess
api_system = Blueprint("system", __name__, url_prefix="/system")
api = Api(api_system)
# Rebuild NixOS
class RebuildSystem(Resource):
"""Rebuild NixOS"""
def get(self):
rebuildResult = subprocess.Popen(["nixos-rebuild", "switch"])
rebuildResult.communicate()[0]
return rebuildResult.returncode
"""
Rebuild NixOS with nixos-rebuild switch
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: System rebuild has started
401:
description: Unauthorized
"""
rebuild_result = subprocess.Popen(["nixos-rebuild", "switch"])
rebuild_result.communicate()[0]
return rebuild_result.returncode
# Rollback NixOS
class RollbackSystem(Resource):
"""Rollback NixOS"""
def get(self):
rollbackResult = subprocess.Popen(["nixos-rebuild", "switch", "--rollback"])
rollbackResult.communicate()[0]
return rollbackResult.returncode
"""
Rollback NixOS with nixos-rebuild switch --rollback
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: System rollback has started
401:
description: Unauthorized
"""
rollback_result = subprocess.Popen(["nixos-rebuild", "switch", "--rollback"])
rollback_result.communicate()[0]
return rollback_result.returncode
# Upgrade NixOS
class UpgradeSystem(Resource):
"""Upgrade NixOS"""
def get(self):
upgradeResult = subprocess.Popen(["nixos-rebuild", "switch", "--upgrade"])
upgradeResult.communicate()[0]
return upgradeResult.returncode
"""
Upgrade NixOS with nixos-rebuild switch --upgrade
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: System upgrade has started
401:
description: Unauthorized
"""
upgrade_result = subprocess.Popen(["nixos-rebuild", "switch", "--upgrade"])
upgrade_result.communicate()[0]
return upgrade_result.returncode
# Get system version from uname
class SystemVersion(Resource):
"""Get system version from uname"""
def get(self):
"""
Get system version from uname -a
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: OK
401:
description: Unauthorized
"""
return {
"system_version": subprocess.check_output(["uname", "-a"])
.decode("utf-8")
@ -40,9 +98,23 @@ class SystemVersion(Resource):
}
# Get python version
class PythonVersion(Resource):
"""Get python version"""
def get(self):
"""
Get python version used by this API
---
tags:
- System
security:
- bearerAuth: []
responses:
200:
description: OK
401:
description: Unauthorized
"""
return subprocess.check_output(["python", "-V"]).decode("utf-8").strip()

View File

@ -1,88 +1,181 @@
#!/usr/bin/env python3
from flask import Blueprint, jsonify, request
from flask_restful import Resource, Api
"""Users management module"""
import subprocess
import portalocker
import json
import re
import portalocker
from flask_restful import Resource, reqparse
from selfprivacy_api import resources
api_users = Blueprint("api_users", __name__)
api = Api(api_users)
# Create a new user
class Users(Resource):
def post(self):
rawPassword = request.headers.get("X-Password")
hashingCommand = """
mkpasswd -m sha-512 {0}
""".format(
rawPassword
)
passwordHashProcessDescriptor = subprocess.Popen(
hashingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
hashedPassword = passwordHashProcessDescriptor.communicate()[0]
hashedPassword = hashedPassword.decode("ascii")
hashedPassword = hashedPassword.rstrip()
"""Users management"""
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
def get(self):
"""
Get a list of users
---
tags:
- Users
security:
- bearerAuth: []
responses:
200:
description: A list of users
401:
description: Unauthorized
"""
with open(
"/etc/nixos/userdata/userdata.json", "r", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_SH)
try:
data = json.load(f)
# Return 400 if username is not provided
if request.headers.get("X-User") is None:
return {"error": "username is required"}, 400
# Return 400 if password is not provided
if request.headers.get("X-Password") is None:
return {"error": "password is required"}, 400
# Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", request.headers.get("X-User")):
return {"error": "username must be alphanumeric"}, 400
# Check if username less than 32 characters
if len(request.headers.get("X-User")) > 32:
return {"error": "username must be less than 32 characters"}, 400
data = json.load(userdata_file)
users = []
for user in data["users"]:
users.append(user["username"])
finally:
portalocker.unlock(userdata_file)
return users
def post(self):
"""
Create a new user
---
consumes:
- application/json
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: body
name: user
required: true
description: User to create
schema:
type: object
required:
- username
- password
properties:
username:
type: string
description: Unix username. Must be alphanumeric and less than 32 characters
password:
type: string
description: Unix password.
responses:
201:
description: Created user
400:
description: Bad request
401:
description: Unauthorized
409:
description: User already exists
"""
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument("username", type=str, required=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
# Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
return {"error": "username must be alphanumeric"}, 400
# Check if username less than 32 characters
if len(args["username"]) > 32:
return {"error": "username must be less than 32 characters"}, 400
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(userdata_file)
# Return 400 if user already exists
for user in data["users"]:
if user["username"] == request.headers.get("X-User"):
return {"error": "User already exists"}, 400
if user["username"] == args["username"]:
return {"error": "User already exists"}, 409
if "users" not in data:
data["users"] = []
data["users"].append(
{
"username": request.headers.get("X-User"),
"hashedPassword": hashedPassword,
"username": args["username"],
"hashedPassword": hashed_password,
}
)
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {"result": 0}
return {"result": 0, "username": args["username"]}, 201
def delete(self):
with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f:
portalocker.lock(f, portalocker.LOCK_EX)
class User(Resource):
"""Single user managment"""
def delete(self, username):
"""
Delete a user
---
tags:
- Users
security:
- bearerAuth: []
parameters:
- in: path
name: username
required: true
description: User to delete
type: string
responses:
200:
description: Deleted user
400:
description: Bad request
401:
description: Unauthorized
404:
description: User not found
"""
with open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
) as userdata_file:
portalocker.lock(userdata_file, portalocker.LOCK_EX)
try:
data = json.load(f)
data = json.load(userdata_file)
# Return 400 if username is not provided
if request.headers.get("X-User") is None:
if username is None:
return {"error": "username is required"}, 400
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist
for user in data["users"]:
if user["username"] == request.headers.get("X-User"):
if user["username"] == username:
data["users"].remove(user)
break
else:
return {"error": "User does not exist"}, 400
return {"error": "User does not exist"}, 404
f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
userdata_file.seek(0)
json.dump(data, userdata_file, indent=4)
userdata_file.truncate()
finally:
portalocker.unlock(f)
portalocker.unlock(userdata_file)
return {"result": 0}
return {"result": 0, "username": username}

View File

@ -1,7 +1,9 @@
#!/usr/bin/env python3
"""Various utility functions"""
# Get domain from /var/domain without trailing new line
def get_domain():
with open("/var/domain", "r") as f:
domain = f.readline().rstrip()
"""Get domain from /var/domain without trailing new line"""
with open("/var/domain", "r", encoding="utf-8") as domain_file:
domain = domain_file.readline().rstrip()
return domain

View File

@ -1,8 +1,10 @@
from setuptools import setup, find_packages
setup(
name='selfprivacy_api',
version='1.1.0',
name="selfprivacy_api",
version="1.1.0",
packages=find_packages(),
scripts=['selfprivacy_api/app.py',],
scripts=[
"selfprivacy_api/app.py",
],
)