LinuxQuestions.org
Help answer threads with 0 replies.
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 04-22-2012, 02:02 AM   #16
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled

Here is a threaded version of what I posted before (I skipped some error checking.) This is more of a practical solution that I'm sharing than a demonstration of what I've been talking about. Like Nominal Animal alluded to, each thread loads enough data into RAM to keep it processing for a while between I/O operations, which cuts down on I/O conflicts between threads.
Code:
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif

#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/mman.h>


typedef double m_value;

typedef struct
{
	unsigned int rows, cols, trans, map_size;
	m_value *data;
	pthread_mutex_t mutex;
} matrix;

typedef struct
{
	matrix *A, *B, *C;
	unsigned int P1, P2, threads, offset;
} thread_spec;


static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix);
static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor, matrix *new_matrix);

static inline unsigned int rows(matrix *M);
static inline unsigned int cols(matrix *M);
static inline int lock(matrix *M);
static inline int unlock(matrix *M);
static inline m_value *element(matrix *M, unsigned int row, unsigned int col);

static void *thread_multiply(void *spec);


int main(int argc, char *argv[])
{
	if (argc != 8)
	{
	fprintf(stderr, "%s (T)[A rows]x[A cols] [A filename] (T)[B rows]x[B cols] [B filename] [# input vectors](,[# output vectors]) [#threads] [C filename]\n", argv[0]);
	fprintf(stderr, "example: %s  1024x4096 A.dat 4096x2048 B.dat 256   4 C.dat\n", argv[0]);
	fprintf(stderr, "example: %s T4096x1024 A.dat 4096x2048 B.dat 256,4 4 C.dat\n", argv[0]);
	return 1;
	}

	const char *const A_file = argv[2], *const B_file = argv[4], *const C_file = argv[7],
	           *const A_spec = argv[1], *const B_spec = argv[3], *const vector_count = argv[5],
	           *const thread_count = argv[6];
	matrix A, B, C;
	int A_fd, B_fd, C_fd;
	unsigned int P1, P2 = 0, threads;


	if (!map_matrix_file(A_spec, A_file, &A_fd, &A)) return 1;
	if (!map_matrix_file(B_spec, B_file, &B_fd, &B)) return 1;
	if (!create_new_matrix(&A, &B, C_file, &C_fd, &C)) return 1;

	if (cols(&A) != rows(&B))
	{
	fprintf(stderr, "inner matrix dimensions are not equal: %u, %u\n", cols(&A), rows(&B));
	return 1;
	}

	if      (sscanf(vector_count, "%u,%u", &P1, &P2) == 2 && P1 > 0);
	else if (sscanf(vector_count, "%u", &P1)         == 1 && P1 > 0);
	else
	{
	fprintf(stderr, "invalid vector count '%s'\n", vector_count);
	return 1;
	}

	if (P2 == 0) P2 = 1;

	if (sscanf(thread_count, "%u", &threads) != 1 || threads < 1)
	{
	fprintf(stderr, "invalid vector count '%s'\n", thread_count);
	return 1;
	}


	fprintf(stderr, "matrix A: %ux%u (%s)\n", rows(&A), cols(&A), A_file);
	fprintf(stderr, "matrix B: %ux%u (%s)\n", rows(&B), cols(&B), B_file);
	fprintf(stderr, "matrix C: %ux%u (%s)\n", rows(&C), cols(&C), C_file);
	fprintf(stderr, "input vectors: %u (%luB for A, %luB for B)\n", P1,
	  P1 * rows(&A) * sizeof(m_value) * threads, P1 * cols(&B) * sizeof(m_value) * threads);
	fprintf(stderr, "output vectors: %u (%luB for C)\n", P2,
	  P2 * cols(&C) * sizeof(m_value) * threads);
	fprintf(stderr, "threads: %u\n", threads);


	pthread_t *all_threads = malloc(sizeof(pthread_t) * threads);

	for (int I = 0; I < threads; I++)
	{
	thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
	*new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, I };
	if (pthread_create(all_threads + I, NULL, &thread_multiply, new_thread) != 0)
	 {
	fprintf(stderr, "could not start thread: %s\n", strerror(errno));
	return 1;
	 }
	}

	for (int I = 0; I < threads; I++)
	pthread_join(all_threads[I], NULL);

	free(all_threads);


	munmap(A.data, A.map_size);
	munmap(B.data, B.map_size);
	munmap(C.data, C.map_size);
	close(A_fd);
	close(B_fd);
	close(C_fd);

	return 0;
}



static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix)
{
	if      (sscanf(specs, "T%ux%u", &new_matrix->rows, &new_matrix->cols) == 2) new_matrix->trans = 1;
	else if (sscanf(specs, "%ux%u",  &new_matrix->rows, &new_matrix->cols) == 2) new_matrix->trans = 0;
	else
	{
	fprintf(stderr, "invalid matrix specification '%s' (for '%s')\n", specs, file);
	return 0;
	}


	*descriptor = open(file, O_RDONLY);
	if (*descriptor < 0)
	{
	fprintf(stderr, "could not open matrix file '%s': %s\n", file, strerror(errno));
	return 0;
	}

	new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
	unsigned int page_size = sysconf(_SC_PAGE_SIZE);
	if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;

	new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ, MAP_PRIVATE, *descriptor, 0);
	if (new_matrix->data < 0)
	{
	fprintf(stderr, "could not map matrix file '%s': %s\n", file, strerror(errno));
	close(*descriptor);
	return 0;
	}

	pthread_mutex_init(&new_matrix->mutex, NULL);

	return 1;
}


