155 lines
5.8 KiB
Python
155 lines
5.8 KiB
Python
from pypdf import PdfReader
|
|
from docx import Document
|
|
import openpyxl
|
|
from typing import Union
|
|
import io
|
|
import logging
|
|
from bs4 import BeautifulSoup
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DocumentProcessor:
|
|
@staticmethod
|
|
def _cell_to_text(v) -> str:
|
|
if v is None:
|
|
return ""
|
|
s = str(v).strip()
|
|
return s if s else ""
|
|
|
|
@staticmethod
|
|
def _sheet_rows_text_openpyxl(sheet) -> list[str]:
|
|
out: list[str] = []
|
|
for row in sheet.iter_rows(values_only=True):
|
|
vals = [DocumentProcessor._cell_to_text(c) for c in row]
|
|
vals = [x for x in vals if x]
|
|
if vals:
|
|
out.append(" | ".join(vals))
|
|
return out
|
|
|
|
@staticmethod
|
|
def _sheet_cols_text_openpyxl(sheet) -> list[str]:
|
|
out: list[str] = []
|
|
max_col = sheet.max_column or 0
|
|
max_row = sheet.max_row or 0
|
|
for c in range(1, max_col + 1):
|
|
vals: list[str] = []
|
|
for r in range(1, max_row + 1):
|
|
v = DocumentProcessor._cell_to_text(sheet.cell(r, c).value)
|
|
if v:
|
|
vals.append(v)
|
|
if vals:
|
|
col_letter = openpyxl.utils.get_column_letter(c)
|
|
out.append(f"COL {col_letter} TOP_DOWN: " + " || ".join(vals))
|
|
out.append(f"COL {col_letter} BOTTOM_UP: " + " || ".join(reversed(vals)))
|
|
return out
|
|
|
|
@staticmethod
|
|
def _sheet_rows_text_xlrd(sheet) -> list[str]:
|
|
out: list[str] = []
|
|
for row_idx in range(sheet.nrows):
|
|
row = sheet.row_values(row_idx)
|
|
vals = [DocumentProcessor._cell_to_text(c) for c in row]
|
|
vals = [x for x in vals if x]
|
|
if vals:
|
|
out.append(" | ".join(vals))
|
|
return out
|
|
|
|
@staticmethod
|
|
def _sheet_cols_text_xlrd(sheet) -> list[str]:
|
|
out: list[str] = []
|
|
for c in range(sheet.ncols):
|
|
vals: list[str] = []
|
|
for r in range(sheet.nrows):
|
|
v = DocumentProcessor._cell_to_text(sheet.cell_value(r, c))
|
|
if v:
|
|
vals.append(v)
|
|
if vals:
|
|
out.append(f"COL {c+1} TOP_DOWN: " + " || ".join(vals))
|
|
out.append(f"COL {c+1} BOTTOM_UP: " + " || ".join(reversed(vals)))
|
|
return out
|
|
|
|
def normalize_email_html(html_content: str) -> str:
|
|
"""
|
|
Очищает HTML письма и нормализует таблицы
|
|
"""
|
|
|
|
soup = BeautifulSoup(html_content, "html.parser")
|
|
|
|
# удаляем скрипты и стили
|
|
for tag in soup(["script", "style"]):
|
|
tag.decompose()
|
|
|
|
# нормализуем таблицы
|
|
for table in soup.find_all("table"):
|
|
table["style"] = "border-collapse: collapse; width: 100%;"
|
|
|
|
for cell in table.find_all(["td", "th"]):
|
|
cell["style"] = "border:1px solid #ccc;padding:6px;"
|
|
|
|
return str(soup)
|
|
def extract_text(self, content: bytes, filename: str) -> str:
|
|
ext = filename.lower().split('.')[-1]
|
|
|
|
# Изображения не обрабатываем
|
|
if ext in ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']:
|
|
return ""
|
|
|
|
if ext == 'pdf':
|
|
return self._extract_pdf(content)
|
|
elif ext in ['docx', 'doc']:
|
|
return self._extract_docx(content)
|
|
elif ext in ['xlsx', 'xls']:
|
|
return self._extract_excel(content)
|
|
elif ext == 'txt':
|
|
return content.decode('utf-8')
|
|
else:
|
|
raise ValueError(f"Unsupported format: {ext}")
|
|
def _extract_pdf(self, content: bytes) -> str:
|
|
pdf = PdfReader(io.BytesIO(content))
|
|
return "\n".join(page.extract_text() for page in pdf.pages)
|
|
|
|
def _extract_docx(self, content: bytes) -> str:
|
|
doc = Document(io.BytesIO(content))
|
|
return "\n".join(paragraph.text for paragraph in doc.paragraphs)
|
|
|
|
def _extract_excel(self, content: bytes) -> str:
|
|
try:
|
|
# сначала пробуем openpyxl (для .xlsx)
|
|
wb = openpyxl.load_workbook(io.BytesIO(content))
|
|
text = []
|
|
for sheet in wb.worksheets:
|
|
text.append(f"=== SHEET: {sheet.title} ===")
|
|
# Сохраняем совместимость: построчное чтение.
|
|
rows_text = self._sheet_rows_text_openpyxl(sheet)
|
|
if rows_text:
|
|
text.append("[ROWS]")
|
|
text.extend(rows_text)
|
|
# Новый режим: чтение по колонкам (общие данные часто внизу столбца).
|
|
cols_text = self._sheet_cols_text_openpyxl(sheet)
|
|
if cols_text:
|
|
text.append("[COLUMNS]")
|
|
text.extend(cols_text)
|
|
return "\n".join(text)
|
|
except Exception as e:
|
|
# если не получилось, возможно это .xls, пробуем xlrd
|
|
try:
|
|
import xlrd
|
|
workbook = xlrd.open_workbook(file_contents=content)
|
|
text = []
|
|
for sheet in workbook.sheets():
|
|
text.append(f"=== SHEET: {sheet.name} ===")
|
|
rows_text = self._sheet_rows_text_xlrd(sheet)
|
|
if rows_text:
|
|
text.append("[ROWS]")
|
|
text.extend(rows_text)
|
|
cols_text = self._sheet_cols_text_xlrd(sheet)
|
|
if cols_text:
|
|
text.append("[COLUMNS]")
|
|
text.extend(cols_text)
|
|
return "\n".join(text)
|
|
except ImportError:
|
|
logger.error("xlrd not installed, cannot parse .xls files")
|
|
return ""
|
|
except Exception as e2:
|
|
logger.error(f"Failed to parse Excel with xlrd: {e2}")
|
|
return ""
|