feat(be-auth): replace JWTMiddleware with auth class, update views, add token tracking

This commit is contained in:
Marc 2024-01-02 18:59:52 -05:00
parent db46e80bd7
commit 77314a4514
Signed by: marc
GPG key ID: 048E042F22B5DC79
14 changed files with 242 additions and 113 deletions

View file

@ -40,7 +40,6 @@ MIDDLEWARE = [
"corsheaders.middleware.CorsMiddleware", "corsheaders.middleware.CorsMiddleware",
"django.middleware.common.CommonMiddleware", "django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware", "django.middleware.csrf.CsrfViewMiddleware",
"identity.middleware.JwtMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware", "django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware",
@ -84,8 +83,7 @@ REST_FRAMEWORK = {
"rest_framework.renderers.JSONRenderer", "rest_framework.renderers.JSONRenderer",
], ],
"DEFAULT_AUTHENTICATION_CLASSES": [ "DEFAULT_AUTHENTICATION_CLASSES": [
"rest_framework.authentication.BasicAuthentication", "identity.authentication_classes.JwtAuthentication"
"rest_framework.authentication.SessionAuthentication",
], ],
} }

View file

@ -2,7 +2,9 @@
Global fixtures Global fixtures
""" """
import django.contrib.auth
import django.test as django_test import django.test as django_test
import django.urls
import pytest import pytest
@ -19,7 +21,15 @@ def fixture_test_user_creds():
return {"username": "testuser", "password": "testpassword"} return {"username": "testuser", "password": "testpassword"}
@pytest.fixture(name="test_user", autouse=True) @pytest.fixture(name="test_user")
def fixture_test_user(test_user_credentials):
"""Fetches the test user record and returns it."""
AuthUser = django.contrib.auth.get_user_model()
return AuthUser.objects.get(username=test_user_credentials["username"])
@pytest.fixture(name="create_test_user", autouse=True)
def fixture_create_test_user(django_user_model, test_user_credentials): def fixture_create_test_user(django_user_model, test_user_credentials):
django_user_model.objects.create_user(**test_user_credentials) django_user_model.objects.create_user(**test_user_credentials)
@ -34,5 +44,8 @@ def fixture_no_auth_client() -> django_test.Client:
def fixture_auth_client(test_user_credentials) -> django_test.Client: def fixture_auth_client(test_user_credentials) -> django_test.Client:
"""Authenticated HTTP client.""" """Authenticated HTTP client."""
client = django_test.Client() client = django_test.Client()
assert client.login(**test_user_credentials) response = client.post(
django.urls.reverse("auth-session-list"), test_user_credentials
)
assert response.status_code == 201
return client return client

View file

@ -5,7 +5,7 @@ import django.conf as django_conf
import rest_framework.viewsets as drf_viewsets import rest_framework.viewsets as drf_viewsets
import rest_framework.status as drf_status import rest_framework.status as drf_status
import rest_framework.views as drf_views import rest_framework.views as drf_views
import rest_framework.permissions as drf_permissions from rest_framework.permissions import IsAuthenticated
import files.serializers as files_serializers import files.serializers as files_serializers
import files.models as files_models import files.models as files_models
@ -47,7 +47,7 @@ class FileViewSet(drf_viewsets.ModelViewSet):
queryset = files_models.File.objects.all() queryset = files_models.File.objects.all()
serializer_class = files_serializers.FileSerializer serializer_class = files_serializers.FileSerializer
permission_classes = [drf_permissions.IsAuthenticated] permission_classes = [IsAuthenticated]
def get_queryset(self): def get_queryset(self):
return self.queryset.filter(owner_id=self.request.user.id) return self.queryset.filter(owner_id=self.request.user.id)
@ -100,8 +100,9 @@ class FileViewSet(drf_viewsets.ModelViewSet):
class FileDataView(drf_views.APIView): class FileDataView(drf_views.APIView):
"""File downloads""" """File downloads"""
permission_classes = [IsAuthenticated]
queryset = files_models.File.objects.all() queryset = files_models.File.objects.all()
permission_classes = [drf_permissions.IsAuthenticated]
def get_queryset(self): def get_queryset(self):
return self.queryset.filter(owner_id=self.request.user.id) return self.queryset.filter(owner_id=self.request.user.id)

