D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
Filename :
snapshot.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import json import logging import os import pwd import shutil import sys import zlib from collections.abc import Iterator from contextlib import contextmanager from typing import BinaryIO SNAPSHOT_PATH = "/var/lve/snapshots" SNAPSHOT_EXT = ".snapshot" SNAPSHOT_EXT_LEN = len(SNAPSHOT_EXT) class Snapshot: """ Class responsible for loading and saving snapshot files for the interval. The files will be saved in a format of: `/var/lve/snapshots/[uid]/dump_time.snapshot` dump_time is the timestamp/integer. The directories `/var/lve/snapshots/[uid]` and the dump files themselves will be owned by user. They will not be readable by other users. """ def __init__(self, incident: dict, compresslevel: int = 1) -> None: self.compresslevel = compresslevel self.incident = incident self.log = logging.getLogger("lib-snapshot") def save(self, data: dict) -> str: dump_date = data["dump_time"] assert dump_date is not None # convert possible non-ascii data to unicode self._replace_unicode_data(data) json_compressed = zlib.compress(json.dumps(data).encode(), self.compresslevel) with self.create_file(dump_date) as f: f.write(json_compressed) self.log.debug("Snapshot dumped to file %s", f.name) return f.name @staticmethod def _to_unicode(obj): if isinstance(obj, bytes): return obj.decode("utf-8", "replace") return obj def _replace_unicode_data(self, data: dict) -> None: u_queries = [] for query in data.get("snap_sql", []): u_queries.append(list(map(self._to_unicode, query))) data["snap_sql"] = u_queries u_urls = [] for http in data.get("snap_http", []): u_urls.append(list(map(self._to_unicode, http))) data["snap_http"] = u_urls def get_file_list(self) -> list[str]: dir_ = self.get_dir() if os.path.isdir(dir_): return os.listdir(dir_) return [] def get_ts_list( self, from_ts: int | None, to_ts: int | None, ) -> list[int]: """ Return ordered list of timestamps when snapshots for this user were created. :param from_ts: Start timestamp for filtering snapshots (inclusive). If None, starts from the earliest snapshot. :param to_ts: End timestamp for filtering snapshots (inclusive). If None, includes all snapshots up to the latest. :return: List of timestamps ordered for the given period. """ return self.snapshot_filter(self.get_file_list(), from_ts, to_ts) def get_snapshots( self, from_ts: int | None, to_ts: int | None, ) -> Iterator[dict]: """ Get snapshots for a given period. Yields one snapshot at a time for memory efficiency. :param from_ts: Start timestamp for filtering snapshots (inclusive). If None, starts from the earliest snapshot. :param to_ts: End timestamp for filtering snapshots (inclusive). If None, includes all snapshots up to the latest. :return: Iterator of snapshot dictionaries. """ for ts in self.get_ts_list(from_ts, to_ts): try: filename = self.get_file_name(self.ts_to_name(ts)) if not os.geteuid(): with drop_privileges(self.incident["uid"]): content = self.read_file_content(filename) else: content = self.read_file_content(filename) snapshot_data = json.loads(content) yield snapshot_data except (ValueError, OSError) as ve: self.log.warning( "Corrupted file: %s (%s)", self.get_file_name(self.ts_to_name(ts)), str(ve), ) def read_file_content(self, filename: str) -> str: with open(filename, "rb") as f: content = f.read() try: content = zlib.decompress(content) except zlib.error: compressed_content = zlib.compress(content, self.compresslevel) with open(filename, "wb") as f: f.write(compressed_content) return content.decode() def get_incident_snapshots(self) -> list: """ Load all snapshots for given incident :return: list of snapshots """ return list(self.get_snapshots(self.incident["incident_start_time"], self.incident["incident_end_time"])) def get_dir(self) -> str: return os.path.join(SNAPSHOT_PATH, str(self.incident["uid"])) def get_file_name(self, name: str) -> str: return os.path.join(self.get_dir(), name) def create_file(self, dump_date: int) -> BinaryIO: """ create file, change its ownership & permissions if needed. Create directories if needed as well :param dump_date: int timestamp used as file name :return: open File object """ dir_ = self.get_dir() if not os.path.exists(dir_): try: # sacrifice security if we cannot setup ownership properly os.makedirs(dir_) os.chmod(dir_, 0o751) os.chown(dir_, self.incident["uid"], 0) except OSError as e: self.log.error("Unable to create dir %s (%s)", dir_, str(e)) file_name = self.get_file_name(self.ts_to_name(dump_date)) with drop_privileges(self.incident["uid"]): file_ = open(file_name, "wb") # pylint: disable=consider-using-with try: os.fchmod(file_.fileno(), 0o400) except OSError as e: self.log.error("Unable to set file permissions %s (%s)", file_name, str(e)) return file_ def delete_old(self, to_ts: int) -> None: """ Delete old snapshots. If there are no more :param to_ts: up to which timestamp to remove snapshots :return: None """ _dir = self.get_dir() files = os.listdir(_dir) all_snapshots = self.snapshot_filter(files) ts_to_remove = self.snapshot_filter(files, to_ts=to_ts) if all_snapshots == ts_to_remove: shutil.rmtree(_dir, ignore_errors=True) else: for ts in ts_to_remove: os.remove(self.get_file_name(self.ts_to_name(ts))) @staticmethod def get_ts(file_: str) -> int | None: if file_.endswith(SNAPSHOT_EXT): ts = file_[0:-SNAPSHOT_EXT_LEN] if ts.isdigit(): try: return int(ts) except ValueError: pass return None @staticmethod def snapshot_filter( files: list[str], from_ts: int | None = None, to_ts: int | None = None, ) -> list[int]: if from_ts is None: from_ts = 0 if to_ts is None: to_ts = sys.maxsize result = [] for filename in files: ts = Snapshot.get_ts(filename) if ts is not None and from_ts <= ts <= to_ts: result.append(ts) return sorted(result) @staticmethod def ts_to_name(ts: int) -> str: return str(ts) + SNAPSHOT_EXT @contextmanager def drop_privileges(uid: int): old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups() gid = pwd.getpwnam("nobody")[3] os.setgroups([]) os.setegid(gid) os.seteuid(uid) try: yield finally: os.seteuid(old_uid) os.setegid(old_gid) os.setgroups(old_groups)