static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor,
  matrix *new_matrix)
{
	*descriptor = open(file, O_RDWR | O_TRUNC | O_CREAT, 0666);
	if (*descriptor < 0)
	{
	fprintf(stderr, "could not open matrix file '%s': %s\n", file, strerror(errno));
	return 0;
	}

	new_matrix->rows  = rows(A);
	new_matrix->cols  = cols(B);
	new_matrix->trans = 0;

	static const m_value zero = 0;

	lseek(*descriptor, ( new_matrix->rows * new_matrix->cols - 1 ) * sizeof(m_value), SEEK_SET);
	write(*descriptor, &zero, sizeof zero);
	lseek(*descriptor, 0, SEEK_SET);

	new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
	unsigned int page_size = sysconf(_SC_PAGE_SIZE);
	if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;

	new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, *descriptor, 0);
	if (new_matrix->data < 0)
	{
	fprintf(stderr, "could not map matrix file '%s': %s\n", file, strerror(errno));
	close(*descriptor);
	return 0;
	}

	for (int I = 0; I < (signed) new_matrix->rows * new_matrix->cols; I++)
	new_matrix->data[I] = 0;

	pthread_mutex_init(&new_matrix->mutex, NULL);

	return 1;
}


static inline unsigned int rows(matrix *M)
{ return (M->trans? M->cols : M->rows); }

static inline unsigned int cols(matrix *M)
{ return (M->trans? M->rows : M->cols); }

static inline int lock(matrix *M)
{ return pthread_mutex_lock(&M->mutex) == 0; }

static inline int unlock(matrix *M)
{ return pthread_mutex_unlock(&M->mutex) == 0; }

static inline m_value *element(matrix *M, unsigned int row, unsigned int col)
{ return M->data + ( M->trans? ( col * M->cols + row ) : ( col + row * M->cols ) ); }


static void multiply_matrices(matrix *A, matrix *B, matrix *C, unsigned int P1,
  unsigned int P2, unsigned int threads, unsigned int offset)
{
	unsigned int A_rows = rows(A), A_cols = cols(A),
	             B_rows = rows(B), B_cols = cols(B),
	             C_rows = rows(C), C_cols = cols(C);

	if (P1 > B_rows) P1 = B_rows;
	if (P1 == 0)     P1 = 1;

	if (P2 > C_rows) P2 = C_rows;
	if (P2 == 0)     P2 = 1;

	unsigned int P_i, P_j;

	unsigned int A_cache = A_rows * P1, B_cache = B_cols * P1, C_cache = C_cols * P2;

	m_value *A_temp = (m_value*) malloc(A_cache * sizeof(m_value));
	m_value *B_temp = (m_value*) malloc(B_cache * sizeof(m_value));
	m_value *C_temp = (m_value*) malloc(C_cache * sizeof(m_value));

	//attempt to lock the cached data in memory (ignore failure)

	mlock(A_temp, A_cache);
	mlock(B_temp, B_cache);
	mlock(C_temp, C_cache);


	for (int I = P1 * offset; P1 && I < B_rows; I += P1 * threads)
	{
	P_i = (P1 > B_rows - I)? (B_rows - I) : P1;

	//load the next sections of data

	lock(A);

	if (A->trans)
	 {
	for (int J = 0; J < P_i; J++)
	for (int K = 0; K < A_rows; K++)
	A_temp[A_rows * J + K] = *element(A, K, I + J);
	 }

	else
	 {
	for (int K = 0; K < A_rows; K++)
	for (int J = 0; J < P_i; J++)
	A_temp[A_rows * J + K] = *element(A, K, I + J);
	 }

	unlock(A);

	lock(B);

	if (B->trans)
	 {
	for (int K = 0; K < B_cols; K++)
	for (int J = 0; J < P_i; J++)
	B_temp[B_cols * J + K] = *element(B, I + J, K);
	 }

	else
	 {
	for (int J = 0; J < P_i; J++)
	for (int K = 0; K < B_cols; K++)
	B_temp[B_cols * J + K] = *element(B, I + J, K);
	 }

	unlock(B);


	//compute partial sums for output

	for (int J = 0; J < C_rows; J += P2)
	 {
	P_j = (P2 > C_rows - J)? (C_rows - J) : P2;

	//compute a partial sum for the current output rows

	if (P_j == 1)
	  {
	for (int L = 0; L < C_cols; L++)
	C_temp[L] = A_temp[J] * B_temp[L];

	for (int K = 1; K < P_i; K++)
	   {
	for (int L = 0; L < C_cols; L++)
	C_temp[L] += A_temp[A_rows * K + J] * B_temp[B_cols * K + L];
	   }
	  }

	else
	  {
	for (int U = 0; U < P_j; U++)
	   {
	for (int L = 0; L < C_cols; L++)
	C_temp[C_cols * U + L] = A_temp[J + U] * B_temp[L];
	   }

	for (int U = 0; U < P_j; U++)
	   {
	for (int K = 1; K < P_i; K++)
	    {
	for (int L = 0; L < C_cols; L++)
	C_temp[C_cols * U + L] += A_temp[A_rows * K + J + U] * B_temp[B_cols * K + L];
	    }
	   }
	  }

	//add the partial sum to the current output rows

	lock(C);

	if (P_j == 1)
	  {
	for (int L = 0; L < C_cols; L++)
	*element(C, J, L) += C_temp[L];
	  }

	else if (C->trans)
	  {
	for (int L = 0; L < C_cols; L++)
	for (int U = 0; U < P_j; U++)
	*element(C, J + U, L) += C_temp[C_cols * U + L];
	  }

	else
	  {
	for (int U = 0; U < P_j; U++)
	for (int L = 0; L < C_cols; L++)
	*element(C, J + U, L) += C_temp[C_cols * U + L];
	  }

	unlock(C);
	 }

	}


	munlock(A_temp, A_cache);
	munlock(B_temp, B_cache);
	munlock(C_temp, C_cache);
	free(A_temp);
	free(B_temp);
	free(C_temp);
}