View file

@ -16,12 +16,12 @@ pytestmark = [pytest.mark.anyio, pytest.mark.django_db]
], ],
ids=["details-data", "list", "details"], ids=["details-data", "list", "details"],
) )
def test_files_views_return_401_if_unauthenticated( def test_files_views_return_403_if_not_authenticated(
no_auth_client, route_name, route_params no_auth_client, route_name, route_params
): ):
"""The files API requires authentication.""" """The files API requires authentication."""
response = no_auth_client.get(django_urls.reverse(route_name, kwargs=route_params)) response = no_auth_client.get(django_urls.reverse(route_name, kwargs=route_params))
assert response.status_code == drf_status.HTTP_401_UNAUTHORIZED assert response.status_code == drf_status.HTTP_403_FORBIDDEN
def test_file_downloads_404_if_does_not_exist(auth_client): def test_file_downloads_404_if_does_not_exist(auth_client):

View file

@ -0,0 +1,51 @@
import logging
import django.contrib.auth
from rest_framework import authentication
import identity.jwt
from identity.models import AuthenticationToken
logger = logging.getLogger(__name__)
AuthUser = django.contrib.auth.get_user_model()
class RevokedTokenException(Exception):
pass
class JwtAuthentication(authentication.BaseAuthentication):
"""
Authentication class handling JWTs attached to requests via cookies.
A JWT is only accepted if it's not expired (i.e. can be decoded) and if
it has not been revoked (as per AuthenticationToken records). A revoked
token is declined even if the token itself has not expired yet and would
otherwise be valid.
"""
def authenticate(self, request):
jwt_cookie = request.COOKIES.get("jwt")
# No JWT, no auth.
if jwt_cookie is None:
return None
try:
decoded_token = identity.jwt.decode_token(jwt_cookie)
logger.info("Token: %s\nDecoded token: %s", jwt_cookie, decoded_token)
user = AuthUser.objects.get(pk=decoded_token["user_id"])
auth_token = AuthenticationToken.objects.get(id=decoded_token["token_id"])
if auth_token.revoked:
raise RevokedTokenException("Revoked tokens cannot be used")
return user, None
except Exception as e: # pylint: disable=broad-exception-caught
logger.exception(e, extra={"authorization_provided": jwt_cookie})
return None, None

View file

@ -0,0 +1,67 @@
import datetime
import freezegun
import pytest
from django.http import HttpRequest
import identity.jwt
from identity.models import AuthenticationToken
from identity.authentication_classes import JwtAuthentication
pytestmark = pytest.mark.django_db
def test_jwt_authentication_accepts_valid_unrevoked_tokens(test_user):
# An AuthenticationToken record exists for each issued token.
auth_token, token_data = identity.jwt.generate_token_for_user(test_user.id)
AuthenticationToken.objects.create(
id=token_data["token_id"],
user_id=token_data["user_id"],
expires_at=datetime.datetime.fromtimestamp(token_data["exp"]),
)
request = HttpRequest()
request.COOKIES["jwt"] = auth_token
user, _ = JwtAuthentication().authenticate(request)
assert user is not None
assert user.id == token_data["user_id"]
def test_jwt_authentication_declines_expired_tokens(test_user):
# Generating an expired token.
with freezegun.freeze_time("2012-01-01"):
auth_token, _ = identity.jwt.generate_token_for_user(test_user.id)
request = HttpRequest()
request.COOKIES["jwt"] = auth_token
user, _ = JwtAuthentication().authenticate(request)
assert user is None
def test_jwt_authentication_declines_revoked_tokens(test_user):
auth_token, token_data = identity.jwt.generate_token_for_user(test_user.id)
AuthenticationToken.objects.create(
id=token_data["token_id"],
user_id=token_data["user_id"],
expires_at=datetime.datetime.fromtimestamp(token_data["exp"]),
revoked=True,
)
request = HttpRequest()
request.COOKIES["jwt"] = auth_token
user, _ = JwtAuthentication().authenticate(request)
assert user is None
def test_jwt_authentication_declines_invalid_tokens():
request = HttpRequest()
request.COOKIES["jwt"] = "notatoken"
user, _ = JwtAuthentication().authenticate(request)
assert user is None

