websockets + chat app (django)

This commit is contained in:
David Bruno Vontor
2025-10-31 13:32:39 +01:00
parent 8dd4f6e731
commit 4791bbc92c
22 changed files with 398 additions and 31 deletions

View File

@@ -0,0 +1,23 @@
# Generated by Django 5.2.7 on 2025-10-31 07:36
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('account', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='customuser',
name='email_verification_sent_at',
field=models.DateTimeField(blank=True, null=True),
),
migrations.AddField(
model_name='customuser',
name='email_verification_token',
field=models.CharField(blank=True, db_index=True, max_length=128, null=True),
),
]

View File

@@ -121,4 +121,32 @@ def send_email_test_task(email):
template_name="email/test.txt",
html_template_name="email/test.html",
context=context,
)
@shared_task
def send_password_reset_email_task(user_id):
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
logger.info(f"Task send_password_reset_email has failed. Invalid User ID was sent.")
return 0
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = password_reset_token.make_token(user)
reset_url = f"{settings.FRONTEND_URL}/reset-password/{uid}/{token}"
context = {
"user": _build_user_template_ctx(user),
"action_url": reset_url,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Obnovit heslo",
}
send_email_with_context(
recipients=user.email,
subject="Obnova hesla",
template_name="email/password_reset.txt",
html_template_name="email/password_reset.html",
context=context,
)

View File

@@ -1,3 +1,28 @@
from django.test import TestCase
from django.contrib.auth import get_user_model
from rest_framework.test import APIClient
# Create your tests here.
class UserViewAnonymousTests(TestCase):
def setUp(self):
self.client = APIClient()
User = get_user_model()
self.target_user = User.objects.create_user(
username="target",
email="target@example.com",
password="pass1234",
is_active=True,
)
def test_anonymous_update_user_is_forbidden_and_does_not_crash(self):
url = f"/api/account/users/{self.target_user.id}/"
payload = {"username": "newname", "email": self.target_user.email}
resp = self.client.put(url, data=payload, format="json")
# Expect 403 Forbidden (permission denied), but most importantly no 500 error
self.assertEqual(resp.status_code, 403, msg=f"Unexpected status: {resp.status_code}, body={getattr(resp, 'data', resp.content)}")
def test_anonymous_retrieve_user_is_unauthorized(self):
url = f"/api/account/users/{self.target_user.id}/"
resp = self.client.get(url)
# Retrieve requires authentication per view; expect 401 Unauthorized
self.assertEqual(resp.status_code, 401, msg=f"Unexpected status: {resp.status_code}, body={getattr(resp, 'data', resp.content)}")

View File

@@ -229,13 +229,17 @@ class UserView(viewsets.ModelViewSet):
# Only admin or the user themselves can update or delete
elif self.action in ['update', 'partial_update', 'destroy']:
if self.request.user.role == 'admin':
user = getattr(self, 'request', None) and getattr(self.request, 'user', None)
# Admins can modify any user
if user and getattr(user, 'is_authenticated', False) and getattr(user, 'role', None) == 'admin':
return [OnlyRolesAllowed("admin")()]
elif self.kwargs.get('pk') and str(self.request.user.id) == self.kwargs['pk']:
# Users can modify their own record
if user and getattr(user, 'is_authenticated', False) and self.kwargs.get('pk') and str(getattr(user, 'id', '')) == self.kwargs['pk']:
return [IsAuthenticated()]
else:
# fallback - deny access
return [OnlyRolesAllowed("admin")()]
# Fallback - deny access (prevents AttributeError for AnonymousUser)
return [OnlyRolesAllowed("admin")()]
# Any authenticated user can retrieve (view) any user's profile
elif self.action == 'retrieve':