Files
vontor-cz/backend/account/tasks.py
David Bruno Vontor 2592a69790 Introduce notifications app and integrate emails
Add a new notifications app (models, serializers, views, admin, tasks, consumers, routing, urls, tests) with Notification.notify for in-app websocket pushes and optional email delivery. Centralize email sending in notifications.tasks.send_email_with_context and re-export it from account.tasks for compatibility. Update account and commerce and advertisement tasks to use Notification.notify/_notify_order and the new email helper; adjust order/notification tasks to consolidate logic. Wire notifications into ASGI routing, settings and URL conf. Misc: handle OSError when importing weasyprint, add tmp/ to .gitignore, add local .claude PowerShell checks, add social.blog skeleton, and remove legacy ews-component test files.
2026-06-09 16:18:41 +02:00

81 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from celery import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils.http import urlsafe_base64_encode
from django.utils.encoding import force_bytes
from .tokens import *
from .models import CustomUser
# Re-exported so existing imports from account.tasks still work
from notifications.tasks import send_email_with_context # noqa: F401
logger = get_task_logger(__name__)
@shared_task
def send_email_verification_task(user_id):
from notifications.models import Notification
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
logger.info("send_email_verification_task: invalid user_id %s", user_id)
return
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = user.generate_email_verification_token()
verify_url = f"{settings.FRONTEND_URL}/email-verification/?uidb64={uid}&token={token}"
Notification.notify(
user=user,
title="Ověření emailu",
text="Prosím ověřte svou e-mailovou adresu kliknutím na odkaz v e-mailu.",
action_url=verify_url,
template_path="email/email_verification.html",
email_context={
"action_url": verify_url,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Ověřit email",
},
)
@shared_task
def send_email_test_task(email):
send_email_with_context(
recipients=email,
subject="Testovací email",
template_path="email/test.html",
context={
"action_url": settings.FRONTEND_URL,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Otevřít aplikaci",
},
)
@shared_task
def send_password_reset_email_task(user_id):
from notifications.models import Notification
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
logger.info("send_password_reset_email_task: invalid user_id %s", user_id)
return
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = password_reset_token.make_token(user)
reset_url = f"{settings.FRONTEND_URL}/reset-password/{uid}/{token}"
Notification.notify(
user=user,
title="Obnova hesla",
text="Byl vyžádán reset vašeho hesla. Klikněte na odkaz pro nastavení nového hesla.",
action_url=reset_url,
template_path="email/password_reset.html",
email_context={
"action_url": reset_url,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Obnovit heslo",
},
)