Deleting All Snapshots
The following script removes all snapshots in a production volume. Use this script for operations that use SnapOPC+ to back up generation-managed differential data. For more information, see the description of Generation Managed Differential Data Backup with SnapOPC+.
The sequence of script processing is as follows.
Use GET /volume/{volume_id}/copysession API to get the business volume snapshot information.
Determine the Copy Session ID of the most recent Snapshot from the information in 1.
Using DELETE /copysession/{copysession_id} API to force the removal of the latest generation of SnapOPC+ Session for which the copy source is the business volume.
Use DELETE /volume API to remove all the Volumes that are used as the SnapOPC+ Copy destinations.
snapopcplus_delete.py
import sys
from datetime import datetime
from eternus_rest import EtdxBaseApi
# Change the following values for your environment.
storage_url = "https://192.168.0.1:5665"
storage_user_name = "username"
storage_password = "password"
# Change the following values as required.
# Volume ID of the source volume of SnapOPC+ to delete.
volume_id = 100001
class EtdxApi(EtdxBaseApi):
def __init__(self, base_url, boxid=None, proxies=None):
super().__init__(base_url, boxid=boxid, proxies=proxies)
def snapopcplus_get_snapshot_list(self, volume_id):
"""Get the SnapOPC+ snapshots about specified volume.
Args:
volume_id (int): Volume ID of the source volume of SnapOPC+ to get.
Returns:
list[obj]: A list of snapshot objects of SnapOPC+. Returns empty
list if fails to get information.
"""
r = super().get("/api/v1/volume/{0!s}/copysession?backup_type=snapshot"
"&is_manual_snapshot=false".format(volume_id))
if r.status_code != 200 or r.json()["backup_volume_list"] == []:
print("Failed to get snapshots or the volume has no "
"SnapOPC+ snapshots.")
print(r)
print(r.json())
return []
return r.json()["backup_volume_list"]
def snapopcplus_delete_copysession(self, copysession_id):
"""Delete the copy session.
Args:
copysession_id (int): Copy session ID to delete.
Returns:
bool: True if successful, False otherwise.
"""
r = super().delete("/api/v1/copysession/{0!s}?force=true"
.format(copysession_id))
if r.status_code != 202:
print("Failed to request the SnapOPC+ copy session deletion.")
print(r)
print(r.json())
return False
# Wait for the job completion
job_r = super().wait_job(r.json()["job_id"])
if job_r.json()["status"] != "Success":
print("Failed to delete the SnapOPC+ copy session.")
print(job_r)
print(job_r.json())
return False
return True
def delete_volumes(self, volume_id_list):
"""Delete the volumes.
Args:
volume_id_list (list[int]): List of Volume ID to delete.
Returns:
list[int]: Deleted Volume ID list.
"""
# If the number of volume_id_list is over 256 (maximum number of the
# volumes to delete at once), call the API command multiple times.
max_delete_volumes = 256
remain_list = list(set(volume_id_list))
deleted_volume_id_list = []
while len(remain_list) > 0:
delete_list = remain_list[:max_delete_volumes]
remain_list = list(set(remain_list) - set(delete_list))
body = {
"volume_id_list": delete_list
}
r = super().delete("/api/v1/volume", body)
if r.status_code != 202:
print("Failed to request the volumes deletion.")
print(r)
print(r.json())
continue
# Wait for the job completion
job_r = super().wait_job(r.json()["job_id"])
if job_r.json()["status"] != "Success":
print("Failed to delete the volumes.")
print(job_r)
print(job_r.json())
continue
deleted_volume_id_list.extend(delete_list)
return deleted_volume_id_list
def snapopcplus_delete(self, volume_id):
"""Delete the SnapOPC+ copy session and their volumes.
Args:
volume_id (int): Volume ID of the source volume of SnapOPC+
to delete.
Returns:
bool: True if successful, False otherwise.
"""
# Get snapshots of SnapOPC+
snapshot_list = self.snapopcplus_get_snapshot_list(volume_id)
if snapshot_list == []:
return False
snapshot_volume_id_list = \
[int(i["volume_href"].split("/")[4]) for i in snapshot_list]
# Get the latest SnapOPC+ copy session and delete it
sorted_snapshot_list = \
sorted(snapshot_list, key=lambda x: datetime.strptime(
x["backup_time"], '%Y-%m-%dT%H:%M:%SZ'))
latest_copysession_id = \
int(sorted_snapshot_list[-1]["copysession_href"].split("/")[4])
print("Deleting the SnapOPC+ copy session (ID: {0!s})."
.format(latest_copysession_id))
if not self.snapopcplus_delete_copysession(latest_copysession_id):
return False
print("The SnapOPC+ copy session was deleted.")
# Delete the snapshots of SnapOPC+
print("Deleting the volumes that had been used by SnapOPC+.")
deleted_volume_id_list = self.delete_volumes(snapshot_volume_id_list)
if set(deleted_volume_id_list) != set(snapshot_volume_id_list):
print("Failed to delete some volumes:",
list(set(snapshot_volume_id_list)
- set(deleted_volume_id_list)))
print("Deleted volumes:", deleted_volume_id_list)
return set(deleted_volume_id_list) == set(snapshot_volume_id_list)
def main():
storage = EtdxApi(storage_url)
# Login
if not storage.login(storage_user_name, storage_password):
return False
# Delete the SnapOPC+ sessions and snapshots.
if not storage.snapopcplus_delete(volume_id):
print("Failed to delete SnapOPC+ copy session and/or some snapshots.")
return False
print("Succeeded to delete SnapOPC+ copy session and all snapshots.")
# Logout
storage.logout()
return True
if __name__ == '__main__':
if not main():
sys.exit(1)

