LinuxQuestions.org

LinuxQuestions.org (/questions/)
-   Programming (https://www.linuxquestions.org/questions/programming-9/)
-   -   Access raw device from multiple processes? (https://www.linuxquestions.org/questions/programming-9/access-raw-device-from-multiple-processes-4175508601/)

gwiesenekker 06-20-2014 05:01 AM

Access raw device from multiple processes?
 
Hi,

I want to use the physical sectors of an SSD partition as a cache for uncompressed data stored compressed on a SATA disk. I created a raw device for the SSD partition. This raw device will be written and read by multiple (OpenMPI) processes simultaneously using open(), write() and read().

My questions are:
Is writing to and reading from the same raw device from multiple processes supported?
Is writing to a physical sector on a raw device cached?
Is writing to a physical sector on a raw device atomic or do you have to use semaphores?
If it is atomic is the behaviour of the concurrent write() calls similar to concurrent creat(..., 0) calls meaning that only one write() call will succeed and the other write() calls will fail?

Regards,
Gijsbert

gwiesenekker 06-21-2014 06:19 AM

Based on the test program below that 20% of the time writes sectors with random data secured by a 64-bit checksum and 80% of the time reads the sectors back and validates the 64-bit checksum the answers on 64-bit Centos 6.5 seem to be:

Is writing to and reading from the same raw device from multiple processes supported?
Yes, because the open(), pwrite() and pread() calls never fail.
Is writing to a physical sector on a raw device cached?
Yes. The test program returned corrupt non-zero sectors. I found out I forgot the close(raw_fd) call, so it looks like these corrupt non-zero sectors were caused by the sectors written by format_raw() not being flushed to the raw device.
Is writing to a physical sector on a raw device atomic or do you have to use semaphores?
Although the writes are cached they are atomic because I do not use semaphores and I can interrupt the test program and launch it again and it never finds corrupt sectors.
If it is atomic is the behaviour of the concurrent write() calls similar to concurrent creat(..., 0) calls meaning that only one write() call will succeed and the other write() calls will fail?
The pwrite() calls never fail, so it looks like each write always completes, overruling any previous writes.

The output of the test program for 8 processes writing and reading 7 physical sectors is:

13:00:30-21/06/2014@ nraw_writes=12406 nraw_reads=50000 nraw_corrupt=0
13:00:30-21/06/2014@ iproc=0 nraw_reads_proc=6173 nraw_writes_proc=12401
13:00:30-21/06/2014@ iproc=1 nraw_reads_proc=5970 nraw_writes_proc=12545
13:00:30-21/06/2014@ iproc=2 nraw_reads_proc=7645 nraw_writes_proc=12545
13:00:30-21/06/2014@ iproc=3 nraw_reads_proc=3054 nraw_writes_proc=7424
13:00:30-21/06/2014@ iproc=4 nraw_reads_proc=5890 nraw_writes_proc=12462
13:00:30-21/06/2014@ iproc=5 nraw_reads_proc=7734 nraw_writes_proc=14920
13:00:30-21/06/2014@ iproc=6 nraw_reads_proc=7528 nraw_writes_proc=14813
13:00:30-21/06/2014@ iproc=7 nraw_reads_proc=6006 nraw_writes_proc=12491
13:00:31-21/06/2014@ nraw_writes=14970 nraw_reads=60000 nraw_corrupt=0
13:00:31-21/06/2014@ iproc=0 nraw_reads_proc=8011 nraw_writes_proc=14960
13:00:31-21/06/2014@ iproc=1 nraw_reads_proc=5970 nraw_writes_proc=12545
13:00:31-21/06/2014@ iproc=2 nraw_reads_proc=9791 nraw_writes_proc=15089
13:00:31-21/06/2014@ iproc=3 nraw_reads_proc=3054 nraw_writes_proc=7424
13:00:31-21/06/2014@ iproc=4 nraw_reads_proc=7953 nraw_writes_proc=14994
13:00:31-21/06/2014@ iproc=5 nraw_reads_proc=9723 nraw_writes_proc=17406
13:00:31-21/06/2014@ iproc=6 nraw_reads_proc=9492 nraw_writes_proc=17303
13:00:31-21/06/2014@ iproc=7 nraw_reads_proc=6006 nraw_writes_proc=12491

nraw_reads_proc is the number of times the test program read a sector written by a process. As these numbers are non-zero and increasing it shows that the test program can read sectors written by other processes.
nraw_writes_proc is the number of sectors written by that processes at the time the sector was written. As these numbers are increasing it shows that sectors have been updated.

Regards,
Gijsbert

Code:

#include "globals.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <malloc.h>
#include <unistd.h>

//linux/fs.h defines BLOCK_SIZE!
#undef BLOCK_SIZE

#define RAW_DEVICE "/dev/raw/raw1"
#define RAW_VERSION 0

#define CRC32_OFFSET      0
#define VERSION_OFFSET    (CRC32_OFFSET + sizeof(ui32_t))
#define MPI_ID_OFFSET    (VERSION_OFFSET + sizeof(byte_t))
#define TIME_STAMP_OFFSET (MPI_ID_OFFSET + sizeof(i16_t))
#define LENGTH_OFFSET    (TIME_STAMP_OFFSET + sizeof(ui64_t))
#define CRC64_OFFSET      (LENGTH_OFFSET + sizeof(i16_t))
#define DATA_OFFSET      (CRC64_OFFSET + sizeof(ui64_t))
#define HEADER_SIZE      (DATA_OFFSET)

#define SEM_KEY 3
#define NLOCK  32

int raw_fd;
i64_t raw_nblocks;
byte_t *raw_buffer;
i64_t nraw_writes;
i64_t nraw_reads;
i64_t nraw_header_corrupt;
i64_t nraw_data_corrupt;
i64_t nraw_hits;
i64_t nraw_too_large;

local int init = TRUE;

void init_raw(void)
{
  unsigned long ul;

  BUG(!init)

  //allow at least storage of a ui64_t
  BUG((RAW_BLOCK_SIZE - HEADER_SIZE) < sizeof(ui64_t))

  BUG((raw_fd = open(RAW_DEVICE, O_RDWR)) == -1)

  BUG(ioctl(raw_fd, BLKGETSIZE, &ul) == -1)

  //if the number of blocks is odd the last sector will not by accessible
  //for raw IO

  if ((ul % 2) == 1) --ul;
  BUG(ul < 3)
  raw_nblocks = first_prime_below(ul);

#ifdef DEBUG
  ul = 999;
  raw_nblocks = first_prime_below(ul);
#endif

  BUG(ioctl(raw_fd, BLKPBSZGET, &ul) == -1)
  BUG(ul != RAW_BLOCK_SIZE)

  raw_buffer = memalign(RAW_BLOCK_SIZE, RAW_BLOCK_SIZE);

  nraw_writes = 0;
  nraw_reads = 0;
  nraw_header_corrupt = 0;
  nraw_data_corrupt = 0;
  nraw_hits = 0;
  nraw_too_large = 0;

  my_printf("raw_fd=%d\n", raw_fd);
  my_printf("raw_nblocks=%lld\n", raw_nblocks);

  init = FALSE;
}

local void clear_raw_buffer(void)
{
  for (int i = 0; i < RAW_BLOCK_SIZE; i++) raw_buffer[i] = 0;
}

void format_raw(void)
{
  int semid;
  i64_t nzero;
  i64_t nformat;
  i64_t nacquire_sem_failed;

  BUG(init)

  semid = mpi_alloc_sem(SEM_KEY, NLOCK, TRUE);

  my_printf("formatting %lld blocks..\n", raw_nblocks);

  nzero = 0;
  nformat = 0;
  nacquire_sem_failed = 0;

  for (i64_t iblock = 0; iblock < raw_nblocks; iblock++)
  {
    if ((iblock % mpi_nprocs) == mpi_id)
    {
      int i;

      //lock does not seem to be required
      //but as we are formatting we want to be sure
      while(mpi_acquire_sem(semid, iblock % NLOCK,
                            0.0, 0.0, FALSE) == FALSE)
      {
        ++nacquire_sem_failed;
      }

      BUG(pread(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)

      for (i = 0; i < RAW_BLOCK_SIZE; i++)
        if (raw_buffer[i] != 0) break;
   
      if (i == RAW_BLOCK_SIZE)
      {
        ++nzero;
      }
      else
      {
        clear_raw_buffer();

        BUG(pwrite(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                  iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)
        ++nformat;
      }

      mpi_release_sem(semid, iblock % NLOCK);
    }
    if ((iblock % 100000) == 0) my_printf("%lld\n", iblock);
  }

  my_printf("..done\n");
  my_printf("nzero=%lld nformat=%lld nacquire_sem_failed=%lld\n",
    nzero, nformat, nacquire_sem_failed);

  my_mpi_barrier("after format_raw", MPI_COMM_WORLD, INVALID);

  MPI_Allreduce(MPI_IN_PLACE, &nzero, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  MPI_Allreduce(MPI_IN_PLACE, &nformat, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  BUG((nzero + nformat) != raw_nblocks)

  mpi_free_sem(semid);
}

local ui64_t return_time(void)
{
  struct timespec time;

  clock_gettime(CLOCK_REALTIME, &time);
  return(time.tv_sec * 1000000000 + time.tv_nsec);
}

void test_raw(void)
{
  i16_t iproc;
  i64_t nraw_reads_proc[MPI_NPROCS_MAX];
  i64_t nraw_writes_proc[MPI_NPROCS_MAX];

  BUG(init)

  for (iproc = 0; iproc < mpi_nprocs; iproc++)
    nraw_reads_proc[iproc] = nraw_writes_proc[iproc] = 0;

  //exercise 20% writes 80% reads
 
  my_printf("%llu\n", rdrand64());

  while(TRUE)
  {
    if ((rdrand64() % 5) == 0)
    {
      int modulo;
      i16_t length;
      ui64_t time_stamp;
      ui64_t crc64;
      ui32_t crc32;
      i64_t iblock;

      clear_raw_buffer();

      //create data

      modulo = RAW_BLOCK_SIZE - HEADER_SIZE - sizeof(ui64_t);

      length = (rdrand64() % modulo) + sizeof(ui64_t);
      BUG(length < sizeof(ui64_t))
      BUG((HEADER_SIZE + length) > RAW_BLOCK_SIZE)

      for (int i = 0; i < length; i++)
      {
        BUG((DATA_OFFSET + i) > RAW_BLOCK_SIZE)
        raw_buffer[DATA_OFFSET + i] = rdrand64() % 256;
      }

      memcpy(raw_buffer + DATA_OFFSET, &nraw_writes, sizeof(i64_t));

      //create header
 
      raw_buffer[VERSION_OFFSET] = RAW_VERSION;

      iproc = mpi_id;

      memcpy(raw_buffer + MPI_ID_OFFSET, &iproc, sizeof(i16_t));

      time_stamp = return_time();

      memcpy(raw_buffer + TIME_STAMP_OFFSET, &time_stamp, sizeof(ui64_t));

      memcpy(raw_buffer + LENGTH_OFFSET, &length, sizeof(i16_t));

      crc64 = return_crc64(raw_buffer + DATA_OFFSET, length, 1);

      memcpy(raw_buffer + CRC64_OFFSET, &crc64, sizeof(ui64_t));

      //check if everything is still OK after the memcpy
      BUG(return_crc64(raw_buffer + DATA_OFFSET, length, 1) != crc64)

      crc32 = return_crc32(raw_buffer + VERSION_OFFSET,
                          HEADER_SIZE - sizeof(ui32_t));

      memcpy(raw_buffer, &crc32, sizeof(ui32_t));

      //check if everything is still OK after the memcpy
      BUG(return_crc32(raw_buffer + VERSION_OFFSET,
                      HEADER_SIZE - sizeof(ui32_t)) != crc32)
       
      iblock = rdrand64() % raw_nblocks;

      BUG(pwrite(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)

      ++nraw_writes;

#ifdef DEBUG
      BUG(pread(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)

      //check header
      memcpy(&crc32, raw_buffer, sizeof(ui32_t));

      BUG(return_crc32(raw_buffer + VERSION_OFFSET,
                      HEADER_SIZE - sizeof(ui32_t)) != crc32)

      BUG(raw_buffer[VERSION_OFFSET] != RAW_VERSION)

      memcpy(&iproc, raw_buffer + MPI_ID_OFFSET, sizeof(i16_t));

      BUG(iproc < 0)
      BUG(iproc >= MPI_NPROCS_MAX)

      memcpy(&length, raw_buffer + LENGTH_OFFSET, sizeof(i16_t));

      BUG(length < 0)
      BUG(length > (RAW_BLOCK_SIZE - HEADER_SIZE))

      memcpy(&crc64, raw_buffer + CRC64_OFFSET, sizeof(ui64_t));

      BUG(return_crc64(raw_buffer + DATA_OFFSET, length, 1) != crc64)
#endif 
    }
    else
    {
      i64_t iblock;
      int i;

      iblock = rdrand64() % raw_nblocks;

      BUG(pread(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)

      ++nraw_reads;

      for (i = 0; i < RAW_BLOCK_SIZE; i++)
        if (raw_buffer[i] != 0) break;

      if (i < RAW_BLOCK_SIZE)
      {
        ui32_t crc32;
       
        //check header
        memcpy(&crc32, raw_buffer, sizeof(ui32_t));

        if (return_crc32(raw_buffer + VERSION_OFFSET,
                        HEADER_SIZE - sizeof(ui32_t)) != crc32)
        {
          ++nraw_header_corrupt;
        }
        else
        {
          i16_t length;
          ui64_t crc64;

          BUG(raw_buffer[VERSION_OFFSET] != RAW_VERSION)

          memcpy(&iproc, raw_buffer + MPI_ID_OFFSET, sizeof(i16_t));
 
          BUG(iproc < 0)
          BUG(iproc >= MPI_NPROCS_MAX)
 
          memcpy(&length, raw_buffer + LENGTH_OFFSET, sizeof(i16_t));
 
          BUG(length < 0)
          BUG(length > (RAW_BLOCK_SIZE - HEADER_SIZE))
 
          memcpy(&crc64, raw_buffer + CRC64_OFFSET, sizeof(ui64_t));
 
          if (return_crc64(raw_buffer + DATA_OFFSET, length, 1) != crc64)
          {
            ++nraw_data_corrupt;
          }
          else
          {
            ++nraw_reads_proc[iproc];

            memcpy(nraw_writes_proc + iproc, raw_buffer + DATA_OFFSET,
                  sizeof(i64_t));
          }
        }
      }
    }
    if ((nraw_reads % 10000) == 0)
    {
      my_printf("nraw_writes=%lld nraw_reads=%lld\n",
                nraw_writes, nraw_reads);

      MPI_Allreduce(MPI_IN_PLACE, &nraw_header_corrupt, 1,
        MPI_LONG_LONG_INT, MPI_MAX, MPI_COMM_WORLD);

      MPI_Allreduce(MPI_IN_PLACE, &nraw_data_corrupt, 1,
        MPI_LONG_LONG_INT, MPI_MAX, MPI_COMM_WORLD);

      my_printf("nraw_header_corrupt=%lld nraw_data_corrupt=%lld\n",
                nraw_header_corrupt, nraw_data_corrupt);

      //show that processors read each other's sectors (nraw_reads)
      //and read each other's data (nraw_writes)
      for (iproc = 0; iproc < mpi_nprocs; iproc++)
      {
        my_printf("iproc=%d nraw_reads_proc=%lld nraw_writes_proc=%lld\n",
          iproc, nraw_reads_proc[iproc], nraw_writes_proc[iproc]);
      }
    }
  }
}

void verify_raw(void)
{
  i64_t nzero;
  i64_t nok;
  i64_t ncorrupt;
  i64_t nheader_corrupt;
  i64_t ndata_corrupt;

  my_printf("verifying %lld blocks..\n", raw_nblocks);

  nzero = nok = 0;
  ncorrupt = nheader_corrupt = ndata_corrupt = 0;

  for (i64_t iblock = 0; iblock < raw_nblocks; iblock++)
  {
    if ((iblock % mpi_nprocs) == mpi_id)
    {
      int i;

      BUG(pread(raw_fd, raw_buffer, RAW_BLOCK_SIZE,
                iblock * RAW_BLOCK_SIZE) != RAW_BLOCK_SIZE)

      for (i = 0; i < RAW_BLOCK_SIZE; i++)
        if (raw_buffer[i] != 0) break;

      if (i < RAW_BLOCK_SIZE)
      {
        ui32_t crc32;

        //check header
        memcpy(&crc32, raw_buffer, sizeof(ui32_t));

        if (return_crc32(raw_buffer + VERSION_OFFSET,
                        HEADER_SIZE - sizeof(ui32_t)) != crc32)
        {
          ++ncorrupt;
          ++nheader_corrupt;
        }
        else
        {
          i16_t iproc;
          ui64_t crc64;
          i16_t length;

          BUG(raw_buffer[VERSION_OFFSET] != RAW_VERSION)

          memcpy(&iproc, raw_buffer + MPI_ID_OFFSET, sizeof(i16_t));

          BUG(iproc < 0)
          BUG(iproc >= MPI_NPROCS_MAX)
   
          memcpy(&length, raw_buffer + LENGTH_OFFSET, sizeof(i16_t));
   
          BUG(length < 0)
          BUG(length > (RAW_BLOCK_SIZE - HEADER_SIZE))

          memcpy(&crc64, raw_buffer + CRC64_OFFSET, sizeof(ui64_t));
   
          if (return_crc64(raw_buffer + DATA_OFFSET, length, 1) != crc64)
          {
            ++ncorrupt;
            ++ndata_corrupt;
          }
          else
          {
            ++nok;
          }
        }
      }
      else
      {
        ++nzero;
      }
    }
    if ((iblock % 100000) == 0) my_printf("%lld\n", iblock);
  }

  my_mpi_barrier("after verify_raw", MPI_COMM_WORLD, INVALID);

  MPI_Allreduce(MPI_IN_PLACE, &nzero, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  MPI_Allreduce(MPI_IN_PLACE, &nok, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  MPI_Allreduce(MPI_IN_PLACE, &ncorrupt, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  MPI_Allreduce(MPI_IN_PLACE, &nheader_corrupt, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  MPI_Allreduce(MPI_IN_PLACE, &ndata_corrupt, 1,
    MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD);

  my_printf("nzero=%lld nok=%lld\n", nzero, nok);
  my_printf("ncorrupt=%lld nheader_corrupt=%lld ndata_corrupt=%lld\n",
    ncorrupt, nheader_corrupt, ndata_corrupt);

  BUG((nzero + nok + ncorrupt) != raw_nblocks)

  if (ncorrupt > 0) my_printf("WARNING: CORRUPT SECTORS FOUND!\n");
}

void fin_raw(void)
{
  BUG(close(raw_fd) == -1)

  my_printf("nraw_writes=%lld\n", nraw_writes);
  my_printf("nraw_reads=%lld\n", nraw_reads);
  my_printf("nraw_header_corrupt=%lld\n", nraw_header_corrupt);
  my_printf("nraw_data_corrupt=%lld\n", nraw_data_corrupt);
  my_printf("nraw_hits=%lld\n", nraw_hits);
  my_printf("nraw_too_large=%lld\n", nraw_too_large);
}


gwiesenekker 06-27-2014 06:33 AM

Although format_raw is very thorough, it is much faster to format the raw device in blocks of 1 MiB:

$ time dd if=/dev/zero of=/dev/raw/raw1 bs=1048576

You can monitor the progress by sending the USR1 signal to the dd process:

166802+0 records in
166802+0 records out
174904573952 bytes (175 GB) copied, 410.168 s, 426 MB/s

Gijsbert


All times are GMT -5. The time now is 12:17 PM.