Coverage for drivers/cowutil.py : 66%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Copyright (C) 2024 Vates SAS
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union, override
19from abc import ABC, abstractmethod
20from enum import IntEnum
22import errno
23import time
25import util
27from vditype import VdiType
29# ------------------------------------------------------------------------------
31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8
33class ImageFormat(IntEnum):
34 RAW = 1
35 VHD = 2 | IMAGE_FORMAT_COW_FLAG
36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG
38IMAGE_FORMAT_TO_STR: Final = {
39 ImageFormat.RAW: "raw",
40 ImageFormat.VHD: "vhd",
41 ImageFormat.QCOW2: "qcow2"
42}
44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()}
46# ------------------------------------------------------------------------------
48def parseImageFormats(str_formats: Optional[str], default_formats: List[ImageFormat]) -> List[ImageFormat]:
49 if not str_formats: 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never false
50 return default_formats
52 entries = [entry.strip() for entry in str_formats.split(",")]
54 image_formats: List[ImageFormat] = []
55 for entry in entries:
56 image_format = STR_TO_IMAGE_FORMAT.get(entry)
57 if image_format:
58 image_formats.append(image_format)
60 if image_formats:
61 return image_formats
63 return default_formats
65# ------------------------------------------------------------------------------
67class CowImageInfo(object):
68 uuid = ""
69 path = ""
70 sizeVirt = -1
71 sizePhys = -1
72 sizeAllocated = -1
73 hidden = False
74 parentUuid = ""
75 parentPath = ""
76 error: Any = 0
78 def __init__(self, uuid):
79 self.uuid = uuid
81# ------------------------------------------------------------------------------
83class CowUtil(ABC):
84 class CheckResult(IntEnum):
85 Success = 0
86 Fail = 1
87 Unavailable = 2
89 @abstractmethod
90 def getMinImageSize(self) -> int:
91 pass
93 @abstractmethod
94 def getMaxImageSize(self) -> int:
95 pass
97 @abstractmethod
98 def getBlockSize(self, path: str) -> int:
99 pass
101 @abstractmethod
102 def getFooterSize(self) -> int:
103 pass
105 @abstractmethod
106 def getDefaultPreallocationSizeVirt(self) -> int:
107 pass
109 @abstractmethod
110 def getMaxChainLength(self) -> int:
111 pass
113 @abstractmethod
114 def calcOverheadEmpty(self, virtual_size: int, block_size: Optional[int] = None) -> int:
115 pass
117 @abstractmethod
118 def calcOverheadBitmap(self, virtual_size: int) -> int:
119 pass
121 @abstractmethod
122 def getInfo(
123 self,
124 path: str,
125 extractUuidFunction: Callable[[str], str],
126 includeParent: bool = True,
127 resolveParent: bool = True,
128 useBackupFooter: bool = False
129 ) -> CowImageInfo:
130 pass
132 @abstractmethod
133 def getInfoFromLVM(
134 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str
135 ) -> Optional[CowImageInfo]:
136 pass
138 @abstractmethod
139 def getAllInfoFromVG(
140 self,
141 pattern: str,
142 extractUuidFunction: Callable[[str], str],
143 vgName: Optional[str] = None,
144 parents: bool = False,
145 exitOnError: bool = False
146 ) -> Dict[str, CowImageInfo]:
147 pass
149 @abstractmethod
150 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]:
151 pass
153 @abstractmethod
154 def getParentNoCheck(self, path: str) -> Optional[str]:
155 pass
157 @abstractmethod
158 def hasParent(self, path: str) -> bool:
159 pass
161 @abstractmethod
162 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None:
163 pass
165 @abstractmethod
166 def getHidden(self, path: str) -> bool:
167 pass
169 @abstractmethod
170 def setHidden(self, path: str, hidden: bool = True) -> None:
171 pass
173 @abstractmethod
174 def getSizeVirt(self, path: str) -> int:
175 pass
177 @abstractmethod
178 def setSizeVirt(self, path: str, size: int, jFile: str) -> None:
179 pass
181 @abstractmethod
182 def setSizeVirtFast(self, path: str, size: int) -> None:
183 pass
185 @abstractmethod
186 def getMaxResizeSize(self, path: str) -> int:
187 pass
189 @abstractmethod
190 def getSizePhys(self, path: str) -> int:
191 pass
193 @abstractmethod
194 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None:
195 pass
197 @abstractmethod
198 def getAllocatedSize(self, path: str) -> int:
199 pass
201 @abstractmethod
202 def getResizeJournalSize(self) -> int:
203 pass
205 @abstractmethod
206 def killData(self, path: str) -> None:
207 pass
209 @abstractmethod
210 def getDepth(self, path: str) -> int:
211 pass
213 @abstractmethod
214 def getBlockBitmap(self, path: str) -> bytes:
215 pass
217 @abstractmethod
218 def coalesce(self, path: str) -> int:
219 pass
221 @abstractmethod
222 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None:
223 pass
225 @abstractmethod
226 def snapshot(
227 self,
228 path: str,
229 parent: str,
230 parentRaw: bool,
231 msize: int = 0,
232 checkEmpty: bool = True,
233 is_mirror_image: bool = False
234 ) -> None:
235 pass
237 @abstractmethod
238 def canSnapshotRaw(self, size: int) -> bool:
239 pass
241 @abstractmethod
242 def check(
243 self,
244 path: str,
245 ignoreMissingFooter: bool = False,
246 fast: bool = False
247 ) -> 'CowUtil.CheckResult':
248 pass
250 @abstractmethod
251 def revert(self, path: str, jFile: str) -> None:
252 pass
254 @abstractmethod
255 def repair(self, path: str) -> None:
256 pass
258 @abstractmethod
259 def validateAndRoundImageSize(self, size: int) -> int:
260 pass
262 @abstractmethod
263 def getKeyHash(self, path: str) -> Optional[str]:
264 pass
266 @abstractmethod
267 def setKey(self, path: str, key_hash: str) -> None:
268 pass
270 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True
271 # If not, both function should raise NotImplementedError
272 @abstractmethod
273 def isCoalesceableOnRemote(self) -> bool:
274 pass
276 @abstractmethod
277 def coalesceOnline(self, path: str) -> int:
278 pass
280 @abstractmethod
281 def cancelCoalesceOnline(self, path: str) -> None:
282 pass
284 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]:
285 """
286 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well.
287 """
288 chain = {}
289 vdis: Dict[str, CowImageInfo] = {}
290 retries = 0
291 while (not vdis):
292 if retries > 60: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries)
294 util.SMlog('ERROR: the image metadata might be corrupted')
295 break
296 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True)
297 if (not vdis): 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 retries = retries + 1
299 time.sleep(1)
300 for uuid, vdi in vdis.items(): 300 ↛ 301line 300 didn't jump to line 301, because the loop on line 300 never started
301 chain[uuid] = vdi.path
302 #util.SMlog("Parent chain for %s: %s" % (lvName, chain))
303 return chain
305 @staticmethod
306 def isCowImage(image_format: ImageFormat) -> bool:
307 return bool(image_format & IMAGE_FORMAT_COW_FLAG)
309 @staticmethod
310 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]:
311 return util.ioretry(
312 lambda: util.pread2(cmd, text=text),
313 errlist=[errno.EIO, errno.EAGAIN]
314 )
316# ------------------------------------------------------------------------------
318def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat:
319 if vdi_type == VdiType.RAW or vdi_type == VdiType.PHY:
320 return ImageFormat.RAW
321 if vdi_type == VdiType.VHD:
322 return ImageFormat.VHD
323 if vdi_type == VdiType.QCOW2: 323 ↛ 326line 323 didn't jump to line 326, because the condition on line 323 was never false
324 return ImageFormat.QCOW2
326 assert False, f"Unsupported vdi type: {vdi_type}"
328def getImageStringFromVdiType(vdi_type: str) -> str:
329 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)]
331def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str:
332 if image_format == ImageFormat.RAW:
333 return VdiType.RAW
334 if image_format == ImageFormat.VHD:
335 return VdiType.VHD
336 if image_format == ImageFormat.QCOW2:
337 return VdiType.QCOW2
339 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}"
341# ------------------------------------------------------------------------------
343def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil:
344 import vhdutil
345 import qcow2util
347 if image_format in (ImageFormat.RAW, ImageFormat.VHD):
348 return vhdutil.VhdUtil()
350 if image_format == ImageFormat.QCOW2: 350 ↛ 353line 350 didn't jump to line 353, because the condition on line 350 was never false
351 return qcow2util.QCowUtil()
353 assert False, f"Unsupported image format: {image_format}"
355def getCowUtil(vdi_type: str) -> CowUtil:
356 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))