import os import sys BASE_DIR: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) if BASE_DIR not in sys.path: sys.path.insert(0, BASE_DIR) import requests from tqdm import tqdm from datetime import datetime from typing import Optional, Dict, Any, Union, List from logger_utils.logger_operations import LoggerOperations app_logger: LoggerOperations = LoggerOperations() from dms_utils.src.config.constants import ( ANTZAPI_SERVER_URL, ANTZAPI_ACCESS_KEY ) class AntzServerUtils: def __init__(self, chunk_size: int = 50 * 1024 * 1024) -> None: self.server_url: str = ANTZAPI_SERVER_URL self.access_key: str = ANTZAPI_ACCESS_KEY self.chunk_size: int = chunk_size app_logger.info("AntzServerUtils initialized with chunk size of {} bytes.".format(self.chunk_size)) def get_access_key(self, access_details: Optional[Dict[str, Any]] = None) -> str: access_key: str = self.access_key if access_details and 'accessKey' in access_details: access_key = access_details['accessKey'] app_logger.debug("Access key overridden via access_details.") return access_key def upload(self, data_details: Dict[str, Any], access_details: Optional[Dict[str, Any]] = None) -> Dict[str, Union[bool, Optional[str]]]: local_file_path: Optional[str] = data_details.get('local_file_path') folder_path: str = data_details.get('folder_path', datetime.now().strftime("%d_%m_%y")) if not local_file_path: app_logger.error("local_file_path not provided in data_details.") return {"status": False, "pathUrl": None, "error": "local_file_path is required in data_details"} access_key: str = self.get_access_key(access_details) headers: Dict[str, str] = {'X-Access-Key': access_key} payload: Dict[str, str] = {'path': folder_path} file_name: str = os.path.basename(local_file_path) try: with open(local_file_path, 'rb') as file_obj: files = [('files[]', (file_name, file_obj, 'image/png'))] app_logger.info(f"Uploading file {file_name} to {self.server_url} in folder {folder_path}.") response = requests.post(self.server_url, headers=headers, data=payload, files=files) if response.status_code == 200: response_data: Dict[str, Any] = response.json() success: bool = response_data.get("success", False) path: Optional[str] = response_data["data"][0]["path"] if "data" in response_data and response_data["data"] else None if success and path: app_logger.info(f"Upload successful. File path: {path}") return {"status": True, "pathUrl": path, "error": None} else: error_msg: str = response_data.get("message", "Unknown error during upload") app_logger.warning(f"Upload failed with message: {error_msg}") return {"status": False, "pathUrl": None, "error": error_msg} else: app_logger.error(f"Upload failed. Status code: {response.status_code}") return {"status": False, "pathUrl": None, "error": "Failed to upload file"} except Exception as e: app_logger.error(f"Upload failed with exception: {str(e)}") return {"status": False, "pathUrl": None, "error": f"Upload failed with exception: {str(e)}"} def upload_folder(self, data_details: Dict[str, Any], access_details: Optional[Dict[str, Any]] = None) -> Dict[str, Union[bool, Optional[List[str]], Optional[str]]]: local_folder_path: Optional[str] = data_details.get('localFolderPath') folder_path: str = data_details.get('folder_path', datetime.now().strftime("%d_%m_%y")) if not local_folder_path: app_logger.error("localFolderPath not provided in data_details.") return {"status": False, "pathUrls": None, "error": "localFolderPath is required in data_details"} uploaded_files: List[str] = [] for filename in os.listdir(local_folder_path): local_file_path: str = os.path.join(local_folder_path, filename) if os.path.isfile(local_file_path): app_logger.debug(f"Uploading file {filename} from folder {local_folder_path}.") response = self.upload({"local_file_path": local_file_path, "folder_path": folder_path}, access_details) if response['status']: uploaded_files.append(response['pathUrl']) else: app_logger.warning(f"Failed to upload {filename}. Aborting batch upload.") return response app_logger.info(f"Successfully uploaded {len(uploaded_files)} files from folder.") return {"status": True, "pathUrls": uploaded_files, "error": None} def download(self, data_details: Dict[str, Any], access_details: Optional[Dict[str, Any]] = None) -> Dict[str, Union[bool, Optional[str]]]: file_url: Optional[str] = data_details.get('fileUrl') local_path: Optional[str] = data_details.get('localPath') file_name: Optional[str] = data_details.get('fileName', None) if not file_url or not local_path: app_logger.error("fileUrl or localPath not provided in data_details.") return {"status": False, "local_file_path": None, "error": "fileUrl and localPath are required in data_details"} try: app_logger.info(f"Starting download from {file_url}") response = requests.get(file_url, stream=True) response.raise_for_status() file_size: int = int(response.headers.get('Content-Length', 0)) if file_name is None: file_name = os.path.basename(file_url) output_path: str = os.path.join(local_path, file_name) with open(output_path, 'wb') as file, tqdm(total=file_size, unit="B", unit_scale=True) as progress_bar: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) progress_bar.update(len(chunk)) app_logger.info(f"Successfully downloaded to {output_path}") return {"status": True, "local_file_path": output_path, "error": None} except requests.exceptions.RequestException as e: app_logger.error(f"Error downloading file: {e}") return {"status": False, "local_file_path": None, "error": str(e)}