Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2024 Vates SAS 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16 

17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union, override 

18 

19from abc import ABC, abstractmethod 

20from enum import IntEnum 

21 

22import errno 

23import time 

24 

25import util 

26 

27from vditype import VdiType 

28 

29# ------------------------------------------------------------------------------ 

30 

31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8 

32 

33class ImageFormat(IntEnum): 

34 RAW = 1 

35 VHD = 2 | IMAGE_FORMAT_COW_FLAG 

36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG 

37 

38IMAGE_FORMAT_TO_STR: Final = { 

39 ImageFormat.RAW: "raw", 

40 ImageFormat.VHD: "vhd", 

41 ImageFormat.QCOW2: "qcow2" 

42} 

43 

44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()} 

45 

46# ------------------------------------------------------------------------------ 

47 

48def parseImageFormats(str_formats: Optional[str], default_formats: List[ImageFormat]) -> List[ImageFormat]: 

49 if not str_formats: 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never false

50 return default_formats 

51 

52 entries = [entry.strip() for entry in str_formats.split(",")] 

53 

54 image_formats: List[ImageFormat] = [] 

55 for entry in entries: 

56 image_format = STR_TO_IMAGE_FORMAT.get(entry) 

57 if image_format: 

58 image_formats.append(image_format) 

59 

60 if image_formats: 

61 return image_formats 

62 

63 return default_formats 

64 

65# ------------------------------------------------------------------------------ 

66 

67class CowImageInfo(object): 

68 uuid = "" 

69 path = "" 

70 sizeVirt = -1 

71 sizePhys = -1 

72 sizeAllocated = -1 

73 hidden = False 

74 parentUuid = "" 

75 parentPath = "" 

76 error: Any = 0 

77 

78 def __init__(self, uuid): 

79 self.uuid = uuid 

80 

81# ------------------------------------------------------------------------------ 

82 

83class CowUtil(ABC): 

84 class CheckResult(IntEnum): 

85 Success = 0 

86 Fail = 1 

87 Unavailable = 2 

88 

89 @abstractmethod 

90 def getMinImageSize(self) -> int: 

91 pass 

92 

93 @abstractmethod 

94 def getMaxImageSize(self) -> int: 

95 pass 

96 

97 @abstractmethod 

98 def getBlockSize(self, path: str) -> int: 

99 pass 

100 

101 @abstractmethod 

102 def getFooterSize(self) -> int: 

103 pass 

104 

105 @abstractmethod 

106 def getDefaultPreallocationSizeVirt(self) -> int: 

107 pass 

108 

109 @abstractmethod 

110 def getMaxChainLength(self) -> int: 

111 pass 

112 

113 @abstractmethod 

114 def calcOverheadEmpty(self, virtual_size: int) -> int: 

115 pass 

116 

117 @abstractmethod 

118 def calcOverheadBitmap(self, virtual_size: int) -> int: 

119 pass 

120 

121 @abstractmethod 

122 def getInfo( 

123 self, 

124 path: str, 

125 extractUuidFunction: Callable[[str], str], 

126 includeParent: bool = True, 

127 resolveParent: bool = True, 

128 useBackupFooter: bool = False 

129 ) -> CowImageInfo: 

130 pass 

131 

132 @abstractmethod 

133 def getInfoFromLVM( 

134 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str 

135 ) -> Optional[CowImageInfo]: 

136 pass 

137 

138 @abstractmethod 

139 def getAllInfoFromVG( 

140 self, 

141 pattern: str, 

142 extractUuidFunction: Callable[[str], str], 

143 vgName: Optional[str] = None, 

144 parents: bool = False, 

145 exitOnError: bool = False 

146 ) -> Dict[str, CowImageInfo]: 

147 pass 

148 

149 @abstractmethod 

150 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]: 

151 pass 

152 

153 @abstractmethod 

154 def getParentNoCheck(self, path: str) -> Optional[str]: 

155 pass 

156 

157 @abstractmethod 

158 def hasParent(self, path: str) -> bool: 

159 pass 

160 

161 @abstractmethod 

162 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None: 

163 pass 

164 

165 @abstractmethod 

166 def getHidden(self, path: str) -> bool: 

167 pass 

168 

169 @abstractmethod 

170 def setHidden(self, path: str, hidden: bool = True) -> None: 

171 pass 

172 

173 @abstractmethod 

174 def getSizeVirt(self, path: str) -> int: 

175 pass 

176 

177 @abstractmethod 

178 def setSizeVirt(self, path: str, size: int, jFile: str) -> None: 

179 pass 

180 

181 @abstractmethod 

182 def setSizeVirtFast(self, path: str, size: int) -> None: 

183 pass 

184 

185 @abstractmethod 

186 def getMaxResizeSize(self, path: str) -> int: 

187 pass 

188 

189 @abstractmethod 

190 def getSizePhys(self, path: str) -> int: 

191 pass 

192 

193 @abstractmethod 

194 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None: 

195 pass 

196 

