Files
vontor-cz/backend/thirdparty/zasilkovna/client.py

182 lines
6.2 KiB
Python

from zeep import Client
from zeep.exceptions import Fault
import base64
import logging
import os
logger = logging.getLogger(__name__)
WSDL_URL = os.getenv("PACKETA_WSDL_URL", "https://www.zasilkovna.cz/api/soap.wsdl")
PACKETA_API_PASSWORD = os.getenv("PACKETA_API_PASSWORD")
zeepZasClient = Client(wsdl=WSDL_URL)
class PacketaAPI:
#TODO: zeptat se jestli nepřidat další checkovací parametry ohledně zásilkovny např: blokování podle nastavení webu
# popřemýšlet, jestli api klíče nenastavit přes configurator webu
def __getattribute__(self):
if PACKETA_API_PASSWORD is None:
raise Exception("Packeta API password is not set in environment variables.")
# ---------- CREATE PACKET METHODS ----------
def create_packet(self, number: str, name: str, surname: str, email: str, phone: str,
address_id: int, value: float, currency: str = "CZK",
weight: float = 1.0, eshop: str = "MyEshop", send_email_to_customer: bool = False):
"""
Jednoduché uložení balíku. Vrací packetId, pod kterým lze později tahat veškeré info.
Parametry:
number: číslo objednávky
name, surname, email, phone: údaje zákazníka
address_id: ID výdejního místa
value: cena/částka zásilky
currency: měna
weight: hmotnost balíku
eshop: název e-shopu
send_email_to_customer: jestli poslat potvrzovací email
Návrat:
dict: {
"packetId": str,
"number": str,
"email": str,
"phone": str,
"value": float,
"currency": str
}
"""
attributes = {
"number": number,
"name": name,
"surname": surname,
"email": email,
"phone": phone,
"addressId": address_id,
"value": value,
"currency": currency,
"weight": weight,
"eshop": eshop,
"sendEmailToCustomer": send_email_to_customer
}
try:
# Použijeme createPacketClaimWithPassword, protože umožňuje ukládat email a telefon
result = zeepZasClient.service.createPacketClaimWithPassword(PACKETA_API_PASSWORD, attributes)
return result
except Fault as e:
logger.error(f"Packeta store_packet error: {e}")
raise Exception(f"Chyba při ukládání balíku: {e}")
# ---------- CANCEL SENDING FROM STORE PACKET METHODS ----------
def cancel_packet(self, packet_id: int):
"""
Zrušení zásilky (pokud ještě nebyla fyzicky odevzdána).
packet_id = 1234567890
"""
try:
zeepZasClient.service.cancelPacket(PACKETA_API_PASSWORD, packet_id)
return {"status": "ok", "message": f"Zásilka {packet_id} byla zrušena."}
except Fault as e:
logger.error(f"Packeta cancelPacket error: {e}")
raise Exception(f"Chyba při rušení zásilky: {e}")
# ---------- INFO PACKET METHODS ----------
def packet_state_ready_to_pickup(self, packet_id: int):
"""
Vrací datum do kdy je uložená zásilka.
"""
try:
request = zeepZasClient.service.packetGetStoredUntil(PACKETA_API_PASSWORD, packet_id)
if request is None:
raise Exception(f"Zásilka {packet_id} nebyla ještě doručena.")
else:
return request # vrací datum do kdy je zásilka uložena
except Fault as e:
logger.error(f"Packeta packetGetStoredUntil error: {e}")
raise Exception(f"Chyba při získávání data uložení zásilky: {e}")
def get_packet_label_pdf(self, packet_id: int, format: str = "A6 on A6", offset: int = 0):
"""
Získání PDF štítku k zásilce.
Parametry:
packet_id (int): ID zásilky (např. 1234567890)
format (str): jeden z formátů:
- "A6 on A6"
- "A7 on A7"
- "A6 on A4"
- "A7 on A4"
- "A8 on A8"
offset (int): pozice na stránce (0 = vlevo nahoře)
Návrat:
bytes: PDF soubor (base64 dekódovaný)
"""
try:
pdf_base64 = zeepZasClient.service.packetLabelPdf(
PACKETA_API_PASSWORD, packet_id, format, offset
)
return base64.b64decode(pdf_base64)
except Fault as e:
logger.error(f"Packeta packetLabelPdf error: {e}")
raise Exception(f"Chyba při získávání štítku: {e}")
# ---------- RETURNING PACKET METHODS ----------
def get_return_routing(self, sender_label: str):
"""
Získá dva návratové routing stringy pro vrácení zásilky.
Args:
sender_label (str): Čárový kód původní zásilky (např. 'Z123456789')
Returns:
list[str]: Dva řetězce, které se mají vytisknout pro návratovou zásilku.
"""
response = self.client.service.senderGetReturnRouting(sender_label)
return list(response)
# --------- SHIPMENT METHODS ---------
def create_shipment(self, packet_ids: list):
"""
Vytvoření zásilky (shipment) z více balíků.
packet_ids = ["1234567890", "1234567891", "1234567892"]
"""
try:
result = zeepZasClient.service.createShipment(PACKETA_API_PASSWORD, packet_ids)
return result
except Fault as e:
logger.error(f"Packeta createShipment error: {e}")
raise Exception(f"Chyba při vytváření shipmentu: {e}")
def get_shipment_packets(self, shipment_id: str):
"""
Získá seznam balíků ve shipmentu podle jeho shipmentId.
"""
try:
result = zeepZasClient.service.shipmentPackets(PACKETA_API_PASSWORD, shipment_id)
return result
except Fault as e:
logger.error(f"Packeta shipmentPackets error: {e}")
raise Exception(f"Chyba při získávání balíků ve shipmentu: {e}")