static void *thread_multiply(void *spec)
{
	thread_spec *const self_specs = (thread_spec*) spec;

	multiply_matrices(self_specs->A, self_specs->B, self_specs->C, self_specs->P1,
	  self_specs->P2, self_specs->threads, self_specs->offset);

	free(spec);
	return NULL;
}
Compile with gcc -O2 --std=c99 matrix-times.c -o matrix-times -lpthread, and add a "# of threads" argument before the output filename, e.g. ./matrix-times 1024x1024 A.dat 1024x1024 B.dat 100 2 C.dat.

For the 4096x4096 multiplication, this finished in 79s with 4 threads and 128 vectors cached (per input matrix per thread.) For 10240x10240 with 4 threads and 256 vectors it finished in ~23 minutes.

I also added an output-vector cache to this one, which causes the threads to compute several rows of output before writing (specify output vectors after the input vectors, e.g. ./matrix-times 1024x1024 A.dat 1024x1024 B.dat 100,4 2 C.dat.) This is essentially just a generalization of the outer-product decomposition to block matrices; something of a compromise between what I proposed and what Nominal Animal did. Caching 32 output vectors for 4096x4096 increases the time from 79s to 84s. Caching 64 output vectors for 10240x10240 reduces the time from ~23 to ~21.5 minutes. (in both cases, less-than 10% of the I/O data is cached.) Because of those results, I think that caching output vectors is only helpful if the matrix is large enough that the partial sums are actually written to the disk between iterations.
Kevin Barry

Last edited by ta0kira; 04-24-2012 at 09:53 AM. Reason: lots of code changes, added results
 
Old 04-25-2012, 03:37 PM   #17
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Quote:
Originally Posted by ta0kira View Post
I'm sorry, your post has too much extraneous information for me to fully understand your point. Please isolate your logical arguments or post example code.
I apologize. I wanted to make sure it was obvious to any reader of the thread that I was investigating the problem in real time, not stating something I already knew. Looking back, it looks more like incoherent babbling.

Strassen's algorithm for 2048x2048 block matrices takes an order of magnitude less time than the naive algorithm. While the blocks need to be read and written several times, ordering the operations will reduce the I/O needed to a point where CPU time used should dominate over time required for I/O. As this method will require only a fixed amount of RAM (which depends only on the number of threads used), it will scale to truly huge matrices pretty well.

While Strassen's is not numerically stable, it is nevertheless used in basically all math libraries, BLAS in particular.

If you need numerical stability, I think your approach is much better. You can even trade off some of the speed and/or larger memory use, by reordering the inner summation loops or adding temporary storage, for better stability by using Kahan summation algorithm. If there are lots of nearly cancelling elements, Kahan summation does help. (It turns out that on newer CPUs the extra additions and subtractions tend to be free or almost free, because most of the clock cycles taken would be needed for memory access anyway. But this requires reordering the inner loop to calculate a single sum at a time, so you don't need to save the overflow accumulator. Saving overflows in temporary storage will double the time the inner loop takes.)

I've been investigating libraries like OpenBLAS (in Debian), and it seems to me the special cases (especially power of two sizes) are not as efficient as they could be. There is also a very interesting special case often occurring in physics, sparse square matrices, that might be possible to handle very efficiently using recursive block matrices (of suitably aligned minimum size).

I started writing my own SSE2-based matrix code, but as I'm writing it from scratch and only when I'm bored, it'll probably be weeks before I have a sensible working example. If I do get it written, I'll post it in this thread (or a link to it, if it is too large).
 
Old 04-26-2012, 12:08 PM   #18
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
Strassen's algorithm for 2048x2048 block matrices takes an order of magnitude less time than the naive algorithm. While the blocks need to be read and written several times, ordering the operations will reduce the I/O needed to a point where CPU time used should dominate over time required for I/O. As this method will require only a fixed amount of RAM (which depends only on the number of threads used), it will scale to truly huge matrices pretty well.

While Strassen's is not numerically stable, it is nevertheless used in basically all math libraries, BLAS in particular.
I understand your point. My code has an O(N^3) inner loop, which I initially thought could be replaced by a call to cblas_dgemm in openblas after reading your post. Under some circumstances this works well; however, it seems like openblas uses a server with worker threads, meaning that you don't have control over the number of threads. So, despite having many threads that divide up the work, openblas seems to perform much worse when multiple threads need to multiply matrices (again, this is from what I can tell.) In theory, though, the same algorithm could be applied directly without these threading complications, and there could be an option to choose which algorithm to use.

Here is a revised version of what I posted before, with better threading control and with a macro to enable use of cblas_dgemm (relevant code highlighted.) This one is about twice as long, so it's probably better to put it in a file. I suppose that call could be replaced with any suitable library call. This minimizes read operations, balances write operations with memory usage, and provides 2 options for computing the intermediate matrix products. When having USE_BLAS defined it's essentially useless to specify more than 1 thread, whether or not openblas is built with multithreading.
Code:
#define USE_BLAS

#ifdef USE_BLAS 
#include <cblas.h>
#endif

#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/stat.h>


//global data for general use by functions

static const char *program_name = NULL;
static ssize_t page_size = 0;
static int show_progress = 0;


//matrix storage types

typedef double m_value;

typedef struct
{
    ssize_t rows, cols, trans, map_size;
    m_value *data;
    pthread_mutex_t mutex;
} matrix;


//thread data

typedef struct
{
    matrix *A, *B, *C;
    ssize_t P1, P2, threads, offset;
    pthread_barrier_t *barrier;
} thread_spec;


//other function declarations

static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix);
static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor, matrix *new_matrix);

