feature(permissions): RecordIp generator #36

This commit is contained in:
Mojib Wali
2021-01-05 09:44:27 +01:00
committed by GitHub
parent e2b1c59c5d
commit ce97c5378c
8 changed files with 198 additions and 35 deletions

View File

@@ -43,3 +43,6 @@ recursive-include invenio_config_tugraz *.json
recursive-include invenio_config_tugraz *.key recursive-include invenio_config_tugraz *.key
recursive-include invenio_config_tugraz *.xml recursive-include invenio_config_tugraz *.xml
recursive-include invenio_config_tugraz *.gitkeep recursive-include invenio_config_tugraz *.gitkeep
# added by check-manifest
recursive-include invenio_config_tugraz *.csv

View File

@@ -8,6 +8,8 @@
"""invenio module that adds tugraz configs.""" """invenio module that adds tugraz configs."""
from os.path import abspath, dirname, join
from flask_babelex import gettext as _ from flask_babelex import gettext as _
INVENIO_CONFIG_TUGRAZ_SHIBBOLETH = True INVENIO_CONFIG_TUGRAZ_SHIBBOLETH = True
@@ -199,7 +201,22 @@ RECAPTCHA_PRIVATE_KEY = None
# ) # )
# #
# Uncomment these to enable overriding RDM permissions # Uncomment these to enable overriding RDM permissions
# RDM_RECORDS_BIBLIOGRAPHIC_SERVICE_CONFIG = ( RDM_RECORDS_BIBLIOGRAPHIC_SERVICE_CONFIG = (
# 'invenio_config_tugraz.rdm_permissions.TUGRAZBibliographicRecordServiceConfig' 'invenio_config_tugraz.rdm_permissions.TUGRAZBibliographicRecordServiceConfig'
# ) )
"""Access control configuration for records.""" """Access control configuration for records."""
# invenio-rdm-records
# =======
# See:
# https://invenio-rdm-records.readthedocs.io/en/latest/configuration.html
#
# Custom Access Right
RDM_RECORDS_CUSTOM_VOCABULARIES = {
'access_right': {
'path': join(
dirname(abspath(__file__)),
'restrictions', 'access_right', 'access_right.csv'
)
}
}

View File

@@ -153,28 +153,33 @@ The succinct encoding of the permissions for your instance gives you
from elasticsearch_dsl.query import Q from elasticsearch_dsl.query import Q
from flask import current_app, request from flask import current_app, request
from invenio_access.permissions import any_user, superuser_access
from invenio_records_permissions.generators import Generator from invenio_records_permissions.generators import Generator
class RecordIp(Generator): class RecordIp(Generator):
"""Allowed any user with accessing with the IP.""" """Allowed any user with accessing with the IP."""
# TODO: Implement def needs(self, record=None, **kwargs):
def needs(self, **kwargs): """Enabling Needs, Set of Needs granting permission."""
"""Enabling Needs, Set of Needs granting permission. if record is None:
return []
If ANY of the Needs are matched, permission is granted. # check if singleip is in the records restriction
is_single_ip = record.get("access", {}).get("access_right") == "singleip"
.. note:: # check if the user ip is on list
visible = self.check_permission()
``_load_permissions()`` method from `Permission if not is_single_ip:
<https://invenio-access.readthedocs.io/en/latest/api.html # if record does not have singleip - return any_user
#invenio_access.permissions.Permission>`_ adds by default the return [any_user]
``superuser_access`` Need (if tied to a User or Role) for us. # if record has singleip, then check the ip of user - if ip user is on list - return any_user
It also expands ActionNeeds into the Users/Roles that elif visible:
provide them. return [any_user]
""" else:
return [] # non of the above - return empty
return []
def excludes(self, **kwargs): def excludes(self, **kwargs):
"""Preventing Needs, Set of Needs denying permission. """Preventing Needs, Set of Needs denying permission.
@@ -196,19 +201,23 @@ class RecordIp(Generator):
""" """
return [] return []
def query_filter(self, **kwargs): def query_filter(self, *args, **kwargs):
"""Elasticsearch filters, List of ElasticSearch query filters. """Filters for singleip records."""
# check if the user ip is on list
visible = self.check_permission()
These filters consist of additive queries mapping to what the current if not visible:
user should be able to retrieve via search. # If user ip is not on the list, and If the record contains 'singleip' will not be seen
""" return ~Q("match", **{"access.access_right": "singleip"})
# Lists all records
return Q("match_all") return Q("match_all")
def check_permission(self): def check_permission(self):
"""Check for User IP address in config variable.""" """Check for User IP address in config variable."""
# Get user IP # Get user IP
user_ip = request.remote_addr # pragma: no cover user_ip = request.remote_addr
# Checks if the user IP is among single IPs # Checks if the user IP is among single IPs
if user_ip in current_app.config["INVENIO_CONFIG_TUGRAZ_SINGLE_IP"]: # pragma: no cover if user_ip in current_app.config["INVENIO_CONFIG_TUGRAZ_SINGLE_IP"]:
return True # pragma: no cover return True
return False # pragma: no cover return False