197 @abstractmethod 

198 def getAllocatedSize(self, path: str) -> int: 

199 pass 

200 

201 @abstractmethod 

202 def getResizeJournalSize(self) -> int: 

203 pass 

204 

205 @abstractmethod 

206 def killData(self, path: str) -> None: 

207 pass 

208 

209 @abstractmethod 

210 def getDepth(self, path: str) -> int: 

211 pass 

212 

213 @abstractmethod 

214 def getBlockBitmap(self, path: str) -> bytes: 

215 pass 

216 

217 @abstractmethod 

218 def coalesce(self, path: str) -> int: 

219 pass 

220 

221 @abstractmethod 

222 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None: 

223 pass 

224 

225 @abstractmethod 

226 def snapshot( 

227 self, 

228 path: str, 

229 parent: str, 

230 parentRaw: bool, 

231 msize: int = 0, 

232 checkEmpty: bool = True 

233 ) -> None: 

234 pass 

235 

236 @abstractmethod 

237 def canSnapshotRaw(self, size: int) -> bool: 

238 pass 

239 

240 @abstractmethod 

241 def check( 

242 self, 

243 path: str, 

244 ignoreMissingFooter: bool = False, 

245 fast: bool = False 

246 ) -> 'CowUtil.CheckResult': 

247 pass 

248 

249 @abstractmethod 

250 def revert(self, path: str, jFile: str) -> None: 

251 pass 

252 

253 @abstractmethod 

254 def repair(self, path: str) -> None: 

255 pass 

256 

257 @abstractmethod 

258 def validateAndRoundImageSize(self, size: int) -> int: 

259 pass 

260 

261 @abstractmethod 

262 def getKeyHash(self, path: str) -> Optional[str]: 

263 pass 

264 

265 @abstractmethod 

266 def setKey(self, path: str, key_hash: str) -> None: 

267 pass 

268 

269 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True 

270 # If not, both function should raise NotImplementedError 

271 @abstractmethod 

272 def isCoalesceableOnRemote(self) -> bool: 

273 pass 

274 

275 @abstractmethod 

276 def coalesceOnline(self, path: str) -> int: 

277 pass 

278 

279 @abstractmethod 

280 def cancelCoalesceOnline(self, path: str) -> None: 

281 pass 

282 

283 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]: 

284 """ 

285 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well. 

286 """ 

287 chain = {} 

288 vdis: Dict[str, CowImageInfo] = {} 

289 retries = 0 

290 while (not vdis): 

291 if retries > 60: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true

292 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries) 

293 util.SMlog('ERROR: the image metadata might be corrupted') 

294 break 

295 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True) 

296 if (not vdis): 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true

297 retries = retries + 1 

298 time.sleep(1) 

299 for uuid, vdi in vdis.items(): 299 ↛ 300line 299 didn't jump to line 300, because the loop on line 299 never started

300 chain[uuid] = vdi.path 

301 #util.SMlog("Parent chain for %s: %s" % (lvName, chain)) 

302 return chain 

303 

304 @staticmethod 

305 def isCowImage(image_format: ImageFormat) -> bool: 

306 return bool(image_format & IMAGE_FORMAT_COW_FLAG) 

307 

308 @staticmethod 

309 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]: 

310 return util.ioretry( 

311 lambda: util.pread2(cmd, text=text), 

312 errlist=[errno.EIO, errno.EAGAIN] 

313 ) 

314 

315# ------------------------------------------------------------------------------ 

316 

317def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat: 

318 if vdi_type == VdiType.RAW or vdi_type == VdiType.PHY: 

319 return ImageFormat.RAW 

320 if vdi_type == VdiType.VHD: 

321 return ImageFormat.VHD 

322 if vdi_type == VdiType.QCOW2: 322 ↛ 325line 322 didn't jump to line 325, because the condition on line 322 was never false

323 return ImageFormat.QCOW2 

324 

325 assert False, f"Unsupported vdi type: {vdi_type}" 

326 

327def getImageStringFromVdiType(vdi_type: str) -> str: 

328 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)] 

329 

330def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str: 

331 if image_format == ImageFormat.RAW: 

332 return VdiType.RAW 

333 if image_format == ImageFormat.VHD: 

334 return VdiType.VHD 

335 if image_format == ImageFormat.QCOW2: 

336 return VdiType.QCOW2 

337 

338 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}" 

339 

340# ------------------------------------------------------------------------------ 

341 

342def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil: 

343 import vhdutil 

344 import qcow2util 

345 

346 if image_format in (ImageFormat.RAW, ImageFormat.VHD): 

347 return vhdutil.VhdUtil() 

348 

349 if image_format == ImageFormat.QCOW2: 349 ↛ 352line 349 didn't jump to line 352, because the condition on line 349 was never false

350 return qcow2util.QCowUtil() 

351 

352 assert False, f"Unsupported image format: {image_format}" 

353 

354def getCowUtil(vdi_type: str) -> CowUtil: 

355 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))