static inline ssize_t rows(matrix *M);
static inline ssize_t cols(matrix *M);

static void *thread_multiply(void *spec);


//main function

int main(int argc, char *argv[])
{
    //set global variables

    program_name  = argv[0];
    page_size     = sysconf(_SC_PAGE_SIZE);
    show_progress = isatty(STDERR_FILENO);

    const char *const A_file = argv[2], *const B_file = argv[4], *const C_file = argv[7],
               *const A_spec = argv[1], *const B_spec = argv[3], *const vector_count = argv[5],
               *const thread_count = argv[6];
    matrix A, B, C;
    int A_fd, B_fd, C_fd;
    unsigned int P1, P2 = 0, threads;

    //check argument count

    if (argc != 8)
    {
    fprintf(stderr, "%s (T)[A rows]x[A cols] [A file] (T)[B rows]x[B cols] [B file] [# in vectors](,[# out vectors]) [# threads] [C file]\n", argv[0]);
    fprintf(stderr, "%s: example: %s  1024x4096 A.dat 4096x2048 B.dat 256   4 C.dat\n", argv[0], argv[0]);
    fprintf(stderr, "%s: example: %s T4096x1024 A.dat 4096x2048 B.dat 256,4 4 C.dat\n", argv[0], argv[0]);
    return 1;
    }


    //map matrix files

    if (!map_matrix_file(A_spec, A_file, &A_fd, &A)) return 1;
    if (!map_matrix_file(B_spec, B_file, &B_fd, &B)) return 1;
    if (!create_new_matrix(&A, &B, C_file, &C_fd, &C)) return 1;

    if (cols(&A) != rows(&B))
    {
    fprintf(stderr, "%s: inner matrix dimensions are not equal: %lu, %lu\n", program_name, cols(&A), rows(&B));
    return 1;
    }


    //parse general parameters

    char error;

    if      (sscanf(vector_count, "%u,%u%1c", &P1, &P2, &error) == 2 && P1 > 0);
    else if (sscanf(vector_count, "%u%1c", &P1, &error)         == 1 && P1 > 0);
    else
    {
    fprintf(stderr, "%s: invalid vector count '%s'\n", program_name, vector_count);
    return 1;
    }

    if (P2 == 0) P2 = 1;

    if (sscanf(thread_count, "%u", &threads) != 1 || threads < 1)
    {
    fprintf(stderr, "%s: invalid vector count '%s'\n", program_name, thread_count);
    return 1;
    }


    //display a summary of the computation

    fprintf(stderr, "matrix A: %lux%lu (%s)%s\n", rows(&A), cols(&A), A_file, A.trans? "^T" : "");
    fprintf(stderr, "matrix B: %lux%lu (%s)%s\n", rows(&B), cols(&B), B_file, B.trans? "^T" : "");
    fprintf(stderr, "matrix C: %lux%lu (%s)%s\n", rows(&C), cols(&C), C_file, C.trans? "^T" : "");
    fprintf(stderr, "threads: %u\n", threads);

    fprintf(stderr, "input cache: %u vectors (%.2f%% cached) [A: %luB; B: %luB]\n", P1,
      100. * (double) (P1 * (rows(&A) + cols(&B)) * threads) /
      (double) (rows(&A) * cols(&A) + rows(&B) * cols(&B)),
      P1 * rows(&A) * sizeof(m_value) * threads, P1 * cols(&B) * sizeof(m_value) * threads);

    fprintf(stderr, "output cache: %u vectors (%.2f%% cached) [C: %luB]\n", P2,
      100. * (double) (P2 * cols(&C) * threads) / (double) (rows(&C) * cols(&C)),
      P2 * cols(&C) * sizeof(m_value) * threads);

    fprintf(stderr, "total cached: %.2f%% [all: %luB]\n",
      100. * (double) ((P1 * (rows(&A) + cols(&B)) + P2 * cols(&C)) * threads) /
      (double) (rows(&A) * cols(&A) + rows(&B) * cols(&B) + rows(&C) * cols(&C)),
      ((P1 * (rows(&A) + cols(&B)) + P2 * cols(&C)) * threads) * sizeof(m_value));

    fprintf(stderr, "read cycles / write passes: %lu\n",
      (rows(&B) / P1) + ((rows(&B) % P1)? 1 : 0));

    fprintf(stderr, "write cycles per pass: %lu\n",
      (cols(&C) / P2) + ((cols(&C) % P2)? 1 : 0));


    //start the processing threads

    pthread_barrier_t barrier;
    pthread_t *all_threads = NULL;

    if (threads > 1)
    //multiple threads
    {
    if (pthread_barrier_init(&barrier, NULL, threads) != 0)
     {
    fprintf(stderr, "%s: could not create thread barrier: %s\n", program_name, strerror(errno));
    return 1;
     }

    all_threads = malloc(sizeof(pthread_t) * threads);
    assert(all_threads);
    }


    for (int I = 0; I < (signed) threads - 1; I++)
    {
    thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
    assert(new_thread);
    *new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, I + 1, &barrier };
    if (pthread_create(all_threads + I, NULL, &thread_multiply, new_thread) != 0)
     {
    fprintf(stderr, "%s: could not start thread: %s\n", program_name, strerror(errno));
    return 1;
     }
    }


    //(first thread is always this thread)

    thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
    assert(new_thread);
    *new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, 0, (threads > 1)? &barrier : NULL };
    thread_multiply(new_thread);


    if (threads > 1)
    {
    for (int I = 0; I < (signed) threads - 1; I++)
    pthread_join(all_threads[I], NULL);

    pthread_barrier_destroy(&barrier);

    free(all_threads);
    }


    //clean up

    munmap(A.data, A.map_size);
    munmap(B.data, B.map_size);
    munmap(C.data, C.map_size);
    close(A_fd);
    close(B_fd);
    close(C_fd);

    return 0;
}



