From a4327fa66910fa4bfa51de8404ac1bdcaa7887d9 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Fri, 1 Mar 2024 03:21:31 +0300 Subject: [PATCH 1/3] fix(dns): Ignore link-local IPv6 address --- .../graphql/common_types/service.py | 5 +- selfprivacy_api/services/__init__.py | 20 ++++--- .../services/bitwarden/__init__.py | 35 ++++-------- selfprivacy_api/services/gitea/__init__.py | 34 ++++-------- .../services/jitsimeet/__init__.py | 36 ++++--------- .../services/mailserver/__init__.py | 38 +++++++------ .../services/nextcloud/__init__.py | 34 ++++-------- selfprivacy_api/services/ocserv/__init__.py | 27 ++-------- selfprivacy_api/services/pleroma/__init__.py | 34 ++++-------- selfprivacy_api/services/service.py | 54 ++++++++++++++----- .../services/test_service/__init__.py | 24 ++------- selfprivacy_api/utils/network.py | 8 +-- tests/test_network_utils.py | 36 +++++++++++-- tests/test_services.py | 4 +- 14 files changed, 172 insertions(+), 217 deletions(-) diff --git a/selfprivacy_api/graphql/common_types/service.py b/selfprivacy_api/graphql/common_types/service.py index 56e12b1..314e6b6 100644 --- a/selfprivacy_api/graphql/common_types/service.py +++ b/selfprivacy_api/graphql/common_types/service.py @@ -8,6 +8,7 @@ from selfprivacy_api.graphql.common_types.dns import DnsRecord from selfprivacy_api.services import get_service_by_id, get_services_by_location from selfprivacy_api.services import Service as ServiceInterface from selfprivacy_api.utils.block_devices import BlockDevices +import selfprivacy_api.utils.network as network_utils def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]: @@ -141,7 +142,9 @@ def service_to_graphql_service(service: ServiceInterface) -> Service: priority=record.priority, display_name=record.display_name, ) - for record in service.get_dns_records() + for record in service.get_dns_records( + network_utils.get_ip4(), network_utils.get_ip6() + ) ], ) diff --git a/selfprivacy_api/services/__init__.py b/selfprivacy_api/services/__init__.py index dd0a5b4..f9dfac2 100644 --- a/selfprivacy_api/services/__init__.py +++ b/selfprivacy_api/services/__init__.py @@ -56,14 +56,18 @@ def get_all_required_dns_records() -> list[ServiceDnsRecord]: ttl=3600, display_name="SelfPrivacy API", ), - ServiceDnsRecord( - type="AAAA", - name="api", - content=ip6, - ttl=3600, - display_name="SelfPrivacy API (IPv6)", - ), ] + + if ip6 is not None: + dns_records.append( + ServiceDnsRecord( + type="AAAA", + name="api", + content=ip6, + ttl=3600, + display_name="SelfPrivacy API (IPv6)", + ) + ) for service in get_enabled_services(): - dns_records += service.get_dns_records() + dns_records += service.get_dns_records(ip4, ip6) return dns_records diff --git a/selfprivacy_api/services/bitwarden/__init__.py b/selfprivacy_api/services/bitwarden/__init__.py index 1590729..f04381d 100644 --- a/selfprivacy_api/services/bitwarden/__init__.py +++ b/selfprivacy_api/services/bitwarden/__init__.py @@ -1,15 +1,14 @@ """Class representing Bitwarden service""" import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_status_getter import get_service_status -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.services.service import Service, ServiceStatus +from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON @@ -41,11 +40,15 @@ class Bitwarden(Service): return "vaultwarden" @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" domain = get_domain() return f"https://password.{domain}" + @staticmethod + def get_subdomain() -> Optional[str]: + return "password" + @staticmethod def is_movable() -> bool: return True @@ -96,29 +99,9 @@ class Bitwarden(Service): return "" @staticmethod - def get_folders() -> typing.List[str]: + def get_folders() -> List[str]: return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - """Return list of DNS records for Bitwarden service.""" - return [ - ServiceDnsRecord( - type="A", - name="password", - content=network_utils.get_ip4(), - ttl=3600, - display_name="Bitwarden", - ), - ServiceDnsRecord( - type="AAAA", - name="password", - content=network_utils.get_ip6(), - ttl=3600, - display_name="Bitwarden (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id="services.bitwarden.move", diff --git a/selfprivacy_api/services/gitea/__init__.py b/selfprivacy_api/services/gitea/__init__.py index 9b6f80f..bf3f5d2 100644 --- a/selfprivacy_api/services/gitea/__init__.py +++ b/selfprivacy_api/services/gitea/__init__.py @@ -1,15 +1,14 @@ """Class representing Bitwarden service""" import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_status_getter import get_service_status -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.services.service import Service, ServiceStatus +from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.gitea.icon import GITEA_ICON @@ -37,11 +36,15 @@ class Gitea(Service): return base64.b64encode(GITEA_ICON.encode("utf-8")).decode("utf-8") @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" domain = get_domain() return f"https://git.{domain}" + @staticmethod + def get_subdomain() -> Optional[str]: + return "git" + @staticmethod def is_movable() -> bool: return True @@ -91,28 +94,9 @@ class Gitea(Service): return "" @staticmethod - def get_folders() -> typing.List[str]: + def get_folders() -> List[str]: return ["/var/lib/gitea"] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - return [ - ServiceDnsRecord( - type="A", - name="git", - content=network_utils.get_ip4(), - ttl=3600, - display_name="Gitea", - ), - ServiceDnsRecord( - type="AAAA", - name="git", - content=network_utils.get_ip6(), - ttl=3600, - display_name="Gitea (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id="services.gitea.move", diff --git a/selfprivacy_api/services/jitsimeet/__init__.py b/selfprivacy_api/services/jitsimeet/__init__.py index 30663f9..3531181 100644 --- a/selfprivacy_api/services/jitsimeet/__init__.py +++ b/selfprivacy_api/services/jitsimeet/__init__.py @@ -1,16 +1,15 @@ """Class representing Jitsi Meet service""" import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job from selfprivacy_api.services.generic_status_getter import ( get_service_status_from_several_units, ) -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.services.service import Service, ServiceStatus +from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON @@ -38,11 +37,15 @@ class JitsiMeet(Service): return base64.b64encode(JITSI_ICON.encode("utf-8")).decode("utf-8") @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" domain = get_domain() return f"https://meet.{domain}" + @staticmethod + def get_subdomain() -> Optional[str]: + return "meet" + @staticmethod def is_movable() -> bool: return False @@ -98,29 +101,8 @@ class JitsiMeet(Service): return "" @staticmethod - def get_folders() -> typing.List[str]: + def get_folders() -> List[str]: return ["/var/lib/jitsi-meet"] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - ip4 = network_utils.get_ip4() - ip6 = network_utils.get_ip6() - return [ - ServiceDnsRecord( - type="A", - name="meet", - content=ip4, - ttl=3600, - display_name="Jitsi", - ), - ServiceDnsRecord( - type="AAAA", - name="meet", - content=ip6, - ttl=3600, - display_name="Jitsi (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: raise NotImplementedError("jitsi-meet service is not movable") diff --git a/selfprivacy_api/services/mailserver/__init__.py b/selfprivacy_api/services/mailserver/__init__.py index 536b444..5f7365f 100644 --- a/selfprivacy_api/services/mailserver/__init__.py +++ b/selfprivacy_api/services/mailserver/__init__.py @@ -2,7 +2,7 @@ import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service @@ -12,7 +12,6 @@ from selfprivacy_api.services.generic_status_getter import ( from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus from selfprivacy_api import utils from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON @@ -40,10 +39,14 @@ class MailServer(Service): return "virtualMail" @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" return None + @staticmethod + def get_subdomain() -> str | None: + return None + @staticmethod def is_movable() -> bool: return True @@ -102,20 +105,18 @@ class MailServer(Service): return "" @staticmethod - def get_folders() -> typing.List[str]: + def get_folders() -> List[str]: return ["/var/vmail", "/var/sieve"] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: + @classmethod + def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]: domain = utils.get_domain() dkim_record = utils.get_dkim_key(domain) - ip4 = network_utils.get_ip4() - ip6 = network_utils.get_ip6() if dkim_record is None: return [] - return [ + dns_records = [ ServiceDnsRecord( type="A", name=domain, @@ -123,13 +124,6 @@ class MailServer(Service): ttl=3600, display_name="Root Domain", ), - ServiceDnsRecord( - type="AAAA", - name=domain, - content=ip6, - ttl=3600, - display_name="Root Domain (IPv6)", - ), ServiceDnsRecord( type="MX", name=domain, @@ -161,6 +155,18 @@ class MailServer(Service): ), ] + if ip6 is not None: + dns_records.append( + ServiceDnsRecord( + type="AAAA", + name=domain, + content=ip6, + ttl=3600, + display_name="Root Domain (IPv6)", + ), + ) + return dns_records + def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id="services.email.move", diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py index 0da6dd9..17e72d7 100644 --- a/selfprivacy_api/services/nextcloud/__init__.py +++ b/selfprivacy_api/services/nextcloud/__init__.py @@ -1,14 +1,13 @@ """Class representing Nextcloud service.""" import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_status_getter import get_service_status -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.services.service import Service, ServiceStatus +from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON @@ -36,11 +35,15 @@ class Nextcloud(Service): return base64.b64encode(NEXTCLOUD_ICON.encode("utf-8")).decode("utf-8") @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" domain = get_domain() return f"https://cloud.{domain}" + @staticmethod + def get_subdomain() -> Optional[str]: + return "cloud" + @staticmethod def is_movable() -> bool: return True @@ -96,28 +99,9 @@ class Nextcloud(Service): return "" @staticmethod - def get_folders() -> typing.List[str]: + def get_folders() -> List[str]: return ["/var/lib/nextcloud"] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - return [ - ServiceDnsRecord( - type="A", - name="cloud", - content=network_utils.get_ip4(), - ttl=3600, - display_name="Nextcloud", - ), - ServiceDnsRecord( - type="AAAA", - name="cloud", - content=network_utils.get_ip6(), - ttl=3600, - display_name="Nextcloud (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id="services.nextcloud.move", diff --git a/selfprivacy_api/services/ocserv/__init__.py b/selfprivacy_api/services/ocserv/__init__.py index a28358d..a7cfa06 100644 --- a/selfprivacy_api/services/ocserv/__init__.py +++ b/selfprivacy_api/services/ocserv/__init__.py @@ -4,11 +4,9 @@ import subprocess import typing from selfprivacy_api.jobs import Job from selfprivacy_api.services.generic_status_getter import get_service_status -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus -from selfprivacy_api.utils import ReadUserData, WriteUserData +from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.services.ocserv.icon import OCSERV_ICON -import selfprivacy_api.utils.network as network_utils class Ocserv(Service): @@ -35,6 +33,10 @@ class Ocserv(Service): """Return service url.""" return None + @staticmethod + def get_subdomain() -> typing.Optional[str]: + return "vpn" + @staticmethod def is_movable() -> bool: return False @@ -79,25 +81,6 @@ class Ocserv(Service): def get_logs(): return "" - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - return [ - ServiceDnsRecord( - type="A", - name="vpn", - content=network_utils.get_ip4(), - ttl=3600, - display_name="OpenConnect VPN", - ), - ServiceDnsRecord( - type="AAAA", - name="vpn", - content=network_utils.get_ip6(), - ttl=3600, - display_name="OpenConnect VPN (IPv6)", - ), - ] - @staticmethod def get_folders() -> typing.List[str]: return [] diff --git a/selfprivacy_api/services/pleroma/__init__.py b/selfprivacy_api/services/pleroma/__init__.py index 1aae50e..cd21178 100644 --- a/selfprivacy_api/services/pleroma/__init__.py +++ b/selfprivacy_api/services/pleroma/__init__.py @@ -1,15 +1,14 @@ """Class representing Nextcloud service.""" import base64 import subprocess -import typing +from typing import Optional, List from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_status_getter import get_service_status -from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus +from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.owned_path import OwnedPath -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice -import selfprivacy_api.utils.network as network_utils from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON @@ -33,11 +32,15 @@ class Pleroma(Service): return base64.b64encode(PLEROMA_ICON.encode("utf-8")).decode("utf-8") @staticmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """Return service url.""" domain = get_domain() return f"https://social.{domain}" + @staticmethod + def get_subdomain() -> Optional[str]: + return "social" + @staticmethod def is_movable() -> bool: return True @@ -82,7 +85,7 @@ class Pleroma(Service): return "" @staticmethod - def get_owned_folders() -> typing.List[OwnedPath]: + def get_owned_folders() -> List[OwnedPath]: """ Get a list of occupied directories with ownership info pleroma has folders that are owned by different users @@ -100,25 +103,6 @@ class Pleroma(Service): ), ] - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - return [ - ServiceDnsRecord( - type="A", - name="social", - content=network_utils.get_ip4(), - ttl=3600, - display_name="Pleroma", - ), - ServiceDnsRecord( - type="AAAA", - name="social", - content=network_utils.get_ip6(), - ttl=3600, - display_name="Pleroma (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id="services.pleroma.move", diff --git a/selfprivacy_api/services/service.py b/selfprivacy_api/services/service.py index f41c821..0cca38a 100644 --- a/selfprivacy_api/services/service.py +++ b/selfprivacy_api/services/service.py @@ -1,7 +1,7 @@ """Abstract class for a service running on a server""" from abc import ABC, abstractmethod from enum import Enum -import typing +from typing import List, Optional from pydantic import BaseModel from selfprivacy_api.jobs import Job @@ -12,7 +12,7 @@ from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api import utils from selfprivacy_api.utils.waitloop import wait_until_true -from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain +from selfprivacy_api.utils import ReadUserData, WriteUserData DEFAULT_START_STOP_TIMEOUT = 5 * 60 @@ -35,7 +35,7 @@ class ServiceDnsRecord(BaseModel): content: str ttl: int display_name: str - priority: typing.Optional[int] = None + priority: Optional[int] = None class Service(ABC): @@ -78,14 +78,22 @@ class Service(ABC): @staticmethod @abstractmethod - def get_url() -> typing.Optional[str]: + def get_url() -> Optional[str]: """ The url of the service if it is accessible from the internet browser. """ pass + @staticmethod + @abstractmethod + def get_subdomain() -> Optional[str]: + """ + The assigned primary subdomain for this service. + """ + pass + @classmethod - def get_user(cls) -> typing.Optional[str]: + def get_user(cls) -> Optional[str]: """ The user that owns the service's files. Defaults to the service's id. @@ -93,7 +101,7 @@ class Service(ABC): return cls.get_id() @classmethod - def get_group(cls) -> typing.Optional[str]: + def get_group(cls) -> Optional[str]: """ The group that owns the service's files. Defaults to the service's user. @@ -209,10 +217,32 @@ class Service(ABC): storage_used += get_storage_usage(folder) return storage_used - @staticmethod - @abstractmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - pass + @classmethod + def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]: + subdomain = cls.get_subdomain() + display_name = cls.get_display_name() + if subdomain is None: + return [] + dns_records = [ + ServiceDnsRecord( + type="A", + name=subdomain, + content=ip4, + ttl=3600, + display_name=display_name, + ) + ] + if ip6 is not None: + dns_records.append( + ServiceDnsRecord( + type="AAAA", + name=subdomain, + content=ip6, + ttl=3600, + display_name=f"{display_name} (IPv6)", + ) + ) + return dns_records @classmethod def get_drive(cls) -> str: @@ -237,7 +267,7 @@ class Service(ABC): return root_device @classmethod - def get_folders(cls) -> typing.List[str]: + def get_folders(cls) -> List[str]: """ get a plain list of occupied directories Default extracts info from overriden get_owned_folders() @@ -249,7 +279,7 @@ class Service(ABC): return [owned_folder.path for owned_folder in cls.get_owned_folders()] @classmethod - def get_owned_folders(cls) -> typing.List[OwnedPath]: + def get_owned_folders(cls) -> List[OwnedPath]: """ Get a list of occupied directories with ownership info Default extracts info from overriden get_folders() diff --git a/selfprivacy_api/services/test_service/__init__.py b/selfprivacy_api/services/test_service/__init__.py index 1e315f5..803896b 100644 --- a/selfprivacy_api/services/test_service/__init__.py +++ b/selfprivacy_api/services/test_service/__init__.py @@ -65,6 +65,10 @@ class DummyService(Service): domain = "test.com" return f"https://password.{domain}" + @staticmethod + def get_subdomain() -> typing.Optional[str]: + return "password" + @classmethod def is_movable(cls) -> bool: return cls.movable @@ -185,26 +189,6 @@ class DummyService(Service): def get_folders(cls) -> List[str]: return cls.folders - @staticmethod - def get_dns_records() -> typing.List[ServiceDnsRecord]: - """Return list of DNS records for Bitwarden service.""" - return [ - ServiceDnsRecord( - type="A", - name="password", - content=network_utils.get_ip4(), - ttl=3600, - display_name="Test Service", - ), - ServiceDnsRecord( - type="AAAA", - name="password", - content=network_utils.get_ip6(), - ttl=3600, - display_name="Test Service (IPv6)", - ), - ] - def move_to_volume(self, volume: BlockDevice) -> Job: job = Jobs.add( type_id=f"services.{self.get_id()}.move", diff --git a/selfprivacy_api/utils/network.py b/selfprivacy_api/utils/network.py index c1b8a2b..e0ace0e 100644 --- a/selfprivacy_api/utils/network.py +++ b/selfprivacy_api/utils/network.py @@ -17,13 +17,15 @@ def get_ip4() -> str: return ip4.group(1) if ip4 else "" -def get_ip6() -> str: +def get_ip6() -> Optional[str]: """Get IPv6 address""" try: ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( "utf-8" ) - ip6 = re.search(r"inet6 (\S+)\/\d+", ip6) + # We ignore link-local addresses + ip6 = re.search(r"inet6 (?!fe80:\S+)(\S+)\/\d+", ip6) + except subprocess.CalledProcessError: ip6 = None - return ip6.group(1) if ip6 else "" + return ip6.group(1) if ip6 else None diff --git a/tests/test_network_utils.py b/tests/test_network_utils.py index 0662584..ed4df3d 100644 --- a/tests/test_network_utils.py +++ b/tests/test_network_utils.py @@ -8,6 +8,19 @@ import pytest from selfprivacy_api.utils.network import get_ip4, get_ip6 OUTPUT_STRING = b""" +2: eth0: mtu 1500 qdisc fq_codel state UP group default qlen 1000 + link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff + altname enp0s3 + altname ens3 + inet 157.90.247.192/32 brd 157.90.247.192 scope global dynamic eth0 + valid_lft 46061sec preferred_lft 35261sec + inet6 fe80::9400:ff:fef1:34ae/64 scope link + valid_lft forever preferred_lft forever + inet6 2a01:4f8:c17:7e3d::2/64 scope global + valid_lft forever preferred_lft forever +""" + +OUTPUT_STRING_WITOUT_IP6 = b""" 2: eth0: mtu 1500 qdisc fq_codel state UP group default qlen 1000 link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff altname enp0s3 @@ -31,6 +44,14 @@ def ip_process_mock(mocker): return mock +@pytest.fixture +def ip_process_mock_without_ip6(mocker): + mock = mocker.patch( + "subprocess.check_output", autospec=True, return_value=OUTPUT_STRING_WITOUT_IP6 + ) + return mock + + @pytest.fixture def failed_ip_process_mock(mocker): mock = mocker.patch( @@ -62,24 +83,29 @@ def test_get_ip4(ip_process_mock): def test_get_ip6(ip_process_mock): """Test get IPv6 address""" ip6 = get_ip6() - assert ip6 == "fe80::9400:ff:fef1:34ae" + assert ip6 == "2a01:4f8:c17:7e3d::2" def test_failed_get_ip4(failed_ip_process_mock): ip4 = get_ip4() - assert ip4 is "" + assert ip4 == "" def test_failed_get_ip6(failed_ip_process_mock): ip6 = get_ip6() - assert ip6 is "" + assert ip6 is None + + +def test_failed_get_ip6_when_none(ip_process_mock_without_ip6): + ip6 = get_ip6() + assert ip6 is None def test_failed_subprocess_get_ip4(failed_subprocess_call): ip4 = get_ip4() - assert ip4 is "" + assert ip4 == "" def test_failed_subprocess_get_ip6(failed_subprocess_call): ip6 = get_ip6() - assert ip6 is "" + assert ip6 is None diff --git a/tests/test_services.py b/tests/test_services.py index c5eff66..9dbecd8 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -169,12 +169,12 @@ def test_enabling_disabling_writes_json( # more detailed testing of this is in test_graphql/test_system.py def test_mailserver_with_dkim_returns_some_dns(dkim_file): - records = MailServer().get_dns_records() + records = MailServer().get_dns_records("203.0.113.3", "2001:db8::1") assert len(records) > 0 def test_mailserver_with_no_dkim_returns_no_dns(no_dkim_file): - assert MailServer().get_dns_records() == [] + assert MailServer().get_dns_records("203.0.113.3", "2001:db8::1") == [] def test_services_enabled_by_default(generic_userdata): From bbec9d9d33fe435581dd9f256c53d1ed55200ce6 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Fri, 1 Mar 2024 14:58:28 +0300 Subject: [PATCH 2/3] refactor: use ipaddress library for ip validation --- selfprivacy_api/services/mailserver/__init__.py | 2 +- selfprivacy_api/utils/network.py | 11 ++++++----- tests/test_services.py | 5 +++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/selfprivacy_api/services/mailserver/__init__.py b/selfprivacy_api/services/mailserver/__init__.py index 5f7365f..b82a793 100644 --- a/selfprivacy_api/services/mailserver/__init__.py +++ b/selfprivacy_api/services/mailserver/__init__.py @@ -44,7 +44,7 @@ class MailServer(Service): return None @staticmethod - def get_subdomain() -> str | None: + def get_subdomain() -> Optional[str]: return None @staticmethod diff --git a/selfprivacy_api/utils/network.py b/selfprivacy_api/utils/network.py index e0ace0e..5ccbd37 100644 --- a/selfprivacy_api/utils/network.py +++ b/selfprivacy_api/utils/network.py @@ -2,6 +2,7 @@ """Network utils""" import subprocess import re +import ipaddress from typing import Optional @@ -23,9 +24,9 @@ def get_ip6() -> Optional[str]: ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( "utf-8" ) - # We ignore link-local addresses - ip6 = re.search(r"inet6 (?!fe80:\S+)(\S+)\/\d+", ip6) - + ip6 = re.findall(r"inet6 (\S+)\/\d+", ip6) + for address in ip6: + if ipaddress.IPv6Address(address).is_global: + return address except subprocess.CalledProcessError: - ip6 = None - return ip6.group(1) if ip6 else None + return None diff --git a/tests/test_services.py b/tests/test_services.py index 9dbecd8..de3665a 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -168,13 +168,14 @@ def test_enabling_disabling_writes_json( # more detailed testing of this is in test_graphql/test_system.py +# Using the same random global IPs as the test_network_utils def test_mailserver_with_dkim_returns_some_dns(dkim_file): - records = MailServer().get_dns_records("203.0.113.3", "2001:db8::1") + records = MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2") assert len(records) > 0 def test_mailserver_with_no_dkim_returns_no_dns(no_dkim_file): - assert MailServer().get_dns_records("203.0.113.3", "2001:db8::1") == [] + assert MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2") == [] def test_services_enabled_by_default(generic_userdata): From 5616dbe77a965800f13549ff7198ef92e1934409 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Fri, 1 Mar 2024 15:06:32 +0300 Subject: [PATCH 3/3] style: rename ip6 addresses variable --- selfprivacy_api/utils/network.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/selfprivacy_api/utils/network.py b/selfprivacy_api/utils/network.py index 5ccbd37..b5d76ec 100644 --- a/selfprivacy_api/utils/network.py +++ b/selfprivacy_api/utils/network.py @@ -21,11 +21,11 @@ def get_ip4() -> str: def get_ip6() -> Optional[str]: """Get IPv6 address""" try: - ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( + ip6_addresses = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( "utf-8" ) - ip6 = re.findall(r"inet6 (\S+)\/\d+", ip6) - for address in ip6: + ip6_addresses = re.findall(r"inet6 (\S+)\/\d+", ip6_addresses) + for address in ip6_addresses: if ipaddress.IPv6Address(address).is_global: return address except subprocess.CalledProcessError: