LinuxQuestions.org

LinuxQuestions.org (/questions/)
-   Linux - Newbie (https://www.linuxquestions.org/questions/linux-newbie-8/)
-   -   Splitting a real-time data stream into multiple files (https://www.linuxquestions.org/questions/linux-newbie-8/splitting-a-real-time-data-stream-into-multiple-files-929602/)

suicidaleggroll 02-15-2012 10:29 PM

Splitting a real-time data stream into multiple files
 
Long story short, I have a system that acquires data from an acquisition board at a rate of approx 2.7MB/s. The data rate is not the problem, the problem is it's continuous, 24/7. This comes out to over 200GB per day.

My problem is the data is coming out in binary format from a closed-source program that prints to stdout. I need a way to split the constant data stream out to a new file every ~10 minutes to keep the file sizes manageable, without losing anything. I also need to be able to then re-combine the data in some way for processing.

Typically, the two codes (acquisition and processing) are run as a pair, piping the output of acquisition straight into processing, ie:

Code:

./acquisition | ./process -i /dev/stdin
I need a way to split these two processes, so the output of acquisition is saved for post-processing, ie:

Code:

./acquisition > outfile
Then sometime later on...
Code:

./process -i outfile
The way I have described works fine, but the "outfile" becomes unusably large after just a couple of hours.

On the processing front I'm envisioning some sort of timed cat with a fifo, ie:
Code:

mkfifo myfifo
./process -i myfifo
cat file1 >> myfifo
sleep 30
cat file2 >> myfifo
sleep 30
etc.

But I'm not sure what to do on the acquisition side of things.

I have a decent amount of experience in BASH scripting and FORTRAN programming, I'm just not sure what the best approach is for this situation.

rugdog 02-16-2012 11:03 AM

hi,

i assume you know when to cut the file for processing given is a binary file. I think an approach, using text files as sample, would be like this in perl:

Code:

#!/usr/bin/perl
$file_count=0;
open(FH," > outfile_$file_count");
while(<>){
        if ( $size == 10 ){
                close(FH);
                open(D,"> ready.outfile_$file_count");
                close(D);
                ++$file_count;
                open(FH," > outfile_$file_count");
                $size=0;
        }
        print FH $_;
        ++$size;
}
close(FH);

say the above script is named splitter.pl you'd call it:

./acquisition|./splitter.pl

and you'll get a list of files like this:

outfile_0
outfile_1
...
ready.outfile_0
ready.outfile_1
...

then your process program would need to chech whenever a new ready.* is there process it and remove the ready.* file.

Now here the variable size in the script will split every 10 lines, but its value is up to you depending on how much data you want to process at a time.

suicidaleggroll 02-22-2012 11:15 AM

Thanks, this works great.
The only issue is the processor usage is a bit high since this is running on a small 2GHz Atom. The acquisition code alone is around 80% CPU, and this perl script is another 10-15.

Since I can barely tell my hand from my foot in perl, is there any way to reduce the overhead? Possibly by grabbing a block of lines at once? Ideally I'd like the splitting routine's overhead to be below 5%, since if the processor becomes saturated, it can start missing samples from the acquisition board.

Dark_Helmet 02-22-2012 01:49 PM

Two thoughts come to mind.

1. There may be a way to combine tee and split (or some other similar utilities) to accomplish what you need. Though, to be honest, I cannot think of a combination using those two that doesn't have a problem.

2. A "simple" C program. In fact, I've already got something coded up, but I haven't tested it.

EDIT:
Forgot to give a usage example earlier...
Code:

./acquisition | ./simple_c_program | ./process -i /dev/stdin
/EDIT

a. Save it as whatever.c
b. Build it with "gcc -o whatever -lm whatever.c
c. Run it with "./whatever [PATTERN]" or place it somewhere in your PATH to get rid of the './'

Replace whatever with what you want the program to be named and whatever.c with whatever you save the source code as.

PATTERN is any text to use as a filename. PATTERN can include one and only one instance of '%d'. That '%d' will have a numeric value substituted for it as the program runs. It starts at 0 and increments by one for each file split. The numeric value is also zero-padded and 5-digits wide. Need a bigger pad? Modify the "#define SUBSTITUTION_WIDTH" line to be something greater. A default pattern of "acquisition-%d.dat" will be used if no PATTERN is supplied.

The numeric value will cycle back to 0 eventually. You will need to take steps to move any files (to save them from overwriting) if and when the numeric value cycles back to 0.

The program attempts to read 4KiB chunks at a time. The files will break at 1.5GiB. The actual file could be up to 4KiB greater--depending on how the fread() calls return. Again, if you want bigger or smaller files, modify the "#define DEFAULT_BREAK_SIZE" to be whatever you prefer. The value is measured in bytes and probably shouldn't exceed 32 bits.

Most of the code deals with reading the filename pattern, checking the pattern for errors, and modifying the pattern (to provide the zero pad and fixed-width). The actual read-write loop is relatively short and straightforward.

Lastly, I don't know how this will fare performance-wise. But aside from the 4KiB buffer, I don't think it would be significant.

Code:

#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define SUBSTITUTION_PATTERN                                        "%d"
#define SUBSTITUTION_WIDTH                                            5

#define DEFAULT_FILENAME_PATTERN                    "acquisition-%d.dat"

/* default file break size is 1.5GiB */
#define DEFAULT_BREAK_SIZE                                  0x60000000

int main( int argc, char *argv[] )
{
  FILE *outputFile;
  long unsigned byteCounter;
  size_t bytesRead;
  char buffer[4096];

  char *sourcePattern;
  char *filenamePattern;
  char *filename;
  char *literalPercent;
  char *nextPercent;
  unsigned substitutionCount;
  unsigned filenameLength;
  unsigned filenameCounter;
  unsigned counterMax = pow( 10, SUBSTITUTION_WIDTH );

  /* Given a filename pattern on the command line or use default? */
  if( argc > 1 )
    sourcePattern = argv[1];
  else
    sourcePattern = DEFAULT_FILENAME_PATTERN;

  /* Save a copy of the pattern to work with */
  filenameLength = strlen( sourcePattern );
  filenamePattern = malloc( sizeof( char ) * ( filenameLength + 1 ) );
  strncpy( filenamePattern, sourcePattern, filenameLength );
  filenamePattern[filenameLength] = '\0';

  /* Count the number of file number substitutions requested */
  substitutionCount = 0;
  sourcePattern = filenamePattern;
  do
  {
    /* Find first % -- used for error checking */
    nextPercent = strstr( sourcePattern, "%" );

    /* Find first %% -- ok because printf() will replace with a single
      '%' and that will not risk overflowing memory reserved for filename */
    literalPercent = strstr( sourcePattern, "%%" );
    sourcePattern = strstr( sourcePattern, SUBSTITUTION_PATTERN );
    if( nextPercent != sourcePattern &&
        nextPercent != literalPercent )
    {
      fprintf( stderr,
              "ERROR: Filename pattern uses a printf style substitution\n"
              "      other than %%d. This is not supported.\n" );
      return 1;
    }
    if( sourcePattern != NULL )
    {
      substitutionCount++;
      sourcePattern += 1;
    }
  } while( sourcePattern != NULL );

  /* Only a single substitution is supported */
  if( substitutionCount > 1 )
  {
    fprintf( stderr,
            "ERROR: Filename pattern uses %%d substitution more than\n"
            "      once. This is not supported.\n" );
      return 1;
  }

  /*
  * Now make a slight modification to the original pattern for zero
  * padded, fixed-width filenameCounter substitutions.
  * The net effect is to change "%d" to "%05d" (or equivalent based
  * on previous #defines
  */
  sourcePattern = filenamePattern;
  filenameLength = strlen( sourcePattern ) + 2;
  filenamePattern = malloc( sizeof( char ) * ( filenameLength + 1 ) );
  nextPercent = strstr( sourcePattern, SUBSTITUTION_PATTERN );
  *nextPercent = '\0';
  snprintf( filenamePattern, filenameLength + 1, "%s%%0%dd%s", sourcePattern,
            SUBSTITUTION_WIDTH, nextPercent + strlen( SUBSTITUTION_PATTERN ) );
  filenamePattern[filenameLength] = '\0';

  /* Can now free the original pattern because we have the zero padded,
    fixed-width version in filenamePattern */
  free( sourcePattern );

  /* Reserve memory large enough for the pattern and all requested
    substitutions */
  filenameLength = strlen( filenamePattern ) + SUBSTITUTION_WIDTH -
    ( strlen( SUBSTITUTION_PATTERN ) + 2 );
  filename = malloc( sizeof( char ) * ( filenameLength + 1 ) );
  filename[filenameLength-1] = '\0';


  /* start the main read-write loop */
  byteCounter = 0;
  filenameCounter = 0;
  while(! feof( stdin ) )
  {
    if( byteCounter == 0 )
    {
      snprintf( filename,
                filenameLength + 1,
                filenamePattern,
                filenameCounter );

      outputFile = fopen( filename, "w" );
      if( outputFile == NULL )
      {
        fprintf( stderr,
                "ERROR: Unable to open \"%s\" for writing.\n",
                filename );
        return 1;
      }
    }

    bytesRead = fread( buffer, sizeof( char ), 4096, stdin );
    fwrite( buffer, sizeof( char ), bytesRead, stdout );
    fwrite( buffer, sizeof( char ), bytesRead, outputFile );

    byteCounter += bytesRead;
    if( byteCounter >= DEFAULT_BREAK_SIZE )
    {
      fclose( outputFile );
      byteCounter = 0;
      filenameCounter = ( filenameCounter + 1 ) % counterMax;
    }
  }

  free( filenamePattern );
  free( filename );

  return 0;
}


suicidaleggroll 02-22-2012 03:36 PM

Wow! Thanks

There were a few bugs with it at first, but I have it running now. I'm going to let it cycle through a few files to make sure the data can be recombined properly afterward. It's only using about 5% of the proc though, less than half of perl.

If you're interested, the bugs that I found were:
1) outputFile is opened for writing (not append) every iteration of the loop, so it just overwrites the data every time (at any given time it only contains the most recent 4096 bytes of data). I moved the block of code to open the outputFile inside of the "byteCounter >= DEFAULT_BREAK_SIZE" if statement, after filenameCounter is incremented. I also copied it to before the while loop to initialize the first file

2) bytesRead += fread( buffer, sizeof( char ), 4096, stdin );
should be
bytesRead = fread( buffer, sizeof( char ), 4096, stdin );

