############################################################################## # # $RCSfile: win32disk.py,v $ # # Copyright (C) 2006 John Popplewell # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Author : John Popplewell # Email : john@johnnypops.demon.co.uk # Website: http://www.johnnypops.demon.co.uk/ # # $Id: win32disk.py,v 1.2 2006/08/21 23:23:58 jfp Exp $ # ############################################################################## _FILE_REVISION = "$Revision: 1.2 $" import ctypes class File: GENERIC_READ = 0x80000000L FILE_SHARE_READ = 0x0001 FILE_SHARE_WRITE = 0x0002 OPEN_EXISTING = 3 FILE_BEGIN = 0 FILE_CURRENT = 1 FILE_END = 2 kernel32 = ctypes.windll.kernel32 CreateFile = kernel32.CreateFileA CloseHandle = kernel32.CloseHandle DeviceIoControl = kernel32.DeviceIoControl ReadFile = kernel32.ReadFile SetFilePointer = kernel32.SetFilePointer GetLastError = kernel32.GetLastError FormatMessage = kernel32.FormatMessageA class DiskGeometry(ctypes.Structure): _fields_ = [ ('Cylinders', ctypes.c_longlong), ('MediaType', ctypes.c_ulong), ('TracksPerCylinder', ctypes.c_ulong), ('SectorsPerTrack', ctypes.c_ulong), ('BytesPerSector', ctypes.c_ulong), ] def __init__(self, name, mode): self.dev = self.CreateFile(name, self.GENERIC_READ, self.FILE_SHARE_READ, 0, self.OPEN_EXISTING, 0, 0) if self.dev == -1: raise IOError("[Errno %d] %s"%(self.GetLastError(), self._error_text())) self.sector_size = self._get_sector_size() self.sector_mask = ~(self.sector_size-1) self.pos = self.sector = self.offset = 0 self.block = None def read(self, size): if not self.block or self.offset+size > len(self.block): self.seek(self.pos) n_sectors = (self.pos + size - self.sector + self.sector_size)/self.sector_size self.block = self._read(max(n_sectors, 32)) data = self.block[self.offset:self.offset+size] self.pos += size self.offset += size return data def seek(self, pos): self.pos = pos sector = self.pos & self.sector_mask if sector != self.sector: self._seek(sector, 0) self.sector = sector self.block = None self.offset = self.pos - self.sector def close(self): if not self.CloseHandle(self.dev): raise IOError("[Errno %d] %s"%(self.GetLastError(), self._error_text())) self.dev = 0 def _seek(self, sector, method=FILE_BEGIN): hi = ctypes.c_ulong((sector >> 32) & 0xFFFFFFFFL) lo = self.SetFilePointer(self.dev, sector & 0xFFFFFFFFL, ctypes.byref(hi), method) if lo == 0xFFFFFFFFL and self.GetLastError(): raise IOError("[Errno %d] %s"%(self.GetLastError(), self._error_text())) return (hi.value << 32) + lo def _read(self, n_sectors): bytes_read = ctypes.c_ulong() buff = ctypes.create_string_buffer(self.sector_size*n_sectors) if not self.ReadFile(self.dev, buff, buff._length_, ctypes.byref(bytes_read), None): raise IOError("[Errno %d] %s"%(self.GetLastError(), self._error_text())) return buff def _get_sector_size(self): IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x00070000 geometry = self.DiskGeometry() bytes_read = ctypes.c_ulong() res = self.DeviceIoControl(self.dev, IOCTL_DISK_GET_DRIVE_GEOMETRY, 0, 0, ctypes.byref(geometry), ctypes.sizeof(geometry), ctypes.byref(bytes_read), 0) if not res: raise IOError("[Errno %d] %s"%(self.GetLastError(), self._error_text())) return geometry.BytesPerSector def _error_text(self): buff = ctypes.create_string_buffer(1024) self.FormatMessage(0x00001000, 0, self.GetLastError() & 0xFFFF, 0, buff, buff._length_, 0) return buff.value.strip()