Coverage for drivers/cowutil.py : 66%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Copyright (C) 2024 Vates SAS
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union, override
19from abc import ABC, abstractmethod
20from enum import IntEnum
22import errno
23import time
25import util
27from vditype import VdiType
29# ------------------------------------------------------------------------------
31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8
33class ImageFormat(IntEnum):
34 RAW = 1
35 VHD = 2 | IMAGE_FORMAT_COW_FLAG
36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG
38IMAGE_FORMAT_TO_STR: Final = {
39 ImageFormat.RAW: "raw",
40 ImageFormat.VHD: "vhd",
41 ImageFormat.QCOW2: "qcow2"
42}
44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()}
46# ------------------------------------------------------------------------------
48def parseImageFormats(str_formats: Optional[str], default_formats: List[ImageFormat]) -> List[ImageFormat]:
49 if not str_formats: 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never false
50 return default_formats
52 entries = [entry.strip() for entry in str_formats.split(",")]
54 image_formats: List[ImageFormat] = []
55 for entry in entries:
56 image_format = STR_TO_IMAGE_FORMAT.get(entry)
57 if image_format:
58 image_formats.append(image_format)
60 if image_formats:
61 return image_formats
63 return default_formats
65# ------------------------------------------------------------------------------
67class CowImageInfo(object):
68 uuid = ""
69 path = ""
70 sizeVirt = -1
71 sizePhys = -1
72 sizeAllocated = -1
73 hidden = False
74 parentUuid = ""
75 parentPath = ""
76 error: Any = 0
78 def __init__(self, uuid):
79 self.uuid = uuid
81# ------------------------------------------------------------------------------
83class CowUtil(ABC):
84 class CheckResult(IntEnum):
85 Success = 0
86 Fail = 1
87 Unavailable = 2
89 @abstractmethod
90 def getMinImageSize(self) -> int:
91 pass
93 @abstractmethod
94 def getMaxImageSize(self) -> int:
95 pass
97 @abstractmethod
98 def getBlockSize(self, path: str) -> int:
99 pass
101 @abstractmethod
102 def getFooterSize(self) -> int:
103 pass
105 @abstractmethod
106 def getDefaultPreallocationSizeVirt(self) -> int:
107 pass
109 @abstractmethod
110 def getMaxChainLength(self) -> int:
111 pass
113 @abstractmethod
114 def calcOverheadEmpty(self, virtual_size: int) -> int:
115 pass
117 @abstractmethod
118 def calcOverheadBitmap(self, virtual_size: int) -> int:
119 pass
121 @abstractmethod
122 def getInfo(
123 self,
124 path: str,
125 extractUuidFunction: Callable[[str], str],
126 includeParent: bool = True,
127 resolveParent: bool = True,
128 useBackupFooter: bool = False
129 ) -> CowImageInfo:
130 pass
132 @abstractmethod
133 def getInfoFromLVM(
134 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str
135 ) -> Optional[CowImageInfo]:
136 pass
138 @abstractmethod
139 def getAllInfoFromVG(
140 self,
141 pattern: str,
142 extractUuidFunction: Callable[[str], str],
143 vgName: Optional[str] = None,
144 parents: bool = False,
145 exitOnError: bool = False
146 ) -> Dict[str, CowImageInfo]:
147 pass
149 @abstractmethod
150 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]:
151 pass
153 @abstractmethod
154 def getParentNoCheck(self, path: str) -> Optional[str]:
155 pass
157 @abstractmethod
158 def hasParent(self, path: str) -> bool:
159 pass
161 @abstractmethod
162 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None:
163 pass
165 @abstractmethod
166 def getHidden(self, path: str) -> bool:
167 pass
169 @abstractmethod
170 def setHidden(self, path: str, hidden: bool = True) -> None:
171 pass
173 @abstractmethod
174 def getSizeVirt(self, path: str) -> int:
175 pass
177 @abstractmethod
178 def setSizeVirt(self, path: str, size: int, jFile: str) -> None:
179 pass
181 @abstractmethod
182 def setSizeVirtFast(self, path: str, size: int) -> None:
183 pass
185 @abstractmethod
186 def getMaxResizeSize(self, path: str) -> int:
187 pass
189 @abstractmethod
190 def getSizePhys(self, path: str) -> int:
191 pass
193 @abstractmethod
194 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None:
195 pass
197 @abstractmethod
198 def getAllocatedSize(self, path: str) -> int:
199 pass
201 @abstractmethod
202 def getResizeJournalSize(self) -> int:
203 pass
205 @abstractmethod
206 def killData(self, path: str) -> None:
207 pass
209 @abstractmethod
210 def getDepth(self, path: str) -> int:
211 pass
213 @abstractmethod
214 def getBlockBitmap(self, path: str) -> bytes:
215 pass
217 @abstractmethod
218 def coalesce(self, path: str) -> int:
219 pass
221 @abstractmethod
222 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None:
223 pass
225 @abstractmethod
226 def snapshot(
227 self,
228 path: str,
229 parent: str,
230 parentRaw: bool,
231 msize: int = 0,
232 checkEmpty: bool = True
233 ) -> None:
234 pass
236 @abstractmethod
237 def canSnapshotRaw(self, size: int) -> bool:
238 pass
240 @abstractmethod
241 def check(
242 self,
243 path: str,
244 ignoreMissingFooter: bool = False,
245 fast: bool = False
246 ) -> 'CowUtil.CheckResult':
247 pass
249 @abstractmethod
250 def revert(self, path: str, jFile: str) -> None:
251 pass
253 @abstractmethod
254 def repair(self, path: str) -> None:
255 pass
257 @abstractmethod
258 def validateAndRoundImageSize(self, size: int) -> int:
259 pass
261 @abstractmethod
262 def getKeyHash(self, path: str) -> Optional[str]:
263 pass
265 @abstractmethod
266 def setKey(self, path: str, key_hash: str) -> None:
267 pass
269 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True
270 # If not, both function should raise NotImplementedError
271 @abstractmethod
272 def isCoalesceableOnRemote(self) -> bool:
273 pass
275 @abstractmethod
276 def coalesceOnline(self, path: str) -> int:
277 pass
279 @abstractmethod
280 def cancelCoalesceOnline(self, path: str) -> None:
281 pass
283 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]:
284 """
285 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well.
286 """
287 chain = {}
288 vdis: Dict[str, CowImageInfo] = {}
289 retries = 0
290 while (not vdis):
291 if retries > 60: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true
292 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries)
293 util.SMlog('ERROR: the image metadata might be corrupted')
294 break
295 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True)
296 if (not vdis): 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true
297 retries = retries + 1
298 time.sleep(1)
299 for uuid, vdi in vdis.items(): 299 ↛ 300line 299 didn't jump to line 300, because the loop on line 299 never started
300 chain[uuid] = vdi.path
301 #util.SMlog("Parent chain for %s: %s" % (lvName, chain))
302 return chain
304 @staticmethod
305 def isCowImage(image_format: ImageFormat) -> bool:
306 return bool(image_format & IMAGE_FORMAT_COW_FLAG)
308 @staticmethod
309 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]:
310 return util.ioretry(
311 lambda: util.pread2(cmd, text=text),
312 errlist=[errno.EIO, errno.EAGAIN]
313 )
315# ------------------------------------------------------------------------------
317def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat:
318 if vdi_type == VdiType.RAW or vdi_type == VdiType.PHY:
319 return ImageFormat.RAW
320 if vdi_type == VdiType.VHD:
321 return ImageFormat.VHD
322 if vdi_type == VdiType.QCOW2: 322 ↛ 325line 322 didn't jump to line 325, because the condition on line 322 was never false
323 return ImageFormat.QCOW2
325 assert False, f"Unsupported vdi type: {vdi_type}"
327def getImageStringFromVdiType(vdi_type: str) -> str:
328 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)]
330def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str:
331 if image_format == ImageFormat.RAW:
332 return VdiType.RAW
333 if image_format == ImageFormat.VHD:
334 return VdiType.VHD
335 if image_format == ImageFormat.QCOW2:
336 return VdiType.QCOW2
338 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}"
340# ------------------------------------------------------------------------------
342def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil:
343 import vhdutil
344 import qcow2util
346 if image_format in (ImageFormat.RAW, ImageFormat.VHD):
347 return vhdutil.VhdUtil()
349 if image_format == ImageFormat.QCOW2: 349 ↛ 352line 349 didn't jump to line 352, because the condition on line 349 was never false
350 return qcow2util.QCowUtil()
352 assert False, f"Unsupported image format: {image_format}"
354def getCowUtil(vdi_type: str) -> CowUtil:
355 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))