from email.message import EmailMessage from urllib.parse import quote import streamlit as st import requests import pandas as pd from datetime import datetime import json from typing import Any, Dict, List, Optional import time import socket import sys import os import re import html import uuid import base64 import mimetypes # ============================================================================= # ЗАГРУЗКА ШАБЛОНОВ ПИСЕМ # ============================================================================= IMAGE_EXTS = ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp'] SHIPPING_TYPES_FILE = "shipping_types(1).json" NO_INFO_TEXT = "Информация отсутствует" NO_INFO_TEXT_EN = "Information not available" _UI_DIR = os.path.dirname(os.path.abspath(__file__)) _CRITERIA_INTERFACE_LAYOUT_CACHE: Optional[Dict] = None try: from container_reference import door_clearance_summary except ImportError: def door_clearance_summary(shipment: Dict) -> str: return NO_INFO_TEXT shipping_templates = [] if os.path.exists(SHIPPING_TYPES_FILE): try: with open(SHIPPING_TYPES_FILE, "r", encoding="utf-8") as f: raw_templates = json.load(f) for t in raw_templates: if not isinstance(t, dict): continue normalized = {} for key, value in t.items(): clean_key = key.strip().strip('"').strip("'") if clean_key == "keywords" and isinstance(value, list): normalized[clean_key] = [ kw.strip().strip('"').strip("'") for kw in value if kw.strip() ] elif isinstance(value, str): normalized[clean_key] = value.strip() else: normalized[clean_key] = value shipping_templates.append(normalized) except Exception as e: st.error(f"Ошибка загрузки шаблонов писем: {e}") else: st.warning(f"Файл {SHIPPING_TYPES_FILE} не найден. Шаблоны писем недоступны.") # ============================================================================= # СЛОВАРЬ ДЛЯ ПРЕОБРАЗОВАНИЯ ТЕХНИЧЕСКИХ КЛЮЧЕЙ В ПОНЯТНЫЕ НАЗВАНИЯ # ============================================================================= def clean_email_text(text: str) -> str: """ Убирает множественные пустые строки, оставляя не более одной пустой строки между непустыми строками. """ if not text: return "" lines = text.splitlines() result = [] prev_empty = False for line in lines: stripped = line.strip() if stripped == "": if not prev_empty: result.append("") # одна пустая строка как разделитель абзацев prev_empty = True else: result.append(line.rstrip()) # убираем пробелы в конце строки prev_empty = False # Убираем пустые строки в начале и конце return "\n".join(result).strip() FIELD_LABELS = { "shipping_options": "Детали перевозки", "requested_shipping_type_names": "Запрошенные типы перевозки (модель)", "shipping_type": "Тип перевозки", "client_name": "Клиент", "incoterms": "Условия поставки Incoterms", "cargo_ready_date": "Дата готовности груза", "pickup_address": "Адрес забора груза", "cargo_value": "Стоимость груза", "package_count": "Количество грузовых мест", "total_weight_kg": "Вес груза", "dimensions": "Габариты", "total_volume_cbm": "Объём", "cargo_description": "Характер груза", "delivery_address": "Адрес доставки", "hs_code": "Код ТН ВЭД", "dangerous_goods": "Опасные свойства", "msds_required": "Требуется MSDS", "dgm_report_required": "Требуется DGM", "brand_name": "Бренд", "brand_authorization_letter": "Разрешение бренда", "transshipment_with_third_country": "Трансшипмент", "exporter_has_export_license": "Лицензия экспортёра", "additional_services": "Дополнительные услуги", "arrival_expediting_responsibility": "Кто осуществляет экспедирование в аэропорту", "special_transport_requirements": "Спецтребования к транспорту", "customs_clearance_required": "Таможенное оформление", "customs_clearance_place_export_rf": "Место оформления (экспорт РФ)", "fumigation_on_wooden_packaging": "Фумигация", "stackable_with_others": "Штабелирование с другими", "stackable_among_themselves": "Штабелирование между собой", "dangerous_goods_clarification": "Батарейки, газы, жидкости, аэрозоли в грузе (уточнение по категориям)", "batteries_packed_separately": "Батарейки упакованы отдельно", "estimated_cost": "Стоимость перевозки", "estimated_transit_time": "Срок доставки", "dimensions_str": "Габариты", "dangerous_goods_str": "Опасные свойства", "missing_fields": "Необходимая информация", "loading_port": "Порт погрузки", "discharge_port": "Порт выгрузки", "shipment_type": "Тип перевозки", "container_type": "Тип контейнера", "vehicle_type": "Тип транспорта", "temperature_range": "Температурный режим", "vehicle_dimensions": "Габариты машины", "vehicle_dimensions_str": "Габариты машины", } LABEL_TO_FIELD = {v: k for k, v in FIELD_LABELS.items()} # Заголовки секций письма контрагенту: русское имя типа -> English SHIPPING_TYPE_NAME_EN = { "Авиаперевозка": "Air freight", "Автомобильная перевозка (LTL)": "Road freight (LTL)", "Автомобильная перевозка (FTL)": "Road freight (FTL)", "Морская перевозка (LCL)": "Sea freight (LCL)", "Морская перевозка (FCL)": "Sea freight (FCL)", "Железнодорожная перевозка (LCL)": "Rail freight (LCL)", "Железнодорожная перевозка (FCL)": "Rail freight (FCL)", "Мультимодальная перевозка море + ж/д (LCL)": "Multimodal sea + rail (LCL)", "Мультимодальная перевозка море + ж/д (FCL)": "Multimodal sea + rail (FCL)", "Авиа перевозка": "Air freight", "Морская перевозка": "Sea freight", "Автомобильная перевозка": "Road freight", "ЖД перевозка": "Rail freight", } # Английские плейсхолдеры в confirmation_template: (Label) -> поле данных BRACKET_LABEL_ALIASES_EN: List[tuple[str, str]] = [ ("Pickup address", "pickup_address"), ("Delivery address", "delivery_address"), ("Port of loading", "loading_port"), ("Port of discharge", "discharge_port"), ("Cargo weight", "total_weight_kg"), ("Number of packages", "package_count"), ("Volume", "total_volume_cbm"), ("Dimensions", "dimensions_str"), ("Cargo description", "cargo_description"), ("HS code", "hs_code"), ("Dangerous properties", "dangerous_goods_str"), ("MSDS required", "msds_required"), ("Freight cost", "estimated_cost"), ("Transit time", "estimated_transit_time"), ("Shipment type", "shipment_type"), ("Container type", "container_type"), ("Vehicle type", "vehicle_type"), ("Vehicle dimensions", "vehicle_dimensions_str"), ("Temperature regime", "temperature_range"), ("Temperature range", "temperature_range"), ] GENERIC_LOCATION_WORDS = { "rf", "рф", "russia", "россия", "china", "китай", "kazakhstan", "казахстан", "belarus", "беларусь", "turkey", "турция", "india", "индия", "germany", "германия", "eu", "europe", "европа", "asia", "азия", } def is_informative_text(value: Optional[str]) -> bool: if value is None: return False text = str(value).strip() if not text: return False if text.lower() in {"n/a", "na", "none", "null", "-", "--", "нет", "unknown"}: return False return True def format_cargo_ready_date_for_display(shipment: Dict, *, empty: str = NO_INFO_TEXT) -> str: """Одна или несколько дат готовности — в одну строку через запятую.""" v = shipment.get("cargo_ready_date") if v is None: return empty if isinstance(v, list): parts = [str(x).strip() for x in v if x is not None and str(x).strip()] return ", ".join(parts) if parts else empty s = str(v).strip() return s if s else empty def is_specific_address(value: Optional[str]) -> bool: if not is_informative_text(value): return False text = str(value).strip() normalized = re.sub(r"[\s,.;:()\-_/]+", " ", text.lower()).strip() parts = [p for p in normalized.split(" ") if p] if len(parts) <= 1 and normalized in GENERIC_LOCATION_WORDS: return False return True def build_documents_summary(shipment: Dict) -> str: docs = shipment.get("documents_found", {}) or {} doc_names = { "msds": "MSDS", "dgm": "DGM report", "brand_authorization": "Авторизационное письмо бренда", } found_parts = [] for key, label in doc_names.items(): files = docs.get(key, []) or [] if files: filenames = [f.get("filename", "") for f in files if f.get("filename")] if filenames: found_parts.append(f"{label}: {', '.join(filenames)}") else: found_parts.append(f"{label}: документ найден") if not found_parts: return NO_INFO_TEXT return "; ".join(found_parts) # ============================================================================= # ОТОБРАЖЕНИЕ КОНТЕЙНЕРОВ (Nxтип без голых 40HC/20DC) # ============================================================================= _CONTAINER_BARE_ONLY = re.compile( r"^\s*(20|40|45)\s*['′'`´]?\s*(?:ft|feet|ф)?\s*" r"(dc|hc|hq|gp|dv|dry|rf|rh|reefer|ref|ot|fr|tk|tank)\s*$", re.IGNORECASE, ) _CONTAINER_NX = re.compile( r"(?\d{1,3})\s*[x×х**]\s*" r"(?P(?:20|40|45)\s*['′'`´]?\s*(?:ft|feet|ф)?\s*" r"(?:dc|hc|hq|gp|dv|dry|rf|rh|reefer|ref|ot|fr|tk|tank)\b)", re.IGNORECASE, ) def _normalize_container_token(n_str: str, type_part: str) -> str: n = int(n_str) t = re.sub(r"\s+", "", type_part.strip()) return f"{n}x{t}" def normalize_container_type_display(raw: Optional[str], *, empty: str = "") -> str: """ Из строки извлекает фрагменты вида 2x40HC; голые 40HC, 20DC не включает. Работает и без запятых между Nx-блоками в одной строке. """ if raw is None: return empty s = str(raw).strip() if not s: return empty tokens: List[str] = [] for m in _CONTAINER_NX.finditer(s): tokens.append(_normalize_container_token(m.group("n"), m.group("t"))) if tokens: return ", ".join(tokens) kept: List[str] = [] for part in re.split(r"[,;]+", s): p = part.strip() if not p: continue if _CONTAINER_BARE_ONLY.match(p): continue if _CONTAINER_NX.search(p): for m in _CONTAINER_NX.finditer(p): kept.append(_normalize_container_token(m.group("n"), m.group("t"))) continue if re.match(r"^\d+\s*[x×х**]", p, re.IGNORECASE): kept.append( re.sub(r"\s*([x×х**])\s*", "x", p, count=1, flags=re.IGNORECASE) ) return ", ".join(kept) if kept else empty def collect_extra_required_missing_for_template(shipment: Dict, template_obj: Dict) -> List[str]: """Поля из shipping_type.extra_required_fields — запрос клиенту при отсутствии данных.""" extra = template_obj.get("extra_required_fields") if isinstance(template_obj, dict) else None if not isinstance(extra, list): return [] out: List[str] = [] for field in extra: if not isinstance(field, str) or not field.strip(): continue field = field.strip() if field == "dangerous_goods_clarification": if format_dangerous_goods_summary(shipment) == NO_INFO_TEXT: out.append(field) continue if field == "stackable_with_others": if shipment.get("stackable_with_others") is None: out.append(field) continue if field == "hs_code": v = shipment.get("hs_code") if v is None or (isinstance(v, str) and not str(v).strip()): out.append(field) continue if field == "vehicle_type": v = shipment.get("vehicle_type") if v is None or (isinstance(v, str) and not str(v).strip()): out.append(field) continue if field == "container_type": v = shipment.get("container_type") if v is None or (isinstance(v, str) and not str(v).strip()): out.append(field) continue if field == "customs_clearance_place_export_rf": v = shipment.get("customs_clearance_place_export_rf") if v is None or (isinstance(v, str) and not str(v).strip()): out.append(field) continue if field == "total_volume_cbm": val = shipment.get("total_volume_cbm") if val is None or val == "": out.append(field) elif isinstance(val, (int, float)) and float(val) <= 0: out.append(field) continue return out def format_dangerous_goods_summary(shipment: Dict) -> str: """Единая строка для чек-листа, обзора и шаблонов писем.""" key_map = { "batteries": "батарейки", "gases": "газы", "liquids": "жидкости", "dry_ice": "сухой лёд", } dg_keys = ("batteries", "gases", "liquids", "dry_ice") dg = shipment.get("dangerous_goods", {}) if isinstance(dg, dict): yes = [key_map[k] for k in dg_keys if dg.get(k) is True] explicit_no = [key_map[k] for k in dg_keys if dg.get(k) is False] if yes: return "Да: " + ", ".join(yes) if len(explicit_no) == len(dg_keys): return "Нет (по всем категориям в перечне)" if explicit_no: return ( "Нет: " + ", ".join(explicit_no) + " — по остальным категориям информации нет" ) note = shipment.get("dangerous_goods_note") if isinstance(note, str) and note.strip(): return note.strip() return NO_INFO_TEXT def format_brand_authorization_summary(shipment: Dict) -> str: """ Для пункта про авторизационное письмо бренда: выводим текстовый контекст из письма (если есть), иначе явный fallback. """ info = shipment.get("brand_authorization_info") if isinstance(info, str) and info.strip(): return info.strip() flag = shipment.get("brand_authorization_letter") if flag is True: return "Требуется авторизационное письмо бренда, подробности в письме не найдены." if flag is False: return "Авторизационное письмо бренда не требуется (по переписке)." return NO_INFO_TEXT def format_dangerous_goods_summary_en(shipment: Dict) -> str: """English strings for confirmation_template (counterparty letter).""" key_map = { "batteries": "batteries", "gases": "gases", "liquids": "liquids", "dry_ice": "dry ice", } dg_keys = ("batteries", "gases", "liquids", "dry_ice") dg = shipment.get("dangerous_goods", {}) if isinstance(dg, dict): yes = [key_map[k] for k in dg_keys if dg.get(k) is True] explicit_no = [key_map[k] for k in dg_keys if dg.get(k) is False] if yes: return "Yes: " + ", ".join(yes) if len(explicit_no) == len(dg_keys): return "No (all categories in the list)" if explicit_no: return ( "No: " + ", ".join(explicit_no) + " — no information on the remaining categories" ) note = shipment.get("dangerous_goods_note") if isinstance(note, str) and note.strip(): if re.search(r"[а-яА-ЯёЁ]", note): return ( "The correspondence mentions hazardous cargo, hazard class or UN/IMDG; " "please clarify: batteries, gases, liquids, dry ice." ) return note.strip() return NO_INFO_TEXT_EN def _shipping_type_to_en(shipment: Dict) -> str: """FCL/LCL или пусто — не подменять названием вида перевозки (море/жд).""" raw = (shipment.get("shipment_type") or "").strip() if not raw: return NO_INFO_TEXT_EN u = raw.upper() if u in ("FCL", "LCL"): return u return raw # ============================================================================= # КЛАСС ДЛЯ АВТОЗАПОЛНЕНИЯ ШАБЛОНОВ # ============================================================================= class AutoFillDict(dict): def __init__(self, data: dict, field_labels: dict = None): super().__init__(data) self._data = data self._field_labels = field_labels or FIELD_LABELS self._label_to_field = {v: k for k, v in self._field_labels.items()} def __missing__(self, key): if key in self._data: val = self._data[key] return str(val) if val is not None else "" if key in self._label_to_field: field_key = self._label_to_field[key] if field_key in self._data: val = self._data[field_key] return str(val) if val is not None else "" return "" # ============================================================================= # ФУНКЦИЯ АВТОЗАПОЛНЕНИЯ ШАБЛОНА # ============================================================================= def auto_fill_template(template: str, data: Dict, field_labels: Dict = None) -> str: field_labels = field_labels or FIELD_LABELS label_to_field = {v: k for k, v in field_labels.items()} result = template auto_data = AutoFillDict(data, field_labels) try: result = result.format(**auto_data) except KeyError: pass merged_pairs = list(label_to_field.items()) + BRACKET_LABEL_ALIASES_EN for label, field_key in merged_pairs: if field_key in data: value = data[field_key] value_str = str(value).strip() if value is not None and str(value).strip() else NO_INFO_TEXT pattern = r'\(' + re.escape(label) + r'\)' result = re.sub(pattern, value_str, result, flags=re.IGNORECASE) return result # ============================================================================= # ФУНКЦИЯ ДЛЯ ГЕНЕРАЦИИ HTML-БЛОКА С КНОПКОЙ КОПИРОВАНИЯ # ============================================================================= def generate_copy_html(text: str, height: int = 500, theme: str = "light") -> str: text_html = text text_json = json.dumps(text) btn_id = f"copy_btn_{uuid.uuid4().hex}" palette = THEME_CONFIG.get(theme, THEME_CONFIG["light"]) return f"""
{text_html}
""" # ============================================================================= # ПАТЧЕР ДЛЯ ОТОБРАЖЕНИЯ ВНЕШНЕГО URL # ============================================================================= def get_external_ip(): try: external_ip = requests.get('https://api.ipify.org', timeout=3).text return external_ip except: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() return local_ip except: return None PUBLIC_WEB_URL = os.getenv("PUBLIC_WEB_URL", "https://nec.clients.septem.pro").rstrip("/") class StreamlitOutputPatcher: def __init__(self): self.external_ip = get_external_ip() self.original_stdout = sys.stdout def write(self, text): if self.external_ip and "Network URL:" in text: import re port_match = re.search(r':(\d+)', text) port = port_match.group(1) if port_match else "8501" # Use canonical public domain instead of raw external IP. external_url = ( f"\n External URL: {PUBLIC_WEB_URL}\n" f" Fallback IP URL: http://{self.external_ip}:{port}\n" ) self.original_stdout.write(text + external_url) else: self.original_stdout.write(text) def flush(self): self.original_stdout.flush() if "streamlit" in sys.argv[0]: sys.stdout = StreamlitOutputPatcher() # ============================================================================= # НАСТРОЙКА СТРАНИЦЫ # ============================================================================= st.set_page_config( page_title="SEPTEM Cargo Analytics", page_icon="🚚", layout="wide" ) THEME_CONFIG = { "light": { "app_bg": "#f5f7fb", "card_bg": "#ffffff", "text": "#111827", "muted": "#6b7280", "border": "#d0d7de", "code_bg": "#f6f8fa", "hover": "#e9ecef", "shadow": "0 2px 4px rgba(0,0,0,0.05)", }, "dark": { "app_bg": "#0f172a", "card_bg": "#111827", "text": "#e5e7eb", "muted": "#94a3b8", "border": "#334155", "code_bg": "#1f2937", "hover": "#374151", "shadow": "0 2px 8px rgba(0,0,0,0.45)", }, } def apply_ui_theme() -> None: theme_name = st.session_state.get("ui_theme", "light") colors = THEME_CONFIG.get(theme_name, THEME_CONFIG["light"]) st.markdown( f""" """, unsafe_allow_html=True, ) API_URL = os.getenv("PUBLIC_API_URL", f"{PUBLIC_WEB_URL}/llm") FRONTEND_URL = PUBLIC_WEB_URL # ============================================================================= # ИНИЦИАЛИЗАЦИЯ СОСТОЯНИЯ СЕССИИ # ============================================================================= if 'session_id' not in st.session_state: st.session_state.session_id = None if 'current_report' not in st.session_state: st.session_state.current_report = None if 'report_loaded' not in st.session_state: st.session_state.report_loaded = False if "ui_theme" not in st.session_state: st.session_state.ui_theme = "light" # ============================================================================= # НОВЫЕ ФУНКЦИИ ДЛЯ РАБОТЫ С ВЛОЖЕНИЯМИ # ============================================================================= def collect_attachment_data_for_shipment(report: Dict, shipment: Dict) -> List[Dict]: """ Собирает реальные вложения (с content_base64) из писем, относящихся к данной перевозке. """ sources = report.get("sources", []) email_ids = shipment.get("ID_emails", []) attachments_data = [] for src in sources: if src.get("id") not in email_ids: continue for att in src.get("attachments", []): if att.get("content_base64"): attachments_data.append({ "filename": att["filename"], "content_base64": att["content_base64"] }) return attachments_data def collect_attachment_data_for_shipments(report: Dict, shipments: List[Dict]) -> List[Dict]: """ Собирает реальные вложения (с content_base64) из писем, относящихся к любому из указанных shipments. Дедупликация по (filename, size). """ sources = report.get("sources", []) email_ids: set = set() for s in shipments or []: for eid in (s.get("ID_emails", []) or []): email_ids.add(eid) seen: set = set() attachments_data: List[Dict] = [] for src in sources: if src.get("id") not in email_ids: continue for att in src.get("attachments", []): if not att.get("content_base64"): continue key = (att.get("filename"), att.get("size")) if key in seen: continue seen.add(key) attachments_data.append({ "filename": att["filename"], "content_base64": att["content_base64"] }) return attachments_data def create_eml_with_attachments(letter_text: str, attachments_data: List[Dict]) -> bytes: """ Создаёт MIME-сообщение с текстом письма и вложениями. Возвращает байтовое представление .eml файла. """ msg = EmailMessage() msg["X-Unsent"] = "1" msg.set_content(letter_text) for att in attachments_data: filename = att["filename"] ext = filename.split(".")[-1].lower() # пропускаем изображения if ext in IMAGE_EXTS: continue content = base64.b64decode(att["content_base64"]) mime_type, encoding = mimetypes.guess_type(filename) if mime_type is None: mime_type = 'application/octet-stream' maintype, subtype = mime_type.split('/', 1) msg.add_attachment( content, maintype=maintype, subtype=subtype, filename=filename ) return msg.as_bytes() # ============================================================================= # ОСНОВНАЯ ФУНКЦИЯ # ============================================================================= def main(): title_col, theme_col = st.columns([8, 2]) with title_col: st.title("🚚 SEPTEM Cargo Analytics") with theme_col: theme_label = st.selectbox( "Тема", options=["Светлая", "Тёмная"], index=0 if st.session_state.ui_theme == "light" else 1, key="ui_theme_selector", ) selected_theme = "light" if theme_label == "Светлая" else "dark" if selected_theme != st.session_state.ui_theme: st.session_state.ui_theme = selected_theme st.rerun() apply_ui_theme() query_params = st.query_params session_id = query_params.get("session_id", [None]) if isinstance(session_id, list): session_id = session_id[0] if session_id else None status_placeholder = st.empty() if session_id and not st.session_state.report_loaded: st.session_state.session_id = session_id status_placeholder.markdown("◌ Обрабатываем данные цепочки писем...") load_report(session_id) st.session_state.report_loaded = True status_placeholder.markdown("✅ Обработка писем завершена.") elif session_id and st.session_state.report_loaded: status_placeholder.markdown( "✅ Обработка писем завершена." ) with st.sidebar: st.header("📋 Сессия") if st.session_state.session_id: st.success(f"ID сессии: {st.session_state.session_id[:8]}...") if st.button("🔄 Обновить отчёт", type="primary", use_container_width=True): with st.spinner("Обновляем..."): load_report(st.session_state.session_id) st.rerun() else: st.info("Нет активной сессии. Используйте надстройку Outlook для отправки писем.") st.divider() if st.session_state.current_report: report = st.session_state.current_report st.metric("Всего писем проанализировано", report.get('total_emails_analyzed', 0)) shipments = report.get('structured_data', {}).get('shipments', []) st.metric("Найдено перевозок", len(shipments)) if st.session_state.current_report: display_cargo_report(st.session_state.current_report) else: display_welcome() # ============================================================================= # ФУНКЦИИ ЗАГРУЗКИ И ОТОБРАЖЕНИЯ # ============================================================================= def load_report(session_id: str): try: response = requests.post( f"{API_URL}/generate-cargo-report", json={"session_id": session_id} ) if response.status_code == 200: st.session_state.current_report = response.json() else: st.error(f"Ошибка загрузки отчёта: {response.status_code} - {response.text}") except Exception as e: st.error(f"Не удалось соединиться с сервером: {e}") def display_welcome(): col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.image("https://cdn-icons-png.flaticon.com/512/1995/1995571.png", width=200) st.markdown(""" Добро пожаловать в SEPTEM Cargo Analytics! Для начала работы: 1. Откройте Outlook 2. Выберите письма с информацией о грузоперевозках 3. Нажмите кнопку "Анализировать выбранные письма" в ленте 4. Данные автоматически загрузятся сюда Система проанализирует: - 📦 Типы грузов - 📍 Маршруты перевозок - ⚖️ Вес и объем - 📅 Сроки доставки - ⚠️ Особые условия """) def display_cargo_report(report: Dict): tab1, tab2, tab3 = st.tabs([ "📦 Обзор перевозок", "✉️ Подготовленный вариант письма", "📎 Все вложения" ]) with tab1: with st.container(key="Block_ship"): display_overview(report) with tab2: shipments = report.get('structured_data', {}).get('shipments', []) if not shipments: st.info("Нет перевозок для формирования письма") else: st.subheader("Письмо Контрагенту") letter_counterparty = generate_letter_for_shipments(shipments, always_confirmation=True) if letter_counterparty: st.components.v1.html( generate_copy_html( letter_counterparty, height=500, theme=st.session_state.get("ui_theme", "light"), ), height=550, ) else: st.info("Письмо Контрагенту не сгенерировано") # Вложения прикладываем ко всем письмам одинаково: всё, что относится к перевозкам attachments_data = collect_attachment_data_for_shipments(report, shipments) if st.session_state.session_id and letter_counterparty: st.divider() with st.container(): eml_cp = create_eml_with_attachments(letter_counterparty, attachments_data) st.download_button( label="📥 Скачать .eml", data=eml_cp, file_name="septem_cargo_counterparty.eml", mime="message/rfc822", key="download_eml", ) if attachments_data: st.success(f"✅ В .eml будет добавлено вложений: {len(attachments_data)}") with st.expander("Показать список прикреплённых файлов"): for att in attachments_data: st.write(f"- {att['filename']}") else: st.info("Вложения отсутствуют (или недоступны для прикрепления).") with tab3: display_all_attachments(report) # ============================================================================= # ФОРМАТИРОВАНИЕ ДАННЫХ ДЛЯ ШАБЛОНА # ============================================================================= def format_shipment_for_template( shipment: Dict, *, template_locale: str = "ru", ) -> Dict: """ template_locale: - \"ru\" — клиентские письма и русские шаблоны; - \"en\" — письмо контрагенту (confirmation_template): служебные фразы и метки на английском, факты из писем (адреса, описание груза) без перевода. """ empty = NO_INFO_TEXT_EN if template_locale == "en" else NO_INFO_TEXT def pick(*values, default=None): d = default if default is not None else empty for v in values: if is_informative_text(v): return str(v).strip() return d _ct_picked = pick( shipment.get("container_type"), shipment.get("container"), ) _ct_for_display = normalize_container_type_display( None if _ct_picked == empty else _ct_picked, empty=empty, ) data = { "client_name": pick(shipment.get("client_name")), "cargo_ready_date": format_cargo_ready_date_for_display( shipment, empty=empty ), "pickup_address": pick( shipment.get("pickup_address"), shipment.get("loading_port") ), "delivery_address": pick( shipment.get("delivery_address"), shipment.get("discharge_port") ), "total_weight_kg": pick( shipment.get("total_weight_kg"), shipment.get("weight"), shipment.get("gross_weight") ), "package_count": pick( shipment.get("package_count"), shipment.get("packages"), shipment.get("places") ), "total_volume_cbm": pick( shipment.get("total_volume_cbm"), shipment.get("volume") ), "cargo_description": pick( shipment.get("cargo_description"), shipment.get("cargo_name"), shipment.get("description") ), "hs_code": pick(shipment.get("hs_code")), "loading_port": pick( shipment.get("loading_port"), shipment.get("pickup_address") ), "discharge_port": pick( shipment.get("discharge_port"), shipment.get("delivery_address") ), "shipment_type": ( _shipping_type_to_en(shipment) if template_locale == "en" else pick(shipment.get("shipment_type")) ), "container_type": _ct_for_display, "vehicle_type": pick(shipment.get("vehicle_type")), "temperature_range": pick(shipment.get("temperature_range")), "msds_required": ( ( "Yes" if shipment.get("msds_required") is True else ("No" if shipment.get("msds_required") is False else empty) ) if template_locale == "en" else ( "✅ Да" if shipment.get("msds_required") is True else ("❌ Нет" if shipment.get("msds_required") is False else NO_INFO_TEXT) ) ), "estimated_cost": empty, "estimated_transit_time": empty, "dimensions_str": empty, "vehicle_dimensions_str": empty, "dangerous_goods_str": empty, "missing_fields": "" } dim_unit = "cm" if template_locale == "en" else "см" dims = shipment.get("dimensions", []) if isinstance(dims, list) and dims: dim_strings = [] for d in dims: if isinstance(d, dict): l = d.get("length_cm") w = d.get("width_cm") h = d.get("height_cm") if l and w and h: dim_strings.append(f"{l}×{w}×{h} {dim_unit}") if dim_strings: data["dimensions_str"] = "; ".join(dim_strings) vdims = shipment.get("vehicle_dimensions", []) if isinstance(vdims, list) and vdims: vdim_strings = [] for d in vdims: if isinstance(d, dict): l = d.get("length_cm") w = d.get("width_cm") h = d.get("height_cm") if l and w and h: vdim_strings.append(f"{l}×{w}×{h} {dim_unit}") if vdim_strings: data["vehicle_dimensions_str"] = "; ".join(vdim_strings) data["dangerous_goods_str"] = ( format_dangerous_goods_summary_en(shipment) if template_locale == "en" else format_dangerous_goods_summary(shipment) ) if data.get("cargo_description") == empty: dangerous_hint = data.get("dangerous_goods_str", "") hs = data.get("hs_code", "") if dangerous_hint and dangerous_hint not in (empty, NO_INFO_TEXT, NO_INFO_TEXT_EN): if template_locale == "en": data["cargo_description"] = f"Dangerous cargo ({dangerous_hint})" else: data["cargo_description"] = f"Опасный груз ({dangerous_hint})" elif hs and hs not in (empty, NO_INFO_TEXT, NO_INFO_TEXT_EN): if template_locale == "en": data["cargo_description"] = f"Cargo under HS code {hs}" else: data["cargo_description"] = f"Груз по коду ТН ВЭД {hs}" shipping_options = shipment.get("shipping_options", []) if isinstance(shipping_options, list) and shipping_options: opt = shipping_options[0] if isinstance(opt, dict): if is_informative_text(opt.get("cost")): data["estimated_cost"] = opt.get("cost") if is_informative_text(opt.get("transit_time")): data["estimated_transit_time"] = opt.get("transit_time") return data def collect_attachments_for_shipment(report: Dict, shipment: Dict): """ Собирает список вложений из писем, которые относятся к конкретной перевозке. """ attachments = [] sources = report.get("sources", []) email_ids = shipment.get("ID_emails", []) for src in sources: if src.get("id") not in email_ids: continue for att in src.get("attachments", []): name = att.get("filename") if name: attachments.append(name) return attachments # ============================================================================= # ГЕНЕРАЦИЯ ПИСЬМА ИЗ ШАБЛОНА # ============================================================================= def generate_letter_from_template( shipment: Dict, template_obj: Dict, always_confirmation: bool = False, *, overview_client_letter: bool = False, ) -> str: """ Генерирует письмо из шаблона. Args: shipment: Данные о перевозке template_obj: Объект шаблона с confirmation_template и info_request_template always_confirmation: Если True, всегда использовать confirmation_template (для вкладки 2) Если False, использовать условную логику (для вкладки 1) overview_client_letter: Если True и always_confirmation=False — только русское письмо клиенту (info_request_template), без английского confirmation_template («контрагенту»). """ required_fields = [ "client_name", "incoterms", "cargo_ready_date", "pickup_address", "cargo_value", "package_count", "total_weight_kg", "dimensions", "total_volume_cbm", "cargo_description", "delivery_address" ] missing = [] for field in required_fields: val = shipment.get(field) if field == "cargo_ready_date": if isinstance(val, list): if not any(str(x).strip() for x in val if x is not None): missing.append(field) elif val is None or (isinstance(val, str) and not val.strip()): missing.append(field) continue if field == "dimensions": if not val or not isinstance(val, list) or len(val) == 0: missing.append(field) elif field == "delivery_address": if not is_specific_address(val): missing.append(field) elif field == "pickup_address": if not is_specific_address(val): missing.append(field) elif ( val is None or (isinstance(val, (int, float)) and float(val) == 0) or ( isinstance(val, str) and val.strip() in {"", "0", "0.0", "0,0", "0,00"} ) ): missing.append(field) missing.extend(collect_extra_required_missing_for_template(shipment, template_obj)) missing = list(dict.fromkeys(missing)) # Собираем названия недостающих полей missing_labels = [FIELD_LABELS.get(f, f) for f in missing] # --- НОВЫЙ БЛОК: проверка наличия обязательных документов --- doc_found = shipment.get("documents_found", {}) # MSDS if shipment.get("msds_required") and not doc_found.get("msds"): missing_labels.append("MSDS (паспорт безопасности)") # DGM report if shipment.get("dgm_report_required") and not doc_found.get("dgm"): missing_labels.append("DGM report (декларация на опасный груз)") # Авторизационное письмо бренда if shipment.get("brand_authorization_letter") and not doc_found.get("brand_authorization"): missing_labels.append("Авторизационное письмо от бренда") # Лицензию агента больше не запрашиваем как отдельный документ. # ---------------------------------------------------------- if overview_client_letter: template_locale = "ru" elif always_confirmation: template_locale = "en" elif missing_labels: template_locale = "ru" else: template_locale = "en" data = format_shipment_for_template(shipment, template_locale=template_locale) empty_addr = NO_INFO_TEXT_EN if template_locale == "en" else NO_INFO_TEXT if not is_specific_address(data.get("delivery_address")): data["delivery_address"] = empty_addr if not is_specific_address(data.get("pickup_address")): data["pickup_address"] = empty_addr if always_confirmation: # Для вкладки 2 - всегда confirmation_template template = template_obj.get("confirmation_template", "") elif overview_client_letter: # Вкладка «Обзор»: только письмо клиенту (русский шаблон), не confirmation_template template = template_obj.get("info_request_template", "") if missing_labels: data["missing_fields"] = "\n".join(f"- {item}" for item in missing_labels) else: data["missing_fields"] = ( "- По имеющимся в переписке данным необходимая информация для предварительной оценки получена.\n" "- Мы подготовим расчёт и свяжемся с вами в ближайшее время." ) else: # Прочие случаи: запрос недостающего или confirmation при полном комплекте if missing_labels: template = template_obj.get("info_request_template", "") data["missing_fields"] = "\n".join(f"- {item}" for item in missing_labels) else: template = template_obj.get("confirmation_template", "") try: return auto_fill_template(template, data, FIELD_LABELS) except KeyError as e: return f"Ошибка в шаблоне: поле {{{e}}} не найдено в данных" except Exception as e: return f"Ошибка при формировании письма: {e}" def _extract_greeting_and_signature(letter_text: str) -> tuple[str, str, str]: """ Пытается вытащить из текста письма: - greeting: первый абзац (обычно "Уважаемый ...!") - signature: абзац начиная с "С уважением," - body: остальной текст между ними """ text = clean_email_text(letter_text or "") if not text: return "", "", "" lines = text.splitlines() # greeting = строки до первой пустой строки (включая её не берем) greeting_lines: List[str] = [] i = 0 while i < len(lines) and lines[i].strip() != "": greeting_lines.append(lines[i].rstrip()) i += 1 greeting = "\n".join(greeting_lines).strip() # signature = ищем "С уважением" (последний блок) sig_idx = None for idx in range(len(lines) - 1, -1, -1): low = lines[idx].strip().lower() if low.startswith("с уважением") or low.startswith("best regards"): sig_idx = idx break if sig_idx is not None: signature = "\n".join(lines[sig_idx:]).strip() body_lines = lines[i:sig_idx] else: signature = "" body_lines = lines[i:] body = clean_email_text("\n".join(body_lines)) return greeting, body, signature def _parse_bullet_lines(bullets_block: str) -> List[str]: """Строки списка вида «- …» из блока bullets.""" if not bullets_block or not str(bullets_block).strip(): return [] out: List[str] = [] for line in bullets_block.splitlines(): st = line.strip() if st.startswith("-"): out.append(st) return out def _bullet_dedup_key(line: str) -> str: s = line.strip() if s.startswith("-"): s = s[1:].strip() return re.sub(r"\s+", " ", s).lower() def generate_letter_for_shipments( shipments: List[Dict], always_confirmation: bool = False, *, overview_client_letter: bool = False, ) -> str: """ Генерирует ОДНО письмо по нескольким перевозкам: - секции «Перевозка (i) — <тип>» для каждой найденной комбинации перевозка + тип перевозки; - шаблон письма подбирается по названию типа перевозки; - одно приветствие и одна подпись (из первого сгенерированного блока). """ shipments = [s for s in (shipments or []) if isinstance(s, dict)] if not shipments or not shipping_templates: return "" def _candidates_for_letter(sh: Dict) -> List[Dict]: cands = sh.get("shipping_type_candidates") if isinstance(cands, list) and cands: return [c for c in cands if isinstance(c, dict)] st_name = sh.get("shipping_type") or "" return [{"shipping_type": st_name, "criteria": sh.get("criteria", "")}] default_template_obj = shipping_templates[0] sections: List[str] = [] greeting = "" signature = "" intro_part = "" outro_part = "" first_info_block_seen = False def _extract_intro_bullets_outro(body: str) -> tuple[str, str, str]: """ Для info_request_template: в body есть интро, затем маркированный список "- ...", затем продолжение (начинается обычно с "Пожалуйста, ..."). """ if not body: return "", "", "" # Начало буллетов: строка, начинающаяся с "- " m = re.search(r"(^|\n)-\s", body) if not m: # На всякий случай: если буллеты не найдены, считаем, что весь body - список return "", body.strip(), "" bullet_start = m.start() please_idx = body.find("Пожалуйста") bullet_end = please_idx if please_idx != -1 else len(body) intro = body[:bullet_start].rstrip() bullets = body[bullet_start:bullet_end].strip() outro = body[bullet_end:].strip() return intro, bullets, outro for idx, shipment in enumerate(shipments, start=1): variants = _candidates_for_letter(shipment) if always_confirmation: for variant in variants: type_name = (variant.get("shipping_type") or "").strip() or "Тип не указан" type_name_en = SHIPPING_TYPE_NAME_EN.get(type_name, type_name) template_obj = None if type_name and type_name != "Тип не указан": template_obj = next( (t for t in shipping_templates if t.get("name") == type_name), None, ) if not template_obj: template_obj = default_template_obj letter_one = generate_letter_from_template( shipment, template_obj, always_confirmation=True ) g, body, sig = _extract_greeting_and_signature(letter_one) if not greeting and g: greeting = g if not signature and sig: signature = sig header = f"Shipment ({idx}) — {type_name_en}" section_body = body if body else clean_email_text(letter_one) sections.append(f"{header}\n\n{section_body}".strip()) continue # Письмо-запрос: при нескольких способах доставки общие недостающие пункты — один раз variant_rows: List[Dict] = [] for variant in variants: type_name = (variant.get("shipping_type") or "").strip() or "Тип не указан" template_obj = None if type_name and type_name != "Тип не указан": template_obj = next( (t for t in shipping_templates if t.get("name") == type_name), None, ) if not template_obj: template_obj = default_template_obj letter_one = generate_letter_from_template( shipment, template_obj, always_confirmation=False, overview_client_letter=overview_client_letter, ) g, body, sig = _extract_greeting_and_signature(letter_one) if not greeting and g: greeting = g if not signature and sig: signature = sig intro_i, bullets_i, outro_i = _extract_intro_bullets_outro(body) if not first_info_block_seen: intro_part, outro_part = intro_i, outro_i first_info_block_seen = True variant_rows.append({ "type_name": type_name, "header": f"Перевозка ({idx}) — {type_name}", "bullets_raw": bullets_i, "body": body, "letter_one": letter_one, }) if len(variant_rows) <= 1: row = variant_rows[0] if row["bullets_raw"]: sections.append(f"{row['header']}\n{row['bullets_raw'].strip()}") elif row["body"].strip(): sections.append(f"{row['header']}\n{row['body'].strip()}".strip()) continue bullet_lists = [_parse_bullet_lines(row["bullets_raw"]) for row in variant_rows] if not any(bullet_lists): for row in variant_rows: if row["body"].strip(): sections.append(f"{row['header']}\n{row['body'].strip()}".strip()) continue type_names = [row["type_name"] for row in variant_rows] types_joined = ", ".join(type_names) merged_header = f"Перевозка ({idx}) — {types_joined}" # Если не у всех вариантов есть буллеты — полного пересечения нет: один объединённый список без дублей if not all(len(bl) > 0 for bl in bullet_lists): seen_u: set = set() merged_u: List[str] = [] for bl in bullet_lists: for l in bl: k = _bullet_dedup_key(l) if k not in seen_u: seen_u.add(k) merged_u.append(l) if merged_u: sections.append(f"{merged_header}\n\n" + "\n".join(merged_u).strip()) continue key_sets = [set(_bullet_dedup_key(l) for l in bl) for bl in bullet_lists] common_keys = set.intersection(*key_sets) first_nonempty = next((bl for bl in bullet_lists if bl), bullet_lists[0]) common_lines = [l for l in first_nonempty if _bullet_dedup_key(l) in common_keys] if common_lines: sections.append(f"{merged_header}\n\n" + "\n".join(common_lines).strip()) for i, row in enumerate(variant_rows): residual = [ l for l in bullet_lists[i] if _bullet_dedup_key(l) not in common_keys ] if residual: sections.append( f"{row['header']} (дополнительно по этому способу)\n" + "\n".join(residual) ) if not always_confirmation: combined_body = "\n\n".join(sections).strip() parts = [p for p in [greeting, intro_part, combined_body, outro_part, signature] if p] return clean_email_text("\n\n".join(parts)) combined_body = "\n\n".join(sections).strip() parts = [p for p in [greeting, combined_body, signature] if p] return clean_email_text("\n\n".join(parts)) def _load_criteria_interface_layout() -> Dict: """Разметка чек-листа как на листах Excel «… Интерфейс» (criteria_interface_layout.json).""" global _CRITERIA_INTERFACE_LAYOUT_CACHE if _CRITERIA_INTERFACE_LAYOUT_CACHE is not None: return _CRITERIA_INTERFACE_LAYOUT_CACHE path = os.path.join(_UI_DIR, "criteria_interface_layout.json") if not os.path.isfile(path): _CRITERIA_INTERFACE_LAYOUT_CACHE = {} return _CRITERIA_INTERFACE_LAYOUT_CACHE try: with open(path, "r", encoding="utf-8") as f: _CRITERIA_INTERFACE_LAYOUT_CACHE = json.load(f) except Exception: _CRITERIA_INTERFACE_LAYOUT_CACHE = {} return _CRITERIA_INTERFACE_LAYOUT_CACHE def _parse_criteria_by_number(criteria_text: str) -> Dict[str, str]: """Ключ — номер пункта из criteria (как в shipping_types), значение — текст критерия.""" out: Dict[str, str] = {} if not isinstance(criteria_text, str) or not criteria_text.strip(): return out for line in criteria_text.splitlines(): line = line.strip() if not line or "\t" not in line: continue num, crit = line.split("\t", 1) n = num.strip() if n: out[n] = crit.strip() return out def _shipping_type_variants_for_display(shipment: Dict) -> List[Dict]: cands = shipment.get("shipping_type_candidates") if isinstance(cands, list) and cands: return [c for c in cands if isinstance(c, dict)] name = shipment.get("shipping_type") or shipment.get("shipment_type") or "" return [{"shipping_type": name, "criteria": shipment.get("criteria", "")}] def render_shipment_criteria_checklist( shipment: Dict, criteria_text: str, shipping_type_name: str = "" ) -> None: """Критерии выбранного типа перевозки и ответы по данным перевозки.""" if not isinstance(criteria_text, str) or not criteria_text.strip(): st.info("Для этого типа перевозки критерии в конфигурации не заданы.") return docs = shipment.get("documents_found", {}) or {} def _yes_no_or_no_info(v) -> str: if v is None: return NO_INFO_TEXT return "✅ да" if bool(v) else "❌ нет" def _collect_text_blob(value) -> str: chunks: List[str] = [] def _walk(v): if isinstance(v, str): s = v.strip() if s: chunks.append(s) return if isinstance(v, dict): for vv in v.values(): _walk(vv) return if isinstance(v, list): for vv in v: _walk(vv) _walk(value) return " ".join(chunks) text_blob = _collect_text_blob(shipment) def _required_doc(v_required, doc_type: str) -> str: if v_required is None: return NO_INFO_TEXT if v_required is False: return "Не нужен" if not bool(v_required): return NO_INFO_TEXT found_docs = docs.get(doc_type, []) or [] if found_docs: filenames = [d.get("filename", "") for d in found_docs if d.get("filename")] files_str = ", ".join(filenames) if filenames else "документ найден" return f"Нужен (документ найден: {files_str})" return "Нужен (документ не найден)" def _format_dimensions(dims) -> str: parts: List[str] = [] seen = set() if isinstance(dims, list): for d in dims: if isinstance(d, dict): l = d.get("length_cm") w = d.get("width_cm") h = d.get("height_cm") if l and w and h: key = f"{l}x{w}x{h}".lower() if key not in seen: seen.add(key) parts.append(f"{l}×{w}×{h} см") return "; ".join(parts) if parts else NO_INFO_TEXT def _join_desc_and_hs(sh: Dict) -> str: desc = (sh.get("cargo_description") or "").strip() hs = (sh.get("hs_code") or "").strip() if not desc and not hs: return NO_INFO_TEXT desc_part = desc if desc else NO_INFO_TEXT hs_part = hs if hs else NO_INFO_TEXT return f"{desc_part} / Код ТН ВЭД: {hs_part}" def _find_temperature_range(sh: Dict) -> str: direct = (sh.get("temperature_range") or "").strip() if direct: return direct # Пытаемся вытащить диапазон из других текстовых полей, если парсер не выделил его в отдельный ключ. scan_fields = [ sh.get("special_transport_requirements"), sh.get("vehicle_type"), sh.get("cargo_description"), sh.get("shipment_type"), sh.get("shipping_type"), ] joined = " ".join(str(x) for x in scan_fields if isinstance(x, str) and x.strip()) if not joined: return NO_INFO_TEXT m = re.search(r"([+-]?\d{1,2}\s*(?:\.\.|-|до)\s*[+-]?\d{1,2})", joined, flags=re.IGNORECASE) if m: return m.group(1).replace(" ", "") return NO_INFO_TEXT def _infer_fcl_lcl(sh: Dict) -> str: shipment_mode = (sh.get("shipment_type") or "").strip() if shipment_mode: return shipment_mode blob = (text_blob + " " + str(sh.get("cargo_description") or "")).lower() if re.search(r"сборн\w*\s+трак|\bltl\b", blob): return NO_INFO_TEXT if re.search( r"\blcl\b|сборн\w*\s+контейнер|сборн\w*\s+груз|группаж|консолидац|" r"consolidat\w*|groupage|less\s+than\s+container", blob, ): return "LCL" if re.search( r"\bfcl\b|цельн\w*\s+контейнер|полн\w*\s+контейнер|" r"отдельн\w*\s+контейнер|full\s+container\s+load", blob, ): return "FCL" return NO_INFO_TEXT def _has_batteries(sh: Dict) -> Optional[bool]: dg = sh.get("dangerous_goods") if not isinstance(dg, dict): return None if dg.get("batteries") is True: return True if dg.get("batteries") is False: return False return None def _msds_negative_phrase() -> Optional[str]: pattern = ( r"(не\s+(?:предостав(?:ляет|им|ят)|дают|высыла(?:ет|ют)|имеется|имеют|прикладыв(?:ает|ают))\s+msds)" r"|(?:msds\s+(?:не\s+предостав(?:лен|ляется)|отсутствует|нет))" ) if re.search(pattern, text_blob, flags=re.IGNORECASE): return "Поставщик не предоставляет MSDS" return None def _answer_customs(criterion_lower: str) -> str: place = (shipment.get("customs_clearance_place_export_rf") or "").strip() needed = shipment.get("customs_clearance_required") if "место таможенного" in criterion_lower or ( "таможенн" in criterion_lower and "место" in criterion_lower ): return place or NO_INFO_TEXT if "таможенн" in criterion_lower: if needed is None and not place: return NO_INFO_TEXT if needed is False: return "не нужно" if needed is True: if place: return f"нужно (место: {place})" return f"нужно (место: {NO_INFO_TEXT.lower()})" if place: return place return NO_INFO_TEXT return NO_INFO_TEXT def _format_volume_cbm() -> str: v = shipment.get("total_volume_cbm") if v is None or (isinstance(v, (int, float)) and v == 0): return NO_INFO_TEXT return f"{v} м³" def _container_spec_drop_bare_types() -> str: """То же отображение контейнеров, что и в письме контрагенту.""" raw = (shipment.get("container_type") or "").strip() return normalize_container_type_display(raw or None, empty=NO_INFO_TEXT) def _answer_for_criterion(criterion_text: str) -> str: c = criterion_text.lower() if "название клиента" in c: return (shipment.get("client_name") or "").strip() or NO_INFO_TEXT if "условия поставки" in c and "incoterms" in c: return (shipment.get("incoterms") or "").strip() or NO_INFO_TEXT if "тип перевозки" in c and "fcl" in c and "lcl" in c: return _infer_fcl_lcl(shipment) if "контейнер" in c and ( "типоразмер" in c or ("количество" in c and "типоразмер" in c) or ("fcl" in c and ("тип" in c or "количеств" in c)) ): return _container_spec_drop_bare_types() if "дата готовности" in c: return format_cargo_ready_date_for_display(shipment) if ( "пункт отправления" in c or "адрес забора" in c or "порт отправления" in c or "порт погрузки" in c or ("станция" in c and ("отправлен" in c or "погруз" in c)) ): pickup = (shipment.get("pickup_address") or "").strip() return pickup if is_specific_address(pickup) else NO_INFO_TEXT if ( "пункт назначения" in c or "адрес доставки" in c or "порт назначения" in c or "порт выгрузки" in c or ("станция" in c and ("назнач" in c or "выгруз" in c)) ): delivery = (shipment.get("delivery_address") or "").strip() return delivery if is_specific_address(delivery) else NO_INFO_TEXT if "стоимость груза" in c: return (shipment.get("cargo_value") or "").strip() or NO_INFO_TEXT if "количество грузовых мест" in c and "вес" in c and ("объем" in c or "объём" in c): pkg = shipment.get("package_count") w = shipment.get("total_weight_kg") pkg_ok = pkg is not None and str(pkg).strip() not in ("", "0", "0.0") w_ok = w is not None and str(w).strip() not in ("", "0", "0.0") pkg_str = str(pkg).strip() if pkg_ok else "" w_str = f"{w} кг" if w_ok else "" vol_str = _format_volume_cbm() if vol_str == NO_INFO_TEXT: vol_str = "" return " / ".join([s for s in [pkg_str, w_str, vol_str] if s]) or NO_INFO_TEXT if "количество грузовых мест" in c and "вес" in c: pkg = shipment.get("package_count") w = shipment.get("total_weight_kg") pkg_ok = pkg is not None and str(pkg).strip() not in ("", "0", "0.0") w_ok = w is not None and str(w).strip() not in ("", "0", "0.0") pkg_str = str(pkg).strip() if pkg_ok else "" w_str = f"{w} кг" if w_ok else "" return " / ".join([s for s in [pkg_str, w_str] if s]) or NO_INFO_TEXT if "количество грузовых мест" in c: pkg = shipment.get("package_count") if pkg is None or str(pkg).strip() in ("", "0", "0.0"): return NO_INFO_TEXT return str(pkg).strip() if "общий объ" in c or "общий объём" in c or "общий объем" in c: return _format_volume_cbm() if "вес" in c and "количество" not in c: w = shipment.get("total_weight_kg") if w is None or str(w).strip() in ("", "0", "0.0"): return NO_INFO_TEXT return f"{w} кг" if ("проём" in c or "проем" in c) and ( "контейнер" in c or "20dv" in c or "40dv" in c or "40hc" in c or "проходимость" in c ): return door_clearance_summary(shipment) # Габариты грузовых мест (LTL, FCL «без контейнеров», и п.8 FTL: «если нет данных о количестве # машин — габариты каждого грузового места…»). Слово «машин» здесь не про ТС — иначе ниже # сработала бы ветка vehicle_dimensions и габариты груза не показались бы. if "габарит" in c and "грузов" in c and "мест" in c: return _format_dimensions(shipment.get("dimensions")) if "габарит" in c and ( "транспортн" in c or "машин" in c or "средств" in c or "кузов" in c or "прицеп" in c or "полуприцеп" in c ): return _format_dimensions(shipment.get("vehicle_dimensions")) if "габарит" in c: return _format_dimensions(shipment.get("dimensions")) if "характер груза" in c or "наименование груза" in c: return _join_desc_and_hs(shipment) if "код тн" in c: return (shipment.get("hs_code") or "").strip() or NO_INFO_TEXT # Перечень DG (авиа п.9, море/авто и т.д.) и формулировки интерфейса («Наличие батарейк, газов…»). if ( "содержатся ли в грузе" in c or "опасные вещества" in c or "опасные свойства" in c or ( ("батарейки" in c or "батарейк" in c) and ( "газы" in c or "газов" in c or "жидкост" in c or "аэрозол" in c or "хладоагент" in c or "сухой лёд" in c or "сухой лед" in c ) ) ): return format_dangerous_goods_summary(shipment) if "штабел" in c and "между собой" in c: return _yes_no_or_no_info(shipment.get("stackable_among_themselves")) if "штабел" in c and "с другими отправками" in c: return _yes_no_or_no_info(shipment.get("stackable_with_others")) if "msds" in c or "паспорт безопасности" in c: msds_negative = _msds_negative_phrase() if msds_negative: return msds_negative if "батаре" in c and _has_batteries(shipment) is not True: return NO_INFO_TEXT return _required_doc(shipment.get("msds_required"), "msds") if re.search(r"(? str: """Одна карточка для вертикального списка (без flex-строки).""" esc = html.escape prefix = f"{esc(str(num))}. " if str(num).strip() else "" return ( "
" f"
{prefix}{esc(crit)}
" f"
Ответ: {esc(answer)}
" "
" ) def _criterion_card_flex_cell( num: str, crit: str, answer: str, *, full_width: bool = False ) -> str: """Ячейка внутри flex-строки: одинаковая высота с соседями (align-items: stretch).""" esc = html.escape prefix = f"{esc(str(num))}. " if str(num).strip() else "" flex_basis = ( "0 0 auto;width:100%;min-width:0" if full_width else "1 1 0;min-width:0" ) return ( f'
' f'
{prefix}{esc(crit)}
' f'
' f'Ответ: {esc(answer)}
' "
" ) def _flex_row_equal_height_html( inner_cells: str, *, margin_bottom_px: int = 10 ) -> str: """Одна линия карточек: высота всех ячеек равна максимальной в строке.""" return ( f'
' f"{inner_cells}" "
" ) def _auto_ltl_ftl_top_section_html( nums: List[str], labels: List[str], r67: Dict[str, Any], ) -> str: """ Верхний блок как в Excel «Авто … интерфейс»: слева 1–3,5,8/9 (5 карточек) на всю высоту; справа сверху 4 и последний столбец (19/18), под ними на ширину правой колонки — таможня (20/19). """ left_parts: List[str] = [] for i in range(5): n = str(nums[i]).strip() crit = by_num.get(n, labels[i]) left_parts.append(_criterion_card_flex_cell(n, crit, _answer_for_criterion(crit))) left_html = "".join(left_parts) n4 = str(nums[5]).strip() crit4 = by_num.get(n4, labels[5]) n_last = str(nums[6]).strip() crit_last = by_num.get(n_last, labels[6]) top_pair = _flex_row_equal_height_html( _criterion_card_flex_cell(n4, crit4, _answer_for_criterion(crit4)) + _criterion_card_flex_cell(n_last, crit_last, _answer_for_criterion(crit_last)), margin_bottom_px=0, ) rn = str(r67.get("num", "")).strip() rlab = (r67.get("label") or "").strip() crit_r = by_num.get(rn, rlab) bottom = _criterion_card_flex_cell(rn, crit_r, _answer_for_criterion(crit_r), full_width=True) right_col = ( f'
' f"{top_pair}" f"{bottom}" "
" ) left_col = ( '
' f"{left_html}" "
" ) return ( '
' f"{left_col}{right_col}" "
" ) by_num = _parse_criteria_by_number(criteria_text) # Показываем верхние 5 пунктов всегда (независимо от layout JSON), # т.к. в некоторых деплоях header-блок может не приходить/не совпадать по формату. # Для FCL по требованию: 5-й верхний столбец должен быть пунктом 9. st_name_for_top = (shipping_type_name or "").strip() fcl_types = { "Морская перевозка (FCL)", "Железнодорожная перевозка (FCL)", "Мультимодальная перевозка море + ж/д (FCL)", } if st_name_for_top in fcl_types: top5_nums = ("1", "2", "3", "5", "9") else: top5_nums = ("1", "2", "3", "5", "8") top5_cells: List[str] = [] for n in top5_nums: crit = by_num.get(n) if isinstance(crit, str) and crit.strip(): top5_cells.append(_criterion_card_flex_cell(n, crit, _answer_for_criterion(crit))) if top5_cells: st.markdown(_flex_row_equal_height_html("".join(top5_cells)), unsafe_allow_html=True) layout_entry = _load_criteria_interface_layout().get("by_shipping_type", {}).get( (shipping_type_name or "").strip() ) blocks = layout_entry.get("blocks") if isinstance(layout_entry, dict) else None _AUTO_TYPES = frozenset( {"Автомобильная перевозка (LTL)", "Автомобильная перевозка (FTL)"} ) if isinstance(blocks, list) and blocks: bi = 0 while bi < len(blocks): block = blocks[bi] if not isinstance(block, dict): bi += 1 continue bt = block.get("type") st_name = (shipping_type_name or "").strip() merge_auto_top = ( st_name in _AUTO_TYPES and bt == "header" and bi + 1 < len(blocks) and isinstance(blocks[bi + 1], dict) and blocks[bi + 1].get("type") == "right_67" ) if merge_auto_top: nums = block.get("nums") or [] labels = block.get("labels") or [] if len(nums) == len(labels) == 7: st.markdown( _auto_ltl_ftl_top_section_html(nums, labels, blocks[bi + 1]), unsafe_allow_html=True, ) bi += 2 continue if bt == "header": nums = block.get("nums") or [] labels = block.get("labels") or [] if len(nums) == len(labels) and len(nums) > 0: cells: List[str] = [] for i in range(len(nums)): n = str(nums[i]).strip() if n in top5_nums: # Верхние 5 уже отрисованы принудительно единым блоком. continue crit = by_num.get(n, labels[i]) ans = _answer_for_criterion(crit) cells.append(_criterion_card_flex_cell(n, crit, ans)) if cells: st.markdown( _flex_row_equal_height_html("".join(cells)), unsafe_allow_html=True, ) bi += 1 continue if bt == "right_67": n = str(block.get("num", "")).strip() lab = (block.get("label") or "").strip() crit = by_num.get(n, lab) st.markdown( _criterion_card_html(n, crit, _answer_for_criterion(crit)), unsafe_allow_html=True, ) bi += 1 continue if bt == "pair": left = block.get("left") or {} right = block.get("right") or {} if isinstance(left, dict) and isinstance(right, dict): n = str(left.get("num", "")).strip() lab = (left.get("label") or "").strip() crit_l = by_num.get(n, lab) ans_l = _answer_for_criterion(crit_l) n2 = str(right.get("num", "")).strip() lab2 = (right.get("label") or "").strip() crit_r = by_num.get(n2, lab2) ans_r = _answer_for_criterion(crit_r) st.markdown( _flex_row_equal_height_html( _criterion_card_flex_cell(n, crit_l, ans_l) + _criterion_card_flex_cell(n2, crit_r, ans_r) ), unsafe_allow_html=True, ) bi += 1 continue if bt == "left_12": n = str(block.get("num", "")).strip() lab = (block.get("label") or "").strip() crit = by_num.get(n, lab) st.markdown( _criterion_card_html(n, crit, _answer_for_criterion(crit)), unsafe_allow_html=True, ) bi += 1 continue bi += 1 return criteria_lines = [l for l in criteria_text.splitlines() if l.strip()] criteria_items: List[tuple[str, str]] = [] for line in criteria_lines: if "\t" in line: num, crit = line.split("\t", 1) criteria_items.append((num.strip(), crit.strip())) else: criteria_items.append(("", line.strip())) for num, crit in criteria_items: answer = _answer_for_criterion(crit) st.markdown(_criterion_card_html(num, crit, answer), unsafe_allow_html=True) # ============================================================================= # ОТОБРАЖЕНИЕ ОБЗОРА ПЕРЕВОЗОК # ============================================================================= def display_overview(report: Dict): st.subheader("Детальная информация о грузоперевозке") data = report.get('structured_data', {}) shipments = data.get('shipments', []) sources = report.get('sources', []) if not shipments: st.info("Нет структурированных данных о перевозках") return for idx, shipment in enumerate(shipments): with st.expander(f"🚚 Перевозка #{idx+1}", expanded=(idx==0)): with st.container(border=True): display_emails(report, shipment.get("ID_emails", []), idx) variants = _shipping_type_variants_for_display(shipment) has_any_criteria = any( isinstance(v.get("criteria"), str) and str(v.get("criteria", "")).strip() for v in variants ) st.markdown("##### Отчёт по возможным типам перевозки") for v_idx, variant in enumerate(variants): type_title = (variant.get("shipping_type") or "").strip() or "Тип не указан" crit_raw = variant.get("criteria", "") crit = crit_raw if isinstance(crit_raw, str) else "" with st.expander(f"📋 {type_title}", expanded=(v_idx == 0)): render_shipment_criteria_checklist(shipment, crit, type_title) def should_show(key, value): if key in ( "shipping_options", "shipping_type", "shipping_type_candidates", "requested_shipping_type_names", ): return False if key == "criteria_preview": return False if has_any_criteria: return False if value is None: return False if isinstance(value, str) and value.strip() == "": return False if isinstance(value, list) and len(value) == 0: return False if isinstance(value, dict) and key == "dangerous_goods": return format_dangerous_goods_summary(shipment) != NO_INFO_TEXT return True fields_to_show = [] for key, value in shipment.items(): fields_to_show.append((key, value)) col1, col2 = st.columns(2) mid = len(fields_to_show) // 2 for i, (key, value) in enumerate(fields_to_show): if key in ["ID_emails", "documents_found"]: continue if not should_show(key, value): continue with col1 if i < mid else col2: label = FIELD_LABELS.get(key, key) if key in ("dimensions", "vehicle_dimensions"): dims = [] for d in value: if isinstance(d, dict) and d.get('length_cm') and d.get('width_cm') and d.get('height_cm'): dims.append(f"{d['length_cm']}×{d['width_cm']}×{d['height_cm']} см") value_str = "; ".join(dims) if dims else NO_INFO_TEXT elif key == "criteria" and isinstance(value, str): # criteria может быть: # - сырой: "1\tНазвание...\n2\t..." # - короткий: "Клиент, Код ТН ВЭД, Требуется MSDS;" raw = value.strip() if "\t" not in raw and (";" in raw or "," in raw) and "\n" not in raw: import re as _re parts = [p.strip() for p in _re.split(r";\\s*|,\\s*", raw) if p.strip()] value_str = "
".join(parts) if parts else NO_INFO_TEXT else: lines = [l.strip() for l in value.splitlines() if l.strip()] formatted: List[str] = [] for l in lines: if "\t" in l: num, text = l.split("\t", 1) formatted.append(f"{num.strip()}. {text.strip()}") else: formatted.append(l) value_str = "
".join(formatted) if formatted else NO_INFO_TEXT elif key == "dangerous_goods" and isinstance(value, dict): value_str = format_dangerous_goods_summary(shipment) elif key == "cargo_ready_date": value_str = format_cargo_ready_date_for_display(shipment) elif isinstance(value, bool): # Только для документов: "нужен/не нужен" doc_bool_keys = {"msds_required", "dgm_report_required", "brand_authorization_letter"} base_str = ("нужен" if value else "не нужен") if key in doc_bool_keys else ("✅ Да" if value else "❌ Нет") docs = shipment.get("documents_found", {}) doc_map = { "msds_required": "msds", "dgm_report_required": "dgm", "brand_authorization_letter": "brand_authorization", } if key in doc_map and value: doc_type = doc_map[key] found_docs = docs.get(doc_type, []) if found_docs: filenames = [d["filename"] for d in found_docs] value_str = f"{base_str} (документ найден: {', '.join(filenames)})" else: value_str = f"{base_str} (документ не найден)" else: value_str = base_str elif isinstance(value, list): value_str = ", ".join(str(v) for v in value) if value else NO_INFO_TEXT elif value == None or value == "": value_str = NO_INFO_TEXT else: value_str = str(value) if key == "total_weight_kg" and isinstance(value, (int, float)): value_str = f"{value} кг" elif key == "total_volume_cbm" and isinstance(value, (int, float)): value_str = f"{value} м³" if key == "shipping_options": continue render_val = ( value_str if key in ("criteria", "cargo_ready_date", "hs_code") else value_str.capitalize() ) st.markdown( f"
{label}: {render_val}
", unsafe_allow_html=True ) with st.container(border=True): st.markdown("#### 📋 Найденные документы") docs = shipment.get("documents_found", {}) for doc_type, files in docs.items(): if not files: continue st.markdown(f"**{doc_type.upper()}**") for f in files: st.markdown(f"- {f['filename']} (из письма: {f['email_subject']})") # Письмо клиенту — строго ОДНО, под всеми перевозками with st.expander("✉️ Письмо клиенту (под всеми перевозками)", expanded=False): with st.container(border=True): if shipping_templates: letter_text = generate_letter_for_shipments( shipments, always_confirmation=False, overview_client_letter=True, ) if letter_text: st.components.v1.html( generate_copy_html( letter_text, height=500, theme=st.session_state.get("ui_theme", "light"), ), height=550, ) attachments_client = collect_attachment_data_for_shipments(report, shipments) dl1, dl2 = st.columns(2) with dl1: st.download_button( label="📥 Скачать текст (.txt)", data=letter_text.encode("utf-8"), file_name="septem_pismo_klientu.txt", mime="text/plain; charset=utf-8", key="download_client_letter_txt", ) with dl2: eml_client = create_eml_with_attachments(letter_text, attachments_client) st.download_button( label="📥 Скачать .eml", data=eml_client, file_name="septem_pismo_klientu.eml", mime="message/rfc822", key="download_client_letter_eml", ) if attachments_client: st.caption( f"В .eml добавлено вложений (без изображений): {len(attachments_client)}" ) else: st.info("Письмо не сгенерировано") else: st.info("Шаблоны писем не загружены") # ============================================================================= # ОТОБРАЖЕНИЕ ПИСЕМ # ============================================================================= def display_emails(report: Dict, ship_id: List, num_ship: int): st.markdown("Письма") sources = report.get('sources', []) for idx, source in enumerate(sources): if source.get("id") not in ship_id: continue with st.expander(f"📧 {source.get('subject', 'Без темы')}"): attachments = source.get('attachments', []) # ---------- Фильтруем вложения (исключаем изображения) ---------- display_attachments = [] for orig_att_idx, att in enumerate(attachments): ext = att['filename'].split('.')[-1].lower() if ext not in IMAGE_EXTS: display_attachments.append((orig_att_idx, att)) if display_attachments: st.markdown("##### 📎 Вложения") cols = st.columns(len(display_attachments)) for col, (orig_att_idx, att) in zip(cols, display_attachments): with col: if st.session_state.session_id: url = f"{API_URL}/attachments/{st.session_state.session_id}/{idx}/{orig_att_idx}" ext = att['filename'].split('.')[-1].lower() if ext == 'pdf': icon = "📕" elif ext in ['doc', 'docx']: icon = "📘" elif ext in ['xls', 'xlsx']: icon = "📗" elif ext in IMAGE_EXTS: icon = "🖼️" else: icon = "📄" st.markdown( f"""
{icon} {att['filename']}
""", unsafe_allow_html=True ) st.markdown("---") # ---------- Содержимое письма: HTML или текст ---------- body_html = source.get('body_html') if body_html and body_html.strip(): # Экранируем для безопасной вставки в srcdoc (используем json.dumps) import json # json.dumps добавляет внешние кавычки и экранирует внутренние safe_html = json.dumps(body_html) # Убираем внешние кавычки, т.к. srcdoc сам будет в кавычках if safe_html.startswith('"') and safe_html.endswith('"'): safe_html = safe_html[1:-1] iframe_code = f''' ''' st.markdown(iframe_code, unsafe_allow_html=True) else: content = clean_email_text(source.get('content', '')) if content: st.text_area( "Содержание", value=content, height=300, disabled=True, key=f"content_{num_ship}_{idx}" ) else: st.info("Текст письма отсутствует") # ============================================================================= # ОТОБРАЖЕНИЕ ВСЕХ ВЛОЖЕНИЙ (ТРЕТЬЯ ВКЛАДКА) # ============================================================================= def display_all_attachments(report: Dict): sources = report.get('sources', []) if not sources: st.info("Нет писем с вложениями") return all_attachments = [] for email_idx, source in enumerate(sources): attachments = source.get('attachments', []) for att_idx, att in enumerate(attachments): ext = att['filename'].split('.')[-1].lower() if ext not in IMAGE_EXTS: # ← фильтр all_attachments.append({ 'email_idx': email_idx, 'att_idx': att_idx, 'filename': att['filename'], 'subject': source.get('subject', 'Без темы'), 'sender': source.get('sender_name', source.get('sender', '')), 'text_preview': att.get('text', '')[:200] + '...' if len(att.get('text', '')) > 200 else att.get('text', ''), 'size': att.get('size', 0), 'has_content': bool(att.get('content_base64')) }) if not all_attachments: st.info("Нет доступных вложений (изображения скрыты)") return st.info(f"Найдено вложений (без учёта изображений): {len(all_attachments)}") for att in all_attachments: with st.container(border=True): col1, col2, col3 = st.columns([3, 1, 1]) with col1: # Можно тоже добавить иконку ext = att['filename'].split('.')[-1].lower() if ext == 'pdf': icon = "📕" elif ext in ['doc', 'docx']: icon = "📘" elif ext in ['xls', 'xlsx']: icon = "📗" else: icon = "📄" st.markdown(f"**{icon} {att['filename']}**") st.caption(f"Из письма: {att['subject']} от {att['sender']}") if att['text_preview']: st.text(att['text_preview']) st.caption(f"Размер: {att['size']} байт") if att['has_content']: st.success("✅ Оригинал файла доступен") else: st.warning("⚠️ Только текст (оригинал не сохранён)") with col2: if st.session_state.session_id: url = f"{API_URL}/attachments/{st.session_state.session_id}/{att['email_idx']}/{att['att_idx']}" st.link_button("👁️ Просмотреть", url, use_container_width=True) else: st.button("👁️ Просмотреть", disabled=True, use_container_width=True) with col3: if st.session_state.session_id: url = f"{API_URL}/attachments/{st.session_state.session_id}/{att['email_idx']}/{att['att_idx']}" st.link_button("📥 Скачать", url, use_container_width=True) else: st.button("📥 Скачать", disabled=True, use_container_width=True) # ============================================================================= # ЗАПУСК ПРИЛОЖЕНИЯ # ============================================================================= if __name__ == "__main__": main()