ProgrammingThis forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.
Notices
Welcome to LinuxQuestions.org, a friendly and active Linux Community.
You are currently viewing LQ as a guest. By joining our community you will have the ability to post topics, receive our newsletter, use the advanced search, subscribe to threads and access many other special features. Registration is quick, simple and absolutely free. Join our community today!
Note that registered members see fewer ads, and ContentLink is completely disabled once you log in.
If you have any problems with the registration process or your account login, please contact us. If you need to reset your password, click here.
Having a problem logging in? Please visit this page to clear all LQ-related cookies.
Get a virtual cloud desktop with the Linux distro that you want in less than five minutes with Shells! With over 10 pre-installed distros to choose from, the worry-free installation life is here! Whether you are a digital nomad or just looking for flexibility, Shells can put your Linux machine on the device that you want to use.
Exclusive for LQ members, get up to 45% off per month. Click here for more info.
Here is a threaded version of what I posted before (I skipped some error checking.) This is more of a practical solution that I'm sharing than a demonstration of what I've been talking about. Like Nominal Animal alluded to, each thread loads enough data into RAM to keep it processing for a while between I/O operations, which cuts down on I/O conflicts between threads.
Code:
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/mman.h>
typedef double m_value;
typedef struct
{
unsigned int rows, cols, trans, map_size;
m_value *data;
pthread_mutex_t mutex;
} matrix;
typedef struct
{
matrix *A, *B, *C;
unsigned int P1, P2, threads, offset;
} thread_spec;
static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix);
static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor, matrix *new_matrix);
static inline unsigned int rows(matrix *M);
static inline unsigned int cols(matrix *M);
static inline int lock(matrix *M);
static inline int unlock(matrix *M);
static inline m_value *element(matrix *M, unsigned int row, unsigned int col);
static void *thread_multiply(void *spec);
int main(int argc, char *argv[])
{
if (argc != 8)
{
fprintf(stderr, "%s (T)[A rows]x[A cols] [A filename] (T)[B rows]x[B cols] [B filename] [# input vectors](,[# output vectors]) [#threads] [C filename]\n", argv[0]);
fprintf(stderr, "example: %s 1024x4096 A.dat 4096x2048 B.dat 256 4 C.dat\n", argv[0]);
fprintf(stderr, "example: %s T4096x1024 A.dat 4096x2048 B.dat 256,4 4 C.dat\n", argv[0]);
return 1;
}
const char *const A_file = argv[2], *const B_file = argv[4], *const C_file = argv[7],
*const A_spec = argv[1], *const B_spec = argv[3], *const vector_count = argv[5],
*const thread_count = argv[6];
matrix A, B, C;
int A_fd, B_fd, C_fd;
unsigned int P1, P2 = 0, threads;
if (!map_matrix_file(A_spec, A_file, &A_fd, &A)) return 1;
if (!map_matrix_file(B_spec, B_file, &B_fd, &B)) return 1;
if (!create_new_matrix(&A, &B, C_file, &C_fd, &C)) return 1;
if (cols(&A) != rows(&B))
{
fprintf(stderr, "inner matrix dimensions are not equal: %u, %u\n", cols(&A), rows(&B));
return 1;
}
if (sscanf(vector_count, "%u,%u", &P1, &P2) == 2 && P1 > 0);
else if (sscanf(vector_count, "%u", &P1) == 1 && P1 > 0);
else
{
fprintf(stderr, "invalid vector count '%s'\n", vector_count);
return 1;
}
if (P2 == 0) P2 = 1;
if (sscanf(thread_count, "%u", &threads) != 1 || threads < 1)
{
fprintf(stderr, "invalid vector count '%s'\n", thread_count);
return 1;
}
fprintf(stderr, "matrix A: %ux%u (%s)\n", rows(&A), cols(&A), A_file);
fprintf(stderr, "matrix B: %ux%u (%s)\n", rows(&B), cols(&B), B_file);
fprintf(stderr, "matrix C: %ux%u (%s)\n", rows(&C), cols(&C), C_file);
fprintf(stderr, "input vectors: %u (%luB for A, %luB for B)\n", P1,
P1 * rows(&A) * sizeof(m_value) * threads, P1 * cols(&B) * sizeof(m_value) * threads);
fprintf(stderr, "output vectors: %u (%luB for C)\n", P2,
P2 * cols(&C) * sizeof(m_value) * threads);
fprintf(stderr, "threads: %u\n", threads);
pthread_t *all_threads = malloc(sizeof(pthread_t) * threads);
for (int I = 0; I < threads; I++)
{
thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
*new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, I };
if (pthread_create(all_threads + I, NULL, &thread_multiply, new_thread) != 0)
{
fprintf(stderr, "could not start thread: %s\n", strerror(errno));
return 1;
}
}
for (int I = 0; I < threads; I++)
pthread_join(all_threads[I], NULL);
free(all_threads);
munmap(A.data, A.map_size);
munmap(B.data, B.map_size);
munmap(C.data, C.map_size);
close(A_fd);
close(B_fd);
close(C_fd);
return 0;
}
static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix)
{
if (sscanf(specs, "T%ux%u", &new_matrix->rows, &new_matrix->cols) == 2) new_matrix->trans = 1;
else if (sscanf(specs, "%ux%u", &new_matrix->rows, &new_matrix->cols) == 2) new_matrix->trans = 0;
else
{
fprintf(stderr, "invalid matrix specification '%s' (for '%s')\n", specs, file);
return 0;
}
*descriptor = open(file, O_RDONLY);
if (*descriptor < 0)
{
fprintf(stderr, "could not open matrix file '%s': %s\n", file, strerror(errno));
return 0;
}
new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
unsigned int page_size = sysconf(_SC_PAGE_SIZE);
if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;
new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ, MAP_PRIVATE, *descriptor, 0);
if (new_matrix->data < 0)
{
fprintf(stderr, "could not map matrix file '%s': %s\n", file, strerror(errno));
close(*descriptor);
return 0;
}
pthread_mutex_init(&new_matrix->mutex, NULL);
return 1;
}
static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor,
matrix *new_matrix)
{
*descriptor = open(file, O_RDWR | O_TRUNC | O_CREAT, 0666);
if (*descriptor < 0)
{
fprintf(stderr, "could not open matrix file '%s': %s\n", file, strerror(errno));
return 0;
}
new_matrix->rows = rows(A);
new_matrix->cols = cols(B);
new_matrix->trans = 0;
static const m_value zero = 0;
lseek(*descriptor, ( new_matrix->rows * new_matrix->cols - 1 ) * sizeof(m_value), SEEK_SET);
write(*descriptor, &zero, sizeof zero);
lseek(*descriptor, 0, SEEK_SET);
new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
unsigned int page_size = sysconf(_SC_PAGE_SIZE);
if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;
new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, *descriptor, 0);
if (new_matrix->data < 0)
{
fprintf(stderr, "could not map matrix file '%s': %s\n", file, strerror(errno));
close(*descriptor);
return 0;
}
for (int I = 0; I < (signed) new_matrix->rows * new_matrix->cols; I++)
new_matrix->data[I] = 0;
pthread_mutex_init(&new_matrix->mutex, NULL);
return 1;
}
static inline unsigned int rows(matrix *M)
{ return (M->trans? M->cols : M->rows); }
static inline unsigned int cols(matrix *M)
{ return (M->trans? M->rows : M->cols); }
static inline int lock(matrix *M)
{ return pthread_mutex_lock(&M->mutex) == 0; }
static inline int unlock(matrix *M)
{ return pthread_mutex_unlock(&M->mutex) == 0; }
static inline m_value *element(matrix *M, unsigned int row, unsigned int col)
{ return M->data + ( M->trans? ( col * M->cols + row ) : ( col + row * M->cols ) ); }
static void multiply_matrices(matrix *A, matrix *B, matrix *C, unsigned int P1,
unsigned int P2, unsigned int threads, unsigned int offset)
{
unsigned int A_rows = rows(A), A_cols = cols(A),
B_rows = rows(B), B_cols = cols(B),
C_rows = rows(C), C_cols = cols(C);
if (P1 > B_rows) P1 = B_rows;
if (P1 == 0) P1 = 1;
if (P2 > C_rows) P2 = C_rows;
if (P2 == 0) P2 = 1;
unsigned int P_i, P_j;
unsigned int A_cache = A_rows * P1, B_cache = B_cols * P1, C_cache = C_cols * P2;
m_value *A_temp = (m_value*) malloc(A_cache * sizeof(m_value));
m_value *B_temp = (m_value*) malloc(B_cache * sizeof(m_value));
m_value *C_temp = (m_value*) malloc(C_cache * sizeof(m_value));
//attempt to lock the cached data in memory (ignore failure)
mlock(A_temp, A_cache);
mlock(B_temp, B_cache);
mlock(C_temp, C_cache);
for (int I = P1 * offset; P1 && I < B_rows; I += P1 * threads)
{
P_i = (P1 > B_rows - I)? (B_rows - I) : P1;
//load the next sections of data
lock(A);
if (A->trans)
{
for (int J = 0; J < P_i; J++)
for (int K = 0; K < A_rows; K++)
A_temp[A_rows * J + K] = *element(A, K, I + J);
}
else
{
for (int K = 0; K < A_rows; K++)
for (int J = 0; J < P_i; J++)
A_temp[A_rows * J + K] = *element(A, K, I + J);
}
unlock(A);
lock(B);
if (B->trans)
{
for (int K = 0; K < B_cols; K++)
for (int J = 0; J < P_i; J++)
B_temp[B_cols * J + K] = *element(B, I + J, K);
}
else
{
for (int J = 0; J < P_i; J++)
for (int K = 0; K < B_cols; K++)
B_temp[B_cols * J + K] = *element(B, I + J, K);
}
unlock(B);
//compute partial sums for output
for (int J = 0; J < C_rows; J += P2)
{
P_j = (P2 > C_rows - J)? (C_rows - J) : P2;
//compute a partial sum for the current output rows
if (P_j == 1)
{
for (int L = 0; L < C_cols; L++)
C_temp[L] = A_temp[J] * B_temp[L];
for (int K = 1; K < P_i; K++)
{
for (int L = 0; L < C_cols; L++)
C_temp[L] += A_temp[A_rows * K + J] * B_temp[B_cols * K + L];
}
}
else
{
for (int U = 0; U < P_j; U++)
{
for (int L = 0; L < C_cols; L++)
C_temp[C_cols * U + L] = A_temp[J + U] * B_temp[L];
}
for (int U = 0; U < P_j; U++)
{
for (int K = 1; K < P_i; K++)
{
for (int L = 0; L < C_cols; L++)
C_temp[C_cols * U + L] += A_temp[A_rows * K + J + U] * B_temp[B_cols * K + L];
}
}
}
//add the partial sum to the current output rows
lock(C);
if (P_j == 1)
{
for (int L = 0; L < C_cols; L++)
*element(C, J, L) += C_temp[L];
}
else if (C->trans)
{
for (int L = 0; L < C_cols; L++)
for (int U = 0; U < P_j; U++)
*element(C, J + U, L) += C_temp[C_cols * U + L];
}
else
{
for (int U = 0; U < P_j; U++)
for (int L = 0; L < C_cols; L++)
*element(C, J + U, L) += C_temp[C_cols * U + L];
}
unlock(C);
}
}
munlock(A_temp, A_cache);
munlock(B_temp, B_cache);
munlock(C_temp, C_cache);
free(A_temp);
free(B_temp);
free(C_temp);
}
static void *thread_multiply(void *spec)
{
thread_spec *const self_specs = (thread_spec*) spec;
multiply_matrices(self_specs->A, self_specs->B, self_specs->C, self_specs->P1,
self_specs->P2, self_specs->threads, self_specs->offset);
free(spec);
return NULL;
}
Compile with gcc -O2 --std=c99 matrix-times.c -o matrix-times -lpthread, and add a "# of threads" argument before the output filename, e.g. ./matrix-times 1024x1024 A.dat 1024x1024 B.dat 100 2 C.dat.
For the 4096x4096 multiplication, this finished in 79s with 4 threads and 128 vectors cached (per input matrix per thread.) For 10240x10240 with 4 threads and 256 vectors it finished in ~23 minutes.
I also added an output-vector cache to this one, which causes the threads to compute several rows of output before writing (specify output vectors after the input vectors, e.g. ./matrix-times 1024x1024 A.dat 1024x1024 B.dat 100,4 2 C.dat.) This is essentially just a generalization of the outer-product decomposition to block matrices; something of a compromise between what I proposed and what Nominal Animal did. Caching 32 output vectors for 4096x4096 increases the time from 79s to 84s. Caching 64 output vectors for 10240x10240 reduces the time from ~23 to ~21.5 minutes. (in both cases, less-than 10% of the I/O data is cached.) Because of those results, I think that caching output vectors is only helpful if the matrix is large enough that the partial sums are actually written to the disk between iterations.
Kevin Barry
Last edited by ta0kira; 04-24-2012 at 09:53 AM.
Reason: lots of code changes, added results
I'm sorry, your post has too much extraneous information for me to fully understand your point. Please isolate your logical arguments or post example code.
I apologize. I wanted to make sure it was obvious to any reader of the thread that I was investigating the problem in real time, not stating something I already knew. Looking back, it looks more like incoherent babbling.
Strassen's algorithm for 2048x2048 block matrices takes an order of magnitude less time than the naive algorithm. While the blocks need to be read and written several times, ordering the operations will reduce the I/O needed to a point where CPU time used should dominate over time required for I/O. As this method will require only a fixed amount of RAM (which depends only on the number of threads used), it will scale to truly huge matrices pretty well.
While Strassen's is not numerically stable, it is nevertheless used in basically all math libraries, BLAS in particular.
If you need numerical stability, I think your approach is much better. You can even trade off some of the speed and/or larger memory use, by reordering the inner summation loops or adding temporary storage, for better stability by using Kahan summation algorithm. If there are lots of nearly cancelling elements, Kahan summation does help. (It turns out that on newer CPUs the extra additions and subtractions tend to be free or almost free, because most of the clock cycles taken would be needed for memory access anyway. But this requires reordering the inner loop to calculate a single sum at a time, so you don't need to save the overflow accumulator. Saving overflows in temporary storage will double the time the inner loop takes.)
I've been investigating libraries like OpenBLAS (in Debian), and it seems to me the special cases (especially power of two sizes) are not as efficient as they could be. There is also a very interesting special case often occurring in physics, sparse square matrices, that might be possible to handle very efficiently using recursive block matrices (of suitably aligned minimum size).
I started writing my own SSE2-based matrix code, but as I'm writing it from scratch and only when I'm bored, it'll probably be weeks before I have a sensible working example. If I do get it written, I'll post it in this thread (or a link to it, if it is too large).
Strassen's algorithm for 2048x2048 block matrices takes an order of magnitude less time than the naive algorithm. While the blocks need to be read and written several times, ordering the operations will reduce the I/O needed to a point where CPU time used should dominate over time required for I/O. As this method will require only a fixed amount of RAM (which depends only on the number of threads used), it will scale to truly huge matrices pretty well.
While Strassen's is not numerically stable, it is nevertheless used in basically all math libraries, BLAS in particular.
I understand your point. My code has an O(N^3) inner loop, which I initially thought could be replaced by a call to cblas_dgemm in openblas after reading your post. Under some circumstances this works well; however, it seems like openblas uses a server with worker threads, meaning that you don't have control over the number of threads. So, despite having many threads that divide up the work, openblas seems to perform much worse when multiple threads need to multiply matrices (again, this is from what I can tell.) In theory, though, the same algorithm could be applied directly without these threading complications, and there could be an option to choose which algorithm to use.
Here is a revised version of what I posted before, with better threading control and with a macro to enable use of cblas_dgemm (relevant code highlighted.) This one is about twice as long, so it's probably better to put it in a file. I suppose that call could be replaced with any suitable library call. This minimizes read operations, balances write operations with memory usage, and provides 2 options for computing the intermediate matrix products. When having USE_BLAS defined it's essentially useless to specify more than 1 thread, whether or not openblas is built with multithreading.
Code:
#define USE_BLAS
#ifdef USE_BLAS
#include <cblas.h>
#endif
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/stat.h>
//global data for general use by functions
static const char *program_name = NULL;
static ssize_t page_size = 0;
static int show_progress = 0;
//matrix storage types
typedef double m_value;
typedef struct
{
ssize_t rows, cols, trans, map_size;
m_value *data;
pthread_mutex_t mutex;
} matrix;
//thread data
typedef struct
{
matrix *A, *B, *C;
ssize_t P1, P2, threads, offset;
pthread_barrier_t *barrier;
} thread_spec;
//other function declarations
static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix);
static int create_new_matrix(matrix *A, matrix *B, const char *file, int * descriptor, matrix *new_matrix);
static inline ssize_t rows(matrix *M);
static inline ssize_t cols(matrix *M);
static void *thread_multiply(void *spec);
//main function
int main(int argc, char *argv[])
{
//set global variables
program_name = argv[0];
page_size = sysconf(_SC_PAGE_SIZE);
show_progress = isatty(STDERR_FILENO);
const char *const A_file = argv[2], *const B_file = argv[4], *const C_file = argv[7],
*const A_spec = argv[1], *const B_spec = argv[3], *const vector_count = argv[5],
*const thread_count = argv[6];
matrix A, B, C;
int A_fd, B_fd, C_fd;
unsigned int P1, P2 = 0, threads;
//check argument count
if (argc != 8)
{
fprintf(stderr, "%s (T)[A rows]x[A cols] [A file] (T)[B rows]x[B cols] [B file] [# in vectors](,[# out vectors]) [# threads] [C file]\n", argv[0]);
fprintf(stderr, "%s: example: %s 1024x4096 A.dat 4096x2048 B.dat 256 4 C.dat\n", argv[0], argv[0]);
fprintf(stderr, "%s: example: %s T4096x1024 A.dat 4096x2048 B.dat 256,4 4 C.dat\n", argv[0], argv[0]);
return 1;
}
//map matrix files
if (!map_matrix_file(A_spec, A_file, &A_fd, &A)) return 1;
if (!map_matrix_file(B_spec, B_file, &B_fd, &B)) return 1;
if (!create_new_matrix(&A, &B, C_file, &C_fd, &C)) return 1;
if (cols(&A) != rows(&B))
{
fprintf(stderr, "%s: inner matrix dimensions are not equal: %lu, %lu\n", program_name, cols(&A), rows(&B));
return 1;
}
//parse general parameters
char error;
if (sscanf(vector_count, "%u,%u%1c", &P1, &P2, &error) == 2 && P1 > 0);
else if (sscanf(vector_count, "%u%1c", &P1, &error) == 1 && P1 > 0);
else
{
fprintf(stderr, "%s: invalid vector count '%s'\n", program_name, vector_count);
return 1;
}
if (P2 == 0) P2 = 1;
if (sscanf(thread_count, "%u", &threads) != 1 || threads < 1)
{
fprintf(stderr, "%s: invalid vector count '%s'\n", program_name, thread_count);
return 1;
}
//display a summary of the computation
fprintf(stderr, "matrix A: %lux%lu (%s)%s\n", rows(&A), cols(&A), A_file, A.trans? "^T" : "");
fprintf(stderr, "matrix B: %lux%lu (%s)%s\n", rows(&B), cols(&B), B_file, B.trans? "^T" : "");
fprintf(stderr, "matrix C: %lux%lu (%s)%s\n", rows(&C), cols(&C), C_file, C.trans? "^T" : "");
fprintf(stderr, "threads: %u\n", threads);
fprintf(stderr, "input cache: %u vectors (%.2f%% cached) [A: %luB; B: %luB]\n", P1,
100. * (double) (P1 * (rows(&A) + cols(&B)) * threads) /
(double) (rows(&A) * cols(&A) + rows(&B) * cols(&B)),
P1 * rows(&A) * sizeof(m_value) * threads, P1 * cols(&B) * sizeof(m_value) * threads);
fprintf(stderr, "output cache: %u vectors (%.2f%% cached) [C: %luB]\n", P2,
100. * (double) (P2 * cols(&C) * threads) / (double) (rows(&C) * cols(&C)),
P2 * cols(&C) * sizeof(m_value) * threads);
fprintf(stderr, "total cached: %.2f%% [all: %luB]\n",
100. * (double) ((P1 * (rows(&A) + cols(&B)) + P2 * cols(&C)) * threads) /
(double) (rows(&A) * cols(&A) + rows(&B) * cols(&B) + rows(&C) * cols(&C)),
((P1 * (rows(&A) + cols(&B)) + P2 * cols(&C)) * threads) * sizeof(m_value));
fprintf(stderr, "read cycles / write passes: %lu\n",
(rows(&B) / P1) + ((rows(&B) % P1)? 1 : 0));
fprintf(stderr, "write cycles per pass: %lu\n",
(cols(&C) / P2) + ((cols(&C) % P2)? 1 : 0));
//start the processing threads
pthread_barrier_t barrier;
pthread_t *all_threads = NULL;
if (threads > 1)
//multiple threads
{
if (pthread_barrier_init(&barrier, NULL, threads) != 0)
{
fprintf(stderr, "%s: could not create thread barrier: %s\n", program_name, strerror(errno));
return 1;
}
all_threads = malloc(sizeof(pthread_t) * threads);
assert(all_threads);
}
for (int I = 0; I < (signed) threads - 1; I++)
{
thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
assert(new_thread);
*new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, I + 1, &barrier };
if (pthread_create(all_threads + I, NULL, &thread_multiply, new_thread) != 0)
{
fprintf(stderr, "%s: could not start thread: %s\n", program_name, strerror(errno));
return 1;
}
}
//(first thread is always this thread)
thread_spec *new_thread = (thread_spec*) malloc(sizeof(thread_spec));
assert(new_thread);
*new_thread = (thread_spec) { &A, &B, &C, P1, P2, threads, 0, (threads > 1)? &barrier : NULL };
thread_multiply(new_thread);
if (threads > 1)
{
for (int I = 0; I < (signed) threads - 1; I++)
pthread_join(all_threads[I], NULL);
pthread_barrier_destroy(&barrier);
free(all_threads);
}
//clean up
munmap(A.data, A.map_size);
munmap(B.data, B.map_size);
munmap(C.data, C.map_size);
close(A_fd);
close(B_fd);
close(C_fd);
return 0;
}
static int map_matrix_file(const char *specs, const char *file, int *descriptor, matrix *new_matrix)
{
//map an existing matrix file
//parse dimensions
char error;
if (sscanf(specs, "T%lux%lu%1c", &new_matrix->rows, &new_matrix->cols, &error) == 2) new_matrix->trans = 1;
else if (sscanf(specs, "%lux%lu%1c", &new_matrix->rows, &new_matrix->cols, &error) == 2) new_matrix->trans = 0;
else
{
fprintf(stderr, "%s: invalid matrix specification '%s' (for '%s')\n", program_name, specs, file);
return 0;
}
//open the file
*descriptor = open(file, O_RDONLY);
if (*descriptor < 0)
{
fprintf(stderr, "%s: could not open matrix file '%s': %s\n", program_name, file, strerror(errno));
return 0;
}
//check the file size
struct stat matrix_stat;
if (fstat(*descriptor, &matrix_stat) != 0)
fprintf(stderr, "%s: unable to verify size of '%s' (ignoring): %s\n", program_name, file, strerror(errno));
else if (matrix_stat.st_size < new_matrix->rows * new_matrix->cols * sizeof(m_value))
{
fprintf(stderr, "%s: '%s' is too small for a %lux%lu matrix\n", program_name, file, new_matrix->rows, new_matrix->cols);
return 0;
}
else if (matrix_stat.st_size != new_matrix->rows * new_matrix->cols * sizeof(m_value))
fprintf(stderr, "%s: '%s' is too large for a %lux%lu matrix (ignoring)\n", program_name, file, new_matrix->rows, new_matrix->cols);
//map the matrix
new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;
new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ, MAP_PRIVATE, *descriptor, 0);
if (new_matrix->data < 0)
{
fprintf(stderr, "%s: could not map matrix file '%s': %s\n", program_name, file, strerror(errno));
close(*descriptor);
return 0;
}
pthread_mutex_init(&new_matrix->mutex, NULL);
return 1;
}
static int create_new_matrix(matrix *A, matrix *B, const char *file, int *descriptor,
matrix *new_matrix)
{
//create a new matrix file
static const m_value zero = 0;
//open the file
*descriptor = open(file, O_RDWR | O_TRUNC | O_CREAT, 0666);
if (*descriptor < 0)
{
fprintf(stderr, "%s: could not open matrix file '%s': %s\n", program_name, file, strerror(errno));
return 0;
}
//determine the matrix and file sizes
new_matrix->rows = rows(A);
new_matrix->cols = cols(B);
new_matrix->trans = 0;
ssize_t new_size = (new_matrix->rows * new_matrix->cols - 1) * sizeof(m_value);
//expand the file
if (lseek(*descriptor, new_size, SEEK_SET) != new_size)
{
fprintf(stderr, "%s: could not expand matrix file '%s': %s\n", program_name, file, strerror(errno));
close(*descriptor);
return 0;
}
if (write(*descriptor, &zero, sizeof zero) != sizeof zero)
{
fprintf(stderr, "%s: could not expand matrix file '%s': %s\n", program_name, file, strerror(errno));
close(*descriptor);
return 0;
}
lseek(*descriptor, 0, SEEK_SET);
//map the matrix
new_matrix->map_size = new_matrix->rows * new_matrix->cols * sizeof(m_value);
if (new_matrix->map_size % page_size) new_matrix->map_size = ( (new_matrix->map_size / page_size) + 1 ) * page_size;
new_matrix->data = (m_value*) mmap(NULL, new_matrix->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, *descriptor, 0);
if (new_matrix->data < 0)
{
fprintf(stderr, "%s: could not map matrix file '%s': %s\n", program_name, file, strerror(errno));
close(*descriptor);
return 0;
}
pthread_mutex_init(&new_matrix->mutex, NULL);
return 1;
}
//functions to advise kernel of future map access
static inline int advise_rows(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_cols(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_rows_done(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int advise_cols_done(matrix *M, ssize_t row, ssize_t col, ssize_t size);
static inline int lock(matrix *M);
static inline int unlock(matrix *M);
static inline m_value *element(matrix *M, ssize_t row, ssize_t col);
static void multiply_matrices(matrix *A, matrix *B, matrix *C, ssize_t P1,
ssize_t P2, unsigned int threads, unsigned int offset, pthread_barrier_t *barrier)
{
//mutex to control allocation access
static pthread_mutex_t malloc_mutex = PTHREAD_MUTEX_INITIALIZER;
static int mlock_failed = 0;
static ssize_t total_done = 0;
//mutex/condition for synchronizing write operations for threads
static pthread_mutex_t first_pass_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t first_pass_cond = PTHREAD_COND_INITIALIZER;
static ssize_t first_pass = 0;
//copy dimensions and check validity of cache values
ssize_t A_rows = rows(A), A_cols = cols(A),
B_rows = rows(B), B_cols = cols(B),
C_rows = rows(C), C_cols = cols(C);
if (P1 > B_rows) P1 = B_rows;
if (P1 == 0) P1 = 1;
if (P2 > C_rows) P2 = C_rows;
if (P2 == 0) P2 = 1;
ssize_t P_i, P_j;
int barrier_outcome;
int barrier_iterations = (int) B_rows / (P1 * threads) - ((B_rows % (P1 * threads))? 0 : 1);
ssize_t A_cache = A_rows * P1, B_cache = B_cols * P1, C_cache = C_cols * P2;
//allocate data caches
if (pthread_mutex_lock(&malloc_mutex) != 0) abort();
m_value *A_temp = NULL;
posix_memalign((void**) &A_temp, page_size, A_cache * sizeof(m_value));
assert(A_temp);
m_value *B_temp = NULL;
posix_memalign((void**) &B_temp, page_size, B_cache * sizeof(m_value));
assert(B_temp);
m_value *C_temp = NULL;
posix_memalign((void**) &C_temp, page_size, C_cache * sizeof(m_value));
assert(C_temp);
//attempt to lock the cached data in memory (ignore failure)
int mlocked = !mlock_failed;
if (mlocked) mlocked = (mlock(A_temp, A_cache) == 0);
if (mlocked) mlocked = (mlock(B_temp, B_cache) == 0);
if (mlocked) mlocked = (mlock(C_temp, C_cache) == 0);
if (!mlocked && !mlock_failed)
{
fprintf(stderr, "%s: unable to lock all caches in memory (ignoring): %s\n", program_name, strerror(errno));
mlock_failed = 1;
}
if (pthread_mutex_unlock(&malloc_mutex) != 0) abort();
//iterate through the input data
for (int I = P1 * offset; I < B_rows; I += P1 * threads)
{
P_i = (P1 > B_rows - I)? (B_rows - I) : P1;
//load the next sections of data
lock(A);
advise_cols(A, 0, I, P_i * A_rows);
if (A->trans)
{
for (int J = 0; J < P_i; J++)
for (int K = 0; K < A_rows; K++)
A_temp[A_rows * J + K] = *element(A, K, I + J);
}
else
{
for (int K = 0; K < A_rows; K++)
for (int J = 0; J < P_i; J++)
A_temp[A_rows * J + K] = *element(A, K, I + J);
}
advise_cols_done(A, 0, I, P_i * A_rows);
unlock(A);
lock(B);
advise_rows(B, I, 0, P_i * B_cols);
if (B->trans)
{
for (int K = 0; K < B_cols; K++)
for (int J = 0; J < P_i; J++)
B_temp[B_cols * J + K] = *element(B, I + J, K);
}
else
{
for (int J = 0; J < P_i; J++)
for (int K = 0; K < B_cols; K++)
B_temp[B_cols * J + K] = *element(B, I + J, K);
}
advise_rows_done(B, I, 0, P_i * B_cols);
unlock(B);
//compute partial sums for output
for (int J = 0; J < C_rows; J += P2)
{
P_j = (P2 > C_rows - J)? (C_rows - J) : P2;
//compute a partial sum for the current output rows
if (P_j == 1)
{
for (int L = 0; L < C_cols; L++)
C_temp[L] = A_temp[J] * B_temp[L];
for (int K = 1; K < P_i; K++)
for (int L = 0; L < C_cols; L++)
C_temp[L] += A_temp[A_rows * K + J] * B_temp[B_cols * K + L];
}
else
{
#ifndef USE_BLAS
for (int U = 0; U < P_j; U++)
for (int L = 0; L < C_cols; L++)
C_temp[C_cols * U + L] = A_temp[J + U] * B_temp[L];
for (int U = 0; U < P_j; U++)
for (int K = 1; K < P_i; K++)
for (int L = 0; L < C_cols; L++)
C_temp[C_cols * U + L] += A_temp[A_rows * K + J + U] * B_temp[B_cols * K + L];
#else
cblas_dgemm(CblasRowMajor, CblasTrans, CblasNoTrans, P_j, C_cols, P_i, 1.0,
A_temp + J, A_rows, B_temp, B_cols, 0.0, C_temp, C_cols);
#endif
}
//wait for the first write cycle for the current section (except for first thread)
if (offset)
{
if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();
if (J + P_j <= first_pass)
{
if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
}
else
{
if (pthread_cond_wait(&first_pass_cond, &first_pass_mutex) != 0) abort();
if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
}
}
//add the partial sum to the current output rows
lock(C);
advise_rows(C, J, 0, P_j * C_cols);
if (P_j == 1)
{
if (I == 0)
for (int L = 0; L < C_cols; L++)
*element(C, J, L) = C_temp[L];
else
for (int L = 0; L < C_cols; L++)
*element(C, J, L) += C_temp[L];
}
else if (C->trans)
{
if (I == 0)
for (int L = 0; L < C_cols; L++)
for (int U = 0; U < P_j; U++)
*element(C, J + U, L) = C_temp[C_cols * U + L];
else
for (int L = 0; L < C_cols; L++)
for (int U = 0; U < P_j; U++)
*element(C, J + U, L) += C_temp[C_cols * U + L];
}
else
{
if (I == 0)
for (int U = 0; U < P_j; U++)
for (int L = 0; L < C_cols; L++)
*element(C, J + U, L) = C_temp[C_cols * U + L];
else
for (int U = 0; U < P_j; U++)
for (int L = 0; L < C_cols; L++)
*element(C, J + U, L) += C_temp[C_cols * U + L];
}
total_done += P_j * P_i;
if (show_progress)
fprintf(stderr, "%.2f%%\t\r", 100. * (double) total_done / (double) (B_rows * C_rows));
//indicate completion of the first pass
if (!offset && threads > 1)
{
if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();
first_pass = J + P_j;
if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
if (pthread_cond_broadcast(&first_pass_cond) != 0) abort();
}
advise_rows_done(C, J, 0, P_j * C_cols);
unlock(C);
}
//block until all threads finish the iteration
if (barrier && barrier_iterations-- > 0)
{
barrier_outcome = barrier? pthread_barrier_wait(barrier) : 0;
if (barrier_outcome && barrier_outcome != PTHREAD_BARRIER_SERIAL_THREAD) abort();
}
//reset the "pass" counter for the next iteration
if (!offset && threads > 1)
{
if (pthread_mutex_lock(&first_pass_mutex) != 0) abort();
if (I + P1 * threads < B_cols) first_pass = 0;
if (pthread_mutex_unlock(&first_pass_mutex) != 0) abort();
if (pthread_cond_broadcast(&first_pass_cond) != 0) abort();
}
}
//clean up
munlock(A_temp, A_cache);
munlock(B_temp, B_cache);
munlock(C_temp, C_cache);
free(A_temp);
free(B_temp);
free(C_temp);
}
static void *thread_multiply(void *spec)
{
thread_spec *const self_specs = (thread_spec*) spec;
int barrier_outcome = self_specs->barrier? pthread_barrier_wait(self_specs->barrier) : 0;
if (barrier_outcome && barrier_outcome != PTHREAD_BARRIER_SERIAL_THREAD) abort();
multiply_matrices(self_specs->A, self_specs->B, self_specs->C, self_specs->P1,
self_specs->P2, self_specs->threads, self_specs->offset, self_specs->barrier);
free(spec);
return NULL;
}
//helper functions
static inline ssize_t rows(matrix *M)
{ if (M->trans) return M->cols; else return M->rows; }
static inline ssize_t cols(matrix *M)
{ if (M->trans) return M->rows; else return M->cols; }
static inline int lock(matrix *M)
{ return pthread_mutex_lock(&M->mutex) == 0; }
static inline int unlock(matrix *M)
{ return pthread_mutex_unlock(&M->mutex) == 0; }
static inline m_value *element(matrix *M, ssize_t row, ssize_t col)
{
if (M->trans) return M->data + (col * M->cols /*<-- not a mistake*/ + row);
else return M->data + (col + row * M->cols);
}
static inline ssize_t page_align_low(ssize_t size)
{ return (size / page_size) * page_size; }
static inline ssize_t page_align_high(ssize_t size)
{ if (size % page_size) return ((size / page_size) + 1) * page_size; else return size; }
static inline int advise(matrix *M, ssize_t x, ssize_t y, ssize_t size, int all,
int advice1, int advice2)
{
if (all)
return madvise(M->data, M->map_size, MADV_RANDOM) == 0;
else
return madvise(M->data + page_align_low(y + x * M->cols),
page_align_high(size + y + x * M->cols), MADV_SEQUENTIAL) == 0;
}
static inline int advise_rows(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, row, col, size, M->trans, MADV_RANDOM, MADV_SEQUENTIAL); }
static inline int advise_cols(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, col, row, size, !M->trans, MADV_RANDOM, MADV_SEQUENTIAL); }
static inline int advise_rows_done(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, row, col, size, M->trans, MADV_NORMAL, MADV_NORMAL); }
static inline int advise_cols_done(matrix *M, ssize_t row, ssize_t col, ssize_t size)
{ return advise(M, col, row, size, !M->trans, MADV_NORMAL, MADV_NORMAL); }
Build with gcc -Wall -O2 --std=c99 matrix-times.c -o matrix-times -lpthread -lopenblas.
Kevin Barry
openblas uses a server with worker threads, meaning that you don't have control over the number of threads
Confirmed. Not all BLAS implementations do, fortunately; for example the one GSL ships with (libgslcblas) uses somewhat more total CPU time, but it does not use multiple threads. Hmm.. I should check current versions of ATLAS.
Using GSL_CBLAS, 2048x2048 matrix-matrix multiplication takes about 22 seconds CPU time, and I used that as a basis for my original calculations. I only just noticed it does 1024x1024 in just 3 seconds, which means it has a bug/feature: it is very slow on larger matrices. OpenBLAS seems to use two cores, totaling about two seconds CPU time for 2048x2048 matrix-matrix multiplication.
Assuming 8 cores of comparable CPU power (a mid-line server or node) to multiply two 10240x10240 matrices, the memory use would be about 1GB total (4*32=128MB per thread), bandwidth should be about four 2048x2048 matrices multiplied per second, and maximum I/O needed for 384 MB/s (read) + 128 MB/s (write).
If each thread were to compute one target block matrix, summing the products of a row of left block matrices and a column of right block matrices, the I/O would be cut down to 256MB/s (read) + a bit for writing at end. In theory, two 10240x10240 matrices could be multiplied in 31 seconds wall time (the target matrix being kept in memory, the summation would take very little time, a couple of seconds total overall) -- but the I/O at 200MB/s read-only would stretch that to about 40 seconds wall time. The total memory use would be about 160M per thread, or 1.2G for 8 threads total. However, you'd have to split the entire source matrices first, which at 100MB/s read + 100MB/s write would take about 17 seconds. Add another say 8 seconds for the write afterwards, and the total computation time would be about 70 seconds wall time. My CPU is roughly on par (0.5x - 2.0x) the speed of yours (I'm too lazy to run full 10240x10240 test right now; I apologize), so roughly speaking, this approach seems to be an order of magnitude faster. But keep in mind the much bigger memory use, the numerical stability issues with Strassen algorithm, and that I haven't verified this in real life yet!
(In my original post, I assumed the 22 seconds per 2048x2048 matrix-matrix multiplication, based on GSL_CBLAS. That would give ample time to do the I/O, no hurry. It seems that one tenth of that, or two seconds, should be attainable; that makes I/O again the bottleneck.)
It is rather frustrating to see OpenBLAS do the work efficiently for large enough matrices, but using multiple threads, while GSL_CBLAS has some kind of a brainfart with this large matrices -- but it'd use only one thread.
I don't really need matrix operations for anything myself, I just find it relaxing to work on the gritty details. Trying to come up with an efficient SSE2 implementation that would be easy to interface to. I've already decided on demanding strict alignment (each row of each matrix must start at a 32 byte boundary) and size (each row must be a multiple of four doubles -- thus also naturally aligning the all rows), allowing me to switch to AVX or its successor when I next upgrade my CPU.
(In my original post, I assumed the 22 seconds per 2048x2048 matrix-matrix multiplication, based on GSL_CBLAS. That would give ample time to do the I/O, no hurry. It seems that one tenth of that, or two seconds, should be attainable; that makes I/O again the bottleneck.)
For some reason, using GSL with libgslcblas in my code runs about the same speed as it does with the O(N^3) loop (maybe slightly worse.) You can also link to libopenblas while using the GSL interface, which is convenient in some senses, but it doesn't allow you to easily choose between the two at run-time.
Anyway, at times I do need to multiply huge matrices, so this might be something that's useful for me in the future.
Kevin Barry
LinuxQuestions.org is looking for people interested in writing
Editorials, Articles, Reviews, and more. If you'd like to contribute
content, let us know.