Direct disk access from python

image

I’ll tell you today about how I tried to get from the python to the hard drive interface, and what came of it.

I periodically need to test a large number of hard drives. Typically, Victoria’s dos is loaded on the net. She tests drives one at a time, which is not very convenient. In addition, boards that do not have IDE mode have recently gone, which additionally complicates the task. At first, I had the idea to take ready-made software for Linux with open sources and add the ability to test several disks in parallel. After a quick search, the depressing state of this area in Linux was revealed. From the software conducting statistics on testing access time to sectors and types of errors when testing, I found only whdd. An attempt to deal with the whdd code ended in complete failure. For me, never a programmer, the code seemed very confusing. In addition, most of it is not at all working with iron.

Having realized that no simple solution is expected, I decided to try to write a similar program myself. Realizing that I could not master a similar project in C, I began to study the possibility of direct work with disks from python, which I often use to solve simple problems and love for simplicity and comprehensibility. The cat cried for information on this issue on the network, but nevertheless I found out that there is a fcntl module which, among other things, allows sending requests to the ioctl device. Now I have the opportunity to send commands to the disk. But in Linux, all disks are considered scsi disks, and for testing, you need to transfer ata commands directly to the disk. It turned out that there is an ATA Command Pass-Through mechanism that allows you to wrap an ata command in an scsi request. The basic information on how to use this was obtained from the sources of the sg3_utils project.

In order to create structures in python similar to structures of the C language, for their subsequent transfer to ioctl, there is a ctypes module. Separately, it is worth mentioning the amount of gray hair that appeared as a result of debugging strange glitches with these structures. So I discovered the knowledge about alignment of structures in C. As a result, two structures were born:

Structure for ATA Pass-Through:

class ataCmd(ctypes.Structure):
    _pack_ = 1
    _fields_ = [
        ('opcode', ctypes.c_ubyte),
        ('protocol', ctypes.c_ubyte),
        ('flags', ctypes.c_ubyte),
        ('features', ctypes.c_ushort),
        ('sector_count', ctypes.c_ushort),
        ('lba_h_low', ctypes.c_ubyte),
        ('lba_low', ctypes.c_ubyte),
        ('lba_h_mid', ctypes.c_ubyte),
        ('lba_mid', ctypes.c_ubyte),
        ('lba_h_high', ctypes.c_ubyte),
        ('lba_high', ctypes.c_ubyte),
        ('device', ctypes.c_ubyte),
        ('command', ctypes.c_ubyte),
        ('control', ctypes.c_ubyte)]

And structure for ioctl:

class sgioHdr(ctypes.Structure):
    _pack_ = 1
    _fields_ = [
        ('interface_id', ctypes.c_int),      # [i] 'S' for SCSI generic (required)
        ('dxfer_direction', ctypes.c_int),   # [i] data transfer direction
        ('cmd_len', ctypes.c_ubyte),         # [i] SCSI command length ( <= 16 bytes)
        ('mx_sb_len', ctypes.c_ubyte),       # [i] max length to write to sbp
        ('iovec_count', ctypes.c_ushort),    # [i] 0 implies no scatter gather
        ('dxfer_len', ctypes.c_uint),        # [i] byte count of data transfer
        ('dxferp', ctypes.c_void_p),         # [i], [*io] points to data transfer memory
        ('cmdp', ctypes.c_void_p),           # [i], [*i] points to command to perform
        ('sbp', ctypes.c_void_p),            # [i], [*o] points to sense_buffer memory
        ('timeout', ctypes.c_uint),          # [i] MAX_UINT->no timeout (unit: millisec)
        ('flags', ctypes.c_uint),            # [i] 0 -> default, see SG_FLAG...
        ('pack_id', ctypes.c_int),           # [i->o] unused internally (normally)
        ('usr_ptr', ctypes.c_void_p),        # [i->o] unused internally
        ('status', ctypes.c_ubyte),          # [o] scsi status
        ('masked_status', ctypes.c_ubyte),   # [o] shifted, masked scsi status
        ('msg_status', ctypes.c_ubyte),      # [o] messaging level data (optional)
        ('sb_len_wr', ctypes.c_ubyte),       # [o] byte count actually written to sbp
        ('host_status', ctypes.c_ushort),    # [o] errors from host adapter
        ('driver_status', ctypes.c_ushort),  # [o] errors from software driver
        ('resid', ctypes.c_int),             # [o] dxfer_len - actual_transferred
        ('duration', ctypes.c_uint),         # [o] time taken by cmd (unit: millisec)
        ('info', ctypes.c_uint)]             # [o] auxiliary information

Since filling these structures is required before each disk operation and takes up a lot of space, this operation is carried out in a separate function. In multibyte values, you need to change the byte order.