View file

@ -1,15 +1,24 @@
import datetime import datetime
import uuid import uuid
import typing
import django.conf import django.conf
import jwt import jwt
def generate_token_for_user(user_id: int) -> str: class TokenData(typing.TypedDict):
exp: int
user_id: int
token_id: str
def generate_token_for_user(user_id: int) -> tuple[str, TokenData]:
""" """
Generates an identity token for a given user. Generates an identity token for a given user.
Returns both the token data (decoded) and the encoding token string.
The token expires in JWT_EXPIRATION seconds (defined in base.settings) and The token expires in JWT_EXPIRATION seconds (defined in base.settings) and
only contains the user's ID and a token ID that can be used to track the only contains the user's ID and a token ID that can be used to track the
token once emitted. token once emitted.
@ -24,8 +33,11 @@ def generate_token_for_user(user_id: int) -> str:
"token_id": str(uuid.uuid4()), "token_id": str(uuid.uuid4()),
} }
return jwt.encode( return (
token_data, django.conf.settings.JWT_SIGNING_SECRET, algorithm="HS256" jwt.encode(
token_data, django.conf.settings.JWT_SIGNING_SECRET, algorithm="HS256"
),
token_data,
) )

View file

@ -8,7 +8,7 @@ import identity.jwt
@freezegun.freeze_time("2012-01-01") @freezegun.freeze_time("2012-01-01")
def test_generates_and_decodes_token_token(): def test_generates_and_decodes_token_token():
MOCK_USER_ID = 1 MOCK_USER_ID = 1
token = identity.jwt.generate_token_for_user(MOCK_USER_ID) token, _ = identity.jwt.generate_token_for_user(MOCK_USER_ID)
assert token is not None assert token is not None
@ -21,7 +21,7 @@ def test_token_decode_fails_if_expired():
MOCK_USER_ID = 1 MOCK_USER_ID = 1
with freezegun.freeze_time("2012-01-01"): with freezegun.freeze_time("2012-01-01"):
token = identity.jwt.generate_token_for_user(MOCK_USER_ID) token, _ = identity.jwt.generate_token_for_user(MOCK_USER_ID)
assert token is not None assert token is not None

View file

