# chat/consumers.py import json from channels.db import database_sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer from .models import Chat, Message class ChatConsumer(AsyncWebsocketConsumer): # -- CONNECT -- async def connect(self): self.chat_id = self.scope["url_route"]["kwargs"]["chat_id"] self.chat_name = f"chat_{self.chat_id}" user = self.scope["user"] if not user.is_authenticated: await self.close(code=4401) return is_member = await _is_chat_member(self.chat_id, user) if not is_member: await self.close(code=4403) return await self.channel_layer.group_add(self.chat_name, self.channel_name) await self.accept() # -- DISCONNECT -- async def disconnect(self, close_code): await self.channel_layer.group_discard(self.chat_name, self.channel_name) # -- RECEIVE -- async def receive(self, text_data): data = json.loads(text_data) user = self.scope["user"] msg_type = data.get("type") if msg_type == "new_chat_message": message = await _create_message( chat_id=self.chat_id, sender=user, content=data["message"], ) await self.channel_layer.group_send(self.chat_name, { "type": "chat.message", "message_id": message.id, "message": message.content, "sender": user.username, }) elif msg_type == "new_reply_chat_message": message = await _create_message( chat_id=self.chat_id, sender=user, content=data["message"], reply_to_id=data.get("reply_to_id"), ) await self.channel_layer.group_send(self.chat_name, { "type": "reply.chat.message", "message_id": message.id, "message": message.content, "reply_to_id": data.get("reply_to_id"), "sender": user.username, }) elif msg_type == "reaction": action, reaction = await _toggle_reaction( message_id=data["message_id"], user=user, emoji=data["emoji"], ) await self.channel_layer.group_send(self.chat_name, { "type": "message.reaction", "message_id": data["message_id"], "emoji": data["emoji"], "user": user.username, "action": action, # 'added' | 'removed' | 'switched' }) elif msg_type == "typing": await self.channel_layer.group_send(self.chat_name, { "type": "typing.status", "user": user.username, "is_typing": data.get("is_typing", True), }) elif msg_type == "stop_typing": await self.channel_layer.group_send(self.chat_name, { "type": "stop.typing", "user": user.username, }) else: await self.send(text_data=json.dumps({"error": "Unsupported message type."})) # -- GROUP EVENT HANDLERS -- # These are called by the channel layer when another part of the system # (consumer or view) calls group_send. async def chat_message(self, event): await self.send(text_data=json.dumps({ "type": "new_chat_message", "message_id": event["message_id"], "message": event["message"], "sender": event["sender"], })) async def reply_chat_message(self, event): await self.send(text_data=json.dumps({ "type": "new_reply_chat_message", "message_id": event["message_id"], "message": event["message"], "reply_to_id": event["reply_to_id"], "sender": event["sender"], })) async def edit_message(self, event): await self.send(text_data=json.dumps({ "type": "edit_chat_message", "message_id": event["message_id"], "content": event["content"], "is_edited": event.get("is_edited", True), })) async def delete_message(self, event): await self.send(text_data=json.dumps({ "type": "delete_chat_message", "message_id": event["message_id"], })) async def message_reaction(self, event): await self.send(text_data=json.dumps({ "type": "reaction", "message_id": event["message_id"], "emoji": event["emoji"], "user": event["user"], "action": event["action"], })) async def typing_status(self, event): await self.send(text_data=json.dumps({ "type": "typing", "user": event["user"], "is_typing": event["is_typing"], })) async def stop_typing(self, event): await self.send(text_data=json.dumps({ "type": "stop_typing", "user": event["user"], })) # --------------------------------------------------------------------------- # DB helpers (run in thread pool via database_sync_to_async) # --------------------------------------------------------------------------- @database_sync_to_async def _is_chat_member(chat_id, user): return Chat.objects.filter(pk=chat_id, members=user).exists() @database_sync_to_async def _create_message(chat_id, sender, content, reply_to_id=None): return Message.objects.create( chat_id=chat_id, sender=sender, content=content, reply_to_id=reply_to_id, ) @database_sync_to_async def _toggle_reaction(message_id, user, emoji): message = Message.objects.get(pk=message_id) return message.toggle_reaction(user, emoji) class ChatConsumer(AsyncWebsocketConsumer): # -- CONNECT -- async def connect(self): self.chat_id = self.scope["url_route"]["kwargs"]["chat_id"] self.chat_name = f"chat_{self.chat_id}" user = self.scope["user"] if not user.is_authenticated: await self.close(code=4401) # unauthorized return #join chat group async_to_sync(self.channel_layer.group_add)( self.chat_name, ) await self.accept() # -- DISCONNECT -- async def disconnect(self, close_code): async_to_sync(self.channel_layer.group_discard)( self.chat_name ) self.disconnect() pass # -- RECIVE -- async def receive(self, data): if data["type"] == "new_chat_message": message = data["message"] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "chat.message", "message": message} ) elif data["type"] == "new_reply_chat_message": message = data["message"] reply_to_id = data["reply_to_id"] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "reply.chat.message", "message": message, "reply_to_id": reply_to_id} ) elif data["type"] == "edit_chat_message": message = data["message"] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "edit.message", "message": message} ) elif data["type"] == "delete_chat_message": message_id = data["message_id"] # Send message to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "delete.message", "message_id": message_id} ) elif data["type"] == "typing": is_typing = data["is_typing"] # Send typing status to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "typing.status", "user": self.scope["user"].username, "is_typing": is_typing} ) elif data["type"] == "stop_typing": # Send stop typing status to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "stop.typing", "user": self.scope["user"].username} ) elif data["type"] == "reaction": message_id = data["message_id"] emoji = data["emoji"] # Send reaction to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "message.reaction", "message_id": message_id, "emoji": emoji, "user": self.scope["user"].username} ) elif data["type"] == "unreaction": message_id = data["message_id"] emoji = data["emoji"] # Send unreaction to room group async_to_sync(self.channel_layer.group_send)( self.chat_name, {"type": "message.unreaction", "message_id": message_id, "emoji": emoji, "user": self.scope["user"].username} ) else: self.close(reason="Unsupported message type") # -- CUSTOM METHODS -- def send_message_to_chat_group(self, event): message = event["message"] create_new_message() self.send(text_data=json.dumps({"message": message})) def edit_message_in_chat_group(self, event): message = event["message"] self.send(text_data=json.dumps({"message": message})) # -- MESSAGES -- @database_sync_to_async def create_new_message(): return None @database_sync_to_async def create_new_reply_message(): return None @database_sync_to_async def edit_message(): return None @database_sync_to_async def delete_message(): return None # -- REACTIONS -- @database_sync_to_async def react_to_message(): return None @database_sync_to_async def unreact_to_message(): return None