NEKReport/web_ui(1).py

2707 lines
112 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from email.message import EmailMessage
from urllib.parse import quote
import streamlit as st
import requests
import pandas as pd
from datetime import datetime
import json
from typing import Any, Dict, List, Optional
import time
import socket
import sys
import os
import re
import html
import uuid
import base64
import mimetypes
# =============================================================================
# ЗАГРУЗКА ШАБЛОНОВ ПИСЕМ
# =============================================================================
IMAGE_EXTS = ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']
SHIPPING_TYPES_FILE = "shipping_types.json"
NO_INFO_TEXT = "Информация отсутствует"
NO_INFO_TEXT_EN = "Information not available"
_UI_DIR = os.path.dirname(os.path.abspath(__file__))
_CRITERIA_INTERFACE_LAYOUT_CACHE: Optional[Dict] = None
try:
from container_reference import door_clearance_summary
except ImportError:
def door_clearance_summary(shipment: Dict) -> str:
return NO_INFO_TEXT
shipping_templates = []
if os.path.exists(SHIPPING_TYPES_FILE):
try:
with open(SHIPPING_TYPES_FILE, "r", encoding="utf-8") as f:
raw_templates = json.load(f)
for t in raw_templates:
if not isinstance(t, dict):
continue
normalized = {}
for key, value in t.items():
clean_key = key.strip().strip('"').strip("'")
if clean_key == "keywords" and isinstance(value, list):
normalized[clean_key] = [
kw.strip().strip('"').strip("'")
for kw in value
if kw.strip()
]
elif isinstance(value, str):
normalized[clean_key] = value.strip()
else:
normalized[clean_key] = value
shipping_templates.append(normalized)
except Exception as e:
st.error(f"Ошибка загрузки шаблонов писем: {e}")
else:
st.warning(f"Файл {SHIPPING_TYPES_FILE} не найден. Шаблоны писем недоступны.")
# =============================================================================
# СЛОВАРЬ ДЛЯ ПРЕОБРАЗОВАНИЯ ТЕХНИЧЕСКИХ КЛЮЧЕЙ В ПОНЯТНЫЕ НАЗВАНИЯ
# =============================================================================
def clean_email_text(text: str) -> str:
"""
Убирает множественные пустые строки, оставляя не более одной пустой строки
между непустыми строками.
"""
if not text:
return ""
lines = text.splitlines()
result = []
prev_empty = False
for line in lines:
stripped = line.strip()
if stripped == "":
if not prev_empty:
result.append("") # одна пустая строка как разделитель абзацев
prev_empty = True
else:
result.append(line.rstrip()) # убираем пробелы в конце строки
prev_empty = False
# Убираем пустые строки в начале и конце
return "\n".join(result).strip()
FIELD_LABELS = {
"shipping_options": "Детали перевозки",
"requested_shipping_type_names": "Запрошенные типы перевозки (модель)",
"shipping_type": "Тип перевозки",
"client_name": "Клиент",
"incoterms": "Условия поставки Incoterms",
"cargo_ready_date": "Дата готовности груза",
"pickup_address": "Адрес забора груза",
"cargo_value": "Стоимость груза",
"package_count": "Количество грузовых мест",
"total_weight_kg": "Вес груза",
"dimensions": "Габариты",
"total_volume_cbm": "Объём",
"cargo_description": "Характер груза",
"delivery_address": "Адрес доставки",
"hs_code": "Код ТН ВЭД",
"dangerous_goods": "Опасные свойства",
"msds_required": "Требуется MSDS",
"dgm_report_required": "Требуется DGM",
"brand_name": "Бренд",
"brand_authorization_letter": "Разрешение бренда",
"transshipment_with_third_country": "Трансшипмент",
"exporter_has_export_license": "Лицензия экспортёра",
"additional_services": "Дополнительные услуги",
"arrival_expediting_responsibility": "Кто осуществляет экспедирование в аэропорту",
"special_transport_requirements": "Спецтребования к транспорту",
"customs_clearance_required": "Таможенное оформление",
"customs_clearance_place_export_rf": "Место оформления (экспорт РФ)",
"fumigation_on_wooden_packaging": "Фумигация",
"stackable_with_others": "Штабелирование с другими",
"stackable_among_themselves": "Штабелирование между собой",
"dangerous_goods_clarification": "Батарейки, газы, жидкости, аэрозоли в грузе (уточнение по категориям)",
"batteries_packed_separately": "Батарейки упакованы отдельно",
"estimated_cost": "Стоимость перевозки",
"estimated_transit_time": "Срок доставки",
"dimensions_str": "Габариты",
"dangerous_goods_str": "Опасные свойства",
"missing_fields": "Необходимая информация",
"loading_port": "Порт погрузки",
"discharge_port": "Порт выгрузки",
"shipment_type": "Тип перевозки",
"container_type": "Тип контейнера",
"vehicle_type": "Тип транспорта",
"temperature_range": "Температурный режим",
"vehicle_dimensions": "Габариты машины",
"vehicle_dimensions_str": "Габариты машины",
}
LABEL_TO_FIELD = {v: k for k, v in FIELD_LABELS.items()}
# Заголовки секций письма контрагенту: русское имя типа -> English
SHIPPING_TYPE_NAME_EN = {
"Авиаперевозка": "Air freight",
"Автомобильная перевозка (LTL)": "Road freight (LTL)",
"Автомобильная перевозка (FTL)": "Road freight (FTL)",
"Морская перевозка (LCL)": "Sea freight (LCL)",
"Морская перевозка (FCL)": "Sea freight (FCL)",
"Железнодорожная перевозка (LCL)": "Rail freight (LCL)",
"Железнодорожная перевозка (FCL)": "Rail freight (FCL)",
"Мультимодальная перевозка море + ж/д (LCL)": "Multimodal sea + rail (LCL)",
"Мультимодальная перевозка море + ж/д (FCL)": "Multimodal sea + rail (FCL)",
"Авиа перевозка": "Air freight",
"Морская перевозка": "Sea freight",
"Автомобильная перевозка": "Road freight",
"ЖД перевозка": "Rail freight",
}
# Английские плейсхолдеры в confirmation_template: (Label) -> поле данных
BRACKET_LABEL_ALIASES_EN: List[tuple[str, str]] = [
("Pickup address", "pickup_address"),
("Delivery address", "delivery_address"),
("Port of loading", "loading_port"),
("Port of discharge", "discharge_port"),
("Cargo weight", "total_weight_kg"),
("Number of packages", "package_count"),
("Volume", "total_volume_cbm"),
("Dimensions", "dimensions_str"),
("Cargo description", "cargo_description"),
("HS code", "hs_code"),
("Dangerous properties", "dangerous_goods_str"),
("MSDS required", "msds_required"),
("Freight cost", "estimated_cost"),
("Transit time", "estimated_transit_time"),
("Shipment type", "shipment_type"),
("Container type", "container_type"),
("Vehicle type", "vehicle_type"),
("Vehicle dimensions", "vehicle_dimensions_str"),
("Temperature regime", "temperature_range"),
("Temperature range", "temperature_range"),
]
GENERIC_LOCATION_WORDS = {
"rf", "рф", "russia", "россия", "china", "китай", "kazakhstan", "казахстан",
"belarus", "беларусь", "turkey", "турция", "india", "индия", "germany", "германия",
"eu", "europe", "европа", "asia", "азия",
}
def is_informative_text(value: Optional[str]) -> bool:
if value is None:
return False
text = str(value).strip()
if not text:
return False
if text.lower() in {"n/a", "na", "none", "null", "-", "--", "нет", "unknown"}:
return False
return True
def format_cargo_ready_date_for_display(shipment: Dict, *, empty: str = NO_INFO_TEXT) -> str:
"""Одна или несколько дат готовности — в одну строку через запятую."""
v = shipment.get("cargo_ready_date")
if v is None:
return empty
if isinstance(v, list):
parts = [str(x).strip() for x in v if x is not None and str(x).strip()]
return ", ".join(parts) if parts else empty
s = str(v).strip()
return s if s else empty
def is_specific_address(value: Optional[str]) -> bool:
if not is_informative_text(value):
return False
text = str(value).strip()
normalized = re.sub(r"[\s,.;:()\-_/]+", " ", text.lower()).strip()
parts = [p for p in normalized.split(" ") if p]
if len(parts) <= 1 and normalized in GENERIC_LOCATION_WORDS:
return False
return True
def build_documents_summary(shipment: Dict) -> str:
docs = shipment.get("documents_found", {}) or {}
doc_names = {
"msds": "MSDS",
"dgm": "DGM report",
"brand_authorization": "Авторизационное письмо бренда",
}
found_parts = []
for key, label in doc_names.items():
files = docs.get(key, []) or []
if files:
filenames = [f.get("filename", "") for f in files if f.get("filename")]
if filenames:
found_parts.append(f"{label}: {', '.join(filenames)}")
else:
found_parts.append(f"{label}: документ найден")
if not found_parts:
return NO_INFO_TEXT
return "; ".join(found_parts)
# =============================================================================
# ОТОБРАЖЕНИЕ КОНТЕЙНЕРОВ (Nxтип без голых 40HC/20DC)
# =============================================================================
_CONTAINER_BARE_ONLY = re.compile(
r"^\s*(20|40|45)\s*[''`´]?\s*(?:ft|feet|ф)?\s*"
r"(dc|hc|hq|gp|dv|dry|rf|rh|reefer|ref|ot|fr|tk|tank)\s*$",
re.IGNORECASE,
)
_CONTAINER_NX = re.compile(
r"(?<![\d.])(?P<n>\d{1,3})\s*[x×х*]\s*"
r"(?P<t>(?:20|40|45)\s*[''`´]?\s*(?:ft|feet|ф)?\s*"
r"(?:dc|hc|hq|gp|dv|dry|rf|rh|reefer|ref|ot|fr|tk|tank)\b)",
re.IGNORECASE,
)
def _normalize_container_token(n_str: str, type_part: str) -> str:
n = int(n_str)
t = re.sub(r"\s+", "", type_part.strip())
return f"{n}x{t}"
def normalize_container_type_display(raw: Optional[str], *, empty: str = "") -> str:
"""
Из строки извлекает фрагменты вида 2x40HC; голые 40HC, 20DC не включает.
Работает и без запятых между Nx-блоками в одной строке.
"""
if raw is None:
return empty
s = str(raw).strip()
if not s:
return empty
tokens: List[str] = []
for m in _CONTAINER_NX.finditer(s):
tokens.append(_normalize_container_token(m.group("n"), m.group("t")))
if tokens:
return ", ".join(tokens)
kept: List[str] = []
for part in re.split(r"[,;]+", s):
p = part.strip()
if not p:
continue
if _CONTAINER_BARE_ONLY.match(p):
continue
if _CONTAINER_NX.search(p):
for m in _CONTAINER_NX.finditer(p):
kept.append(_normalize_container_token(m.group("n"), m.group("t")))
continue
if re.match(r"^\d+\s*[x×х*]", p, re.IGNORECASE):
kept.append(
re.sub(r"\s*([x×х*])\s*", "x", p, count=1, flags=re.IGNORECASE)
)
return ", ".join(kept) if kept else empty
def collect_extra_required_missing_for_template(shipment: Dict, template_obj: Dict) -> List[str]:
"""Поля из shipping_type.extra_required_fields — запрос клиенту при отсутствии данных."""
extra = template_obj.get("extra_required_fields") if isinstance(template_obj, dict) else None
if not isinstance(extra, list):
return []
out: List[str] = []
for field in extra:
if not isinstance(field, str) or not field.strip():
continue
field = field.strip()
if field == "dangerous_goods_clarification":
if format_dangerous_goods_summary(shipment) == NO_INFO_TEXT:
out.append(field)
continue
if field == "stackable_with_others":
if shipment.get("stackable_with_others") is None:
out.append(field)
continue
if field == "hs_code":
v = shipment.get("hs_code")
if v is None or (isinstance(v, str) and not str(v).strip()):
out.append(field)
continue
if field == "vehicle_type":
v = shipment.get("vehicle_type")
if v is None or (isinstance(v, str) and not str(v).strip()):
out.append(field)
continue
if field == "container_type":
v = shipment.get("container_type")
if v is None or (isinstance(v, str) and not str(v).strip()):
out.append(field)
continue
if field == "customs_clearance_place_export_rf":
v = shipment.get("customs_clearance_place_export_rf")
if v is None or (isinstance(v, str) and not str(v).strip()):
out.append(field)
continue
if field == "total_volume_cbm":
val = shipment.get("total_volume_cbm")
if val is None or val == "":
out.append(field)
elif isinstance(val, (int, float)) and float(val) <= 0:
out.append(field)
continue
return out
def format_dangerous_goods_summary(shipment: Dict) -> str:
"""Единая строка для чек-листа, обзора и шаблонов писем."""
key_map = {
"batteries": "батарейки",
"gases": "газы",
"liquids": "жидкости",
"dry_ice": "сухой лёд",
}
dg_keys = ("batteries", "gases", "liquids", "dry_ice")
dg = shipment.get("dangerous_goods", {})
if isinstance(dg, dict):
yes = [key_map[k] for k in dg_keys if dg.get(k) is True]
explicit_no = [key_map[k] for k in dg_keys if dg.get(k) is False]
if yes:
return "Да: " + ", ".join(yes)
if len(explicit_no) == len(dg_keys):
return "Нет"
if explicit_no:
return (
"Нет: " + ", ".join(explicit_no) + " — по остальным категориям информации нет"
)
note = shipment.get("dangerous_goods_note")
if isinstance(note, str) and note.strip():
return note.strip()
return NO_INFO_TEXT
def format_brand_authorization_summary(shipment: Dict) -> str:
"""
Для пункта про авторизационное письмо бренда:
выводим текстовый контекст из письма (если есть), иначе явный fallback.
"""
info = shipment.get("brand_authorization_info")
if isinstance(info, str) and info.strip():
return info.strip()
flag = shipment.get("brand_authorization_letter")
if flag is True:
return "Требуется авторизационное письмо бренда, подробности в письме не найдены."
if flag is False:
return "Авторизационное письмо бренда не требуется (по переписке)."
return NO_INFO_TEXT
def format_dangerous_goods_summary_en(shipment: Dict) -> str:
"""English strings for confirmation_template (counterparty letter)."""
key_map = {
"batteries": "batteries",
"gases": "gases",
"liquids": "liquids",
"dry_ice": "dry ice",
}
dg_keys = ("batteries", "gases", "liquids", "dry_ice")
dg = shipment.get("dangerous_goods", {})
if isinstance(dg, dict):
yes = [key_map[k] for k in dg_keys if dg.get(k) is True]
explicit_no = [key_map[k] for k in dg_keys if dg.get(k) is False]
if yes:
return "Yes: " + ", ".join(yes)
if len(explicit_no) == len(dg_keys):
return "No"
if explicit_no:
return (
"No: " + ", ".join(explicit_no) + " — no information on the remaining categories"
)
note = shipment.get("dangerous_goods_note")
if isinstance(note, str) and note.strip():
if re.search(r"[а-яА-ЯёЁ]", note):
return (
"The correspondence mentions hazardous cargo, hazard class or UN/IMDG; "
"please clarify: batteries, gases, liquids, dry ice."
)
return note.strip()
return NO_INFO_TEXT_EN
def _shipping_type_to_en(shipment: Dict) -> str:
"""FCL/LCL или пусто — не подменять названием вида перевозки (море/жд)."""
raw = (shipment.get("shipment_type") or "").strip()
if not raw:
return NO_INFO_TEXT_EN
u = raw.upper()
if u in ("FCL", "LCL"):
return u
return raw
# =============================================================================
# КЛАСС ДЛЯ АВТОЗАПОЛНЕНИЯ ШАБЛОНОВ
# =============================================================================
class AutoFillDict(dict):
def __init__(self, data: dict, field_labels: dict = None):
super().__init__(data)
self._data = data
self._field_labels = field_labels or FIELD_LABELS
self._label_to_field = {v: k for k, v in self._field_labels.items()}
def __missing__(self, key):
if key in self._data:
val = self._data[key]
return str(val) if val is not None else ""
if key in self._label_to_field:
field_key = self._label_to_field[key]
if field_key in self._data:
val = self._data[field_key]
return str(val) if val is not None else ""
return ""
# =============================================================================
# ФУНКЦИЯ АВТОЗАПОЛНЕНИЯ ШАБЛОНА
# =============================================================================
def auto_fill_template(template: str, data: Dict, field_labels: Dict = None) -> str:
field_labels = field_labels or FIELD_LABELS
label_to_field = {v: k for k, v in field_labels.items()}
result = template
auto_data = AutoFillDict(data, field_labels)
try:
result = result.format(**auto_data)
except KeyError:
pass
merged_pairs = list(label_to_field.items()) + BRACKET_LABEL_ALIASES_EN
for label, field_key in merged_pairs:
if field_key in data:
value = data[field_key]
value_str = str(value).strip() if value is not None and str(value).strip() else NO_INFO_TEXT
pattern = r'\(' + re.escape(label) + r'\)'
result = re.sub(pattern, value_str, result, flags=re.IGNORECASE)
return result
# =============================================================================
# ФУНКЦИЯ ДЛЯ ГЕНЕРАЦИИ HTML-БЛОКА С КНОПКОЙ КОПИРОВАНИЯ
# =============================================================================
def generate_copy_html(text: str, height: int = 500, theme: str = "light") -> str:
text_html = text
text_json = json.dumps(text)
btn_id = f"copy_btn_{uuid.uuid4().hex}"
palette = THEME_CONFIG.get(theme, THEME_CONFIG["light"])
return f"""
<style>
.copy-wrapper {{
position: relative;
height: {height}px;
}}
.code-block {{
font-family: 'Courier New', Courier, monospace;
font-size: 0.95em;
background-color: {palette["code_bg"]};
border: 1px solid {palette["border"]};
border-radius: 6px;
padding: 16px;
padding-top: 40px;
white-space: pre-wrap;
word-break: break-word;
overflow-y: scroll;
box-shadow: {palette["shadow"]};
color: {palette["text"]};
height: 100%;
}}
.code-block table {{
border-collapse: collapse;
width: 100%;
}}
.code-block td, .code-block th {{
border: 1px solid {palette["border"]};
padding: 6px;
}}
.button-copy {{
position: absolute;
top: 8px;
right: 8px;
font-family: 'Courier New', Courier, monospace;
font-size: 0.9em;
background-color: {palette["card_bg"]};
border: 1px solid {palette["border"]};
border-radius: 6px;
padding: 6px 10px;
cursor: pointer;
color: {palette["text"]};
transition: background-color 0.2s;
}}
.button-copy:hover {{
background-color: {palette["hover"]};
}}
</style>
<div class="copy-wrapper">
<button id="{btn_id}" class="button-copy">📋 Копировать</button>
<div class="code-block">{text_html}</div>
</div>
<script>
(function() {{
const btn = document.getElementById("{btn_id}");
const textToCopy = {text_json};
btn.addEventListener('click', function() {{
navigator.clipboard.writeText(textToCopy).catch(err => {{
alert('❌ Ошибка копирования: ' + err);
}});
}});
}})();
</script>
"""
# =============================================================================
# ПАТЧЕР ДЛЯ ОТОБРАЖЕНИЯ ВНЕШНЕГО URL
# =============================================================================
def get_external_ip():
try:
external_ip = requests.get('https://api.ipify.org', timeout=3).text
return external_ip
except:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except:
return None
PUBLIC_WEB_URL = os.getenv("PUBLIC_WEB_URL", "https://nec.clients.septem.pro").rstrip("/")
class StreamlitOutputPatcher:
def __init__(self):
self.external_ip = get_external_ip()
self.original_stdout = sys.stdout
def write(self, text):
if self.external_ip and "Network URL:" in text:
import re
port_match = re.search(r':(\d+)', text)
port = port_match.group(1) if port_match else "8501"
# Use canonical public domain instead of raw external IP.
external_url = (
f"\n External URL: {PUBLIC_WEB_URL}\n"
f" Fallback IP URL: http://{self.external_ip}:{port}\n"
)
self.original_stdout.write(text + external_url)
else:
self.original_stdout.write(text)
def flush(self):
self.original_stdout.flush()
if "streamlit" in sys.argv[0]:
sys.stdout = StreamlitOutputPatcher()
# =============================================================================
# НАСТРОЙКА СТРАНИЦЫ
# =============================================================================
st.set_page_config(
page_title="SEPTEM Cargo Analytics",
page_icon="🚚",
layout="wide"
)
THEME_CONFIG = {
"light": {
"app_bg": "#f5f7fb",
"card_bg": "#ffffff",
"text": "#111827",
"muted": "#6b7280",
"border": "#d0d7de",
"code_bg": "#f6f8fa",
"hover": "#e9ecef",
"shadow": "0 2px 4px rgba(0,0,0,0.05)",
},
"dark": {
"app_bg": "#0f172a",
"card_bg": "#111827",
"text": "#e5e7eb",
"muted": "#94a3b8",
"border": "#334155",
"code_bg": "#1f2937",
"hover": "#374151",
"shadow": "0 2px 8px rgba(0,0,0,0.45)",
},
}
def apply_ui_theme() -> None:
theme_name = st.session_state.get("ui_theme", "light")
colors = THEME_CONFIG.get(theme_name, THEME_CONFIG["light"])
st.markdown(
f"""
<style>
:root {{
--septem-app-bg: {colors["app_bg"]};
--septem-card-bg: {colors["card_bg"]};
--septem-text: {colors["text"]};
--septem-muted: {colors["muted"]};
--septem-border: {colors["border"]};
--septem-code-bg: {colors["code_bg"]};
--septem-hover: {colors["hover"]};
--septem-shadow: {colors["shadow"]};
}}
div[data-testid="stAppViewContainer"] {{
background: var(--septem-app-bg);
}}
section.main > div {{
background: var(--septem-app-bg);
}}
header[data-testid="stHeader"] {{
background: var(--septem-app-bg);
}}
div[data-testid="stToolbar"] * {{
color: var(--septem-text) !important;
}}
div[data-testid="stAppViewContainer"], div[data-testid="stAppViewContainer"] * {{
color: var(--septem-text);
}}
div[data-testid="stSidebar"] {{
background: var(--septem-card-bg);
border-right: 1px solid var(--septem-border);
}}
div[data-testid="stSidebar"] > div:first-child {{
background: var(--septem-card-bg);
}}
button[kind], .stButton > button, .stDownloadButton > button, div[data-testid="stSidebarCollapseButton"] > button {{
background: var(--septem-card-bg) !important;
border: 1px solid var(--septem-border) !important;
color: var(--septem-text) !important;
}}
button[kind]:hover, .stButton > button:hover, .stDownloadButton > button:hover, div[data-testid="stSidebarCollapseButton"] > button:hover {{
background: var(--septem-hover) !important;
}}
div[data-testid="stSidebar"], div[data-testid="stSidebar"] * {{
color: var(--septem-text) !important;
}}
div[data-testid="stMetricValue"], div[data-testid="stMetricLabel"] {{
color: var(--septem-text) !important;
}}
button[data-baseweb="tab"] {{
color: var(--septem-muted) !important;
}}
button[data-baseweb="tab"][aria-selected="true"] {{
color: var(--septem-text) !important;
}}
div[data-baseweb="select"] > div,
div[data-baseweb="input"] > div,
div[data-baseweb="textarea"] > div {{
background: var(--septem-card-bg) !important;
border-color: var(--septem-border) !important;
}}
div[data-baseweb="select"] * {{
color: var(--septem-text) !important;
}}
div[data-testid="stExpander"] {{
border: 1px solid var(--septem-border);
border-radius: 10px;
background: var(--septem-card-bg);
}}
div[data-testid="stExpander"] details {{
background: var(--septem-card-bg) !important;
border-radius: 10px;
}}
div[data-testid="stExpander"] summary {{
background: var(--septem-card-bg) !important;
color: var(--septem-text) !important;
border-radius: 10px;
}}
div[data-testid="stExpander"] details[open] summary {{
background: var(--septem-card-bg) !important;
color: var(--septem-text) !important;
}}
div[data-testid="stExpander"] .streamlit-expanderContent {{
background: var(--septem-card-bg) !important;
}}
div[data-testid="stMarkdownContainer"] p {{
margin-top: 0.25rem;
margin-bottom: 0.25rem;
}}
.stMarkdown, .stText, .stExpander, .stContainer {{
word-wrap: break-word;
overflow-wrap: break-word;
}}
.streamlit-expanderContent {{
overflow-x: hidden;
}}
.element-container .stMarkdown p {{
white-space: normal;
}}
.button-code {{
max-width: 100%;
box-sizing: border-box;
}}
.ship-field {{
margin-bottom: 6px;
}}
.code-block {{
font-family: 'Courier New', Courier, monospace;
font-size: 0.95em;
background-color: var(--septem-code-bg);
border: 1px solid var(--septem-border);
border-radius: 6px;
padding: 16px;
padding-top: 40px;
overflow-y: scroll;
box-shadow: var(--septem-shadow);
height: 100%;
}}
.code-block table {{
border-collapse: collapse;
width: 100%;
}}
.code-block td, .code-block th {{
border: 1px solid var(--septem-border);
padding: 6px;
}}
.download-eml button {{
border: 1px solid var(--septem-border);
background: var(--septem-card-bg);
color: var(--septem-text);
padding: 6px 12px;
font-size: 13px;
border-radius: 6px;
}}
.download-eml button:hover {{
background: var(--septem-hover);
}}
</style>
""",
unsafe_allow_html=True,
)
API_URL = os.getenv("PUBLIC_API_URL", f"{PUBLIC_WEB_URL}/llm")
FRONTEND_URL = PUBLIC_WEB_URL
# =============================================================================
# ИНИЦИАЛИЗАЦИЯ СОСТОЯНИЯ СЕССИИ
# =============================================================================
if 'session_id' not in st.session_state:
st.session_state.session_id = None
if 'current_report' not in st.session_state:
st.session_state.current_report = None
if 'report_loaded' not in st.session_state:
st.session_state.report_loaded = False
if "ui_theme" not in st.session_state:
st.session_state.ui_theme = "light"
# =============================================================================
# НОВЫЕ ФУНКЦИИ ДЛЯ РАБОТЫ С ВЛОЖЕНИЯМИ
# =============================================================================
def collect_attachment_data_for_shipment(report: Dict, shipment: Dict) -> List[Dict]:
"""
Собирает реальные вложения (с content_base64) из писем,
относящихся к данной перевозке.
"""
sources = report.get("sources", [])
email_ids = shipment.get("ID_emails", [])
attachments_data = []
for src in sources:
if src.get("id") not in email_ids:
continue
for att in src.get("attachments", []):
if att.get("content_base64"):
attachments_data.append({
"filename": att["filename"],
"content_base64": att["content_base64"]
})
return attachments_data
def collect_attachment_data_for_shipments(report: Dict, shipments: List[Dict]) -> List[Dict]:
"""
Собирает реальные вложения (с content_base64) из писем,
относящихся к любому из указанных shipments. Дедупликация по (filename, size).
"""
sources = report.get("sources", [])
email_ids: set = set()
for s in shipments or []:
for eid in (s.get("ID_emails", []) or []):
email_ids.add(eid)
seen: set = set()
attachments_data: List[Dict] = []
for src in sources:
if src.get("id") not in email_ids:
continue
for att in src.get("attachments", []):
if not att.get("content_base64"):
continue
key = (att.get("filename"), att.get("size"))
if key in seen:
continue
seen.add(key)
attachments_data.append({
"filename": att["filename"],
"content_base64": att["content_base64"]
})
return attachments_data
def create_eml_with_attachments(letter_text: str, attachments_data: List[Dict]) -> bytes:
"""
Создаёт MIME-сообщение с текстом письма и вложениями.
Возвращает байтовое представление .eml файла.
"""
msg = EmailMessage()
msg["X-Unsent"] = "1"
msg.set_content(letter_text)
for att in attachments_data:
filename = att["filename"]
ext = filename.split(".")[-1].lower()
# пропускаем изображения
if ext in IMAGE_EXTS:
continue
content = base64.b64decode(att["content_base64"])
mime_type, encoding = mimetypes.guess_type(filename)
if mime_type is None:
mime_type = 'application/octet-stream'
maintype, subtype = mime_type.split('/', 1)
msg.add_attachment(
content,
maintype=maintype,
subtype=subtype,
filename=filename
)
return msg.as_bytes()
# =============================================================================
# ОСНОВНАЯ ФУНКЦИЯ
# =============================================================================
def main():
title_col, theme_col = st.columns([8, 2])
with title_col:
st.title("🚚 SEPTEM Cargo Analytics")
with theme_col:
theme_label = st.selectbox(
"Тема",
options=["Светлая", "Тёмная"],
index=0 if st.session_state.ui_theme == "light" else 1,
key="ui_theme_selector",
)
selected_theme = "light" if theme_label == "Светлая" else "dark"
if selected_theme != st.session_state.ui_theme:
st.session_state.ui_theme = selected_theme
st.rerun()
apply_ui_theme()
query_params = st.query_params
session_id = query_params.get("session_id", [None])
if isinstance(session_id, list):
session_id = session_id[0] if session_id else None
status_placeholder = st.empty()
if session_id and not st.session_state.report_loaded:
st.session_state.session_id = session_id
status_placeholder.markdown("◌ Обрабатываем данные цепочки писем...")
load_report(session_id)
st.session_state.report_loaded = True
status_placeholder.markdown("✅ Обработка писем завершена.")
elif session_id and st.session_state.report_loaded:
status_placeholder.markdown(
"✅ Обработка писем завершена."
)
with st.sidebar:
st.header("📋 Сессия")
if st.session_state.session_id:
st.success(f"ID сессии: {st.session_state.session_id[:8]}...")
if st.button("🔄 Обновить отчёт", type="primary", use_container_width=True):
with st.spinner("Обновляем..."):
load_report(st.session_state.session_id)
st.rerun()
else:
st.info("Нет активной сессии. Используйте надстройку Outlook для отправки писем.")
st.divider()
if st.session_state.current_report:
report = st.session_state.current_report
st.metric("Всего писем проанализировано", report.get('total_emails_analyzed', 0))
shipments = report.get('structured_data', {}).get('shipments', [])
st.metric("Найдено перевозок", len(shipments))
if st.session_state.current_report:
display_cargo_report(st.session_state.current_report)
else:
display_welcome()
# =============================================================================
# ФУНКЦИИ ЗАГРУЗКИ И ОТОБРАЖЕНИЯ
# =============================================================================
def load_report(session_id: str):
try:
response = requests.post(
f"{API_URL}/generate-cargo-report",
json={"session_id": session_id}
)
if response.status_code == 200:
st.session_state.current_report = response.json()
else:
st.error(f"Ошибка загрузки отчёта: {response.status_code} - {response.text}")
except Exception as e:
st.error(f"Не удалось соединиться с сервером: {e}")
def display_welcome():
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.image("https://cdn-icons-png.flaticon.com/512/1995/1995571.png", width=200)
st.markdown("""
Добро пожаловать в SEPTEM Cargo Analytics!
Для начала работы:
1. Откройте Outlook
2. Выберите письма с информацией о грузоперевозках
3. Нажмите кнопку "Анализировать выбранные письма" в ленте
4. Данные автоматически загрузятся сюда
Система проанализирует:
- 📦 Типы грузов
- 📍 Маршруты перевозок
- ⚖️ Вес и объем
- 📅 Сроки доставки
- ⚠️ Особые условия
""")
def display_cargo_report(report: Dict):
tab1, tab2, tab3 = st.tabs([
"📦 Обзор перевозок",
"✉️ Подготовленный вариант письма",
"📎 Все вложения"
])
with tab1:
with st.container(key="Block_ship"):
display_overview(report)
with tab2:
shipments = report.get('structured_data', {}).get('shipments', [])
if not shipments:
st.info("Нет перевозок для формирования письма")
else:
st.subheader("Письмо Контрагенту")
letter_counterparty = generate_letter_for_shipments(shipments, always_confirmation=True)
if letter_counterparty:
st.components.v1.html(
generate_copy_html(
letter_counterparty,
height=500,
theme=st.session_state.get("ui_theme", "light"),
),
height=550,
)
else:
st.info("Письмо Контрагенту не сгенерировано")
# Вложения прикладываем ко всем письмам одинаково: всё, что относится к перевозкам
attachments_data = collect_attachment_data_for_shipments(report, shipments)
if st.session_state.session_id and letter_counterparty:
st.divider()
with st.container():
eml_cp = create_eml_with_attachments(letter_counterparty, attachments_data)
st.download_button(
label="📥 Скачать .eml",
data=eml_cp,
file_name="septem_cargo_counterparty.eml",
mime="message/rfc822",
key="download_eml",
)
if attachments_data:
st.success(f"В .eml будет добавлено вложений: {len(attachments_data)}")
with st.expander("Показать список прикреплённых файлов"):
for att in attachments_data:
st.write(f"- {att['filename']}")
else:
st.info("Вложения отсутствуют (или недоступны для прикрепления).")
with tab3:
display_all_attachments(report)
# =============================================================================
# ФОРМАТИРОВАНИЕ ДАННЫХ ДЛЯ ШАБЛОНА
# =============================================================================
def format_shipment_for_template(
shipment: Dict,
*,
template_locale: str = "ru",
) -> Dict:
"""
template_locale:
- \"ru\" — клиентские письма и русские шаблоны;
- \"en\" — письмо контрагенту (confirmation_template): служебные фразы и метки на английском,
факты из писем (адреса, описание груза) без перевода.
"""
empty = NO_INFO_TEXT_EN if template_locale == "en" else NO_INFO_TEXT
def pick(*values, default=None):
d = default if default is not None else empty
for v in values:
if is_informative_text(v):
return str(v).strip()
return d
_ct_picked = pick(
shipment.get("container_type"),
shipment.get("container"),
)
_ct_for_display = normalize_container_type_display(
None if _ct_picked == empty else _ct_picked,
empty=empty,
)
data = {
"client_name": pick(shipment.get("client_name")),
"cargo_ready_date": format_cargo_ready_date_for_display(
shipment, empty=empty
),
"pickup_address": pick(
shipment.get("pickup_address"),
shipment.get("loading_port")
),
"delivery_address": pick(
shipment.get("delivery_address"),
shipment.get("discharge_port")
),
"total_weight_kg": pick(
shipment.get("total_weight_kg"),
shipment.get("weight"),
shipment.get("gross_weight")
),
"package_count": pick(
shipment.get("package_count"),
shipment.get("packages"),
shipment.get("places")
),
"total_volume_cbm": pick(
shipment.get("total_volume_cbm"),
shipment.get("volume")
),
"cargo_description": pick(
shipment.get("cargo_description"),
shipment.get("cargo_name"),
shipment.get("description")
),
"hs_code": pick(shipment.get("hs_code")),
"loading_port": pick(
shipment.get("loading_port"),
shipment.get("pickup_address")
),
"discharge_port": pick(
shipment.get("discharge_port"),
shipment.get("delivery_address")
),
"shipment_type": (
_shipping_type_to_en(shipment)
if template_locale == "en"
else pick(shipment.get("shipment_type"))
),
"container_type": _ct_for_display,
"vehicle_type": pick(shipment.get("vehicle_type")),
"temperature_range": pick(shipment.get("temperature_range")),
"msds_required": (
(
"Yes"
if shipment.get("msds_required") is True
else ("No" if shipment.get("msds_required") is False else empty)
)
if template_locale == "en"
else (
"✅ Да"
if shipment.get("msds_required") is True
else ("❌ Нет" if shipment.get("msds_required") is False else NO_INFO_TEXT)
)
),
"estimated_cost": empty,
"estimated_transit_time": empty,
"dimensions_str": empty,
"vehicle_dimensions_str": empty,
"dangerous_goods_str": empty,
"missing_fields": ""
}
dim_unit = "cm" if template_locale == "en" else "см"
dims = shipment.get("dimensions", [])
if isinstance(dims, list) and dims:
dim_strings = []
for d in dims:
if isinstance(d, dict):
l = d.get("length_cm")
w = d.get("width_cm")
h = d.get("height_cm")
if l and w and h:
dim_strings.append(f"{l}×{w}×{h} {dim_unit}")
if dim_strings:
data["dimensions_str"] = "; ".join(dim_strings)
vdims = shipment.get("vehicle_dimensions", [])
if isinstance(vdims, list) and vdims:
vdim_strings = []
for d in vdims:
if isinstance(d, dict):
l = d.get("length_cm")
w = d.get("width_cm")
h = d.get("height_cm")
if l and w and h:
vdim_strings.append(f"{l}×{w}×{h} {dim_unit}")
if vdim_strings:
data["vehicle_dimensions_str"] = "; ".join(vdim_strings)
data["dangerous_goods_str"] = (
format_dangerous_goods_summary_en(shipment)
if template_locale == "en"
else format_dangerous_goods_summary(shipment)
)
if data.get("cargo_description") == empty:
dangerous_hint = data.get("dangerous_goods_str", "")
hs = data.get("hs_code", "")
if dangerous_hint and dangerous_hint not in (empty, NO_INFO_TEXT, NO_INFO_TEXT_EN):
if template_locale == "en":
data["cargo_description"] = f"Dangerous cargo ({dangerous_hint})"
else:
data["cargo_description"] = f"Опасный груз ({dangerous_hint})"
elif hs and hs not in (empty, NO_INFO_TEXT, NO_INFO_TEXT_EN):
if template_locale == "en":
data["cargo_description"] = f"Cargo under HS code {hs}"
else:
data["cargo_description"] = f"Груз по коду ТН ВЭД {hs}"
shipping_options = shipment.get("shipping_options", [])
if isinstance(shipping_options, list) and shipping_options:
opt = shipping_options[0]
if isinstance(opt, dict):
if is_informative_text(opt.get("cost")):
data["estimated_cost"] = opt.get("cost")
if is_informative_text(opt.get("transit_time")):
data["estimated_transit_time"] = opt.get("transit_time")
return data
def collect_attachments_for_shipment(report: Dict, shipment: Dict):
"""
Собирает список вложений из писем,
которые относятся к конкретной перевозке.
"""
attachments = []
sources = report.get("sources", [])
email_ids = shipment.get("ID_emails", [])
for src in sources:
if src.get("id") not in email_ids:
continue
for att in src.get("attachments", []):
name = att.get("filename")
if name:
attachments.append(name)
return attachments
# =============================================================================
# ГЕНЕРАЦИЯ ПИСЬМА ИЗ ШАБЛОНА
# =============================================================================
def generate_letter_from_template(
shipment: Dict,
template_obj: Dict,
always_confirmation: bool = False,
*,
overview_client_letter: bool = False,
) -> str:
"""
Генерирует письмо из шаблона.
Args:
shipment: Данные о перевозке
template_obj: Объект шаблона с confirmation_template и info_request_template
always_confirmation: Если True, всегда использовать confirmation_template (для вкладки 2)
Если False, использовать условную логику (для вкладки 1)
overview_client_letter: Если True и always_confirmation=False — только русское письмо клиенту
(info_request_template), без английского confirmation_template («контрагенту»).
"""
required_fields = [
"client_name", "incoterms", "cargo_ready_date", "pickup_address", "cargo_value",
"package_count", "total_weight_kg", "dimensions", "total_volume_cbm",
"cargo_description", "delivery_address"
]
missing = []
for field in required_fields:
val = shipment.get(field)
if field == "cargo_ready_date":
if isinstance(val, list):
if not any(str(x).strip() for x in val if x is not None):
missing.append(field)
elif val is None or (isinstance(val, str) and not val.strip()):
missing.append(field)
continue
if field == "dimensions":
if not val or not isinstance(val, list) or len(val) == 0:
missing.append(field)
elif field == "delivery_address":
if not is_specific_address(val):
missing.append(field)
elif field == "pickup_address":
if not is_specific_address(val):
missing.append(field)
elif (
val is None
or (isinstance(val, (int, float)) and float(val) == 0)
or (
isinstance(val, str)
and val.strip() in {"", "0", "0.0", "0,0", "0,00"}
)
):
missing.append(field)
missing.extend(collect_extra_required_missing_for_template(shipment, template_obj))
missing = list(dict.fromkeys(missing))
# Собираем названия недостающих полей
missing_labels = [FIELD_LABELS.get(f, f) for f in missing]
# --- НОВЫЙ БЛОК: проверка наличия обязательных документов ---
doc_found = shipment.get("documents_found", {})
# MSDS
if shipment.get("msds_required") and not doc_found.get("msds"):
missing_labels.append("MSDS (паспорт безопасности)")
# DGM report
if shipment.get("dgm_report_required") and not doc_found.get("dgm"):
missing_labels.append("DGM report (декларация на опасный груз)")
# Авторизационное письмо бренда
if shipment.get("brand_authorization_letter") and not doc_found.get("brand_authorization"):
missing_labels.append("Авторизационное письмо от бренда")
# Лицензию агента больше не запрашиваем как отдельный документ.
# ----------------------------------------------------------
if overview_client_letter:
template_locale = "ru"
elif always_confirmation:
template_locale = "en"
elif missing_labels:
template_locale = "ru"
else:
template_locale = "en"
data = format_shipment_for_template(shipment, template_locale=template_locale)
empty_addr = NO_INFO_TEXT_EN if template_locale == "en" else NO_INFO_TEXT
if not is_specific_address(data.get("delivery_address")):
data["delivery_address"] = empty_addr
if not is_specific_address(data.get("pickup_address")):
data["pickup_address"] = empty_addr
if always_confirmation:
# Для вкладки 2 - всегда confirmation_template
template = template_obj.get("confirmation_template", "")
elif overview_client_letter:
# Вкладка «Обзор»: только письмо клиенту (русский шаблон), не confirmation_template
template = template_obj.get("info_request_template", "")
if missing_labels:
data["missing_fields"] = "\n".join(f"- {item}" for item in missing_labels)
else:
data["missing_fields"] = (
"- По имеющимся в переписке данным необходимая информация для предварительной оценки получена.\n"
"- Мы подготовим расчёт и свяжемся с вами в ближайшее время."
)
else:
# Прочие случаи: запрос недостающего или confirmation при полном комплекте
if missing_labels:
template = template_obj.get("info_request_template", "")
data["missing_fields"] = "\n".join(f"- {item}" for item in missing_labels)
else:
template = template_obj.get("confirmation_template", "")
try:
return auto_fill_template(template, data, FIELD_LABELS)
except KeyError as e:
return f"Ошибка в шаблоне: поле {{{e}}} не найдено в данных"
except Exception as e:
return f"Ошибка при формировании письма: {e}"
def _extract_greeting_and_signature(letter_text: str) -> tuple[str, str, str]:
"""
Пытается вытащить из текста письма:
- greeting: первый абзац (обычно "Уважаемый ...!")
- signature: абзац начиная с "С уважением,"
- body: остальной текст между ними
"""
text = clean_email_text(letter_text or "")
if not text:
return "", "", ""
lines = text.splitlines()
# greeting = строки до первой пустой строки (включая её не берем)
greeting_lines: List[str] = []
i = 0
while i < len(lines) and lines[i].strip() != "":
greeting_lines.append(lines[i].rstrip())
i += 1
greeting = "\n".join(greeting_lines).strip()
# signature = ищем "С уважением" (последний блок)
sig_idx = None
for idx in range(len(lines) - 1, -1, -1):
low = lines[idx].strip().lower()
if low.startswith("с уважением") or low.startswith("best regards"):
sig_idx = idx
break
if sig_idx is not None:
signature = "\n".join(lines[sig_idx:]).strip()
body_lines = lines[i:sig_idx]
else:
signature = ""
body_lines = lines[i:]
body = clean_email_text("\n".join(body_lines))
return greeting, body, signature
def _parse_bullet_lines(bullets_block: str) -> List[str]:
"""Строки списка вида «- …» из блока bullets."""
if not bullets_block or not str(bullets_block).strip():
return []
out: List[str] = []
for line in bullets_block.splitlines():
st = line.strip()
if st.startswith("-"):
out.append(st)
return out
def _bullet_dedup_key(line: str) -> str:
s = line.strip()
if s.startswith("-"):
s = s[1:].strip()
return re.sub(r"\s+", " ", s).lower()
def generate_letter_for_shipments(
shipments: List[Dict],
always_confirmation: bool = False,
*,
overview_client_letter: bool = False,
) -> str:
"""
Генерирует ОДНО письмо по нескольким перевозкам:
- секции «Перевозка (i) — <тип>» для каждой найденной комбинации перевозка + тип перевозки;
- шаблон письма подбирается по названию типа перевозки;
- одно приветствие и одна подпись (из первого сгенерированного блока).
"""
shipments = [s for s in (shipments or []) if isinstance(s, dict)]
if not shipments or not shipping_templates:
return ""
def _candidates_for_letter(sh: Dict) -> List[Dict]:
cands = sh.get("shipping_type_candidates")
if isinstance(cands, list) and cands:
return [c for c in cands if isinstance(c, dict)]
st_name = sh.get("shipping_type") or ""
return [{"shipping_type": st_name, "criteria": sh.get("criteria", "")}]
default_template_obj = shipping_templates[0]
sections: List[str] = []
greeting = ""
signature = ""
intro_part = ""
outro_part = ""
first_info_block_seen = False
def _extract_intro_bullets_outro(body: str) -> tuple[str, str, str]:
"""
Для info_request_template: в body есть интро, затем маркированный список "- ...",
затем продолжение (начинается обычно с "Пожалуйста, ...").
"""
if not body:
return "", "", ""
# Начало буллетов: строка, начинающаяся с "- "
m = re.search(r"(^|\n)-\s", body)
if not m:
# На всякий случай: если буллеты не найдены, считаем, что весь body - список
return "", body.strip(), ""
bullet_start = m.start()
please_idx = body.find("Пожалуйста")
bullet_end = please_idx if please_idx != -1 else len(body)
intro = body[:bullet_start].rstrip()
bullets = body[bullet_start:bullet_end].strip()
outro = body[bullet_end:].strip()
return intro, bullets, outro
for idx, shipment in enumerate(shipments, start=1):
variants = _candidates_for_letter(shipment)
if always_confirmation:
for variant in variants:
type_name = (variant.get("shipping_type") or "").strip() or "Тип не указан"
type_name_en = SHIPPING_TYPE_NAME_EN.get(type_name, type_name)
template_obj = None
if type_name and type_name != "Тип не указан":
template_obj = next(
(t for t in shipping_templates if t.get("name") == type_name),
None,
)
if not template_obj:
template_obj = default_template_obj
letter_one = generate_letter_from_template(
shipment, template_obj, always_confirmation=True
)
g, body, sig = _extract_greeting_and_signature(letter_one)
if not greeting and g:
greeting = g
if not signature and sig:
signature = sig
header = f"Shipment ({idx}) — {type_name_en}"
section_body = body if body else clean_email_text(letter_one)
sections.append(f"{header}\n\n{section_body}".strip())
continue
# Письмо-запрос: при нескольких способах доставки общие недостающие пункты — один раз
variant_rows: List[Dict] = []
for variant in variants:
type_name = (variant.get("shipping_type") or "").strip() or "Тип не указан"
template_obj = None
if type_name and type_name != "Тип не указан":
template_obj = next(
(t for t in shipping_templates if t.get("name") == type_name),
None,
)
if not template_obj:
template_obj = default_template_obj
letter_one = generate_letter_from_template(
shipment,
template_obj,
always_confirmation=False,
overview_client_letter=overview_client_letter,
)
g, body, sig = _extract_greeting_and_signature(letter_one)
if not greeting and g:
greeting = g
if not signature and sig:
signature = sig
intro_i, bullets_i, outro_i = _extract_intro_bullets_outro(body)
if not first_info_block_seen:
intro_part, outro_part = intro_i, outro_i
first_info_block_seen = True
variant_rows.append({
"type_name": type_name,
"header": f"Перевозка ({idx}) — {type_name}",
"bullets_raw": bullets_i,
"body": body,
"letter_one": letter_one,
})
if len(variant_rows) <= 1:
row = variant_rows[0]
if row["bullets_raw"]:
sections.append(f"{row['header']}\n{row['bullets_raw'].strip()}")
elif row["body"].strip():
sections.append(f"{row['header']}\n{row['body'].strip()}".strip())
continue
bullet_lists = [_parse_bullet_lines(row["bullets_raw"]) for row in variant_rows]
if not any(bullet_lists):
for row in variant_rows:
if row["body"].strip():
sections.append(f"{row['header']}\n{row['body'].strip()}".strip())
continue
type_names = [row["type_name"] for row in variant_rows]
types_joined = ", ".join(type_names)
merged_header = f"Перевозка ({idx}) — {types_joined}"
# Если не у всех вариантов есть буллеты — полного пересечения нет: один объединённый список без дублей
if not all(len(bl) > 0 for bl in bullet_lists):
seen_u: set = set()
merged_u: List[str] = []
for bl in bullet_lists:
for l in bl:
k = _bullet_dedup_key(l)
if k not in seen_u:
seen_u.add(k)
merged_u.append(l)
if merged_u:
sections.append(f"{merged_header}\n\n" + "\n".join(merged_u).strip())
continue
key_sets = [set(_bullet_dedup_key(l) for l in bl) for bl in bullet_lists]
common_keys = set.intersection(*key_sets)
first_nonempty = next((bl for bl in bullet_lists if bl), bullet_lists[0])
common_lines = [l for l in first_nonempty if _bullet_dedup_key(l) in common_keys]
if common_lines:
sections.append(f"{merged_header}\n\n" + "\n".join(common_lines).strip())
for i, row in enumerate(variant_rows):
residual = [
l for l in bullet_lists[i]
if _bullet_dedup_key(l) not in common_keys
]
if residual:
sections.append(
f"{row['header']} (дополнительно по этому способу)\n" + "\n".join(residual)
)
if not always_confirmation:
combined_body = "\n\n".join(sections).strip()
parts = [p for p in [greeting, intro_part, combined_body, outro_part, signature] if p]
return clean_email_text("\n\n".join(parts))
combined_body = "\n\n".join(sections).strip()
parts = [p for p in [greeting, combined_body, signature] if p]
return clean_email_text("\n\n".join(parts))
def _load_criteria_interface_layout() -> Dict:
"""Разметка чек-листа как на листах Excel «… Интерфейс» (criteria_interface_layout.json)."""
global _CRITERIA_INTERFACE_LAYOUT_CACHE
if _CRITERIA_INTERFACE_LAYOUT_CACHE is not None:
return _CRITERIA_INTERFACE_LAYOUT_CACHE
path = os.path.join(_UI_DIR, "criteria_interface_layout.json")
if not os.path.isfile(path):
_CRITERIA_INTERFACE_LAYOUT_CACHE = {}
return _CRITERIA_INTERFACE_LAYOUT_CACHE
try:
with open(path, "r", encoding="utf-8") as f:
_CRITERIA_INTERFACE_LAYOUT_CACHE = json.load(f)
except Exception:
_CRITERIA_INTERFACE_LAYOUT_CACHE = {}
return _CRITERIA_INTERFACE_LAYOUT_CACHE
def _parse_criteria_by_number(criteria_text: str) -> Dict[str, str]:
"""Ключ — номер пункта из criteria (как в shipping_types), значение — текст критерия."""
out: Dict[str, str] = {}
if not isinstance(criteria_text, str) or not criteria_text.strip():
return out
for line in criteria_text.splitlines():
line = line.strip()
if not line or "\t" not in line:
continue
num, crit = line.split("\t", 1)
n = num.strip()
if n:
out[n] = crit.strip()
return out
def _shipping_type_variants_for_display(shipment: Dict) -> List[Dict]:
cands = shipment.get("shipping_type_candidates")
if isinstance(cands, list) and cands:
return [c for c in cands if isinstance(c, dict)]
name = shipment.get("shipping_type") or shipment.get("shipment_type") or ""
return [{"shipping_type": name, "criteria": shipment.get("criteria", "")}]
def render_shipment_criteria_checklist(
shipment: Dict, criteria_text: str, shipping_type_name: str = ""
) -> None:
"""Критерии выбранного типа перевозки и ответы по данным перевозки."""
if not isinstance(criteria_text, str) or not criteria_text.strip():
st.info("Для этого типа перевозки критерии в конфигурации не заданы.")
return
docs = shipment.get("documents_found", {}) or {}
def _yes_no_or_no_info(v) -> str:
if v is None:
return NO_INFO_TEXT
return "✅ да" if bool(v) else "❌ нет"
def _collect_text_blob(value) -> str:
chunks: List[str] = []
def _walk(v):
if isinstance(v, str):
s = v.strip()
if s:
chunks.append(s)
return
if isinstance(v, dict):
for vv in v.values():
_walk(vv)
return
if isinstance(v, list):
for vv in v:
_walk(vv)
_walk(value)
return " ".join(chunks)
text_blob = _collect_text_blob(shipment)
def _required_doc(v_required, doc_type: str) -> str:
if v_required is None:
return NO_INFO_TEXT
if v_required is False:
return "Не нужен"
if not bool(v_required):
return NO_INFO_TEXT
found_docs = docs.get(doc_type, []) or []
if found_docs:
filenames = [d.get("filename", "") for d in found_docs if d.get("filename")]
files_str = ", ".join(filenames) if filenames else "документ найден"
return f"Нужен (документ найден: {files_str})"
return "Нужен (документ не найден)"
def _format_dimensions(dims) -> str:
parts: List[str] = []
seen = set()
if isinstance(dims, list):
for d in dims:
if isinstance(d, dict):
l = d.get("length_cm")
w = d.get("width_cm")
h = d.get("height_cm")
if l and w and h:
key = f"{l}x{w}x{h}".lower()
if key not in seen:
seen.add(key)
parts.append(f"{l}×{w}×{h} см")
return "; ".join(parts) if parts else NO_INFO_TEXT
def _format_dimensions_any() -> str:
"""
Обязательный вывод для критериев с "габаритами":
сначала грузовые места, затем габариты ТС, если грузовых нет.
"""
cargo_dims = _format_dimensions(shipment.get("dimensions"))
if cargo_dims != NO_INFO_TEXT:
return cargo_dims
vehicle_dims = _format_dimensions(shipment.get("vehicle_dimensions"))
if vehicle_dims != NO_INFO_TEXT:
return vehicle_dims
return NO_INFO_TEXT
def _join_desc_and_hs(sh: Dict) -> str:
desc = (sh.get("cargo_description") or "").strip()
hs = (sh.get("hs_code") or "").strip()
if not desc and not hs:
return NO_INFO_TEXT
desc_part = desc if desc else NO_INFO_TEXT
hs_part = hs if hs else NO_INFO_TEXT
return f"{desc_part} / Код ТН ВЭД: {hs_part}"
def _find_temperature_range(sh: Dict) -> str:
direct = (sh.get("temperature_range") or "").strip()
if direct:
return direct
# Пытаемся вытащить диапазон из других текстовых полей, если парсер не выделил его в отдельный ключ.
scan_fields = [
sh.get("special_transport_requirements"),
sh.get("vehicle_type"),
sh.get("cargo_description"),
sh.get("shipment_type"),
sh.get("shipping_type"),
]
joined = " ".join(str(x) for x in scan_fields if isinstance(x, str) and x.strip())
if not joined:
return NO_INFO_TEXT
m = re.search(r"([+-]?\d{1,2}\s*(?:\.\.|-|до)\s*[+-]?\d{1,2})", joined, flags=re.IGNORECASE)
if m:
return m.group(1).replace(" ", "")
return NO_INFO_TEXT
def _infer_fcl_lcl(sh: Dict) -> str:
shipment_mode = (sh.get("shipment_type") or "").strip()
if shipment_mode:
return shipment_mode
blob = (text_blob + " " + str(sh.get("cargo_description") or "")).lower()
if re.search(r"сборн\w*\s+трак|\bltl\b", blob):
return NO_INFO_TEXT
if re.search(
r"\blcl\b|сборн\w*\s+контейнер|сборн\w*\s+груз|группаж|консолидац|"
r"consolidat\w*|groupage|less\s+than\s+container",
blob,
):
return "LCL"
if re.search(
r"\bfcl\b|цельн\w*\s+контейнер|полн\w*\s+контейнер|"
r"отдельн\w*\s+контейнер|full\s+container\s+load",
blob,
):
return "FCL"
return NO_INFO_TEXT
def _has_batteries(sh: Dict) -> Optional[bool]:
dg = sh.get("dangerous_goods")
if not isinstance(dg, dict):
return None
if dg.get("batteries") is True:
return True
if dg.get("batteries") is False:
return False
return None
def _msds_negative_phrase() -> Optional[str]:
pattern = (
r"(не\s+(?:предостав(?:ляет|им|ят)|дают|высыла(?:ет|ют)|имеется|имеют|прикладыв(?:ает|ают))\s+msds)"
r"|(?:msds\s+(?:не\s+предостав(?:лен|ляется)|отсутствует|нет))"
)
if re.search(pattern, text_blob, flags=re.IGNORECASE):
return "Поставщик не предоставляет MSDS"
return None
def _answer_customs(criterion_lower: str) -> str:
place = (shipment.get("customs_clearance_place_export_rf") or "").strip()
needed = shipment.get("customs_clearance_required")
if "место таможенного" in criterion_lower or (
"таможенн" in criterion_lower and "место" in criterion_lower
):
return place or NO_INFO_TEXT
if "таможенн" in criterion_lower:
if needed is None and not place:
return NO_INFO_TEXT
if needed is False:
return "не нужно"
if needed is True:
if place:
return f"нужно (место: {place})"
return f"нужно (место: {NO_INFO_TEXT.lower()})"
if place:
return place
return NO_INFO_TEXT
return NO_INFO_TEXT
def _format_volume_cbm() -> str:
v = shipment.get("total_volume_cbm")
if v is None or (isinstance(v, (int, float)) and v == 0):
return NO_INFO_TEXT
return f"{v} м³"
def _container_spec_drop_bare_types() -> str:
"""То же отображение контейнеров, что и в письме контрагенту."""
raw = (shipment.get("container_type") or "").strip()
return normalize_container_type_display(raw or None, empty=NO_INFO_TEXT)
def _answer_for_criterion(criterion_text: str) -> str:
c = criterion_text.lower()
# Любой критерий с "габарит" должен в приоритете возвращать размеры, а не перехватываться ветками по весу.
if "габарит" in c:
return _format_dimensions_any()
if "название клиента" in c:
return (shipment.get("client_name") or "").strip() or NO_INFO_TEXT
if "условия поставки" in c and "incoterms" in c:
return (shipment.get("incoterms") or "").strip() or NO_INFO_TEXT
if "тип перевозки" in c and "fcl" in c and "lcl" in c:
return _infer_fcl_lcl(shipment)
if "контейнер" in c and (
"типоразмер" in c
or ("количество" in c and "типоразмер" in c)
or ("fcl" in c and ("тип" in c or "количеств" in c))
):
return _container_spec_drop_bare_types()
if "дата готовности" in c:
return format_cargo_ready_date_for_display(shipment)
if (
"пункт отправления" in c
or "адрес забора" in c
or "порт отправления" in c
or "порт погрузки" in c
or ("станция" in c and ("отправлен" in c or "погруз" in c))
):
pickup = (shipment.get("pickup_address") or "").strip()
return pickup if is_specific_address(pickup) else NO_INFO_TEXT
if (
"пункт назначения" in c
or "адрес доставки" in c
or "порт назначения" in c
or "порт выгрузки" in c
or ("станция" in c and ("назнач" in c or "выгруз" in c))
):
delivery = (shipment.get("delivery_address") or "").strip()
return delivery if is_specific_address(delivery) else NO_INFO_TEXT
if "стоимость груза" in c:
return (shipment.get("cargo_value") or "").strip() or NO_INFO_TEXT
if "количество грузовых мест" in c and "вес" in c and ("объем" in c or "объём" in c):
pkg = shipment.get("package_count")
w = shipment.get("total_weight_kg")
pkg_ok = pkg is not None and str(pkg).strip() not in ("", "0", "0.0")
w_ok = w is not None and str(w).strip() not in ("", "0", "0.0")
pkg_str = str(pkg).strip() if pkg_ok else ""
w_str = f"{w} кг" if w_ok else ""
vol_str = _format_volume_cbm()
if vol_str == NO_INFO_TEXT:
vol_str = ""
return " / ".join([s for s in [pkg_str, w_str, vol_str] if s]) or NO_INFO_TEXT
if "количество грузовых мест" in c and "вес" in c:
pkg = shipment.get("package_count")
w = shipment.get("total_weight_kg")
pkg_ok = pkg is not None and str(pkg).strip() not in ("", "0", "0.0")
w_ok = w is not None and str(w).strip() not in ("", "0", "0.0")
pkg_str = str(pkg).strip() if pkg_ok else ""
w_str = f"{w} кг" if w_ok else ""
return " / ".join([s for s in [pkg_str, w_str] if s]) or NO_INFO_TEXT
if "количество грузовых мест" in c:
pkg = shipment.get("package_count")
if pkg is None or str(pkg).strip() in ("", "0", "0.0"):
return NO_INFO_TEXT
return str(pkg).strip()
if "общий объ" in c or "общий объём" in c or "общий объем" in c:
return _format_volume_cbm()
if "вес" in c and "количество" not in c:
w = shipment.get("total_weight_kg")
if w is None or str(w).strip() in ("", "0", "0.0"):
return NO_INFO_TEXT
return f"{w} кг"
if ("проём" in c or "проем" in c) and (
"контейнер" in c
or "20dv" in c
or "40dv" in c
or "40hc" in c
or "проходимость" in c
):
return door_clearance_summary(shipment)
# Для FTL/FCL-пунктов о габаритах палет/коробок приоритетно показываем размеры грузовых мест,
# не смешивая с проверкой проёмов контейнера.
if (
("палл" in c or "палет" in c or "короб" in c or "carton" in c or "box" in c)
and ("габарит" in c or "размер" in c or "size" in c or "dimension" in c)
):
return _format_dimensions_any()
# Габариты грузовых мест (LTL, FCL «без контейнеров», и п.8 FTL: «если нет данных о количестве
# машин — габариты каждого грузового места…»). Слово «машин» здесь не про ТС — иначе ниже
# сработала бы ветка vehicle_dimensions и габариты груза не показались бы.
if "габарит" in c and "грузов" in c and "мест" in c:
return _format_dimensions_any()
if "габарит" in c and (
"транспортн" in c
or "машин" in c
or "средств" in c
or "кузов" in c
or "прицеп" in c
or "полуприцеп" in c
):
return _format_dimensions_any()
if "габарит" in c:
return _format_dimensions_any()
if "характер груза" in c or "наименование груза" in c:
return _join_desc_and_hs(shipment)
if "код тн" in c:
return (shipment.get("hs_code") or "").strip() or NO_INFO_TEXT
# Перечень DG (авиа п.9, море/авто и т.д.) и формулировки интерфейса («Наличие батарейк, газов…»).
if (
"содержатся ли в грузе" in c
or "опасные вещества" in c
or "опасные свойства" in c
or (
("батарейки" in c or "батарейк" in c)
and (
"газы" in c
or "газов" in c
or "жидкост" in c
or "аэрозол" in c
or "хладоагент" in c
or "сухой лёд" in c
or "сухой лед" in c
)
)
):
return format_dangerous_goods_summary(shipment)
if "штабел" in c and "между собой" in c:
return _yes_no_or_no_info(shipment.get("stackable_among_themselves"))
if "штабел" in c and "с другими отправками" in c:
return _yes_no_or_no_info(shipment.get("stackable_with_others"))
if "msds" in c or "паспорт безопасности" in c:
msds_negative = _msds_negative_phrase()
if msds_negative:
return msds_negative
if "батаре" in c and _has_batteries(shipment) is not True:
# Если батареек нет, документ по батарейкам не нужен.
if shipment.get("msds_required") is False:
return "Не нужен"
return NO_INFO_TEXT
return _required_doc(shipment.get("msds_required"), "msds")
if re.search(r"(?<![a-z])dgm(?![a-z])", c):
if "батаре" in c and _has_batteries(shipment) is not True:
if shipment.get("dgm_report_required") is False:
return "Не нужен"
return NO_INFO_TEXT
return _required_doc(shipment.get("dgm_report_required"), "dgm")
if "замена" in c and "документ" in c:
rep = shipment.get("document_replacement_needed")
if rep is None:
return NO_INFO_TEXT
if rep is False:
return "Не требуется"
msds_r = shipment.get("msds_required")
dgm_r = shipment.get("dgm_report_required")
brand_r = shipment.get("brand_authorization_letter")
flags = [msds_r, dgm_r, brand_r]
if all(v is None for v in flags):
return NO_INFO_TEXT
missing_any = False
if msds_r is True and not docs.get("msds"):
missing_any = True
if dgm_r is True and not docs.get("dgm"):
missing_any = True
if brand_r is True and not docs.get("brand_authorization"):
missing_any = True
if missing_any:
return "Требуется"
if any(v is True for v in flags):
return "Не требуется"
if all(v is False for v in flags):
return "Не требуется"
return NO_INFO_TEXT
if "документ" in c and ("msds" not in c and "dgm" not in c):
return build_documents_summary(shipment)
if "бренд" in c and "авторизац" in c:
return format_brand_authorization_summary(shipment)
if "название бренда" in c or (
"бренд" in c
and "авторизац" not in c
and "зарегистрирован" not in c
and "таможенной системе" not in c
):
return (shipment.get("brand_name") or "").strip() or NO_INFO_TEXT
if "лицензия экспорт" in c:
return _yes_no_or_no_info(shipment.get("exporter_has_export_license"))
if "трансшипмент" in c:
return _yes_no_or_no_info(shipment.get("transshipment_with_third_country"))
if "дополнительный сервис" in c or "дополнительные услуги" in c or "дополнительные сервисы" in c or "сервисы" in c:
extras = shipment.get("additional_services")
lines: List[str] = []
if isinstance(extras, list):
lines.extend(str(x).strip() for x in extras if str(x).strip())
# Расширение доп.сервисов: показываем штрафы и сроки доставки, если они найдены в спецтребованиях.
req = (shipment.get("special_transport_requirements") or "").strip()
if req:
chunks = [p.strip() for p in req.split("|") if p.strip()]
for chunk in chunks:
cl = chunk.lower()
if any(k in cl for k in ("штраф", "неустой", "penalty", "срок", "transit", "delivery")):
lines.append(chunk)
if not lines:
return NO_INFO_TEXT
dedup: List[str] = []
seen = set()
for line in lines:
key = line.lower()
if key in seen:
continue
seen.add(key)
dedup.append(line)
return ", ".join(dedup) or NO_INFO_TEXT
if "спецтребования" in c:
return (shipment.get("special_transport_requirements") or "").strip() or NO_INFO_TEXT
if "фумигац" in c:
return _yes_no_or_no_info(shipment.get("fumigation_on_wooden_packaging"))
if "таможен" in c:
return _answer_customs(c)
if "экспедир" in c or "forwarding" in c or "клиент осуществляет экспедирование" in c:
resp = (shipment.get("arrival_expediting_responsibility") or "").strip()
return resp or NO_INFO_TEXT
if "найдена ли" in c and "письме" in c:
summary = build_documents_summary(shipment)
if summary != NO_INFO_TEXT:
return f"По вложениям и распознанным документам: {summary}"
return NO_INFO_TEXT
if "батарейки" in c and ("упаков" in c or "отдель" in c):
if _has_batteries(shipment) is not True:
return NO_INFO_TEXT
return _yes_no_or_no_info(shipment.get("batteries_packed_separately"))
if "тип транспорта" in c:
return (shipment.get("vehicle_type") or "").strip() or NO_INFO_TEXT
if "температурный режим" in c:
return _find_temperature_range(shipment)
return NO_INFO_TEXT
def _criterion_card_html(num: str, crit: str, answer: str) -> str:
"""Одна карточка для вертикального списка (без flex-строки)."""
esc = html.escape
prefix = f"{esc(str(num))}. " if str(num).strip() else ""
return (
"<div style='"
"border:1px solid var(--septem-border);"
"border-radius:10px;"
"padding:10px 12px;"
"margin-bottom:8px;"
"background:var(--septem-card-bg);'>"
f"<div style='font-weight:600; margin-bottom:6px;'>{prefix}{esc(crit)}</div>"
f"<div style='color:var(--septem-text);'><span style='color:var(--septem-muted);'>Ответ:</span> {esc(answer)}</div>"
"</div>"
)
def _criterion_card_flex_cell(
num: str, crit: str, answer: str, *, full_width: bool = False
) -> str:
"""Ячейка внутри flex-строки: одинаковая высота с соседями (align-items: stretch)."""
esc = html.escape
prefix = f"{esc(str(num))}. " if str(num).strip() else ""
flex_basis = (
"0 0 auto;width:100%;min-width:0" if full_width else "1 1 0;min-width:0"
)
return (
f'<div style="flex:{flex_basis};display:flex;flex-direction:column;'
'border:1px solid var(--septem-border);border-radius:10px;padding:10px 12px;'
'background:var(--septem-card-bg);box-sizing:border-box;">'
f'<div style="font-weight:600;margin-bottom:6px;">{prefix}{esc(crit)}</div>'
f'<div style="color:var(--septem-text);flex:1 1 auto;min-height:0;">'
f'<span style="color:var(--septem-muted);">Ответ:</span> {esc(answer)}</div>'
"</div>"
)
def _flex_row_equal_height_html(
inner_cells: str, *, margin_bottom_px: int = 10
) -> str:
"""Одна линия карточек: высота всех ячеек равна максимальной в строке."""
return (
f'<div style="display:flex;gap:8px;align-items:stretch;width:100%;'
f'margin-bottom:{margin_bottom_px}px;box-sizing:border-box;">'
f"{inner_cells}"
"</div>"
)
def _auto_ltl_ftl_top_section_html(
nums: List[str],
labels: List[str],
r67: Dict[str, Any],
) -> str:
"""
Верхний блок как в Excel «Авто … интерфейс»: слева 13,5,8/9 (5 карточек) на всю высоту;
справа сверху 4 и последний столбец (19/18), под ними на ширину правой колонки — таможня (20/19).
"""
left_parts: List[str] = []
for i in range(5):
n = str(nums[i]).strip()
crit = by_num.get(n, labels[i])
left_parts.append(_criterion_card_flex_cell(n, crit, _answer_for_criterion(crit)))
left_html = "".join(left_parts)
n4 = str(nums[5]).strip()
crit4 = by_num.get(n4, labels[5])
n_last = str(nums[6]).strip()
crit_last = by_num.get(n_last, labels[6])
top_pair = _flex_row_equal_height_html(
_criterion_card_flex_cell(n4, crit4, _answer_for_criterion(crit4))
+ _criterion_card_flex_cell(n_last, crit_last, _answer_for_criterion(crit_last)),
margin_bottom_px=0,
)
rn = str(r67.get("num", "")).strip()
rlab = (r67.get("label") or "").strip()
crit_r = by_num.get(rn, rlab)
bottom = _criterion_card_flex_cell(rn, crit_r, _answer_for_criterion(crit_r), full_width=True)
right_col = (
f'<div style="display:flex;flex-direction:column;gap:8px;flex:2 1 0;min-width:0;">'
f"{top_pair}"
f"{bottom}"
"</div>"
)
left_col = (
'<div style="display:flex;gap:6px;align-items:stretch;flex:5 1 0;min-width:0;">'
f"{left_html}"
"</div>"
)
return (
'<div style="display:flex;gap:10px;align-items:stretch;width:100%;margin-bottom:10px;'
'box-sizing:border-box;">'
f"{left_col}{right_col}"
"</div>"
)
by_num = _parse_criteria_by_number(criteria_text)
# Показываем верхние 5 пунктов всегда (независимо от layout JSON),
# т.к. в некоторых деплоях header-блок может не приходить/не совпадать по формату.
# Для FCL по требованию: 5-й верхний столбец должен быть пунктом 9.
st_name_for_top = (shipping_type_name or "").strip()
fcl_types = {
"Автомобильная перевозка (FTL)",
"Морская перевозка (FCL)",
"Железнодорожная перевозка (FCL)",
"Мультимодальная перевозка море + ж/д (FCL)",
}
if st_name_for_top in fcl_types:
top5_nums = ("1", "2", "3", "5", "9")
else:
top5_nums = ("1", "2", "3", "5", "8")
top5_cells: List[str] = []
for n in top5_nums:
crit = by_num.get(n)
if isinstance(crit, str) and crit.strip():
top5_cells.append(_criterion_card_flex_cell(n, crit, _answer_for_criterion(crit)))
if top5_cells:
st.markdown(_flex_row_equal_height_html("".join(top5_cells)), unsafe_allow_html=True)
layout_entry = _load_criteria_interface_layout().get("by_shipping_type", {}).get(
(shipping_type_name or "").strip()
)
blocks = layout_entry.get("blocks") if isinstance(layout_entry, dict) else None
_AUTO_TYPES = frozenset(
{"Автомобильная перевозка (LTL)", "Автомобильная перевозка (FTL)"}
)
if isinstance(blocks, list) and blocks:
bi = 0
while bi < len(blocks):
block = blocks[bi]
if not isinstance(block, dict):
bi += 1
continue
bt = block.get("type")
st_name = (shipping_type_name or "").strip()
merge_auto_top = (
st_name in _AUTO_TYPES
and bt == "header"
and bi + 1 < len(blocks)
and isinstance(blocks[bi + 1], dict)
and blocks[bi + 1].get("type") == "right_67"
)
if merge_auto_top:
nums = block.get("nums") or []
labels = block.get("labels") or []
if len(nums) == len(labels) == 7:
st.markdown(
_auto_ltl_ftl_top_section_html(nums, labels, blocks[bi + 1]),
unsafe_allow_html=True,
)
bi += 2
continue
if bt == "header":
nums = block.get("nums") or []
labels = block.get("labels") or []
if len(nums) == len(labels) and len(nums) > 0:
cells: List[str] = []
for i in range(len(nums)):
n = str(nums[i]).strip()
if n in top5_nums:
# Верхние 5 уже отрисованы принудительно единым блоком.
continue
crit = by_num.get(n, labels[i])
ans = _answer_for_criterion(crit)
cells.append(_criterion_card_flex_cell(n, crit, ans))
if cells:
st.markdown(
_flex_row_equal_height_html("".join(cells)),
unsafe_allow_html=True,
)
bi += 1
continue
if bt == "right_67":
n = str(block.get("num", "")).strip()
lab = (block.get("label") or "").strip()
crit = by_num.get(n, lab)
st.markdown(
_criterion_card_html(n, crit, _answer_for_criterion(crit)),
unsafe_allow_html=True,
)
bi += 1
continue
if bt == "pair":
left = block.get("left") or {}
right = block.get("right") or {}
if isinstance(left, dict) and isinstance(right, dict):
n = str(left.get("num", "")).strip()
lab = (left.get("label") or "").strip()
crit_l = by_num.get(n, lab)
ans_l = _answer_for_criterion(crit_l)
n2 = str(right.get("num", "")).strip()
lab2 = (right.get("label") or "").strip()
crit_r = by_num.get(n2, lab2)
ans_r = _answer_for_criterion(crit_r)
st.markdown(
_flex_row_equal_height_html(
_criterion_card_flex_cell(n, crit_l, ans_l)
+ _criterion_card_flex_cell(n2, crit_r, ans_r)
),
unsafe_allow_html=True,
)
bi += 1
continue
if bt == "left_12":
n = str(block.get("num", "")).strip()
lab = (block.get("label") or "").strip()
crit = by_num.get(n, lab)
st.markdown(
_criterion_card_html(n, crit, _answer_for_criterion(crit)),
unsafe_allow_html=True,
)
bi += 1
continue
bi += 1
return
criteria_lines = [l for l in criteria_text.splitlines() if l.strip()]
criteria_items: List[tuple[str, str]] = []
for line in criteria_lines:
if "\t" in line:
num, crit = line.split("\t", 1)
criteria_items.append((num.strip(), crit.strip()))
else:
criteria_items.append(("", line.strip()))
for num, crit in criteria_items:
answer = _answer_for_criterion(crit)
st.markdown(_criterion_card_html(num, crit, answer), unsafe_allow_html=True)
# =============================================================================
# ОТОБРАЖЕНИЕ ОБЗОРА ПЕРЕВОЗОК
# =============================================================================
def display_overview(report: Dict):
st.subheader("Детальная информация о грузоперевозке")
data = report.get('structured_data', {})
shipments = data.get('shipments', [])
sources = report.get('sources', [])
if not shipments:
st.info("Нет структурированных данных о перевозках")
return
for idx, shipment in enumerate(shipments):
with st.expander(f"🚚 Перевозка #{idx+1}", expanded=(idx==0)):
with st.container(border=True):
display_emails(report, shipment.get("ID_emails", []), idx)
variants = _shipping_type_variants_for_display(shipment)
has_any_criteria = any(
isinstance(v.get("criteria"), str) and str(v.get("criteria", "")).strip()
for v in variants
)
st.markdown("##### Отчёт по возможным типам перевозки")
for v_idx, variant in enumerate(variants):
type_title = (variant.get("shipping_type") or "").strip() or "Тип не указан"
crit_raw = variant.get("criteria", "")
crit = crit_raw if isinstance(crit_raw, str) else ""
with st.expander(f"📋 {type_title}", expanded=(v_idx == 0)):
render_shipment_criteria_checklist(shipment, crit, type_title)
def should_show(key, value):
if key in (
"shipping_options",
"shipping_type",
"shipping_type_candidates",
"requested_shipping_type_names",
):
return False
if key == "criteria_preview":
return False
if has_any_criteria:
return False
if value is None:
return False
if isinstance(value, str) and value.strip() == "":
return False
if isinstance(value, list) and len(value) == 0:
return False
if isinstance(value, dict) and key == "dangerous_goods":
return format_dangerous_goods_summary(shipment) != NO_INFO_TEXT
return True
fields_to_show = []
for key, value in shipment.items():
fields_to_show.append((key, value))
col1, col2 = st.columns(2)
mid = len(fields_to_show) // 2
for i, (key, value) in enumerate(fields_to_show):
if key in ["ID_emails", "documents_found"]:
continue
if not should_show(key, value):
continue
with col1 if i < mid else col2:
label = FIELD_LABELS.get(key, key)
if key in ("dimensions", "vehicle_dimensions"):
dims = []
for d in value:
if isinstance(d, dict) and d.get('length_cm') and d.get('width_cm') and d.get('height_cm'):
dims.append(f"{d['length_cm']}×{d['width_cm']}×{d['height_cm']} см")
value_str = "; ".join(dims) if dims else NO_INFO_TEXT
elif key == "criteria" and isinstance(value, str):
# criteria может быть:
# - сырой: "1\tНазвание...\n2\t..."
# - короткий: "Клиент, Код ТН ВЭД, Требуется MSDS;"
raw = value.strip()
if "\t" not in raw and (";" in raw or "," in raw) and "\n" not in raw:
import re as _re
parts = [p.strip() for p in _re.split(r";\\s*|,\\s*", raw) if p.strip()]
value_str = "<br/>".join(parts) if parts else NO_INFO_TEXT
else:
lines = [l.strip() for l in value.splitlines() if l.strip()]
formatted: List[str] = []
for l in lines:
if "\t" in l:
num, text = l.split("\t", 1)
formatted.append(f"{num.strip()}. {text.strip()}")
else:
formatted.append(l)
value_str = "<br/>".join(formatted) if formatted else NO_INFO_TEXT
elif key == "dangerous_goods" and isinstance(value, dict):
value_str = format_dangerous_goods_summary(shipment)
elif key == "cargo_ready_date":
value_str = format_cargo_ready_date_for_display(shipment)
elif isinstance(value, bool):
# Только для документов: "нужен/не нужен"
doc_bool_keys = {"msds_required", "dgm_report_required", "brand_authorization_letter"}
base_str = ("нужен" if value else "не нужен") if key in doc_bool_keys else ("✅ Да" if value else "❌ Нет")
docs = shipment.get("documents_found", {})
doc_map = {
"msds_required": "msds",
"dgm_report_required": "dgm",
"brand_authorization_letter": "brand_authorization",
}
if key in doc_map and value:
doc_type = doc_map[key]
found_docs = docs.get(doc_type, [])
if found_docs:
filenames = [d["filename"] for d in found_docs]
value_str = f"{base_str} (документ найден: {', '.join(filenames)})"
else:
value_str = f"{base_str} (документ не найден)"
else:
value_str = base_str
elif isinstance(value, list):
value_str = ", ".join(str(v) for v in value) if value else NO_INFO_TEXT
elif value == None or value == "":
value_str = NO_INFO_TEXT
else:
value_str = str(value)
if key == "total_weight_kg" and isinstance(value, (int, float)):
value_str = f"{value} кг"
elif key == "total_volume_cbm" and isinstance(value, (int, float)):
value_str = f"{value} м³"
if key == "shipping_options":
continue
render_val = (
value_str
if key in ("criteria", "cargo_ready_date", "hs_code")
else value_str.capitalize()
)
st.markdown(
f"<div class='ship-field'><b>{label}:</b> {render_val}</div>",
unsafe_allow_html=True
)
with st.container(border=True):
st.markdown("#### 📋 Найденные документы")
docs = shipment.get("documents_found", {})
for doc_type, files in docs.items():
if not files:
continue
st.markdown(f"**{doc_type.upper()}**")
for f in files:
st.markdown(f"- {f['filename']} (из письма: {f['email_subject']})")
# Письмо клиенту — строго ОДНО, под всеми перевозками
with st.expander("✉️ Письмо клиенту (под всеми перевозками)", expanded=False):
with st.container(border=True):
if shipping_templates:
letter_text = generate_letter_for_shipments(
shipments,
always_confirmation=False,
overview_client_letter=True,
)
if letter_text:
st.components.v1.html(
generate_copy_html(
letter_text,
height=500,
theme=st.session_state.get("ui_theme", "light"),
),
height=550,
)
attachments_client = collect_attachment_data_for_shipments(report, shipments)
dl1, dl2 = st.columns(2)
with dl1:
st.download_button(
label="📥 Скачать текст (.txt)",
data=letter_text.encode("utf-8"),
file_name="septem_pismo_klientu.txt",
mime="text/plain; charset=utf-8",
key="download_client_letter_txt",
)
with dl2:
eml_client = create_eml_with_attachments(letter_text, attachments_client)
st.download_button(
label="📥 Скачать .eml",
data=eml_client,
file_name="septem_pismo_klientu.eml",
mime="message/rfc822",
key="download_client_letter_eml",
)
if attachments_client:
st.caption(
f"В .eml добавлено вложений (без изображений): {len(attachments_client)}"
)
else:
st.info("Письмо не сгенерировано")
else:
st.info("Шаблоны писем не загружены")
# =============================================================================
# ОТОБРАЖЕНИЕ ПИСЕМ
# =============================================================================
def display_emails(report: Dict, ship_id: List, num_ship: int):
st.markdown("Письма")
sources = report.get('sources', [])
for idx, source in enumerate(sources):
if source.get("id") not in ship_id:
continue
with st.expander(f"📧 {source.get('subject', 'Без темы')}"):
attachments = source.get('attachments', [])
# ---------- Фильтруем вложения (исключаем изображения) ----------
display_attachments = []
for orig_att_idx, att in enumerate(attachments):
ext = att['filename'].split('.')[-1].lower()
if ext not in IMAGE_EXTS:
display_attachments.append((orig_att_idx, att))
if display_attachments:
st.markdown("##### 📎 Вложения")
cols = st.columns(len(display_attachments))
for col, (orig_att_idx, att) in zip(cols, display_attachments):
with col:
if st.session_state.session_id:
url = f"{API_URL}/attachments/{st.session_state.session_id}/{idx}/{orig_att_idx}"
ext = att['filename'].split('.')[-1].lower()
if ext == 'pdf':
icon = "📕"
elif ext in ['doc', 'docx']:
icon = "📘"
elif ext in ['xls', 'xlsx']:
icon = "📗"
elif ext in IMAGE_EXTS:
icon = "🖼️"
else:
icon = "📄"
st.markdown(
f"""
<div style="text-align: center;">
<a href="{url}" target="_blank" style="text-decoration:none;">
<div style="
display: inline-block;
cursor:pointer;
border:1px solid var(--septem-border);
padding:6px;
border-radius:8px;
font-size:12px;
background:var(--septem-code-bg);
color:var(--septem-text);
max-width: 100%;
word-wrap: break-word;">
{icon} {att['filename']}
</div>
</a>
</div>
""",
unsafe_allow_html=True
)
st.markdown("---")
# ---------- Содержимое письма: HTML или текст ----------
body_html = source.get('body_html')
if body_html and body_html.strip():
# Экранируем для безопасной вставки в srcdoc (используем json.dumps)
import json
# json.dumps добавляет внешние кавычки и экранирует внутренние
safe_html = json.dumps(body_html)
# Убираем внешние кавычки, т.к. srcdoc сам будет в кавычках
if safe_html.startswith('"') and safe_html.endswith('"'):
safe_html = safe_html[1:-1]
iframe_code = f'''
<iframe
srcdoc="{safe_html}"
sandbox=""
style="width:100%; height:400px; border:1px solid var(--septem-border); border-radius:4px; background:var(--septem-card-bg);"
></iframe>
'''
st.markdown(iframe_code, unsafe_allow_html=True)
else:
content = clean_email_text(source.get('content', ''))
if content:
st.text_area(
"Содержание",
value=content,
height=300,
disabled=True,
key=f"content_{num_ship}_{idx}"
)
else:
st.info("Текст письма отсутствует")
# =============================================================================
# ОТОБРАЖЕНИЕ ВСЕХ ВЛОЖЕНИЙ (ТРЕТЬЯ ВКЛАДКА)
# =============================================================================
def display_all_attachments(report: Dict):
sources = report.get('sources', [])
if not sources:
st.info("Нет писем с вложениями")
return
all_attachments = []
for email_idx, source in enumerate(sources):
attachments = source.get('attachments', [])
for att_idx, att in enumerate(attachments):
ext = att['filename'].split('.')[-1].lower()
if ext not in IMAGE_EXTS: # ← фильтр
all_attachments.append({
'email_idx': email_idx,
'att_idx': att_idx,
'filename': att['filename'],
'subject': source.get('subject', 'Без темы'),
'sender': source.get('sender_name', source.get('sender', '')),
'text_preview': att.get('text', '')[:200] + '...' if len(att.get('text', '')) > 200 else att.get('text', ''),
'size': att.get('size', 0),
'has_content': bool(att.get('content_base64'))
})
if not all_attachments:
st.info("Нет доступных вложений (изображения скрыты)")
return
st.info(f"Найдено вложений (без учёта изображений): {len(all_attachments)}")
for att in all_attachments:
with st.container(border=True):
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
# Можно тоже добавить иконку
ext = att['filename'].split('.')[-1].lower()
if ext == 'pdf':
icon = "📕"
elif ext in ['doc', 'docx']:
icon = "📘"
elif ext in ['xls', 'xlsx']:
icon = "📗"
else:
icon = "📄"
st.markdown(f"**{icon} {att['filename']}**")
st.caption(f"Из письма: {att['subject']} от {att['sender']}")
if att['text_preview']:
st.text(att['text_preview'])
st.caption(f"Размер: {att['size']} байт")
if att['has_content']:
st.success("✅ Оригинал файла доступен")
else:
st.warning("⚠️ Только текст (оригинал не сохранён)")
with col2:
if st.session_state.session_id:
url = f"{API_URL}/attachments/{st.session_state.session_id}/{att['email_idx']}/{att['att_idx']}"
st.link_button("👁️ Просмотреть", url, use_container_width=True)
else:
st.button("👁️ Просмотреть", disabled=True, use_container_width=True)
with col3:
if st.session_state.session_id:
url = f"{API_URL}/attachments/{st.session_state.session_id}/{att['email_idx']}/{att['att_idx']}"
st.link_button("📥 Скачать", url, use_container_width=True)
else:
st.button("📥 Скачать", disabled=True, use_container_width=True)
# =============================================================================
# ЗАПУСК ПРИЛОЖЕНИЯ
# =============================================================================
if __name__ == "__main__":
main()