# chat/consumers.py import json from channels.db import database_sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer from django.utils import timezone from .models import Chat, ChatReadStatus, Message class ChatConsumer(AsyncWebsocketConsumer): # -- CONNECT -- async def connect(self): self.chat_id = self.scope["url_route"]["kwargs"]["chat_id"] self.chat_name = f"chat_{self.chat_id}" user = self.scope["user"] if not user.is_authenticated: await self.close(code=4401) return is_member = await _is_chat_member(self.chat_id, user) if not is_member: await self.close(code=4403) return await self.channel_layer.group_add(self.chat_name, self.channel_name) await self.accept() # -- DISCONNECT -- async def disconnect(self, close_code): await self.channel_layer.group_discard(self.chat_name, self.channel_name) # -- RECEIVE -- async def receive(self, text_data): data = json.loads(text_data) user = self.scope["user"] msg_type = data.get("type") if msg_type == "new_chat_message": message = await _create_message( chat_id=self.chat_id, sender=user, content=data["message"], ) await self.channel_layer.group_send(self.chat_name, { "type": "chat.message", "message_id": message.id, "message": message.content, "sender_id": user.id, "sender": user.username, "sender_avatar": user.avatar.url if user.avatar else None, }) elif msg_type == "new_reply_chat_message": message = await _create_message( chat_id=self.chat_id, sender=user, content=data["message"], reply_to_id=data.get("reply_to_id"), ) await self.channel_layer.group_send(self.chat_name, { "type": "reply.chat.message", "message_id": message.id, "message": message.content, "reply_to_id": data.get("reply_to_id"), "sender_id": user.id, "sender": user.username, "sender_avatar": user.avatar.url if user.avatar else None, }) elif msg_type == "reaction": action, reaction = await _toggle_reaction( message_id=data["message_id"], user=user, emoji=data["emoji"], ) await self.channel_layer.group_send(self.chat_name, { "type": "message.reaction", "message_id": data["message_id"], "emoji": data["emoji"], "user": user.username, "action": action, # 'added' | 'removed' | 'switched' }) elif msg_type == "typing": await self.channel_layer.group_send(self.chat_name, { "type": "typing.status", "user": user.username, "is_typing": data.get("is_typing", True), }) elif msg_type == "stop_typing": await self.channel_layer.group_send(self.chat_name, { "type": "stop.typing", "user": user.username, }) elif msg_type == "mark_read": await _mark_read(chat_id=self.chat_id, user=user) await self.channel_layer.group_send(self.chat_name, { "type": "read.status", "user": user.username, "chat_id": int(self.chat_id), }) else: await self.send(text_data=json.dumps({"error": "Unsupported message type."})) # -- GROUP EVENT HANDLERS -- # These are called by the channel layer when another part of the system # (consumer or view) calls group_send. async def chat_message(self, event): await self.send(text_data=json.dumps({ "type": "new_chat_message", "message_id": event["message_id"], "message": event["message"], "sender_id": event["sender_id"], "sender": event["sender"], "sender_avatar": event["sender_avatar"], })) async def reply_chat_message(self, event): await self.send(text_data=json.dumps({ "type": "new_reply_chat_message", "message_id": event["message_id"], "message": event["message"], "reply_to_id": event["reply_to_id"], "sender_id": event["sender_id"], "sender": event["sender"], "sender_avatar": event["sender_avatar"], })) async def edit_message(self, event): await self.send(text_data=json.dumps({ "type": "edit_chat_message", "message_id": event["message_id"], "content": event["content"], "is_edited": event.get("is_edited", True), })) async def delete_message(self, event): await self.send(text_data=json.dumps({ "type": "delete_chat_message", "message_id": event["message_id"], })) async def message_reaction(self, event): await self.send(text_data=json.dumps({ "type": "reaction", "message_id": event["message_id"], "emoji": event["emoji"], "user": event["user"], "action": event["action"], })) async def typing_status(self, event): await self.send(text_data=json.dumps({ "type": "typing", "user": event["user"], "is_typing": event["is_typing"], })) async def stop_typing(self, event): await self.send(text_data=json.dumps({ "type": "stop_typing", "user": event["user"], })) async def read_status(self, event): await self.send(text_data=json.dumps({ "type": "read_status", "user": event["user"], "chat_id": event["chat_id"], })) # --------------------------------------------------------------------------- # DB helpers (run in thread pool via database_sync_to_async) # --------------------------------------------------------------------------- @database_sync_to_async def _is_chat_member(chat_id, user): from django.db.models import Q return Chat.objects.filter(Q(pk=chat_id), Q(members=user) | Q(owner=user)).exists() @database_sync_to_async def _create_message(chat_id, sender, content, reply_to_id=None): return Message.objects.create( chat_id=chat_id, sender=sender, content=content, reply_to_id=reply_to_id, ) @database_sync_to_async def _toggle_reaction(message_id, user, emoji): message = Message.objects.get(pk=message_id) return message.toggle_reaction(user, emoji) @database_sync_to_async def _mark_read(chat_id, user): ChatReadStatus.objects.update_or_create( user=user, chat_id=chat_id, defaults={"last_read_at": timezone.now()}, )