@ -1,47 +0,0 @@
import logging
import django.http
import django.contrib.auth
import identity.jwt
logger = logging.getLogger(__name__)
AuthUser = django.contrib.auth.get_user_model()
class JwtMiddleware:
"""
Middleware that handles using credentials supplied via request cookies
carrying JWTs on requests to log users in seamlessly.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: django.http.HttpRequest) -> django.http.HttpResponse:
"""
If a JWT cookie is attached to the request, its token value is
retrieved, tentatively decoded and authentication details are
added to the request data if available.
On failure, no details are added.
Views are expected to handle their own verification of
whether the user details are adequate.
"""
jwt_cookie = request.COOKIES.get("jwt")
if jwt_cookie is not None:
try:
decoded_token = identity.jwt.decode_token(jwt_cookie)
logger.info("Token: %s\nDecoded token: %s", jwt_cookie, decoded_token)
user = AuthUser.objects.get(pk=decoded_token["user_id"])
request.user = user
except Exception as e: # pylint: disable=broad-exception-caught
logger.exception(e, extra={"authorization_provided": jwt_cookie})
return self.get_response(request)

View file

@ -1,48 +0,0 @@
import pytest
import django.http
import django.contrib.auth
import identity.middleware
import identity.jwt
AuthUser = django.contrib.auth.get_user_model()
class HttpRequestWithUser(django.http.HttpRequest):
"""HttpRequest type after user is added by middleware."""
user: AuthUser
@pytest.fixture(name="jwt_middleware")
def fixture_jwt_middleware():
def _noop(_: django.http.HttpRequest):
return django.http.HttpResponse()
return identity.middleware.JwtMiddleware(_noop)
def test_middleware_does_not_append_user_details_to_request_if_invalid_credentials(
jwt_middleware,
):
"""If authorization headers are present but cannot be validated, no user details."""
mock_request = HttpRequestWithUser()
mock_request.COOKIES["jwt"] = "notatoken"
jwt_middleware(mock_request)
assert not hasattr(mock_request, "user")
def test_middleware_adds_user_to_request_in_if_valid_token(
jwt_middleware, test_user_credentials
):
"""If authorization headers are present and contain a valid JWT, sets user on request."""
mock_request = HttpRequestWithUser()
test_user = AuthUser.objects.get(username=test_user_credentials["username"])
token = identity.jwt.generate_token_for_user(test_user.id)
mock_request.COOKIES["jwt"] = token
jwt_middleware(mock_request)
assert mock_request.user == test_user

View file

@ -0,0 +1,43 @@
# Generated by Django 4.2.8 on 2024-01-02 06:13
import uuid
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="AuthenticationToken",
fields=[
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
("refresh_token", models.UUIDField(default=uuid.uuid4)),
("revoked", models.BooleanField(default=False)),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("expires_at", models.DateTimeField()),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View file

@ -0,0 +1,30 @@
import uuid
import datetime
from django.db import models
from django.conf import settings
class AuthenticationToken(models.Model):
"""
Tracking record for authentication tokens generated
by the application and not invalidated.
Tokens contain their own expiration date (mirrored here with
`expiration`). A token can be invalidated by the application
by flipping the `revoked` flag - if truthy, the token bearing the id
of the revoked token cannot be used and will be refused even if
not expired yet.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# User associated with the token.
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
# Refresh token associated with the token.
refresh_token: str = models.UUIDField(null=False, default=uuid.uuid4)
# Whether the token was revoked by the application.
revoked: bool = models.BooleanField(default=False)
created_at: datetime.datetime = models.DateTimeField(auto_now_add=True)
updated_at: datetime.datetime = models.DateTimeField(auto_now=True)
# Expiration date of the token, according to its content.
expires_at: datetime.datetime = models.DateTimeField()

View file

@ -1,5 +1,5 @@
import logging import logging
import uuid import datetime
import django.http import django.http
import django.contrib.auth import django.contrib.auth
@ -7,6 +7,7 @@ import rest_framework.views
import rest_framework.status import rest_framework.status
import identity.jwt import identity.jwt
from identity.models import AuthenticationToken
AuthUser = django.contrib.auth.get_user_model() AuthUser = django.contrib.auth.get_user_model()
@ -46,9 +47,17 @@ class SessionListView(rest_framework.views.APIView):
if user is not None: if user is not None:
django.contrib.auth.login(request, user) django.contrib.auth.login(request, user)
token = identity.jwt.generate_token_for_user(user_id=user.id) token, token_data = identity.jwt.generate_token_for_user(user_id=user.id)
response = django.http.HttpResponse(status=201) token_tracker = AuthenticationToken.objects.create(
id=token_data["token_id"],
user_id=token_data["user_id"],
expires_at=datetime.datetime.fromtimestamp(token_data["exp"]),
)
response = django.http.JsonResponse(
{"refresh_token": token_tracker.refresh_token}, status=201
)
response.set_cookie( response.set_cookie(
"jwt", value=token, secure=False, domain="localhost", httponly=False "jwt", value=token, secure=False, domain="localhost", httponly=False