static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix)
{
    //map an existing matrix file


    //parse dimensions

    char error;

    if      (sscanf(specs, "T%lux%lu%1c", &new_matrix->rows, &new_matrix->cols, &error) == 2) new_matrix->trans = 1;
    else if (sscanf(specs, "%lux%lu%1c",  &new_matrix->rows, &new_matrix->cols, &error) == 2) new_matrix->trans = 0;
    else
    {
    fprintf(stderr, "%s: invalid matrix specification '%s' (for '%s')\n", program_name, specs, file);
    return 0;
    }


    //open the file

    *descriptor = open(file, O_RDONLY);
    if (*descriptor < 0)
    {
    fprintf(stderr, "%s: could not open matrix file '%s': %s\n", program_name, file, strerror(errno));
    return 0;
    }


    //check the file size

    struct stat matrix_stat;
    if (fstat(*descriptor, &matrix_stat) != 0)
    fprintf(stderr, "%s: unable to verify size of '%s' (ignoring): %s\n", program_name, file, strerror(errno));

    else if (matrix_stat.st_size < new_matrix->rows * new_matrix->cols * sizeof(m_value))
    {
    fprintf(stderr, "%s: '%s' is too small for a %lux%lu matrix\n", program_name, file, new_matrix->rows, new_matrix->cols);
    return 0;
    }

    else if (matrix_stat.st_size != new_matrix->rows * new_matrix->cols * sizeof(m_value))
    fprintf(stderr, "%s: '%s' is too large for a %lux%lu matrix (ignoring)\n", program_name, file, new_matrix->rows, new_matrix->cols);


    //map the matrix

    new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
    if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;

    new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ, MAP_PRIVATE, *descriptor, 0);
    if (new_matrix->data < 0)
    {
    fprintf(stderr, "%s: could not map matrix file '%s': %s\n", program_name, file, strerror(errno));
    close(*descriptor);
    return 0;
    }


    pthread_mutex_init(&new_matrix->mutex, NULL);

    return 1;
}


static int create_new_matrix(matrix *A, matrix *B, const char *file, int *descriptor,
  matrix *new_matrix)
{
    //create a new matrix file


    static const m_value zero = 0;

    //open the file

    *descriptor = open(file, O_RDWR | O_TRUNC | O_CREAT, 0666);
    if (*descriptor < 0)
    {
    fprintf(stderr, "%s: could not open matrix file '%s': %s\n", program_name, file, strerror(errno));
    return 0;
    }


    //determine the matrix and file sizes

    new_matrix->rows  = rows(A);
    new_matrix->cols  = cols(B);
    new_matrix->trans = 0;

    ssize_t new_size = (new_matrix->rows * new_matrix->cols - 1) * sizeof(m_value);


    //expand the file

    if (lseek(*descriptor, new_size, SEEK_SET) != new_size)
    {
    fprintf(stderr, "%s: could not expand matrix file '%s': %s\n", program_name, file, strerror(errno));
    close(*descriptor);
    return 0;
    }

    if (write(*descriptor, &zero, sizeof zero) != sizeof zero)
    {
    fprintf(stderr, "%s: could not expand matrix file '%s': %s\n", program_name, file, strerror(errno));
    close(*descriptor);
    return 0;
    }

    lseek(*descriptor, 0, SEEK_SET);


    //map the matrix

    new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
    if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;

    new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, *descriptor, 0);
    if (new_matrix->data < 0)
    {
    fprintf(stderr, "%s: could not map matrix file '%s': %s\n", program_name, file, strerror(errno));
    close(*descriptor);
    return 0;
    }


    pthread_mutex_init(&new_matrix->mutex, NULL);

    return 1;
}