def prepareSgio(cmd, feature, count, lba, direction, sense, buf):
    if direction == SG_DXFER_FROM_DEV:
        buf_len = ctypes.sizeof(buf)
        buf_p = ctypes.cast(buf, ctypes.c_void_p)
        prot = 4 << 1  # PIO Data-In
    elif direction == SG_DXFER_TO_DEV:
        buf_len = ctypes.sizeof(buf)
        buf_p = ctypes.cast(buf, ctypes.c_void_p)
        prot = 5 << 1  # PIO Data-Out
    else:
        buf_len = 0
        buf_p = None
        prot = 3 << 1  # Non-data
    if cmd != 0xb0:  # not SMART COMMAND
        prot = prot | 1  # + EXTEND
    sector_lba = lba.to_bytes(6, byteorder='little')
    ata_cmd = ataCmd(opcode=0x85,  # ATA PASS-THROUGH (16)
                     protocol=prot,
                     # flags field
                     # OFF_LINE = 0 (0 seconds offline)
                     # CK_COND = 1 (copy sense data in response)
                     # T_DIR = 1 (transfer from the ATA device)
                     # BYT_BLOK = 1 (length is in blocks, not bytes)
                     # T_LENGTH = 2 (transfer length in the SECTOR_COUNT field)
                     flags=0x2e,
                     features=swap16(feature),
                     sector_count=swap16(count),
                     lba_h_low=sector_lba[3], lba_low=sector_lba[0],
                     lba_h_mid=sector_lba[4], lba_mid=sector_lba[1],
                     lba_h_high=sector_lba[5], lba_high=sector_lba[2],
                     device=0,
                     command=cmd,
                     control=0)
    sgio = sgioHdr(interface_id=ASCII_S, dxfer_direction=direction,
                   cmd_len=ctypes.sizeof(ata_cmd),
                   mx_sb_len=ctypes.sizeof(sense), iovec_count=0,
                   dxfer_len=buf_len,
                   dxferp=buf_p,
                   cmdp=ctypes.addressof(ata_cmd),
                   sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=1000,
                   flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0,
                   msg_status=0, sb_len_wr=0, host_status=0, driver_status=0,
                   resid=0, duration=0, info=0)
    return sgio

This function takes the ata command, parameters and buffers and returns a ready-made structure for the ioctl request. Then everything is simple. We create a buffer in which the status of the command execution and the contents of the ata status and error registers will be returned. Create a buffer for the sector read from disk. We fill in the structures and execute our first ata team.

sense = ctypes.c_buffer(64)
identify = ctypes.c_buffer(512)
sgio = prepareSgio(0xec, 0, 0, 0, SG_DXFER_FROM_DEV, sense, identify)  # IDENTIFY
with open(dev, 'r') as fd:
    if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0:
        return None  # fcntl failed!

In response, we get a sector with disk information:

0000000: 5a04 ff3f 37c8 1000 0000 0000 3f00 0000  Z..?7.......?...
0000010: 0000 0000 2020 2020 2020 4b4a 3131 3142  ....      KJ111B
0000020: 3942 5647 4142 4659 0300 5fea 3800 4b4a  9BVGABFY.._.8.KJ
0000030: 4f41 3341 4145 6948 6174 6863 2069 5548  OA3AAEiHathc iUH
0000040: 3741 3232 3230 4130 414c 3333 2030 2020  7A2220A0AL33 0  
0000050: 2020 2020 2020 2020 2020 2020 2020 1080                ..
0000060: 0040 002f 0040 0002 0002 0700 ff3f 1000  .@./.@.......?..
0000070: 3f00 10fc fb00 0001 ffff ff0f 0000 0700  ?...............
0000080: 0300 7800 7800 7800 7800 0000 0000 0000  ..x.x.x.x.......
0000090: 0000 0000 0000 1f00 0617 0000 5e00 4400  ............^.D.
00000a0: fc01 2900 6b34 697d 7347 6934 41bc 6347  ..).k4i}sGi4A.cG
00000b0: 7f40 0401 0000 0000 feff 0000 0000 0800  .@..............
00000c0: ca00 f900 1027 0000 b088 e0e8 0000 0000  .....'..........
00000d0: ca00 0000 0000 875a 0050 a2cc cb22 44fc  .......Z.P..."D.
00000e0: 0000 0000 0000 0000 0000 0000 0000 1440  ...............@
00000f0: 1440 0000 0000 0000 0000 0000 0000 0000  .@..............
0000100: 0100 0b00 0000 0000 8020 f10d 20fa 0100  ......... .. ...
0000110: 0040 0404 0403 0000 0000 0502 0604 0504  .@..............
0000120: 0506 0803 0506 0504 0505 0603 0505 0000  ................
0000130: 3741 3342 0000 0a78 0000 bd5d d3a1 0080  7A3B...x...]....
0000140: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000150: 0200 0000 0000 0000 0000 0000 0000 0000  ................
0000160: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000170: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000180: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000190: 0000 0000 0000 0000 0000 0000 3d00 0000  ............=...
00001a0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001b0: 0000 201c 0000 0000 0000 0000 1f10 2100  .. ...........!.
00001c0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001d0: 0000 0000 0100 e003 0000 0000 0000 0000  ................
00001e0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001f0: 0000 0000 0000 0000 0000 0000 0000 a503  ................

It contains complete information about the disk, extract the main one.

    serial = swapString(identify[20:40])
    firmware = swapString(identify[46:53])
    model = swapString(identify[54:93])
    sectors = int.from_bytes(identify[200] + identify[201] + identify[202] + identify[203] +
                             identify[204] + identify[205] + identify[206] + identify[207], byteorder='little')

As a result, we get:

model: Hitachi HUA722020ALA330; firmware: JKAOA3; serial number: JK11A1YAJE2N5V; number of sectors: 3907029168.

Now we can send ata commands to the disk and receive responses from it. Slowly, the result of my work took shape in a library containing the implementation of the basic set of ata commands, including reading SMART. Anyone interested can take a look at her here . Do not scold the quality of the code, I'm not a magician programmer, I'm just learning.

Now it remains to write a testing utility with its help. I feel many more discoveries await me.

Upd:
Recommended by amaraoI rewrote the library using classes and exceptions. I also decided that the name sgio was misleading regarding the purpose of the library. The library is now called atapt and is available on GitHub and through pip. There is an example of use on the github.

Also popular now: