from celery import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils.http import urlsafe_base64_encode
from django.utils.encoding import force_bytes
from .tokens import *
from .models import CustomUser
# Re-exported so existing imports from account.tasks still work
from notifications.tasks import send_email_with_context # noqa: F401
logger = get_task_logger(__name__)
@shared_task
def send_email_verification_task(user_id):
from notifications.models import Notification
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
logger.info("send_email_verification_task: invalid user_id %s", user_id)
return
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = user.generate_email_verification_token()
verify_url = f"{settings.FRONTEND_URL}/email-verification/?uidb64={uid}&token={token}"
Notification.notify(
user=user,
title="Ověření e‑mailu",
text="Prosím ověřte svou e-mailovou adresu kliknutím na odkaz v e-mailu.",
action_url=verify_url,
template_path="email/email_verification.html",
email_context={
"action_url": verify_url,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Ověřit e‑mail",
},
)
@shared_task
def send_email_change_verification_task(user_id, new_email, verify_url):
"""Sends confirmation link to the NEW email address."""
from notifications.tasks import send_email_with_context
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
return
send_email_with_context(
recipients=new_email,
subject="Potvrzení změny e-mailu — vontor.cz",
template_path="email/action_confirm.html",
context={
"title": "Potvrďte novou e-mailovou adresu",
"description": f"Obdrželi jsme žádost o změnu e-mailové adresy na účtu {user.username}. Klikněte na tlačítko níže pro potvrzení.",
"action_url": verify_url,
"cta_label": "Potvrdit nový e-mail",
"note": "Pokud jste tuto změnu nepožadovali, ignorujte tento e-mail — vaše stávající adresa zůstane aktivní.",
},
)
@shared_task
def send_email_change_notification_task(user_id, old_email, new_email):
"""Notifies the OLD email address that a change was requested."""
from notifications.tasks import send_email_with_context
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
return
send_email_with_context(
recipients=old_email,
subject="Žádost o změnu e-mailu — vontor.cz",
template_path="email/action_confirm.html",
context={
"title": "Vaše e-mailová adresa se mění",
"description": f"Na účtu {user.username} byla zahájena změna e-mailové adresy na {new_email}.",
"action_url": None,
"cta_label": None,
"note": "Pokud jste tuto akci nespustili, okamžitě kontaktujte podporu. Změna vstoupí v platnost až po potvrzení z nové adresy.",
},
)
@shared_task
def send_email_test_task(email):
send_email_with_context(
recipients=email,
subject="Testovací e‑mail",
template_path="email/test.html",
context={
"action_url": settings.FRONTEND_URL,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Otevřít aplikaci",
},
)
@shared_task
def send_password_reset_email_task(user_id):
from notifications.models import Notification
try:
user = CustomUser.objects.get(pk=user_id)
except CustomUser.DoesNotExist:
logger.info("send_password_reset_email_task: invalid user_id %s", user_id)
return
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = password_reset_token.make_token(user)
reset_url = f"{settings.FRONTEND_URL}/reset-password/{uid}/{token}"
Notification.notify(
user=user,
title="Obnova hesla",
text="Byl vyžádán reset vašeho hesla. Klikněte na odkaz pro nastavení nového hesla.",
action_url=reset_url,
template_path="email/password_reset.html",
email_context={
"action_url": reset_url,
"frontend_url": settings.FRONTEND_URL,
"cta_label": "Obnovit heslo",
},
)