//functions to advise kernel of future map access

static inline int advise_rows(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_cols(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_rows_done(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_cols_done(matrix *M, ssize_t row, ssize_t col, ssize_t size);

static inline int lock(matrix *M);
static inline int unlock(matrix *M);
static inline m_value *element(matrix *M, ssize_t row, ssize_t col);


static void multiply_matrices(matrix *A, matrix *B, matrix *C, ssize_t P1,
  ssize_t P2, unsigned int threads, unsigned int offset, pthread_barrier_t *barrier)
{
    //mutex to control allocation access

    static pthread_mutex_t malloc_mutex = PTHREAD_MUTEX_INITIALIZER;
    static int mlock_failed = 0;
    static ssize_t total_done = 0;


    //mutex/condition for synchronizing write operations for threads

    static pthread_mutex_t first_pass_mutex = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t  first_pass_cond  = PTHREAD_COND_INITIALIZER;
    static ssize_t first_pass = 0;


    //copy dimensions and check validity of cache values

    ssize_t A_rows = rows(A), A_cols = cols(A),
            B_rows = rows(B), B_cols = cols(B),
            C_rows = rows(C), C_cols = cols(C);

    if (P1 > B_rows) P1 = B_rows;
    if (P1 == 0)     P1 = 1;

    if (P2 > C_rows) P2 = C_rows;
    if (P2 == 0)     P2 = 1;


    ssize_t P_i, P_j;
    int barrier_outcome;
    int barrier_iterations = (int) B_rows / (P1 * threads) - ((B_rows % (P1 * threads))? 0 : 1);

    ssize_t A_cache = A_rows * P1, B_cache = B_cols * P1, C_cache = C_cols * P2;


    //allocate data caches

    if (pthread_mutex_lock(&malloc_mutex) != 0) abort();

    m_value *A_temp = NULL;
    posix_memalign((void**) &A_temp, page_size, A_cache * sizeof(m_value));
    assert(A_temp);
    m_value *B_temp = NULL;
    posix_memalign((void**) &B_temp, page_size, B_cache * sizeof(m_value));
    assert(B_temp);
    m_value *C_temp = NULL;
    posix_memalign((void**) &C_temp, page_size, C_cache * sizeof(m_value));
    assert(C_temp);

    //attempt to lock the cached data in memory (ignore failure)

    int mlocked = !mlock_failed;

    if (mlocked) mlocked = (mlock(A_temp, A_cache) == 0);
    if (mlocked) mlocked = (mlock(B_temp, B_cache) == 0);
    if (mlocked) mlocked = (mlock(C_temp, C_cache) == 0);

    if (!mlocked && !mlock_failed)
    {
    fprintf(stderr, "%s: unable to lock all caches in memory (ignoring): %s\n", program_name, strerror(errno));
    mlock_failed = 1;
    }

    if (pthread_mutex_unlock(&malloc_mutex) != 0) abort();


    //iterate through the input data

    for (int I = P1 * offset; I < B_rows; I += P1 * threads)
    {

    P_i = (P1 > B_rows - I)? (B_rows - I) : P1;

    //load the next sections of data

    lock(A);
    advise_cols(A, 0, I, P_i * A_rows);

    if (A->trans)
     {
    for (int J = 0; J < P_i; J++)
    for (int K = 0; K < A_rows; K++)
    A_temp[A_rows * J + K] = *element(A, K, I + J);
     }

    else
     {
    for (int K = 0; K < A_rows; K++)
    for (int J = 0; J < P_i; J++)
    A_temp[A_rows * J + K] = *element(A, K, I + J);
     }

    advise_cols_done(A, 0, I, P_i * A_rows);
    unlock(A);

    lock(B);
    advise_rows(B, I, 0, P_i * B_cols);

    if (B->trans)
     {
    for (int K = 0; K < B_cols; K++)
    for (int J = 0; J < P_i; J++)
    B_temp[B_cols * J + K] = *element(B, I + J, K);
     }

    else
     {
    for (int J = 0; J < P_i; J++)
    for (int K = 0; K < B_cols; K++)
    B_temp[B_cols * J + K] = *element(B, I + J, K);
     }

    advise_rows_done(B, I, 0, P_i * B_cols);
    unlock(B);


    //compute partial sums for output

    for (int J = 0; J < C_rows; J += P2)
     {
    P_j = (P2 > C_rows - J)? (C_rows - J) : P2;

    //compute a partial sum for the current output rows

    if (P_j == 1)
      {
    for (int L = 0; L < C_cols; L++)
    C_temp[L] = A_temp[J] * B_temp[L];

    for (int K = 1; K < P_i; K++)
    for (int L = 0; L < C_cols; L++)
    C_temp[L] += A_temp[A_rows * K + J] * B_temp[B_cols * K + L];
      }

    else
      {
#ifndef USE_BLAS

    for (int U = 0; U < P_j; U++)
    for (int L = 0; L < C_cols; L++)
    C_temp[C_cols * U + L] = A_temp[J + U] * B_temp[L];

    for (int U = 0; U < P_j; U++)
    for (int K = 1; K < P_i; K++)
    for (int L = 0; L < C_cols; L++)
    C_temp[C_cols * U + L] += A_temp[A_rows * K + J + U] * B_temp[B_cols * K + L];

#else

    cblas_dgemm(CblasRowMajor, CblasTrans, CblasNoTrans, P_j, C_cols, P_i, 1.0,
      A_temp + J, A_rows, B_temp, B_cols, 0.0, C_temp, C_cols);

#endif
      }


    //wait for the first write cycle for the current section (except for first thread)

    if (offset)
    {
    if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();

    if (J + P_j <= first_pass)
     {
    if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
     }
    else
     {
    if (pthread_cond_wait(&first_pass_cond, &first_pass_mutex) != 0) abort();
    if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
     }
    }


    //add the partial sum to the current output rows

    lock(C);
    advise_rows(C, J, 0, P_j * C_cols);


    if (P_j == 1)
      {
    if (I == 0)
    for (int L = 0; L < C_cols; L++)
    *element(C, J, L) = C_temp[L];

    else
    for (int L = 0; L < C_cols; L++)
    *element(C, J, L) += C_temp[L];
      }

    else if (C->trans)
      {
    if (I == 0)
    for (int L = 0; L < C_cols; L++)
    for (int U = 0; U < P_j; U++)
    *element(C, J + U, L) = C_temp[C_cols * U + L];

    else
    for (int L = 0; L < C_cols; L++)
    for (int U = 0; U < P_j; U++)
    *element(C, J + U, L) += C_temp[C_cols * U + L];
      }

    else
      {
    if (I == 0)
    for (int U = 0; U < P_j; U++)
    for (int L = 0; L < C_cols; L++)
    *element(C, J + U, L) = C_temp[C_cols * U + L];

    else
    for (int U = 0; U < P_j; U++)
    for (int L = 0; L < C_cols; L++)
    *element(C, J + U, L) += C_temp[C_cols * U + L];
      }

    total_done += P_j * P_i;

    if (show_progress)
    fprintf(stderr, "%.2f%%\t\r", 100. * (double) total_done / (double) (B_rows * C_rows));

    //indicate completion of the first pass

    if (!offset && threads > 1)
      {
    if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();
    first_pass = J + P_j;
    if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
    if (pthread_cond_broadcast(&first_pass_cond) != 0) abort();
      }


    advise_rows_done(C, J, 0, P_j * C_cols);
    unlock(C);
     }


    //block until all threads finish the iteration

    if (barrier && barrier_iterations-- > 0)
     {
    barrier_outcome = barrier? pthread_barrier_wait(barrier) : 0;
    if (barrier_outcome && barrier_outcome != PTHREAD_BARRIER_SERIAL_THREAD) abort();
     }

    //reset the "pass" counter for the next iteration

    if (!offset && threads > 1)
     {
    if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();
    if (I + P1 * threads < B_cols) first_pass = 0;
    if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
    if (pthread_cond_broadcast(&first_pass_cond) != 0) abort();
     }

    }


    //clean up

    munlock(A_temp, A_cache);
    munlock(B_temp, B_cache);
    munlock(C_temp, C_cache);
    free(A_temp);
    free(B_temp);
    free(C_temp);
}


static void *thread_multiply(void *spec)
{
    thread_spec *const self_specs = (thread_spec*) spec;

    int barrier_outcome = self_specs->barrier? pthread_barrier_wait(self_specs->barrier) : 0;
    if (barrier_outcome && barrier_outcome != PTHREAD_BARRIER_SERIAL_THREAD) abort();

    multiply_matrices(self_specs->A, self_specs->B, self_specs->C, self_specs->P1,
      self_specs->P2, self_specs->threads, self_specs->offset, self_specs->barrier);

    free(spec);
    return NULL;
}


//helper functions

static inline ssize_t rows(matrix *M)
{ if (M->trans) return M->cols; else return M->rows; }

static inline ssize_t cols(matrix *M)
{ if (M->trans) return M->rows; else return M->cols; }

static inline int lock(matrix *M)
{ return pthread_mutex_lock(&M->mutex) == 0; }

static inline int unlock(matrix *M)
{ return pthread_mutex_unlock(&M->mutex) == 0; }

static inline m_value *element(matrix *M, ssize_t row, ssize_t col)
{
    if (M->trans) return M->data + (col * M->cols /*<-- not a mistake*/ + row);
    else          return M->data + (col + row * M->cols);
}


static inline ssize_t page_align_low(ssize_t size)
{ return (size / page_size) * page_size; }

static inline ssize_t page_align_high(ssize_t size)
{ if (size % page_size) return ((size / page_size) + 1) * page_size; else return size; }

static inline int advise(matrix *M, ssize_t x, ssize_t y, ssize_t size, int all,
  int advice1, int advice2)
{
    if (all)
    return madvise(M->data, M->map_size, MADV_RANDOM) == 0;
    else
    return madvise(M->data + page_align_low(y + x * M->cols),
      page_align_high(size + y + x * M->cols), MADV_SEQUENTIAL) == 0;
}


static inline int advise_rows(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, row, col, size, M->trans, MADV_RANDOM, MADV_SEQUENTIAL); }

static inline int advise_cols(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, col, row, size, !M->trans, MADV_RANDOM, MADV_SEQUENTIAL); }

static inline int advise_rows_done(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, row, col, size, M->trans, MADV_NORMAL, MADV_NORMAL); }

