mirror of
https://github.com/Cian-H/invenio-config-iform.git
synced 2025-12-23 05:21:57 +00:00
perm: implement single-ip and ip-network
* with that addition it is possible to restrict records to an special ip or an ip network
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
#
|
#
|
||||||
# Copyright (C) 2020-2022 Graz University of Technology.
|
# Copyright (C) 2020-2024 Graz University of Technology.
|
||||||
#
|
#
|
||||||
# invenio-config-tugraz is free software; you can redistribute it and/or
|
# invenio-config-tugraz is free software; you can redistribute it and/or
|
||||||
# modify it under the terms of the MIT License; see LICENSE file for more
|
# modify it under the terms of the MIT License; see LICENSE file for more
|
||||||
@@ -9,7 +9,6 @@
|
|||||||
"""invenio module that adds tugraz configs."""
|
"""invenio module that adds tugraz configs."""
|
||||||
|
|
||||||
from .ext import InvenioConfigTugraz
|
from .ext import InvenioConfigTugraz
|
||||||
from .permissions.generators import RecordIp
|
|
||||||
from .utils import get_identity_from_user_by_email
|
from .utils import get_identity_from_user_by_email
|
||||||
|
|
||||||
__version__ = "0.12.1"
|
__version__ = "0.12.1"
|
||||||
@@ -17,6 +16,5 @@ __version__ = "0.12.1"
|
|||||||
__all__ = (
|
__all__ = (
|
||||||
"__version__",
|
"__version__",
|
||||||
"InvenioConfigTugraz",
|
"InvenioConfigTugraz",
|
||||||
"RecordIp",
|
|
||||||
"get_identity_from_user_by_email",
|
"get_identity_from_user_by_email",
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
#
|
#
|
||||||
# Copyright (C) 2020-2023 Graz University of Technology.
|
# Copyright (C) 2020-2024 Graz University of Technology.
|
||||||
#
|
#
|
||||||
# invenio-config-tugraz is free software; you can redistribute it and/or
|
# invenio-config-tugraz is free software; you can redistribute it and/or
|
||||||
# modify it under the terms of the MIT License; see LICENSE file for more
|
# modify it under the terms of the MIT License; see LICENSE file for more
|
||||||
@@ -10,23 +10,26 @@
|
|||||||
|
|
||||||
from invenio_i18n import gettext as _
|
from invenio_i18n import gettext as _
|
||||||
|
|
||||||
INVENIO_CONFIG_TUGRAZ_SHIBBOLETH = False
|
CONFIG_TUGRAZ_SHIBBOLETH = False
|
||||||
"""Set True if SAML is configured"""
|
"""Set True if SAML is configured"""
|
||||||
|
|
||||||
INVENIO_CONFIG_TUGRAZ_SINGLE_IP = []
|
CONFIG_TUGRAZ_SINGLE_IPS = []
|
||||||
"""Allows access to users whose IP address is listed.
|
"""Allows access to users whose IP address is listed.
|
||||||
|
|
||||||
INVENIO_CONFIG_TUGRAZ_SINGLE_IP =
|
INVENIO_CONFIG_TUGRAZ_SINGLE_IPS =
|
||||||
["127.0.0.1", "127.0.0.2"]
|
["127.0.0.1", "127.0.0.2"]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
INVENIO_CONFIG_TUGRAZ_IP_RANGES = []
|
CONFIG_TUGRAZ_IP_RANGES = []
|
||||||
"""Allows access to users whose range of IP address is listed.
|
"""Allows access to users whose range of IP address is listed.
|
||||||
|
|
||||||
INVENIO_CONFIG_TUGRAZ_IP_RANGES =
|
INVENIO_CONFIG_TUGRAZ_IP_RANGES =
|
||||||
[["127.0.0.2", "127.0.0.99"], ["127.0.1.3", "127.0.1.5"]]
|
[["127.0.0.2", "127.0.0.99"], ["127.0.1.3", "127.0.1.5"]]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
CONFIG_TUGRAZ_IP_NETWORK = ""
|
||||||
|
"""Allows access to users who are in the IP network."""
|
||||||
|
|
||||||
|
|
||||||
CONFIG_TUGRAZ_ROUTES = {
|
CONFIG_TUGRAZ_ROUTES = {
|
||||||
"guide": "/guide",
|
"guide": "/guide",
|
||||||
|
|||||||
15
invenio_config_tugraz/custom_fields/__init__.py
Normal file
15
invenio_config_tugraz/custom_fields/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright (C) 2024 Graz University of Technology.
|
||||||
|
#
|
||||||
|
# invenio-config-tugraz is free software; you can redistribute it and/or
|
||||||
|
# modify it under the terms of the MIT License; see LICENSE file for more
|
||||||
|
# details.
|
||||||
|
|
||||||
|
"""Custom fields."""
|
||||||
|
|
||||||
|
|
||||||
|
from invenio_records_resources.services.custom_fields import BooleanCF
|
||||||
|
|
||||||
|
ip_network = BooleanCF(name="ip_network")
|
||||||
|
single_ip = BooleanCF(name="single_ip")
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
#
|
#
|
||||||
# Copyright (C) 2020-2022 Graz University of Technology.
|
# Copyright (C) 2020-2024 Graz University of Technology.
|
||||||
#
|
#
|
||||||
# invenio-config-tugraz is free software; you can redistribute it and/or
|
# invenio-config-tugraz is free software; you can redistribute it and/or
|
||||||
# modify it under the terms of the MIT License; see LICENSE file for more
|
# modify it under the terms of the MIT License; see LICENSE file for more
|
||||||
@@ -9,6 +9,7 @@
|
|||||||
"""invenio module that adds tugraz configs."""
|
"""invenio module that adds tugraz configs."""
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
|
from .custom_fields import ip_network, single_ip
|
||||||
|
|
||||||
|
|
||||||
class InvenioConfigTugraz(object):
|
class InvenioConfigTugraz(object):
|
||||||
@@ -22,6 +23,7 @@ class InvenioConfigTugraz(object):
|
|||||||
def init_app(self, app):
|
def init_app(self, app):
|
||||||
"""Flask application initialization."""
|
"""Flask application initialization."""
|
||||||
self.init_config(app)
|
self.init_config(app)
|
||||||
|
self.add_custom_fields(app)
|
||||||
app.extensions["invenio-config-tugraz"] = self
|
app.extensions["invenio-config-tugraz"] = self
|
||||||
|
|
||||||
def init_config(self, app):
|
def init_config(self, app):
|
||||||
@@ -30,6 +32,12 @@ class InvenioConfigTugraz(object):
|
|||||||
if k.startswith("INVENIO_CONFIG_TUGRAZ_"):
|
if k.startswith("INVENIO_CONFIG_TUGRAZ_"):
|
||||||
app.config.setdefault(k, getattr(config, k))
|
app.config.setdefault(k, getattr(config, k))
|
||||||
|
|
||||||
|
def add_custom_fields(self, app):
|
||||||
|
"""Add custom fields."""
|
||||||
|
app.config.setdefault("RDM_CUSTOM_FIELDS", [])
|
||||||
|
app.config["RDM_CUSTOM_FIELDS"].append(ip_network)
|
||||||
|
app.config["RDM_CUSTOM_FIELDS"].append(single_ip)
|
||||||
|
|
||||||
|
|
||||||
def finalize_app(app):
|
def finalize_app(app):
|
||||||
"""Finalize app."""
|
"""Finalize app."""
|
||||||
|
|||||||
@@ -45,7 +45,11 @@ method specifies those from the actor's point-of-view in search scenarios.
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from ipaddress import ip_address, ip_network
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from flask import current_app, request
|
from flask import current_app, request
|
||||||
|
from flask_principal import Need
|
||||||
from invenio_access.permissions import any_user
|
from invenio_access.permissions import any_user
|
||||||
from invenio_records_permissions.generators import Generator
|
from invenio_records_permissions.generators import Generator
|
||||||
from invenio_search.engine import dsl
|
from invenio_search.engine import dsl
|
||||||
@@ -53,32 +57,27 @@ from invenio_search.engine import dsl
|
|||||||
from .roles import tugraz_authenticated_user
|
from .roles import tugraz_authenticated_user
|
||||||
|
|
||||||
|
|
||||||
class RecordIp(Generator):
|
class RecordSingleIP(Generator):
|
||||||
"""Allowed any user with accessing with the IP."""
|
"""Allowed any user with accessing with the IP."""
|
||||||
|
|
||||||
def needs(self, record=None, **kwargs):
|
def needs(self, record: dict | None = None, **__: dict) -> list[Need]:
|
||||||
"""Enabling Needs, Set of Needs granting permission."""
|
"""Set of Needs granting permission. Enabling Needs."""
|
||||||
if record is None:
|
if record is None:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# check if singleip is in the records restriction
|
|
||||||
is_single_ip = record.get("access", {}).get("access_right") == "singleip"
|
|
||||||
|
|
||||||
# check if the user ip is on list
|
|
||||||
visible = self.check_permission()
|
|
||||||
|
|
||||||
if not is_single_ip:
|
|
||||||
# if record does not have singleip - return any_user
|
# if record does not have singleip - return any_user
|
||||||
|
if not record.get("custom_fields", {}).get("single_ip", False):
|
||||||
return [any_user]
|
return [any_user]
|
||||||
# if record has singleip, then check the ip of user - if ip user is on list - return any_user
|
|
||||||
elif visible:
|
# if record has singleip, and the ip of the user matches the allowed ip
|
||||||
|
if self.check_permission():
|
||||||
return [any_user]
|
return [any_user]
|
||||||
else:
|
|
||||||
# non of the above - return empty
|
# non of the above - return empty
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def excludes(self, **kwargs):
|
def excludes(self, **kwargs: dict) -> list[Need]:
|
||||||
"""Preventing Needs, Set of Needs denying permission.
|
"""Set of Needs denying permission. Preventing Needs.
|
||||||
|
|
||||||
If ANY of the Needs are matched, permission is revoked.
|
If ANY of the Needs are matched, permission is revoked.
|
||||||
|
|
||||||
@@ -95,33 +94,116 @@ class RecordIp(Generator):
|
|||||||
If the same Need is returned by `needs` and `excludes`, then that
|
If the same Need is returned by `needs` and `excludes`, then that
|
||||||
Need provider is disallowed.
|
Need provider is disallowed.
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
if (
|
||||||
|
kwargs["record"]["custom_fields"]["single_ip"]
|
||||||
|
and not self.check_permission()
|
||||||
|
):
|
||||||
|
return [any_user]
|
||||||
|
|
||||||
|
except KeyError:
|
||||||
|
return []
|
||||||
|
else:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def query_filter(self, *args, **kwargs):
|
def query_filter(self, *_: dict, **__: dict) -> Any: # noqa: ANN401
|
||||||
"""Filters for singleip records."""
|
"""Filter for singleip records."""
|
||||||
# check if the user ip is on list
|
if not self.check_permission():
|
||||||
visible = self.check_permission()
|
|
||||||
|
|
||||||
if not visible:
|
|
||||||
# If user ip is not on the list, and If the record contains 'singleip' will not be seen
|
# If user ip is not on the list, and If the record contains 'singleip' will not be seen
|
||||||
return ~dsl.Q("match", **{"access.access_right": "singleip"})
|
return ~dsl.Q("match", **{"custom_fields.single_ip": True})
|
||||||
|
|
||||||
# Lists all records
|
# Lists all records
|
||||||
return dsl.Q("match_all")
|
return dsl.Q("match_all")
|
||||||
|
|
||||||
def check_permission(self):
|
def check_permission(self) -> bool:
|
||||||
"""Check for User IP address in config variable."""
|
"""Check for User IP address in config variable.
|
||||||
# Get user IP
|
|
||||||
|
If the user ip is in the configured list return True.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
user_ip = request.remote_addr
|
user_ip = request.remote_addr
|
||||||
# Checks if the user IP is among single IPs
|
except RuntimeError:
|
||||||
if user_ip in current_app.config["INVENIO_CONFIG_TUGRAZ_SINGLE_IP"]:
|
return False
|
||||||
return True
|
|
||||||
|
single_ips = current_app.config["CONFIG_TUGRAZ_SINGLE_IPS"]
|
||||||
|
|
||||||
|
return user_ip in single_ips
|
||||||
|
|
||||||
|
|
||||||
|
class AllowedFromIPNetwork(Generator):
|
||||||
|
"""Allowed from ip range."""
|
||||||
|
|
||||||
|
def needs(self, record: dict | None = None, **__: dict) -> list[Need]:
|
||||||
|
"""Set of Needs granting permission. Enabling Needs."""
|
||||||
|
if record is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# if the record doesn't have set the ip range allowance
|
||||||
|
if not record.get("custom_fields", {}).get("ip_network", False):
|
||||||
|
return [any_user]
|
||||||
|
|
||||||
|
# if the record has set the ip_range allowance and is in the range
|
||||||
|
if self.check_permission():
|
||||||
|
return [any_user]
|
||||||
|
|
||||||
|
# non of the above - return empty
|
||||||
|
return []
|
||||||
|
|
||||||
|
def excludes(self, **kwargs: dict) -> Need:
|
||||||
|
"""Set of Needs denying permission. Preventing Needs.
|
||||||
|
|
||||||
|
If ANY of the Needs are matched, permission is revoked.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
``_load_permissions()`` method from `Permission
|
||||||
|
<https://invenio-access.readthedocs.io/en/latest/api.html
|
||||||
|
#invenio_access.permissions.Permission>`_ adds by default the
|
||||||
|
``superuser_access`` Need (if tied to a User or Role) for us.
|
||||||
|
|
||||||
|
It also expands ActionNeeds into the Users/Roles that
|
||||||
|
provide them.
|
||||||
|
|
||||||
|
If the same Need is returned by `needs` and `excludes`, then that
|
||||||
|
Need provider is disallowed.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if (
|
||||||
|
kwargs["record"]["custom_fields"]["ip_network"]
|
||||||
|
and not self.check_permission()
|
||||||
|
):
|
||||||
|
return [any_user]
|
||||||
|
|
||||||
|
except KeyError:
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def query_filter(self, *_: dict, **__: dict) -> Any: # noqa: ANN401
|
||||||
|
"""Filter for ip range records."""
|
||||||
|
if not self.check_permission():
|
||||||
|
return ~dsl.Q("match", **{"custom_fields.ip_network": True})
|
||||||
|
|
||||||
|
return dsl.Q("match_all")
|
||||||
|
|
||||||
|
def check_permission(self) -> bool:
|
||||||
|
"""Check for User IP address in the configured network."""
|
||||||
|
try:
|
||||||
|
user_ip = request.remote_addr
|
||||||
|
except RuntimeError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
network = current_app.config["CONFIG_TUGRAZ_IP_NETWORK"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
return ip_address(user_ip) in ip_network(network)
|
||||||
|
except ValueError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
class TUGrazAuthenticatedUser(Generator):
|
class TUGrazAuthenticatedUser(Generator):
|
||||||
"""Generates the `tugraz_authenticated_user` role-need."""
|
"""Generates the `tugraz_authenticated_user` role-need."""
|
||||||
|
|
||||||
def needs(self, **__):
|
def needs(self, **__: dict) -> list[Need]:
|
||||||
"""Generate needs to be checked against current user identity."""
|
"""Generate needs to be checked against current user identity."""
|
||||||
return [tugraz_authenticated_user]
|
return [tugraz_authenticated_user]
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ from invenio_records_permissions.generators import (
|
|||||||
from invenio_records_permissions.policies.records import RecordPermissionPolicy
|
from invenio_records_permissions.policies.records import RecordPermissionPolicy
|
||||||
from invenio_users_resources.services.permissions import UserManager
|
from invenio_users_resources.services.permissions import UserManager
|
||||||
|
|
||||||
from .generators import TUGrazAuthenticatedUser
|
from .generators import AllowedFromIPNetwork, RecordSingleIP, TUGrazAuthenticatedUser
|
||||||
|
|
||||||
|
|
||||||
class TUGrazRDMRecordPermissionPolicy(RecordPermissionPolicy):
|
class TUGrazRDMRecordPermissionPolicy(RecordPermissionPolicy):
|
||||||
@@ -87,11 +87,18 @@ class TUGrazRDMRecordPermissionPolicy(RecordPermissionPolicy):
|
|||||||
SubmissionReviewer(),
|
SubmissionReviewer(),
|
||||||
CommunityInclusionReviewers(),
|
CommunityInclusionReviewers(),
|
||||||
RecordCommunitiesAction("view"),
|
RecordCommunitiesAction("view"),
|
||||||
|
AllowedFromIPNetwork(),
|
||||||
|
RecordSingleIP(),
|
||||||
]
|
]
|
||||||
|
|
||||||
can_tugraz_authenticated = [TUGrazAuthenticatedUser(), SystemProcess()]
|
can_tugraz_authenticated = [TUGrazAuthenticatedUser(), SystemProcess()]
|
||||||
can_authenticated = can_tugraz_authenticated
|
can_authenticated = can_tugraz_authenticated
|
||||||
can_all = [AnyUser(), SystemProcess()]
|
can_all = [
|
||||||
|
AnyUser(),
|
||||||
|
SystemProcess(),
|
||||||
|
AllowedFromIPNetwork(),
|
||||||
|
RecordSingleIP(),
|
||||||
|
]
|
||||||
|
|
||||||
#
|
#
|
||||||
# Miscellaneous
|
# Miscellaneous
|
||||||
|
|||||||
@@ -15,6 +15,9 @@ from invenio_config_tugraz.permissions.policies import TUGrazRDMRecordPermission
|
|||||||
ALLOWED_DIFFERENCES = {
|
ALLOWED_DIFFERENCES = {
|
||||||
"can_authenticated",
|
"can_authenticated",
|
||||||
"can_create",
|
"can_create",
|
||||||
|
"can_search",
|
||||||
|
"can_view",
|
||||||
|
"can_all",
|
||||||
"can_search_drafts",
|
"can_search_drafts",
|
||||||
"can_tugraz_authenticated",
|
"can_tugraz_authenticated",
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user