import json from channels.db import database_sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer from django.utils import timezone @database_sync_to_async def _mark_notification_read(notification_id, user): from .models import Notification try: n = Notification.objects.get(pk=notification_id, user=user) if not n.is_read: n.is_read = True n.read_at = timezone.now() n.save(update_fields=["is_read", "read_at"]) return n.read_at.isoformat() except Notification.DoesNotExist: return None @database_sync_to_async def _mark_all_notifications_read(user): from .models import Notification now = timezone.now() return Notification.objects.filter(user=user, is_read=False).update(is_read=True, read_at=now) @database_sync_to_async def _delete_notification(notification_id, user): from .models import Notification try: n = Notification.objects.get(pk=notification_id, user=user) n.delete() return True except Notification.DoesNotExist: return False class NotificationConsumer(AsyncWebsocketConsumer): async def connect(self): user = self.scope["user"] if not user.is_authenticated: await self.close(code=4401) return self.group_name = f"notifications_{user.pk}" await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def disconnect(self, close_code): if hasattr(self, "group_name"): await self.channel_layer.group_discard(self.group_name, self.channel_name) async def receive(self, text_data): data = json.loads(text_data) msg_type = data.get("type") if msg_type == "mark_read": notification_id = data.get("id") read_at = await _mark_notification_read(notification_id, self.scope["user"]) if read_at: await self.send(text_data=json.dumps({ "type": "notification.read", "id": notification_id, "read_at": read_at, })) elif msg_type == "mark_all_read": count = await _mark_all_notifications_read(self.scope["user"]) await self.send(text_data=json.dumps({ "type": "notification.read_all", "marked": count, })) elif msg_type == "delete": notification_id = data.get("id") deleted = await _delete_notification(notification_id, self.scope["user"]) if deleted: await self.send(text_data=json.dumps({ "type": "notification.deleted", "id": notification_id, })) async def notification_new(self, event): await self.send(text_data=json.dumps({ "type": "notification.new", "id": event["id"], "title": event["title"], "text": event["text"], "notification_type": event["notification_type"], "action_url": event.get("action_url"), "created_at": event["created_at"], }))