Files
vontor-cz/backend/thirdparty/zasilkovna/client.py
Brunobrno f14c09bf7a Refactor commerce models and add configuration app
Major refactor of commerce models: restructured Carrier, Payment, and DiscountCode models, improved order total calculation, and integrated Zasilkovna and Stripe logic. Added new configuration Django app for shop settings, updated Zasilkovna and Stripe models, and fixed Zasilkovna client WSDL URL. Removed unused serializers and views in commerce, and registered new apps in settings.
2025-11-14 02:21:20 +01:00

175 lines
5.8 KiB
Python

from zeep import Client
from zeep.exceptions import Fault
import base64
import logging
import os
logger = logging.getLogger(__name__)
WSDL_URL = os.getenv("PACKETA_WSDL_URL", "https://www.zasilkovna.cz/api/soap.wsdl")
PACKETA_API_PASSWORD = os.getenv("PACKETA_API_PASSWORD")
zeepZasClient = Client(wsdl=WSDL_URL)
class PacketaAPI:
# ---------- CREATE PACKET METHODS ----------
def create_packet(self, number: str, name: str, surname: str, email: str, phone: str,
address_id: int, value: float, currency: str = "CZK",
weight: float = 1.0, eshop: str = "MyEshop", send_email_to_customer: bool = False):
"""
Jednoduché uložení balíku. Vrací packetId, pod kterým lze později tahat veškeré info.
Parametry:
number: číslo objednávky
name, surname, email, phone: údaje zákazníka
address_id: ID výdejního místa
value: cena/částka zásilky
currency: měna
weight: hmotnost balíku
eshop: název e-shopu
send_email_to_customer: jestli poslat potvrzovací email
Návrat:
dict: {
"packetId": str,
"number": str,
"email": str,
"phone": str,
"value": float,
"currency": str
}
"""
attributes = {
"number": number,
"name": name,
"surname": surname,
"email": email,
"phone": phone,
"addressId": address_id,
"value": value,
"currency": currency,
"weight": weight,
"eshop": eshop,
"sendEmailToCustomer": send_email_to_customer
}
try:
# Použijeme createPacketClaimWithPassword, protože umožňuje ukládat email a telefon
result = zeepZasClient.service.createPacketClaimWithPassword(PACKETA_API_PASSWORD, attributes)
return result
except Fault as e:
logger.error(f"Packeta store_packet error: {e}")
raise Exception(f"Chyba při ukládání balíku: {e}")
# ---------- CANCEL SENDING FROM STORE PACKET METHODS ----------
def cancel_packet(self, packet_id: int):
"""
Zrušení zásilky (pokud ještě nebyla fyzicky odevzdána).
packet_id = 1234567890
"""
try:
zeepZasClient.service.cancelPacket(PACKETA_API_PASSWORD, packet_id)
return {"status": "ok", "message": f"Zásilka {packet_id} byla zrušena."}
except Fault as e:
logger.error(f"Packeta cancelPacket error: {e}")
raise Exception(f"Chyba při rušení zásilky: {e}")
# ---------- INFO PACKET METHODS ----------
def packet_state_ready_to_pickup(self, packet_id: int):
"""
Vrací datum do kdy je uložená zásilka.
"""
try:
request = zeepZasClient.service.packetGetStoredUntil(PACKETA_API_PASSWORD, packet_id)
if request is None:
raise Exception(f"Zásilka {packet_id} nebyla ještě doručena.")
else:
return request # vrací datum do kdy je zásilka uložena
except Fault as e:
logger.error(f"Packeta packetGetStoredUntil error: {e}")
raise Exception(f"Chyba při získávání data uložení zásilky: {e}")
def get_packet_label_pdf(self, packet_id: int, format: str = "A6 on A6", offset: int = 0):
"""
Získání PDF štítku k zásilce.
Parametry:
packet_id (int): ID zásilky (např. 1234567890)
format (str): jeden z formátů:
- "A6 on A6"
- "A7 on A7"
- "A6 on A4"
- "A7 on A4"
- "A8 on A8"
offset (int): pozice na stránce (0 = vlevo nahoře)
Návrat:
bytes: PDF soubor (base64 dekódovaný)
"""
try:
pdf_base64 = zeepZasClient.service.packetLabelPdf(
PACKETA_API_PASSWORD, packet_id, format, offset
)
return base64.b64decode(pdf_base64)
except Fault as e:
logger.error(f"Packeta packetLabelPdf error: {e}")
raise Exception(f"Chyba při získávání štítku: {e}")
# ---------- RETURNING PACKET METHODS ----------
def get_return_routing(self, sender_label: str):
"""
Získá dva návratové routing stringy pro vrácení zásilky.
Args:
sender_label (str): Čárový kód původní zásilky (např. 'Z123456789')
Returns:
list[str]: Dva řetězce, které se mají vytisknout pro návratovou zásilku.
"""
response = self.client.service.senderGetReturnRouting(sender_label)
return list(response)
# --------- SHIPMENT METHODS ---------
def create_shipment(self, packet_ids: list):
"""
Vytvoření zásilky (shipment) z více balíků.
packet_ids = ["1234567890", "1234567891", "1234567892"]
"""
try:
result = zeepZasClient.service.createShipment(PACKETA_API_PASSWORD, packet_ids)
return result
except Fault as e:
logger.error(f"Packeta createShipment error: {e}")
raise Exception(f"Chyba při vytváření shipmentu: {e}")
def get_shipment_packets(self, shipment_id: str):
"""
Získá seznam balíků ve shipmentu podle jeho shipmentId.
"""
try:
result = zeepZasClient.service.shipmentPackets(PACKETA_API_PASSWORD, shipment_id)
return result
except Fault as e:
logger.error(f"Packeta shipmentPackets error: {e}")
raise Exception(f"Chyba při získávání balíků ve shipmentu: {e}")