otherwise bytesRead increases by 4096 every iteration of the loop, the value of which then gets added to byteCounter every iteration of the loop. The result is byteCounter increases exponentially with time, eg: (0, 4096, 12288, 24576, 40960, etc) rather than (0, 4096, 8192, 12288, 16384, 20480, etc).


Thanks again. Assuming the files recombine properly (which I see no reason why they shouldn't), this should work just fine.

Dark_Helmet 02-22-2012 03:41 PM

EDIT:
Geez... I originally scanned over your comments and thought I knew what you were talking about. I guess I was reading what I wanted/expected to read, but not what was actually there.

I changed the code to address what you mentioned... not what I thought you were saying originally :)

EDIT2:
Cleaned up the code a little more. Then again, maybe I added more bugs. That would be my luck. Anyway, glad it's working.

suicidaleggroll 02-22-2012 04:03 PM

Lol, I guess I missed your original reply. Sorry if my wording was ambiguous/confusing. Your modified code look just like what I have running now.

It's on the 5th file now, should be good enough to test the recombination and processing.

Dark_Helmet 02-22-2012 04:10 PM

Quote:

Originally Posted by suicidaleggroll
Sorry if my wording was ambiguous/confusing.

Not at all. You were quite clear. The areas of the code you were referencing had other bugs in them when I originally posted the code. I fixed them in a later edit. So when you started talking about those areas, I immediately assumed you were talking about the pre-edit bugs.

Only after I looked at it your response and the code a couple times did it click that "wait... something isn't right."

suicidaleggroll 02-24-2012 10:04 AM

The recombination and processing went smoothly, turns out that once the fifo fills up, it stops the cat from appending more data until the processing command pulls it out, which means I can do something as simple as:

Code:

mkfifo myfifo
./process -i myfifo

then in another terminal

for i in acquisition*; do
  echo Processing $i
  cat $i >> myfifo
done

and it works beautifully


All times are GMT -5. The time now is 06:14 PM.