static inline int advise_cols_done(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, col, row, size, !M->trans, MADV_NORMAL, MADV_NORMAL); }
Build with gcc -Wall -O2 --std=c99 matrix-times.c -o matrix-times -lpthread -lopenblas.
Kevin Barry

Last edited by ta0kira; 04-26-2012 at 12:10 PM.
 
Old 04-26-2012, 02:24 PM   #19
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Quote:
Originally Posted by ta0kira View Post
openblas uses a server with worker threads, meaning that you don't have control over the number of threads
Confirmed. Not all BLAS implementations do, fortunately; for example the one GSL ships with (libgslcblas) uses somewhat more total CPU time, but it does not use multiple threads. Hmm.. I should check current versions of ATLAS.

Using GSL_CBLAS, 2048x2048 matrix-matrix multiplication takes about 22 seconds CPU time, and I used that as a basis for my original calculations. I only just noticed it does 1024x1024 in just 3 seconds, which means it has a bug/feature: it is very slow on larger matrices. OpenBLAS seems to use two cores, totaling about two seconds CPU time for 2048x2048 matrix-matrix multiplication.

Assuming 8 cores of comparable CPU power (a mid-line server or node) to multiply two 10240x10240 matrices, the memory use would be about 1GB total (4*32=128MB per thread), bandwidth should be about four 2048x2048 matrices multiplied per second, and maximum I/O needed for 384 MB/s (read) + 128 MB/s (write).

