URL: http://github.com/Dstack-TEE/dstack/pull/241.diff
= backup_dir + self.state_file = state_file + self.full_interval_seconds = full_interval_seconds + self.inc_interval_seconds = inc_interval_seconds + self.max_backups = max_backups + self.vm_filter = vm_filter + self.state = self._load_state() + + def _load_state(self) -> Dict: + """Load backup state from JSON file""" + if not self.state_file.exists(): + return {} + + try: + with open(self.state_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + logger.warning( + "Failed to load state file, starting with empty state") + return {} + + def _save_state(self): + """Save backup state to JSON file""" + with open(self.state_file, 'w') as f: + json.dump(self.state, f, indent=2) + + def get_running_vms(self, vm_filter: Optional[str] = None) -> List[Dict[str, str]]: + """Returns a list of running VMs with their IDs and names""" + logger.info("Getting list of running VMs...") + vms = [] + + if not self.vms_dir.exists(): + logger.warning(f"VMs directory {self.vms_dir} does not exist") + return vms + + # Iterate through VM directories + for vm_dir in self.vms_dir.iterdir(): + if not vm_dir.is_dir(): + logger.debug(f"Skipping non-directory {vm_dir}") + continue + + vm_id = vm_dir.name + if vm_filter and vm_filter.strip() not in vm_id: + continue + pid_file = vm_dir / "qemu.pid" + + if not pid_file.exists(): + logger.debug(f"No PID file found for VM {vm_id}") + continue + + try: + pid = int(pid_file.read_text().strip()) + os.kill(pid, 0) + except (ValueError, ProcessLookupError): + logger.debug(f"No running process found for VM {vm_id}") + continue + except OSError as e: + logger.debug(f"Failed to check process for VM {vm_id}: {e}") + if e.errno == 1: # Operation not permitted + pass + else: + continue + + manifest_file = vm_dir / "vm-manifest.json" + if not manifest_file.exists(): + logger.debug(f"No manifest file found for VM {vm_id}") + continue + + qmp_socket = vm_dir / "qmp.sock" + if not qmp_socket.exists(): + logger.debug(f"No QMP socket found for VM {vm_id}") + continue + + try: + with open(manifest_file, 'r') as f: + manifest = json.load(f) + image = manifest.get('image') or "" + if not image.startswith("dstack-"): + logger.debug( + f"Image {image} is not a dstack image, skipping") + continue + version_tuple = parse_version(image) + if version_tuple < (0, 5, 0): + hd = "hd0" + else: + hd = "hd1" + vm_name = manifest.get( + 'name') or manifest.get('id') or vm_id + + vms.append({ + 'id': vm_id, + 'name': vm_name, + 'hd': hd + }) + logger.debug(f"Found running VM: {vm_name} ({vm_id})") + except (json.JSONDecodeError, IOError) as e: + logger.error(f"Failed to read manifest for VM {vm_id}: {e}") + + logger.info(f"Found {len(vms)} running VMs") + + return vms + + def get_last_backup_time(self, vm_id: str, backup_type: str) -> Optional[int]: + """Get the timestamp of the last backup of specified type for a VM""" + logger.debug( + f"Looking for {backup_type} backup timestamp for VM {vm_id}") + + if vm_id not in self.state: + logger.debug(f"VM {vm_id} not found in state file") + return None + + timestamp = self.state[vm_id].get(backup_type) + if timestamp: + logger.debug( + f"Retrieved {backup_type} backup timestamp for VM {vm_id}: {timestamp}") + + return timestamp + + def update_backup_time(self, vm_id: str, backup_type: str): + """Update the timestamp for a backup type""" + current_time = int(time.time()) + + if vm_id not in self.state: + self.state[vm_id] = {} + + self.state[vm_id][backup_type] = current_time + self._save_state() + + logger.debug( + f"Updated {backup_type} backup timestamp for VM {vm_id} to {datetime.fromtimestamp(current_time)}") + + def _backup_vm_configs(self, vm_dir: Path, backup_dir: Path) -> bool: + """Backup VM configuration files as tar.gz""" + vm_configs = ["vm-manifest.json", "shared/"] + config_archive = backup_dir / "vm-configs.tar.gz" + + logger.info(f"Creating VM config backup: {config_archive}") + + try: + with tarfile.open(config_archive, 'w:gz') as tar: + for config_item in vm_configs: + config_path = vm_dir / config_item + if config_path.exists(): + # Add to archive with relative path + tar.add(config_path, arcname=config_item) + logger.debug(f"Added {config_item} to config archive") + else: + logger.warning( + f"Config item {config_item} not found, skipping") + + logger.info(f"VM config backup completed: {config_archive}") + return True + except Exception as e: + logger.error(f"Failed to create VM config backup: {e}") + return False + + def perform_backup(self, vm_id: str, vm_name: str, backup_type: str, hd: str) -> bool: + """Perform a backup for the specified VM""" + logger.info(f"Performing {backup_type} backup...") + + # Convert to absolute paths + vm_dir = self.vms_dir.resolve() / vm_id + backup_dir = self.backup_dir.resolve() / vm_id / "backups" + qmp_socket = vm_dir / "qmp.sock" + backup_lock = vm_dir / "backup.lock" + + # Create backup directory if it doesn't exist + backup_dir.mkdir(parents=True, exist_ok=True) + + # Set backup level based on type + backup_level = "full" if backup_type == "full" else "inc" + + vm_configs = ["vm-manifest.json", "shared/"] + + # Create or update latest symlink + latest_dir = backup_dir / "latest" + + def do_backup(): + # For full backups, clear bitmaps first + if backup_level == "full": + logger.info( + f"Clearing bitmaps for full backup of VM {vm_name}") + if qmp_socket.exists(): + try: + # Use absolute path for qmp_socket + abs_qmp_socket = qmp_socket.resolve() + result = subprocess.Popen( + ["qmpbackup", "--debug", "--socket", + str(abs_qmp_socket), "cleanup", "--remove-bitmap"], + stdout=sys.stdout, + stderr=sys.stderr + ) + returncode = result.wait() + if returncode != 0: + logger.warning( + f"Failed to clear bitmaps for VM {vm_name} ({vm_id})") + # Continue anyway as this might be the first backup + except Exception as e: + logger.error(f"Error clearing bitmaps: {e}") + return False + else: + logger.error(f"QMP socket not found at {qmp_socket}") + return False + + # Perform the backup + logger.info(f"Running qmpbackup") + + # Convert to absolute paths for qmpbackup + abs_qmp_socket = qmp_socket.resolve() + abs_latest_dir = latest_dir.resolve() + + logger.debug( + f"Running: qmpbackup --socket {abs_qmp_socket} backup -i {hd} --no-subdir -t {abs_latest_dir} -l {backup_level}") + if qmp_socket.exists(): + try: + # Use Popen for real-time output + process = subprocess.Popen( + [ + "qmpbackup", + "--debug", + "--socket", str(abs_qmp_socket), + "backup", + "-i", hd, + "--no-subdir", + "-t", str(abs_latest_dir), + "-l", backup_level + ], + stdout=sys.stdout, + stderr=sys.stderr, + text=True, + bufsize=1 # Line buffered + ) + + # Get return code + returncode = process.wait() + if returncode == 0: + logger.info(f"Disk backup successful") + + # Backup VM configuration files + if not self._backup_vm_configs(vm_dir, abs_latest_dir): + logger.error("VM config backup failed") + return False + + self.update_backup_time(vm_id, backup_type) + + # Rotate backups if needed + if backup_type == "full": + self._rotate_backups(vm_id) + return True + else: + logger.error("Backup failed") + return False + except Exception as e: + logger.error(f"Error performing backup: {e}") + return False + else: + logger.error(f"QMP socket not found at {qmp_socket}") + return False + + if backup_level == "full": + # Create timestamped directory for this backup + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + backup_timestamp_dir = backup_dir / f"{timestamp}" + logger.info(f"Creating backup directory: {backup_timestamp_dir}") + backup_timestamp_dir.mkdir(parents=True, exist_ok=True) + try: + latest_dir.unlink() + except FileNotFoundError: + pass + latest_dir.symlink_to(timestamp) + + try: + backup_lock.touch(exist_ok=False) + locked = True + except Exception as e: + logger.error(f"Error creating backup lock: {e}") + locked = False + if locked: + try: + suc = do_backup() + except Exception as e: + logger.error(f"Error performing backup: {e}") + suc = False + finally: + backup_lock.unlink() + else: + suc = False + if backup_type == "full": + if not suc: + logger.info( + f"Removing {os.path.basename(backup_timestamp_dir)}") + try: + shutil.rmtree(backup_timestamp_dir) + except Exception as e: + logger.error(f"Error removing old backup: {e}") + else: + pass + + return suc + + def needs_backup(self, vm_id: str) -> Optional[str]: + """Determine if a VM needs a backup and what type""" + current_time = int(time.time()) + last_full = self.get_last_backup_time(vm_id, "full") + last_full_ts = datetime.fromtimestamp(last_full) if last_full else None + last_incremental = self.get_last_backup_time(vm_id, "incremental") + last_incremental_ts = datetime.fromtimestamp( + last_incremental) if last_incremental else None + + logger.debug(f"Last full backup: {last_full_ts}") + logger.debug(f"Last incremental backup: {last_incremental_ts}") + + # Determine if we need a full backup based on configured interval + if not last_full or (current_time - last_full) > self.full_interval_seconds: + return "full" + # Determine if we need an incremental backup based on configured interval + elif not last_incremental or (current_time - last_incremental) > self.inc_interval_seconds: + return "incremental" + else: + return None + + def _rotate_backups(self, vm_id): + """Remove old backups to keep only max_backups""" + backup_dir = self.backup_dir.resolve() / vm_id / "backups" + if not backup_dir.exists(): + return + + # Get all backup directories (excluding 'latest' symlink) + backup_dirs = [d for d in backup_dir.iterdir() + if d.is_dir() and d.name != "latest"] + + # Sort by name (which is timestamp format) + backup_dirs.sort() + + # If we have more backups than max_backups, remove the oldest ones + if len(backup_dirs) > self.max_backups: + logger.info( + f"Rotating backups for VM {vm_id}, keeping {self.max_backups} most recent") + for old_dir in backup_dirs[:-self.max_backups]: + logger.info( + f"Removing old backup: {os.path.basename(old_dir)}") + try: + shutil.rmtree(old_dir) + except Exception as e: + logger.error(f"Failed to remove old backup {old_dir}: {e}") + + def run(self): + """Main entry point for the backup scheduler""" + logger.info("=" * 80) + logger.info(f"Starting backup scheduler") + logger.info(f"Using VMs directory: {self.vms_dir}") + logger.info(f"Using backup directory: {self.backup_dir}") + + # Get list of running VMs + vms = self.get_running_vms(self.vm_filter) + + if not vms: + logger.info("No running VMs found") + return + + total_vms = len(vms) + # Process each VM + for i, vm in enumerate(vms): + vm_id = vm['id'] + vm_name = vm['name'] + hd = vm['hd'] + + logger.info("-" * 50) + logger.info( + f"[{i+1}/{total_vms}] Processing VM: {vm_name} ({vm_id})") + + # Check if backup is needed + backup_type = self.needs_backup(vm_id) + + if not backup_type: + logger.info(f"No backup needed") + continue + + # Perform backup + start_time = time.time() + if self.perform_backup(vm_id, vm_name, backup_type, hd): + elapsed_time = time.time() - start_time + logger.info( + f"{backup_type} backup completed successfully (total time: {elapsed_time:.2f}s)") + else: + elapsed_time = time.time() - start_time + logger.error( + f"{backup_type} backup failed (time elapsed: {elapsed_time:.2f}s)") + + logger.info("-" * 50) + logger.info("Backup scheduler run completed") + + +def parse_interval(interval_str): + """Parse interval string like '7d' or '12h' into seconds""" + if not interval_str: + raise ValueError("Interval cannot be empty") + + # Get the unit (last character) and value (everything else) + unit = interval_str[-1].lower() + try: + value = int(interval_str[:-1]) + except ValueError: + raise ValueError( + f"Invalid interval format: {interval_str}. Expected format like '7d' or '12h'") + + # Convert to seconds based on unit + if unit == 'd': + return value * 24 * 60 * 60 # days to seconds + elif unit == 'h': + return value * 60 * 60 # hours to seconds + elif unit == 'm': + return value * 60 # minutes to seconds + elif unit == 's': + return value # already in seconds + else: + raise ValueError( + f"Unknown time unit: {unit}. Use d (days), h (hours), m (minutes), or s (seconds)") + + +def parse_args(): + """Parse command line arguments""" + # First parse just the vmm-work-dir to use it for defaults + temp_parser = argparse.ArgumentParser(add_help=False) + temp_parser.add_argument("--vmm-work-dir", type=Path, default=".") + temp_args, _ = temp_parser.parse_known_args() + vmm_work_dir = temp_args.vmm_work_dir + + # Now create the real parser with all arguments + parser = argparse.ArgumentParser( + description="Periodic backup script for dstack VMM") + + # Add all arguments with proper defaults + parser.add_argument("--vmm-work-dir", type=Path, + default=".", + help="dstack-vmm work directory") + parser.add_argument("--vms-dir", type=Path, + default=vmm_work_dir / "run" / "vm", + help="Directory containing VM run data") + parser.add_argument("--backup-dir", type=Path, + default=vmm_work_dir / "run" / "backup", + help="Directory for storing backups") + parser.add_argument("--log-file", type=Path, + default=vmm_work_dir / "logs" / "backup.log", + help="Log file path (with rotation enabled)") + parser.add_argument("--state-file", type=Path, + default=vmm_work_dir / "state" / "backup_state.json", + help="File for storing backup state") + parser.add_argument("--full-interval", type=str, + default="7d", + help="Interval for full backups (e.g., 7d for 7 days)") + parser.add_argument("--inc-interval", type=str, + default="1d", + help="Interval for incremental backups (e.g., 1d for 1 day)") + parser.add_argument("--max-backups", type=int, default=4, + help="Maximum number of full backups to keep per VM") + parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)") + parser.add_argument("--vm-filter", type=str, help="Filter VMs by ID") + + # Parse all arguments + args = parser.parse_args() + + # Parse interval strings into seconds + args.full_interval_seconds = parse_interval(args.full_interval) + args.inc_interval_seconds = parse_interval(args.inc_interval) + + return args + + +def setup_logging(log_file: Path, log_level: str = "INFO"): + """Set up logging configuration""" + # Create logs directory if it doesn't exist + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Map string log level to logging constants + log_level_map = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL + } + + # Get the numeric log level (default to INFO if invalid) + numeric_level = log_level_map.get(log_level.upper(), logging.INFO) + + # Create a rotating file handler (10MB size limit, 3 backup files) + file_handler = RotatingFileHandler( + log_file, maxBytes=10*1024*1024, backupCount=3) + file_handler.setLevel(numeric_level) + file_handler.setFormatter(logging.Formatter( + "[%(asctime)s] %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")) + + # Create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(numeric_level) + console_handler.setFormatter(logging.Formatter( + "[%(asctime)s] %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(numeric_level) + + # Remove any existing handlers + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Add our handlers + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + +def main(): + """Main entry point""" + try: + # Parse command line arguments + args = parse_args() + + # Set up logging with specified log file and log level + setup_logging(args.log_file, args.log_level) + + # Create directories if they don't exist + args.backup_dir.mkdir(parents=True, exist_ok=True) + args.state_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize and run scheduler + scheduler = BackupScheduler( + args.vms_dir, args.backup_dir, args.state_file, + args.full_interval_seconds, args.inc_interval_seconds, + args.max_backups, args.vm_filter) + scheduler.run() + except Exception as e: + logger.error(f"Error: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/vmm/Cargo.toml b/vmm/Cargo.toml index 6283f494c..a085bd98e 100644 --- a/vmm/Cargo.toml +++ b/vmm/Cargo.toml @@ -44,6 +44,8 @@ hex_fmt.workspace = true lspci.workspace = true base64.workspace = true serde-human-bytes.workspace = true +serde-duration.workspace = true +chrono.workspace = true [dev-dependencies] insta.workspace = true diff --git a/vmm/rpc/proto/vmm_rpc.proto b/vmm/rpc/proto/vmm_rpc.proto index ab51519ef..c2a70bf1c 100644 --- a/vmm/rpc/proto/vmm_rpc.proto +++ b/vmm/rpc/proto/vmm_rpc.proto @@ -31,6 +31,8 @@ message VmInfo { string shutdown_progress = 12; // Image version string image_version = 13; + // Backup in progress + bool backup_in_progress = 14; } message Id { @@ -223,6 +225,41 @@ message GpuInfo { bool is_free = 4; } +message BackupDiskRequest { + // vm id + string vm_id = 1; + // full or incremental + string level = 2; +} + +message BackupInfo { + // Group id + string backup_id = 1; + // id of the snapshot + string snapshot_id = 2; + // timestamp + string timestamp = 3; + // level: full or incremental + string level = 4; + // size of the backup in bytes + uint64 size = 5; +} + +message ListBackupsResponse { + repeated BackupInfo backups = 1; +} + +message DeleteBackupRequest { + string vm_id = 1; + string backup_id = 2; +} + +message RestoreBackupRequest { + string vm_id = 1; + string backup_id = 2; + string snapshot_id = 3; +} + // Service definition for dstack-vmm service Vmm { // RPC to create a VM @@ -261,4 +298,16 @@ service Vmm { // List GPUs rpc ListGpus(google.protobuf.Empty) returns (ListGpusResponse); + + // Backup a VM data disk + rpc BackupDisk(BackupDiskRequest) returns (google.protobuf.Empty); + + // List backups for a VM + rpc ListBackups(BackupDiskRequest) returns (ListBackupsResponse); + + // Delete a backup + rpc DeleteBackup(DeleteBackupRequest) returns (google.protobuf.Empty); + + // Restore a backup + rpc RestoreBackup(RestoreBackupRequest) returns (google.protobuf.Empty); } diff --git a/vmm/src/app.rs b/vmm/src/app.rs index be464fca4..bb32f71e9 100644 --- a/vmm/src/app.rs +++ b/vmm/src/app.rs @@ -6,7 +6,9 @@ use dstack_kms_rpc::kms_client::KmsClient; use dstack_types::shared_filenames::{ compat_v3, APP_COMPOSE, ENCRYPTED_ENV, INSTANCE_INFO, SYS_CONFIG, USER_CONFIG, }; -use dstack_vmm_rpc::{self as pb, GpuInfo, StatusRequest, StatusResponse, VmConfiguration}; +use dstack_vmm_rpc::{ + self as pb, BackupInfo, GpuInfo, StatusRequest, StatusResponse, VmConfiguration, +}; use fs_err as fs; use guest_api::client::DefaultClient as GuestClient; use id_pool::IdPool; @@ -16,9 +18,10 @@ use serde_json::json; use std::collections::{BTreeSet, HashMap}; use std::net::IpAddr; use std::path::{Path, PathBuf}; +use std::process::Command; use std::sync::{Arc, Mutex, MutexGuard}; use supervisor_client::SupervisorClient; -use tracing::{error, info}; +use tracing::{error, info, warn}; pub use image::{Image, ImageInfo}; pub use qemu::{VmConfig, VmWorkDir}; @@ -125,6 +128,18 @@ impl App { VmWorkDir::new(self.config.run_path.join(id)) } + fn backups_dir(&self, id: &str) -> PathBuf { + self.config.cvm.backup.path.join(id).join("backups") + } + + fn backup_dir(&self, id: &str, backup_id: &str) -> PathBuf { + self.backups_dir(id).join(backup_id) + } + + fn backup_file(&self, id: &str, backup_id: &str, snapshot_id: &str) -> PathBuf { + self.backup_dir(id, backup_id).join(snapshot_id) + } + pub fn new(config: Config, supervisor: SupervisorClient) -> Self { let cid_start = config.cvm.cid_start; let cid_end = cid_start.saturating_add(config.cvm.cid_pool_size); @@ -647,6 +662,258 @@ impl App { } Ok(()) } + + pub(crate) async fn backup_disk(&self, id: &str, level: &str) -> Result<()> { + if !self.config.cvm.backup.enabled { + bail!("Backup is not enabled"); + } + let work_dir = self.work_dir(id); + let backup_dir = self.backups_dir(id); + + // Determine backup level based on the backup_type + let backup_level = match level { + "full" => "full", + "incremental" => "inc", + _ => bail!("Invalid backup level: {level}"), + }; + + let qmp_socket = work_dir.qmp_socket(); + let _lock = BackupLock::try_lock(work_dir.backup_lock_file()) + .context("Failed to lock for backup")?; + + let id = id.to_string(); + tokio::task::spawn_blocking(move || { + let latest_dir = backup_dir.join("latest"); + if backup_level == "full" { + // clear the bitmaps + let output = Command::new("qmpbackup") + .arg("--socket") + .arg(&qmp_socket) + .arg("cleanup") + .arg("--remove-bitmap") + .output() + .context("Failed to clear bitmaps")?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + warn!("Failed to clear bitmaps for {id}: {stderr}"); + } + // Switch to new dir and symbol link the latest to it + let timestamp = chrono::Utc::now().format("%Y%m%dZ%H%M%S").to_string(); + let new_dir = backup_dir.join(×tamp); + fs::create_dir_all(&new_dir).context("Failed to create backup directory")?; + if fs::symlink_metadata(&latest_dir).is_ok() { + fs::remove_file(&latest_dir) + .context("Failed to remove latest directory link")?; + } + fs::os::unix::fs::symlink(×tamp, &latest_dir) + .context("Failed to create latest directory link")?; + } + let output = Command::new("qmpbackup") + .arg("--socket") + .arg(&qmp_socket) + .arg("backup") + .arg("-i") + .arg("hd1") + .arg("--no-subdir") + .arg("-t") + .arg(&latest_dir) + .arg("-l") + .arg(backup_level) + .output() + .context("Failed to execute qmpbackup command")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + warn!("Failed to backup disk for {id}: {stderr}"); + } + Ok(()) + }) + .await + .context("Failed to execute backup task")? + } + + pub(crate) async fn list_backups(&self, id: &str) -> Result