View File

@@ -60,7 +60,6 @@ from invenio_rdm_records.services import (
from invenio_records_permissions.generators import ( from invenio_records_permissions.generators import (
Admin, Admin,
AnyUser, AnyUser,
AnyUserIfPublic,
RecordOwners, RecordOwners,
SuperUser, SuperUser,
) )
@@ -69,15 +68,43 @@ from .generators import RecordIp
class TUGRAZPermissionPolicy(RDMRecordPermissionPolicy): class TUGRAZPermissionPolicy(RDMRecordPermissionPolicy):
"""Access control configuration for records. """Access control configuration for rdm records.
This overrides the /api/records endpoint. This overrides the origin:
https://github.com/inveniosoftware/invenio-rdm-records/blob/master/invenio_rdm_records/services/permissions.py.
""" """
# Create action given to no one (Not even superusers) bc Deposits should # Read access given to:
# be used. # TODO:
can_create = [SuperUser()] # AnyUserIfPublic : grant access if record is public
# RecordIp: grant access for single_ip
# RecordOwners: owner of records, enable once the deposit is allowed only for loged-in users.
# CURRENT:
# AnyUser
# RecordIp: grant access for single_ip
can_read = [AnyUser(), RecordIp()] # RecordOwners()
# Search access given to:
# AnyUser : grant access anyUser
# RecordIp: grant access for single_ip
can_search = [AnyUser(), RecordIp()]
# Update access given to record owners.
can_update = [RecordOwners()]
# Delete access given to admins only.
can_delete = [Admin()]
# TODO: create (AuthenticatedUser) generator
# Create action given to AuthenticatedUser
# UI - if user is loged in
# API - if user has be Access token (Bearer API-TOKEN)
# can_create = [AuthenticatedUser()]
# Associated files permissions (which are really bucket permissions)
# can_read_files = [AnyUserIfPublic(), RecordOwners()]
# can_update_files = [RecordOwners()]
class TUGRAZBibliographicRecordServiceConfig(BibliographicRecordServiceConfig): class TUGRAZBibliographicRecordServiceConfig(BibliographicRecordServiceConfig):

View File

@@ -0,0 +1,6 @@
access_right,access_right_name,icon,notes
open, Open Access, lock open
embargoed, Embargoed, ban
restricted, Restricted, key
closed, Private, lock
singleip, Single Ip, lock
1 access_right,access_right_name,icon,notes
2 open, Open Access, lock open
3 embargoed, Embargoed, ban
4 restricted, Restricted, key
5 closed, Private, lock
6 singleip, Single Ip, lock

View File

@@ -20,6 +20,7 @@ tests_require = [
"SQLAlchemy-Utils>=0.33.1,<0.36", "SQLAlchemy-Utils>=0.33.1,<0.36",
"invenio-rdm-records~=0.20.8", "invenio-rdm-records~=0.20.8",
"invenio-search[elasticsearch7]>=1.4.0", "invenio-search[elasticsearch7]>=1.4.0",
"psycopg2-binary>=2.8.6",
] ]
extras_require = { extras_require = {

View File

@@ -20,6 +20,7 @@ import pytest
from flask import Flask from flask import Flask
from flask_babelex import Babel from flask_babelex import Babel
from invenio_db import InvenioDB, db from invenio_db import InvenioDB, db
from sqlalchemy_utils.functions import create_database, database_exists, drop_database
from invenio_config_tugraz import InvenioConfigTugraz from invenio_config_tugraz import InvenioConfigTugraz
@@ -70,3 +71,93 @@ def create_app(request):
app.test_request_context().push() app.test_request_context().push()
return app return app
@pytest.fixture(scope='function')
def open_record():
"""Open record data as dict coming from the external world."""
return {
"access": {
"metadata": False,
"files": False,
"owned_by": [1],
"access_right": "open"
},
"metadata": {
"publication_date": "2020-06-01",
"resource_type": {
"type": "image",
"subtype": "image-photo"
},
# Technically not required
"creators": [{
"name": "Troy Brown",
"type": "personal"
}, {
"name": "Phillip Lester",
"type": "personal",
"identifiers": {"orcid": "0000-0002-1825-0097"},
"affiliations": [{
"name": "Carter-Morris",
"identifiers": {"ror": "03yrm5c26"}
}]
}, {
"name": "Steven Williamson",
"type": "personal",
"identifiers": {"orcid": "0000-0002-1825-0097"},
"affiliations": [{
"name": "Ritter and Sons",
"identifiers": {"ror": "03yrm5c26"}
}, {
"name": "Montgomery, Bush and Madden",
"identifiers": {"ror": "03yrm5c26"}
}]
}],
"title": "A Romans story"
}
}
@pytest.fixture(scope='function')
def singleip_record():
"""Single Ip record data as dict coming from the external world."""
return {
"access": {
"metadata": False,
"files": False,
"owned_by": [1],
"access_right": "singleip"
},
"metadata": {
"publication_date": "2020-06-01",
"resource_type": {
"type": "image",
"subtype": "image-photo"
},
# Technically not required
"creators": [{
"name": "Troy Brown",
"type": "personal"
}, {
"name": "Phillip Lester",
"type": "personal",
"identifiers": {"orcid": "0000-0002-1825-0097"},
"affiliations": [{
"name": "Carter-Morris",
"identifiers": {"ror": "03yrm5c26"}
}]
}, {
"name": "Steven Williamson",
"type": "personal",
"identifiers": {"orcid": "0000-0002-1825-0097"},
"affiliations": [{
"name": "Ritter and Sons",
"identifiers": {"ror": "03yrm5c26"}
}, {
"name": "Montgomery, Bush and Madden",
"identifiers": {"ror": "03yrm5c26"}
}]
}],
"title": "A Romans story"
}
}

View File

@@ -8,13 +8,22 @@
"""Test Generators.""" """Test Generators."""
from invenio_access.permissions import any_user
from invenio_config_tugraz.generators import RecordIp from invenio_config_tugraz.generators import RecordIp
def test_recordip(): def test_recordip(create_app, open_record, singleip_record):
"""Test Generator RecordIp.""" """Test Generator RecordIp."""
generator = RecordIp() generator = RecordIp()
open_record = open_record
singleiprec = singleip_record
assert generator.needs() == [] assert generator.needs(record=None) == []
assert generator.excludes() == [] assert generator.needs(record=open_record) == [any_user]
assert generator.query_filter().to_dict() == {"match_all": {}} assert generator.needs(record=singleiprec) == []
assert generator.excludes(record=open_record) == []
assert generator.excludes(record=open_record) == []
assert generator.query_filter().to_dict() == {'bool': {'must_not': [{'match': {'access.access_right': 'singleip'}}]}}