If each thread were to compute one target block matrix, summing the products of a row of left block matrices and a column of right block matrices, the I/O would be cut down to 256MB/s (read) + a bit for writing at end. In theory, two 10240x10240 matrices could be multiplied in 31 seconds wall time (the target matrix being kept in memory, the summation would take very little time, a couple of seconds total overall) -- but the I/O at 200MB/s read-only would stretch that to about 40 seconds wall time. The total memory use would be about 160M per thread, or 1.2G for 8 threads total. However, you'd have to split the entire source matrices first, which at 100MB/s read + 100MB/s write would take about 17 seconds. Add another say 8 seconds for the write afterwards, and the total computation time would be about 70 seconds wall time. My CPU is roughly on par (0.5x - 2.0x) the speed of yours (I'm too lazy to run full 10240x10240 test right now; I apologize), so roughly speaking, this approach seems to be an order of magnitude faster. But keep in mind the much bigger memory use, the numerical stability issues with Strassen algorithm, and that I haven't verified this in real life yet!

(In my original post, I assumed the 22 seconds per 2048x2048 matrix-matrix multiplication, based on GSL_CBLAS. That would give ample time to do the I/O, no hurry. It seems that one tenth of that, or two seconds, should be attainable; that makes I/O again the bottleneck.)

It is rather frustrating to see OpenBLAS do the work efficiently for large enough matrices, but using multiple threads, while GSL_CBLAS has some kind of a brainfart with this large matrices -- but it'd use only one thread.

I don't really need matrix operations for anything myself, I just find it relaxing to work on the gritty details. Trying to come up with an efficient SSE2 implementation that would be easy to interface to. I've already decided on demanding strict alignment (each row of each matrix must start at a 32 byte boundary) and size (each row must be a multiple of four doubles -- thus also naturally aligning the all rows), allowing me to switch to AVX or its successor when I next upgrade my CPU.
 
Old 04-26-2012, 04:25 PM   #20
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
(In my original post, I assumed the 22 seconds per 2048x2048 matrix-matrix multiplication, based on GSL_CBLAS. That would give ample time to do the I/O, no hurry. It seems that one tenth of that, or two seconds, should be attainable; that makes I/O again the bottleneck.)
For some reason, using GSL with libgslcblas in my code runs about the same speed as it does with the O(N^3) loop (maybe slightly worse.) You can also link to libopenblas while using the GSL interface, which is convenient in some senses, but it doesn't allow you to easily choose between the two at run-time.

Anyway, at times I do need to multiply huge matrices, so this might be something that's useful for me in the future.
Kevin Barry
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Memory map or physical address of some memory allocated by malloc demon007 Programming 1 02-04-2012 06:17 AM
[SOLVED] gdb and memory map itz2000 Linux - Security 1 04-21-2010 10:08 PM
Linux Virtual memory mapping to Board memory map !rajkums! Linux - Kernel 4 10-19-2008 12:27 PM
Kernel virtual memory map TO Board memory map -----> Mapping !rajkums! Linux - Embedded & Single-board computer 0 10-18-2008 09:21 AM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 12:58 PM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration