LinuxQuestions.org
Download your favorite Linux distribution at LQ ISO.
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 08-10-2011, 11:45 PM   #1
golden_boy615
Member
 
Registered: Dec 2008
Distribution: Ubuntu Fedora
Posts: 445

Rep: Reputation: 18
how to have fifo thread mutex in C


hello
I want to write a multi thread program that its thread have an access to a shared resource I want to have FIFO mutex on this resource I mean each thread that blocked on the mutex first then unblocked first.is this possible in threads .
 
Old 08-12-2011, 12:48 PM   #2
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
I don't think it's directly possible with pthread mutexes; however, you can fake it with some pthread barriers, conditions, and mutexes. I was curious about the problem so I came up with a working solution.
Code:
#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>


#define QUEUE_SIZE    32
#define TOTAL_THREADS 8
#define ITERATIONS    8


typedef pthread_barrier_t *waiting_thread;


//struct to simulate fifo-based mutex-------------------------------------------

typedef struct
{
	int               init;
	waiting_thread    threads[QUEUE_SIZE];
	pthread_mutex_t   mutex, change_mutex;
	pthread_cond_t    queue_change;
	pthread_barrier_t queuing_sync;
	unsigned int      front, count, max;
} thread_queue;

static thread_queue main_queue = { 0 };

//END struct to simulate fifo-based mutex---------------------------------------


//functions for dealing with queue----------------------------------------------

//initialize the queue
static int init_main_queue()
{
	if (main_queue.init) return 1;
	memset(main_queue.threads, 0, sizeof(waiting_thread) * QUEUE_SIZE);
	if (pthread_mutex_init(&main_queue.mutex, NULL) != 0)             return 0;
	if (pthread_mutex_init(&main_queue.change_mutex, NULL) != 0)      return 0;
	if (pthread_cond_init(&main_queue.queue_change, NULL) != 0)       return 0;
	if (pthread_barrier_init(&main_queue.queuing_sync, NULL, 2) != 0) return 0;
	main_queue.front = 0;
	main_queue.count = 0;
	main_queue.max   = QUEUE_SIZE;
	main_queue.init  = 1;
	return 1;
}


//clean up the queue
static int fini_main_queue()
{
	//(there should probably be some sort of 'main_queue.threads' cleanup)

	if (!main_queue.init) return 1;
	pthread_mutex_destroy(&main_queue.mutex);
	pthread_mutex_destroy(&main_queue.change_mutex);
	pthread_cond_destroy(&main_queue.queue_change);
	pthread_barrier_destroy(&main_queue.queuing_sync);
	return 1;
}


//push a thread onto the queue (from the thread itself)
static int push_waiting_thread(waiting_thread tThread)
{
	//obtain a queue lock
	if (!main_queue.init || pthread_mutex_lock(&main_queue.mutex) != 0) return 0;
	if (main_queue.count == main_queue.max)
	{
	pthread_mutex_unlock(&main_queue.mutex);
	return 0;
	}

	//notify the queuing thread of a change; the lock on the mutex will
	//prevent it from continuing until the change is made
	pthread_cond_broadcast(&main_queue.queue_change);

	//add the thread to the queue
	main_queue.threads[(main_queue.front + main_queue.count++) % main_queue.max] = tThread;
	pthread_mutex_unlock(&main_queue.mutex);

	return 1;
}


//wait for access to the "resource" (from the thread needing it)
static int wait_for_resource(waiting_thread tThread, int iIdentity)
{
	//add the thread to the queue and wait for its number to come up
	if (!push_waiting_thread(tThread)) return 0;
	fprintf(stderr, "thread %i waiting\n", iIdentity);
	int outcome = pthread_barrier_wait(tThread);
	return !outcome || outcome == PTHREAD_BARRIER_SERIAL_THREAD;
}


//"unlock" the simulated mutex when finished with it
static int finish_with_resource()
{
	if (!main_queue.init) return 0;
	int outcome = pthread_barrier_wait(&main_queue.queuing_sync);
	return !outcome || outcome == PTHREAD_BARRIER_SERIAL_THREAD;
}


//call the next thread in the queue (from the queuing thread)
static int pop_waiting_thread()
{
	int outcome;

	//obtain a queue lock
	if (!main_queue.init || pthread_mutex_lock(&main_queue.mutex) != 0) return 0;

	//if the queue is empty, block until a thread is added
	if (!main_queue.count)
	{
	pthread_mutex_unlock(&main_queue.mutex);
	//(there is a chance 'pthread_cond_broadcast' will happen between the
	//line above and the line below; therefore, a 'nanosleep' might be in
	//order here. a 'pthread_barrier_wait' wouldn't be any easier.)
	pthread_mutex_lock(&main_queue.change_mutex);
	pthread_cond_wait(&main_queue.queue_change, &main_queue.change_mutex);
	pthread_mutex_unlock(&main_queue.change_mutex);
	return 0;
	}

	waiting_thread next = main_queue.threads[main_queue.front++];
	--main_queue.count;
	main_queue.front %= main_queue.max;
	
	pthread_mutex_unlock(&main_queue.mutex);

	//continue the thread
	//(this will cause a segfault if 'next' has been destroyed already)
	outcome = pthread_barrier_wait(next);
	if (outcome && outcome != PTHREAD_BARRIER_SERIAL_THREAD) return 0;

	//wait for the thread to finish with the resource
	outcome = pthread_barrier_wait(&main_queue.queuing_sync);
	return !outcome || outcome == PTHREAD_BARRIER_SERIAL_THREAD;
}

//END functions for dealing with queue------------------------------------------


//threads-----------------------------------------------------------------------

//queuing thread
static void *queuing_thread(void *iIgnore)
{
	struct timespec delay = { 0, 100 * 1000 * 1000 };

	if (!main_queue.init) return NULL;
	fprintf(stderr, "queuing thread started\n");
	while (1)
	{
	pop_waiting_thread();
	nanosleep(&delay, NULL);
	}
	fprintf(stderr, "queuing thread finished\n");
	return NULL;
}


//worker threads
static void *worker_thread(void *iIdentity)
{
	int identity = *(int*) iIdentity, iterations = ITERATIONS;
	
	fprintf(stderr, "thread %i started\n", identity);

	pthread_barrier_t barrier;

	if (pthread_barrier_init(&barrier, NULL, 2) != 0) return NULL;

	while (iterations-- > 0 && wait_for_resource(&barrier, identity))
	{
	fprintf(stderr, "thread %i has the resource (%i)\n", identity, iterations);
	finish_with_resource();
	fprintf(stderr, "thread %i is done with the resource (%i)\n", identity, iterations);
	}

	pthread_barrier_destroy(&barrier);

	fprintf(stderr, "thread %i finished\n", identity);

	return NULL;
}

//END threads-------------------------------------------------------------------


int main()
{
	pthread_t queueing;
	pthread_t workers[TOTAL_THREADS];

	init_main_queue();

	int I;
	for (I = 0; I < TOTAL_THREADS; I++)
	{
	//create threads with a delay so they queue in order to start with
	pthread_create(workers + I, NULL, &worker_thread, &I);
	sleep(1);
	}

	pthread_create(&queueing, NULL, &queuing_thread, NULL);

	for (I = 0; I < TOTAL_THREADS; I++)
	pthread_join(workers[I], NULL);

	pthread_cancel(queueing);
	pthread_join(queueing, NULL);

	fini_main_queue();
}
As far as I know, the red comments are the only real problems with it, aside from the fact that it requires one pthread_barrier_t per worker thread. This would be an acceptable starting-point to a solution for me, but it might be a bit elaborate for your needs.
Kevin Barry

Last edited by ta0kira; 08-12-2011 at 06:05 PM. Reason: added mutex lock before pthread_cond_wait
 
Old 08-12-2011, 01:03 PM   #3
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
I believe futexes (mutexes provided by the Linux kernel) work that way by default -- the futex is granted in the same order it was requested.

You can also build a FIFO mutex using pthreads, if you really need such a type. (Why would you, really?) You'll need a normal mutex, a condition variable (to wake up all waiters via broadcast), and two counters. In C:
Code:
typedef struct fifo_mutex        fifo_mutex_t;
struct fifo_mutex {
        pthread_mutex_t          mutex;
        pthread_cond_t           cond;
        int                      worker;
        int                      waiter;
};
#define FIFO_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, \
                                 PTHREAD_COND_INITIALIZER, \
                                 1, 0 }
To lock the FIFO mutex:
  1. First lock the mutex in the structure. This may block.
  2. Increase the waiter counter.
    Save the waiter counter value for later. If the mutex is contended, the saved value will tell when its turn for this thread to own the mutex.
  3. If the worker counter matches the waiter counter, this thread has locked the FIFO mutex successfully.
    Otherwise:
  4. The thread will wait on the condition variable, releasing the mutex atomically. (The pthread_cond_wait() function does exactly this.)
    When woken, the thread will first obtain the mutex (pthread_cond_wait() does this automatically, too). If the worker counter matches the saved counter value, this thread has grabbed the mutex successfully.
    Otherwise, repeat this step until successful.
To release the FIFO mutex:
  1. Increase the worker counter.
  2. Broadcast on the condition variable, waking up all threads waiting on the condition variable. (The pthread_cond_broadcast() function does exactly this.)
  3. Release the mutex in the structure.
The above is best written into helper functions, say fifo_mutex_lock() and fifo_mutex_unlock(), that can be used similarly to pthread_mutex_lock and pthread_mutex_unlock . (fifo_mutex_trylock is trivial to implement should you need it.)

The above is not susceptible to thread starvation, although the "cost" of acquiring and releasing the mutex will increase as the number of threads blocking on it increases. (If heavily contended, it does suffer from the "thundering herd" problem, since every thread blocking on it will have to be briefly woken up to find the next thread, whenever the mutex changes state.) If uncontended, the above is just two additions "heavier" than a normal mutex.

If a thread is cancelled while blocking on the mutex (waiting on the condition variable), the mutex will end up deadlocking. You can avoid that by changing the thread cancelability state temporarily, before starting to wait on the condition variable, and restoring the state when the mutex has been grabbed.

There are of course totally different approaches, for example a queue or chain of threads, where each thread releasing the mutex will wake up only the next waiting thread. These tend to be much more complex to code correctly than the above, however.

Last edited by Nominal Animal; 08-12-2011 at 01:27 PM.
 
Old 08-12-2011, 02:16 PM   #4
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
I believe futexes (mutexes provided by the Linux kernel) work that way by default -- the futex is granted in the same order it was requested.
That doesn't sound like a simple solution to this problem. From man 2 futex:
Quote:
Callers of this function are expected to adhere to the semantics as set out in futex(7). As these semantics involve writing non-portable assembly instructions, this in turn probably means that most users will in fact be library authors and not general application developers.
Kevin Barry
 
Old 08-12-2011, 04:58 PM   #5
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
@ta0kira: No, but glibc pthread library uses futexes by default on Linux. But see the end of this post for a serious caveat.

Here's the implementation of fifo_mutex_t I was thinking about. Save this as fifo-mutex.h for example:
Code:
#ifndef   FIFO_MUTEX_H
#define   FIFO_MUTEX_H
#include <pthread.h>
#include <errno.h>

/* First in, first out mutex type and associated inline functions.
 * Note that the type is not async signal safe, and therefore should
 * not be used in signal handlers; this is due to condition variables
 * used in the structure internally.
*/

typedef struct fifo_mutex	fifo_mutex_t;
struct fifo_mutex {
	pthread_mutex_t		mutex;
	pthread_cond_t		cond;
	unsigned int		worker;
	unsigned int		waiter;
};

#define FIFO_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, \
                                 PTHREAD_COND_INITIALIZER, \
                                 1U, 0U }

static inline int fifo_mutex_init(fifo_mutex_t *const fifo,
                                   pthread_mutexattr_t *const mutexattr,
                                   pthread_condattr_t *const condattr)
{
	if (fifo) {

		pthread_mutex_init(&(fifo->mutex), mutexattr);
		pthread_cond_init(&(fifo->cond), condattr);

		fifo->worker = 1U;
		fifo->waiter = 0U;

		return 0;
	}
	return EINVAL;
}

static inline int fifo_mutex_unlock(fifo_mutex_t *const fifo)
{
	int result;

	if (fifo) {
		fifo->worker++;

		result = pthread_cond_broadcast(&(fifo->cond));
		if (result)
			return result;

		result = pthread_mutex_unlock(&(fifo->mutex));
		if (result)
			return result;

		return 0;
	}
	return EINVAL;
}

static inline int fifo_mutex_lock(fifo_mutex_t *const fifo)
{
	int result, waiter;

	if (fifo) {

                /* Atomic preincrement */
                waiter = __sync_add_and_fetch((int *)&(fifo->waiter), (int)1);

                /* Obtain the mutex */
		result = pthread_mutex_lock(&(fifo->mutex));
		if (result)
			return result;

                /* This thread? */
		if (waiter == fifo->worker)
			return 0;

		while (waiter != fifo->worker)
			pthread_cond_wait(&(fifo->cond), &(fifo->mutex));

		return 0;
	}
	return EINVAL;
}

#endif /* FIFO_MUTEX_H */
Here is an example program you can use to test the type.
Code:
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>

#ifdef USE_PTHREAD_MUTEX_INSTEAD

#define MUTEX_TYPE "pthread_mutex_t"
#define fifo_mutex_t            pthread_mutex_t
#define fifo_mutex_lock(p)      pthread_mutex_lock(p)
#define fifo_mutex_unlock(p)    pthread_mutex_unlock(p)
#define FIFO_MUTEX_INITIALIZER  PTHREAD_MUTEX_INITIALIZER

#else

#define MUTEX_TYPE "fifo_mutex_t"
#include "fifo-mutex.h"

#endif

#ifndef   MAX_THREADS
#define   MAX_THREADS 65536
#endif

#ifndef   LOOPS
#define   LOOPS 10
#endif

fifo_mutex_t	lock = FIFO_MUTEX_INITIALIZER;
volatile int	working = 0;

void *worker(void *payload)
{
	long const id    = (long)payload;
        long       loop;

	printf("Thread %ld waiting on the %s lock.\n", id, MUTEX_TYPE);
	fflush(stdout);

	working = 1;

	fifo_mutex_lock(&lock);

	for (loop = 1L; loop <= (long)LOOPS; loop++) {


		printf("Thread %ld is holding the %s lock; loop %ld of %ld.\n",
		       id, MUTEX_TYPE, loop, (long)LOOPS);
		fflush(stdout);

		fifo_mutex_unlock(&lock);
		fifo_mutex_lock(&lock);

	}

	printf("Thread %ld is holding the %s lock for the final time.\n",
	       id, MUTEX_TYPE);
	fflush(stdout);

	fifo_mutex_unlock(&lock);

	return 0;
}

int main(int argc, char *argv[])
{
	pthread_t	thread[MAX_THREADS];
	long		threads, asked, i;
	int		result;
	char		dummy;

	if (argc != 2) {
		fprintf(stderr, "\n"
		                "Usage: %s threads\n"
		                "\n"
		                "This is an example program for the %s type.\n"
		                "\n"
		              , argv[0], MUTEX_TYPE);
		return 2;
	}

	if (sscanf(argv[1], "%ld %c", &threads, &dummy) != 1) {
		fprintf(stderr, "%s: Invalid number of threads.\n", argv[1]);
		return 1;
	}
	if (threads < 1) {
		fprintf(stderr, "%s: Too few threads.\n", argv[1]);
		return 1;
	}
	if (threads > MAX_THREADS) {
		fprintf(stderr, "%s: Too many threads. Maximum is %d.\n", argv[1], MAX_THREADS);
		return 1;
	}

	result = fifo_mutex_lock(&lock);
	if (result) {
		char const *const error = strerror(result);
		fprintf(stderr, "Cannot initialize %s lock: %s [%d].\n", MUTEX_TYPE, error, result);
		return 1;
	}

	asked = threads;
	threads = 0L;
	while (threads < asked) {

		/* Clear the working flag, so that we serialize the thread creation. */
		working = 0;

		result = pthread_create(&(thread[threads]), NULL, worker, (void *)(1L + (long)threads));
		if (result) {
			char const *const error = strerror(result);
			fflush(stdout);
			fprintf(stderr, "Warning: Could not create worker thread %ld: %s [%d].\n", 1L + threads, error, result);
			fflush(stderr);
			break;
		}

		/* Wait until the thread has set the working flag before continuing. */
		while (!working)
			/* pthread_yield() */
			;

		threads++;
	}

	if (threads < 1) {
		fprintf(stderr, "No worker threads.\n");
		return 1;
	}

	printf("%ld worker threads created.\n", threads);
	printf("Main thread is releasing the %s lock:\n", MUTEX_TYPE);
	fflush(stdout);

	result = fifo_mutex_unlock(&lock);
	if (result) {
		char const *const error = strerror(result);
		fprintf(stderr, "Error releasing %s lock: %s [%d].\n", MUTEX_TYPE, error, result);
	}

	for (i = 0; i < threads; i++) {
		result = pthread_join(thread[i], NULL);
		if (result) {
			char const *const error = strerror(result);
			fprintf(stderr, "Error joining thread %ld of %ld: %s [%d].\n", i + 1L, threads, error, result);
			fflush(stderr);
		}
	}

	printf("All done.\n");
	fflush(stdout);

	return 0;
}
I use __sync_add_and_fetch() to atomically preincrement and fetch the waiter number. The built-in function is provided by most newer compilers (at least GCC and Intel CC). If it is not available, the structure needs a second internal mutex to protect the waiter counter. On architectures where preincrementing an int is atomic, you could use #define __sync_add_and_fetch(ptr, value) (*(ptr) += (value)) instead.

The volatile int working flag is used to synchronize the creation of the threads (and the first printf in the worker, so they are completed in the order the threads are created), and thus the order in which they try to lock the fifo_mutex_t lock. As you can see in the worker code, the other printfs in the worker is done in the order the threads obtain the fifo_mutex_t lock. In other words, the printfs should be reliable indicators of the locking order. (The main thread will initially keep the lock locked, to make the race start from when it releases the lock.)

Each worker thread will take the mutex LOOPS times, in a tight loop. You can see in the code that the unlock and lock are right next to each other. If the fifo_mutex_t works correctly (and the workers have enough work to do to keep at least one thread blocking on it at all times), the same order should be repeated each loop.

If you define preprocessor macro USE_PTHREAD_MUTEX_INSTEAD for example using -DUSE_PTHREAD_MUTEX_INSTEAD when compiling the test program, then the fifo_mutex_t type is replaced with pthread_mutex_t (and the fifo_ functions with the pthread_ equivalents).

On all of the machines I tried (various Linux and one Solaris 10), my fifo_mutex_t worked correctly: the fifo_mutex_t lock was handed to each thread in their calling order (i.e. FIFO, first in first out), even when unlocking and locking the fifo_mutex_t in a tight loop.

On a Solaris 10 machine, pthread_mutex_t lock was always handed in a scrambled order. This means that you cannot rely on FIFO behaviour for pthread_mutex_t on non-Linux machines at all.

On the Linux machines I tested with LOOPS=0, pthread_mutex_t lock was handed to each thread in the calling order (FIFO). However, if the same thread tries to acquire the pthread_mutex_t , i.e. LOOPS>0 in the example program above, the order gets scrambled. This means that you can rely on FIFO order for pthread_mutex_t locks on Linux only the first time a thread acquires that mutex (since the last time the mutex was free, with no threads blocking on it).

I hope you find this useful,

Last edited by Nominal Animal; 08-12-2011 at 04:59 PM.
 
1 members found this post helpful.
Old 08-12-2011, 06:13 PM   #6
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
Code:
        while (waiter != fifo->worker)
            pthread_cond_wait(&(fifo->cond), &(fifo->mutex));
You provided an interesting solution; however, only one thread can block on the pthread condition per mutex. Since you only have one mutex, you can only have one thread blocking on pthread_cond_wait at a time. That means all but one thread waiting for the futex will be eating up resources in a spinlock.

Have you looked at my solution? I'm not saying it's better, but I'd like to hear your comments. I'm sure some combination of our solutions would be ideal.
Kevin Barry
 
Old 08-12-2011, 07:52 PM   #7
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
If a thread is cancelled while blocking on the mutex (waiting on the condition variable), the mutex will end up deadlocking. You can avoid that by changing the thread cancelability state temporarily, before starting to wait on the condition variable, and restoring the state when the mutex has been grabbed.

There are of course totally different approaches, for example a queue or chain of threads, where each thread releasing the mutex will wake up only the next waiting thread. These tend to be much more complex to code correctly than the above, however.
I was inspired by your "chain" suggestion and I implemented a linked-list solution to the problem.
Code:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


#define TOTAL_THREADS 8
#define ITERATIONS    8


struct thread_cell;

struct thread_cell
{
    int canceled;
    struct thread_cell *next;
    pthread_mutex_t mutex;
    pthread_cond_t  condition;
};


struct fifo_mutex
{
    struct thread_cell *head, *tail;
    pthread_mutex_t queue_mutex, resource_mutex;
};


struct fifo_mutex global_mutex;


static int init_thread_cell(struct thread_cell *cCell)
{
    cCell->canceled = 0;
    cCell->next = NULL;
    //(return-val checks skipped for clarity)
    pthread_mutex_init(&cCell->mutex, NULL);
    pthread_cond_init(&cCell->condition, NULL);
    return 1;
}


static int fini_thread_cell(struct thread_cell *cCell)
{
    cCell->canceled = 1;
    cCell->next = NULL;
    //(return-val checks skipped for clarity)
    pthread_mutex_destroy(&cCell->mutex);
    pthread_cond_destroy(&cCell->condition);
    return 1;
}


static int init_fifo_mutex(struct fifo_mutex *mMutex)
{
    mMutex->head = mMutex->tail = NULL;
    //(return-val checks skipped for clarity)
    pthread_mutex_init(&mMutex->queue_mutex, NULL);
    pthread_mutex_init(&mMutex->resource_mutex, NULL);
    return 1;
}


static int fini_fifo_mutex(struct fifo_mutex *mMutex)
{
    pthread_mutex_lock(&mMutex->queue_mutex);

    struct thread_cell *current_cell = mMutex->head;
    while (current_cell)
    {
    struct thread_cell *old_cell = current_cell;
    current_cell = current_cell->next;
    fini_thread_cell(old_cell);
    }

    mMutex->head = mMutex->tail = NULL;
    //(return-val checks skipped for clarity)
    pthread_mutex_destroy(&mMutex->queue_mutex);
    pthread_mutex_destroy(&mMutex->resource_mutex);
    return 1;
}


typedef struct
{
    struct fifo_mutex  *mutex;
    struct thread_cell *cell;
} cleanup_specs;


static void fifo_wait_cleanup(cleanup_specs *sSpecs)
{
    //unfortunately this is a potential deadlock if this thread is the next
    //in line for obtaining a lock and the unlock process has started
    pthread_mutex_lock(&sSpecs->mutex->queue_mutex);
    sSpecs->cell->canceled = 1;
    pthread_mutex_unlock(&sSpecs->mutex->queue_mutex);
}


static int fifo_mutex_lock(struct fifo_mutex *mMutex)
{
    //(certain obvious checks skipped for clarity)

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    pthread_mutex_lock(&mMutex->queue_mutex);

    struct thread_cell *new_cell = (struct thread_cell*) malloc(sizeof(struct thread_cell));
    init_thread_cell(new_cell);

    cleanup_specs cancel_specs = { mMutex, new_cell };
    pthread_cleanup_push((void(*)(void*)) &fifo_wait_cleanup, (void*) &cancel_specs);

    int first_cell = mMutex->tail == NULL;

    if (!first_cell) mMutex->tail->next = new_cell;
    else             mMutex->head = new_cell;
    mMutex->tail = new_cell;

    pthread_mutex_unlock(&mMutex->queue_mutex);
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_testcancel();


    pthread_mutex_lock(&new_cell->mutex);

    if (!first_cell)
    pthread_cond_wait(&new_cell->condition, &new_cell->mutex);

    pthread_cleanup_pop(0);

    pthread_mutex_lock(&mMutex->resource_mutex);

    //NOTE: no need to unlock 'new_cell->mutex' since destruction in
    //'fifo_mutex_unlock' will (or already has) done that

    return 1;
}


static int fifo_mutex_unlock(struct fifo_mutex *mMutex)
{
    //(certain obvious checks skipped for clarity)

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    pthread_mutex_lock(&mMutex->queue_mutex);

    pthread_mutex_unlock(&mMutex->resource_mutex);

    struct thread_cell *current = mMutex->head;

    while (current)
    {
    mMutex->head = mMutex->head->next;
    fini_thread_cell(current);
    free(current);
    current = mMutex->head;

    if (!mMutex->head) mMutex->tail = NULL;

    else if (!mMutex->head->canceled)
     {
    pthread_cond_broadcast(&mMutex->head->condition);
    break;
     }
    }

    pthread_mutex_unlock(&mMutex->queue_mutex);
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_testcancel();

    return 1;
}


static void *worker_thread(void *iIdentity)
{
    struct timespec delay = { 0, 100 * 1000 * 1000 };

    int identity = *(int*) iIdentity, iterations = ITERATIONS;

    fprintf(stderr, "thread %i started\n", identity);

    sleep(1);

    while (iterations-- > 0 && fifo_mutex_lock(&global_mutex))
    {
    fprintf(stderr, "thread %i has the resource (%i)\n", identity, iterations);
    nanosleep(&delay, NULL);
    fprintf(stderr, "thread %i is done with the resource (%i)\n", identity, iterations);
    fifo_mutex_unlock(&global_mutex);
    }

    fprintf(stderr, "thread %i finished\n", identity);

    return NULL;
}


int main()
{
    struct timespec delay = { 0, 10 * 1000 * 1000 };

    pthread_t workers[TOTAL_THREADS];

    init_fifo_mutex(&global_mutex);

    int I;
    for (I = 0; I < TOTAL_THREADS; I++)
    {
    //create threads with a delay so they queue in order to start with
    pthread_create(workers + I, NULL, &worker_thread, &I);
    nanosleep(&delay, NULL);
    }

    for (I = 0; I < TOTAL_THREADS; I++)
    pthread_join(workers[I], NULL);

    fini_fifo_mutex(&global_mutex);
}
It should be safe to cancel a thread while it's waiting for a lock (it's been fine in my rudimentary tests so far). I'm not really sure what the cost are for creating and destroying mutexes and conditions for every lock, but it seems to be the only solution that allows thread cancellation and no queuing thread.
Kevin Barry

Last edited by ta0kira; 08-12-2011 at 08:14 PM. Reason: added cancelation points, added potential-deadlock point-out
 
Old 08-12-2011, 08:59 PM   #8
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Quote:
Originally Posted by ta0kira View Post
You provided an interesting solution; however, only one thread can block on the pthread condition per mutex.
No, there is no such limit. (If there were, it would make no sense having separate pthread_cond_signal() and pthread_cond_broadcast() functions.)

Each thread hitting the pthread_cond_wait(cond,mutex) will atomically unlock the mutex and then wait on the condition variable. (The "atomic" just means it is not possible for a condition to slip in between unnoticed, as long as the condition signaler or broadcaster holds that same mutex.)

When the condition variable is broadcast, each and every thread waiting on it is woken up, internally automatically re-acquiring the mutex in turn. With futex-based pthread mutexes, that will happen in FIFO order, as long as none of the threads will try to re-acquire mutex (or there are no other threads blocking on the mutex).

In my implementation, each woken up thread will check if it is their turn, and if not, go back to waiting on the condition variable, again atomically releasing the mutex. Those threads that were woken up but did not get the mutex before the correct thread, will keep on blocking on the mutex until the correct thread releases it, and they get to run. They will eventually get to run, because the scheduler will give each thread some CPU time, eventually. If it is then still not their time to run, they will go back to waiting on the condition variable (again, releasing the mutex atomically).

Therefore, there may be any number of threads, with some blocking on the mutex, and others waiting on the condition variable. There are no limits as far as I can see. I did run tests with 300 threads (on both 32-bit and 64-bit unicore and multicore CPUs). For more threads I'd need to set a smaller per-thread stack.

Quote:
Originally Posted by ta0kira View Post
Have you looked at my solution? I'm not saying it's better, but I'd like to hear your comments. I'm sure some combination of our solutions would be ideal.
Not too closely; I thought it is more complex than really necessary. I'd need to draw a graph to understand the scheme well enough to comment on it -- and I was too lazy to do that, sorry.
It does look interesting, especially the queue approach. Is there a specific feature I should have noticed?
I dislike arbitrary limits, though. (But I do think it is possible to change the array to a dynamically grown one if necessary.)

Perhaps a third scheme would work better?

Use a queue (a circular buffer?) of mutexes, so that each thread blocks on the mutex belonging to its predecessor, with indices either accessed using atomic builtins or protected by a separate mutex. Each thread will first lock their own (new) mutex. If there is no predecessor mutex, the thread then owns the FIFO lock. Otherwise, the thread will lock the predecessor mutex, blocking on it. When the thread obtains the mutex, it will unlock and discard it. Then it will own the FIFO lock. To release the FIFO lock, the thread will just unlock its own mutex.

At any point, there will be at most one thread blocking on each mutex. The correct thread is thus always woken up, when the holding thread releases its mutex. (The locking and unlocking order in the code is critical, to avoid the possibility of deadlock.)

If the mutexes are stored in a circular buffer, the buffer cannot be reallocated while there are mutexes in it (because the kernel depends on the address when there is a thread blocked on a futex in Linux). A linked list would allow dynamic growth, but to eliminate malloc() overhead, one would have to use allocation pools; thus more complex code.

This third scheme should be able to rely only on mutexes, so it should be usable wherever mutexes are. Condition variables cannot be used in signal handlers, for example. None of the pthread_mutex functions are cancellation points, so this third scheme would have no pthread cancellation points either. (Both our implementations currently have cancellation points, although by default, pthreads cancellations are deferred, so it should not matter.) Unlike my implementation, this scheme should not suffer from the thundering herd problem: each mutex is only blocked by a single thread. The "cost" should also be fixed, not depend on the number of threads blocking on the structure. I believe it would be more efficient in general than either of our implementations.
 
Old 08-12-2011, 09:45 PM   #9
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
No, there is no such limit. (If there were, it would make no sense having separate pthread_cond_signal() and pthread_cond_broadcast() functions.)
Until now I was under the impression that each thread had its own mutex but there was only one condition, which would still have been conducive to separate signal/broadcast functions. I guess I misinterpreted the implications of this section of the manpage:
Quote:
pthread_cond_wait atomically unlocks the mutex (as per pthread_unlock_mutex) and waits for the condition variable cond to be signaled. The thread execution is suspended and does not consume any CPU time until the condition variable is signaled. The mutex must be locked by the calling thread on entrance to pthread_cond_wait. Before returning to the calling thread, pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
It hadn't occurred to me that the mutex being unlocked during pthread_cond_wait allowed it to be locked by another thread joining the wait. I suppose I thought the mutex was what allowed each thread to block and then continue all at once.
Kevin Barry
 
Old 08-12-2011, 11:08 PM   #10
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
Perhaps a third scheme would work better?

Use a queue (a circular buffer?) of mutexes, so that each thread blocks on the mutex belonging to its predecessor, with indices either accessed using atomic builtins or protected by a separate mutex. Each thread will first lock their own (new) mutex. If there is no predecessor mutex, the thread then owns the FIFO lock. Otherwise, the thread will lock the predecessor mutex, blocking on it. When the thread obtains the mutex, it will unlock and discard it. Then it will own the FIFO lock. To release the FIFO lock, the thread will just unlock its own mutex.

At any point, there will be at most one thread blocking on each mutex. The correct thread is thus always woken up, when the holding thread releases its mutex. (The locking and unlocking order in the code is critical, to avoid the possibility of deadlock.)
This sounds like it would work. My first solution used a queue of barriers and an extra "queuing" thread to continue the next in line. The barriers made sure the queuing thread didn't get too far ahead, which wouldn't be a problem if each thread continued the next (as in your suggestion and in my second solution). Given the lack of possible cancelation you're suggesting, though, I don't think this is an improvement over your first solution. The main purpose of my second solution was to allow thread cancellation without consequences, and also to remove the extra thread.
Quote:
Originally Posted by Nominal Animal View Post
A linked list would allow dynamic growth, but to eliminate malloc() overhead, one would have to use allocation pools; thus more complex code.
I can only imagine an allocation pool here as a pair of singly-linked lists embedded in an array, which would either be a fixed size or have the overhead of list regeneration when resized. I'd expect a malloced array to be the better solution if dynamic sizing is to be possible, and % can facilitate circularity (which I used in my first solution). In any case, those details can be made sufficiently opaque, and perhaps even abstracted out with callbacks. The procedure used to pass the lock is more important than the method of sequencing the data structures.

You might as well just add this to the lock/unlock functions in your first solution and call it good:
Code:
//first line of function -->
int old_state = PTHREAD_CANCEL_ENABLE;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);

//function stuff here...

//before return -->
pthread_setcancelstate(old_state, NULL);
Kevin Barry

PS This might be nitpicking, but I'm wondering about idiosyncrasies related to possible integer overflow in the indexing used in your first solution. I haven't used an architecture that hasn't wrapped in such cases, however, and in fact I don't even know of one.

Last edited by ta0kira; 08-12-2011 at 11:10 PM.
 
Old 08-13-2011, 01:41 AM   #11
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Quote:
Originally Posted by ta0kira View Post
This sounds like it would work. My first solution used a queue of barriers and an extra "queuing" thread to continue the next in line. The barriers made sure the queuing thread didn't get too far ahead, which wouldn't be a problem if each thread continued the next (as in your suggestion and in my second solution).
Right.

Quote:
Originally Posted by ta0kira View Post
The main purpose of my second solution was to allow thread cancellation without consequences, and also to remove the extra thread.
I can see how that might be useful.

On the other hand, I've personally never needed thread cancellation at all; my solutions tend to use thread pools that do a bit of work, and then either wait for new work or exit. In some cases it is useful to have many more threads than available CPU cores, in which case a FIFO mutex would likely be heavily contended. The suggested scheme should have all the benefits of my original solution, but not suffer from the "thundering herd" problem (where all threads must be briefly woken up whenever the mutex changes).

On the gripping hand, I really cannot imagine a real need for a FIFO mutex either. I suspect only Windows programmers might find it useful.. but I may be wrong.

Quote:
Originally Posted by ta0kira View Post
I can only imagine an allocation pool here as a pair of singly-linked lists embedded in an array, which would either be a fixed size or have the overhead of list regeneration when resized. I'd expect a malloced array to be the better solution if dynamic sizing is to be possible, and % can facilitate circularity (which I used in my first solution).
Yeah, except the arrays containing active mutexes must not be resized, because the kernel will get confused if their address suddenly changes. I was thinking of basically
Code:
struct mutex_list {
	pthread_mutex_t    lock;
	struct mutex_list *next;
	struct mutex_pool *pool;
};
   
struct mutex_pool {
	struct mutex_pool *next;
	unsigned long	   free;   /* Each bit set for a free item */
	struct mutex_list  item[];
};

static inline struct mutex_list *palloc_mutex(struct mutex_pool **p) {  ...  }
static inline struct mutex_list *pfree_mutex(struct mutex_pool **p, struct mutex_list *m) {  ...  }
where palloc_mutex() would get the first free item in the first pool with free items, and pfree_mutex() would mark the item free, and push that pool further down the list, freeing the entire pool if it becomes empty. Pushing the pool down the list means it will be less likely to be used for new allocations; increasing the chance of being able to release a totally empty pool.

Quote:
Originally Posted by ta0kira View Post
You might as well just add this to the lock/unlock functions in your first solution and call it good: [pthread_setcancelstate()]
Yes, but the disabling call must be in the _lock() function, before the first pthread_cond_wait(), and the restoring call must be in the _unlock() function, after the last mutex is unlocked, so that the mutex is not left locked if the thread is canceled. I was too lazy to think about how to keep that extra state around.

Quote:
Originally Posted by ta0kira View Post
This might be nitpicking, but I'm wondering about idiosyncrasies related to possible integer overflow in the indexing used in your first solution.
C99 defines unsigned integer math to use modulo arithmetic (section 6.3.1.3, I believe, if you happen to have the standard), i.e. adding one to the largest possible unsigned value (of any unsigned integer type) will yield zero. So, using a C99 compiler, the indexing should work correctly, even when the overflow occurs.

On the other hand, if you use a circular buffer and atomic built-ins to modify the indices, you must use either a power of two buffer size (so that the index values can just be ANDed with (size-1) to get the actual indices), or do a load-adjust-compare-swap in a loop using e.g. __sync_bool_compare_and_swap(). It will still be faster than using a mutex. (I once wrote an atomic addition function for double-precision floating point around __sync_bool_compare_and_swap(), for testing purposes. It has surprisingly little overhead on most x86 CPUs I tested. Measurable, but less than using a mutex.)
 
Old 08-13-2011, 10:43 AM   #12
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
Quote:
Originally Posted by Nominal Animal View Post
On the other hand, I've personally never needed thread cancellation at all; my solutions tend to use thread pools that do a bit of work, and then either wait for new work or exit. In some cases it is useful to have many more threads than available CPU cores, in which case a FIFO mutex would likely be heavily contended. The suggested scheme should have all the benefits of my original solution, but not suffer from the "thundering herd" problem (where all threads must be briefly woken up whenever the mutex changes).
I've only used thread cancellation where a blocking read was the cancellation point, but never anything mutex-related. I forgot about the herd thing. In that case, I think the solution we just discussed is appropriate.
Quote:
Originally Posted by Nominal Animal View Post
On the gripping hand, I really cannot imagine a real need for a FIFO mutex either. I suspect only Windows programmers might find it useful.. but I may be wrong.
Yeah, I can't think of a reason either, unless the operations performed while the mutex is locked are lengthy and "fairness" is an issue.
Quote:
Originally Posted by Nominal Animal View Post
Yeah, except the arrays containing active mutexes must not be resized, because the kernel will get confused if their address suddenly changes. I was thinking of basically
Code:
struct mutex_list {
    pthread_mutex_t    lock;
    struct mutex_list *next;
    struct mutex_pool *pool;
};
   
struct mutex_pool {
    struct mutex_pool *next;
    unsigned long       free;   /* Each bit set for a free item */
    struct mutex_list  item[];
};

static inline struct mutex_list *palloc_mutex(struct mutex_pool **p) {  ...  }
static inline struct mutex_list *pfree_mutex(struct mutex_pool **p, struct mutex_list *m) {  ...  }
where palloc_mutex() would get the first free item in the first pool with free items, and pfree_mutex() would mark the item free, and push that pool further down the list, freeing the entire pool if it becomes empty. Pushing the pool down the list means it will be less likely to be used for new allocations; increasing the chance of being able to release a totally empty pool.
I don't entirely understand your wording; however, it made me realize that this might be an acceptable method of pooling:
Code:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


struct element;

struct element
{
	struct element  *next;
	pthread_mutex_t  mutex;
};


struct pool
{
	pthread_mutex_t  mutex;
	struct element  *head;
};


#define POOL_INIT { PTHREAD_MUTEX_INITIALIZER, NULL }


static void init_pool(struct pool *pPool)
{
	pthread_mutex_init(&pPool->mutex, NULL);
	pPool->head = NULL;
}


static void fini_pool(struct pool *pPool)
{
	pthread_mutex_lock(&pPool->mutex);

	while (pPool->head)
	{
	struct element *current = pPool->head;
	pPool->head = pPool->head->next;
	pthread_mutex_destroy(&current->mutex);
	free(current);
	}

	pthread_mutex_unlock(&pPool->mutex);
	pthread_mutex_destroy(&pPool->mutex);
}


static struct element *alloc_element(struct pool *pPool)
{
	pthread_mutex_lock(&pPool->mutex);

	struct element *new_element = NULL;

	if (!pPool->head)
	{
	new_element = (struct element*) malloc(sizeof(struct element));
	if (new_element) pthread_mutex_init(&new_element->mutex, NULL);
	}

	else
	{
	new_element = pPool->head;
	pPool->head = pPool->head->next;
	}

	if (new_element) new_element->next = NULL;

	pthread_mutex_unlock(&pPool->mutex);
	return new_element;
}


static void free_element(struct pool *pPool, struct element *eElement)
{
	pthread_mutex_lock(&pPool->mutex);

	eElement->next = pPool->head;
	pPool->head    = eElement;

	pthread_mutex_unlock(&pPool->mutex);
}


int main()
{
	struct pool mutex_pool = POOL_INIT;

	struct element *new_mutex = alloc_element(&mutex_pool);

	if (pthread_mutex_lock(&new_mutex->mutex) == 0)
	fprintf(stderr, "locked\n");

	//some stuff...

	if (pthread_mutex_unlock(&new_mutex->mutex) == 0)
	fprintf(stderr, "unlocked\n");

	free_element(&mutex_pool, new_mutex);

	fini_pool(&mutex_pool);
}
Is that closer to what you had in mind?
Kevin Barry

Last edited by ta0kira; 08-13-2011 at 10:48 AM. Reason: removed pthread_mutex_destroy when throwing elements into pool
 
Old 08-13-2011, 03:20 PM   #13
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Quote:
Originally Posted by ta0kira View Post
I don't entirely understand your wording
Yeah, sorry.

The idea is that the allocation "pool" is actually a chain of pools.
Whenever an item is allocated, the first pool in the chain with free items is used. If there are no free items in the chain at all, an empty pool is prepended to the list. Whenever an item is deallocated, the pool is also pushed down the chain (unless it is the first or last pool in the chain). That will push sparse pools down the chain, and we should end up with mostly full pools with a few sparse ones.

... but some test coding indicates the above is not worth the effort. It gets pretty complex fast, but is not faster than malloc()/free().

I think it is more efficient to keep a separate linked list to reuse recently released items, with all items individually malloc()ed. When there are no items to reuse, just malloc() a new one. When releasing, just move the item to the separate list. After every N releases, one could free() excess elements in the separate list; note that N is also the maximum number of additional items the separate list could have.

The code is very similar to your latest example, except with the reuse/unused list added; very little added complexity. One could even let N vary based on contention (just add suitable counters to the structure): with low contention, the unused list would be kept small, but larger when there is more contention. I guess that would pretty much eliminate the malloc()/free() overhead, because in most situations the items would be just recycled endlessly, without any malloc()/free() calls. (malloc()/free() would get called only for "bursty" contention, i.e. whenever the contention on the lock changes.)
 
Old 08-13-2011, 05:56 PM   #14
Nominal Animal
Senior Member
 
Registered: Dec 2010
Location: Finland
Distribution: Xubuntu, CentOS, LFS
Posts: 1,723
Blog Entries: 3

Rep: Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948Reputation: 948
Here is an example implementation I described in my post above. I'm not sure it is even interesting.
  1. It uses only mutexes (pthread_mutex_t), so it has the same semantics wrt. cancellation.
  2. Because memory is allocated dynamically (malloc()/free()), it cannot be used in a signal handler.
  3. Each blocking thread will dynamically allocate a mutex and a pointer, so the size of the structure will grow as the number of blocking threads increases.
  4. Unused mutex and pointer pairs will be cached in a separate list, to avoid excess malloc()/free() overhead.
    When the FIFO mutex structure is contended, the cache only grows.
    When the FIFO mutex is uncontended, the cache shrinks slowly.
  5. You can call fifo_mutex_trim(&lock) at any time to discard the unused cached entries.
    (You do not need to have the FIFO mutex locked.)
  6. Each thread will block on a pthread_mutex_t owned and locked by their predecessor.
    Thus, there should be no "thundering herd" issue even with a lot of threads blocking on the FIFO mutex.
  7. Interface is modeled to be compatible with pthread_mutex_() functions.
  8. It was lightly tested on some Linux distros and Solaris 10, with up to 300 threads without problems.
    Still, I expect there to be some bugs lurking around in it.
    I also have not benchmarked this against the other implementations.
Code:
#ifndef   FIFO_MUTEX_H
#define   FIFO_MUTEX_H
#include <pthread.h>

/* fifo_mutex_t lock = FIFO_MUTEX_INITIALIZER;
 *	This creates a FIFO mutex, with semantics similar to
 *	pthread_mutex_t, except that the lock is granted in
 *	the calling order (but not recursively by the same thread).
 *
 * int result = fifo_mutex_init(&lock, NULL);
 *	Initialize the FIFO mutex. Similar to pthread_mutex_init().
 *	The second parameter is the pthread_mutexattr_t pointer,
 *	which defines the attributes for the structure-locking mutex;
 *	it is recommended to always supply NULL.
 *
 * int result = fifo_mutex_destroy(&lock);
 *	Destroy the FIFO mutex. Similar to pthread_mutex_destroy().
 *
 * int result = fifo_mutex_lock(&lock);
 *	Lock the FIFO mutex. Similar to pthread_mutex_lock().
 *
 * int result = fifo_mutex_unlock(&lock);
 *	Unlock the FIFO mutex. Similar to pthread_mutex_unlock().
 *
 * int result = fifo_mutex_trim(&lock);
 *	Remove cached entries from the FIFO mutex.
 *	When contended, the FIFO mutex will grow automatically.
 *	When uncontended, the cached entries will be slowly released.
 *	You can use this function to remove all unused cached entries
 *	from the mutex. There should be no need to call this function
 *	during normal program operation, but it can be called at any
 *	time.
 *
 * int result = fifo_mutex_count_unused(&lock);
 *	Return the number of unused entries cached in the FIFO mutex.
 *
 * int result = fifo_mutex_count_list(&lock);
 *	Return the number of entries in the FIFO mutex list.
 *	Since cleanup is done by the next thread to lock the mutex,
 *	the result includes both the number of threads blocked on
 *	this FIFO mutex, plus the number of threads to clean up.
 *	If the current thread holds the FIFO mutex, then the count
 *	is 1 + the number of other threads blocking on the mutex.
*/
typedef struct fifo_mutex	 fifo_mutex_t;

struct fifo_mutex {
	/* Lock protecting all fields */
	pthread_mutex_t		 control;

	/* List of unused locks (allocation cache) */
	struct fifo_mutex_list	*unused;

	/* Mutex list */
	struct fifo_mutex_list	*list;

	/* Pointer to the active mutex in the list (tail) */
	struct fifo_mutex_list	*active;

	/* Owner identity */
	pthread_t		 owner;
};

#define FIFO_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, \
                                 NULL, NULL, NULL, (pthread_t)-1 }

/*
 * Implementation.
*/

#include <stdlib.h>
#include <errno.h>

/* A singly-linked list of mutexes.
*/
struct fifo_mutex_list {
	pthread_mutex_t		 lock;
	struct fifo_mutex_list	*next;
};

/* Initialize a FIFO mutex. The attribute only affects the control mutex in the structure.
*/
static inline int fifo_mutex_init(fifo_mutex_t *const m, pthread_mutexattr_t const *const a)
{
	if (!m)
		return EINVAL;

	m->unused = NULL;
	m->list   = NULL;
	m->active = NULL;	
	m->owner  = (pthread_t)-1;
	return pthread_mutex_init(&(m->control), a);
}

/* Destroy a FIFO mutex.
*/
static inline int fifo_mutex_destroy(fifo_mutex_t *const m)
{
	struct fifo_mutex_list *curr, *next;
	int                     result;

	if (!m)
		return EINVAL;

	result = pthread_mutex_lock(&(m->control));
	if (result)
		return result;

	/* Clear out the unused list. */
	if (m->unused) {

		next = m->unused;
		m->unused = NULL;

		while (next) {
			curr = next;
			next = next->next;

			curr->next = NULL;
			pthread_mutex_destroy(&(curr->lock));
			free(curr);
		}
	}

	/* If the FIFO mutex is still in use, return EBUSY. */
	if (m->active) {
		pthread_mutex_unlock(&(m->control));
		return EBUSY;
	}

	if (m->list) {

		/* If the first mutex is not immediately lockable,
		 * then the mutex is in use -- return EBUSY. */
		if (pthread_mutex_trylock(&(m->list->lock))) {
			pthread_mutex_unlock(&m->control);
			return EBUSY;
		}
		pthread_mutex_unlock(&(m->list->lock));

		/* Clean up the list. */
		next = m->list;
		m->list = NULL;

		while (next) {
			curr = next;
			next = next->next;

			curr->next = NULL;
			pthread_mutex_destroy(&(curr->lock));
			free(curr);
		}
	}

	pthread_mutex_unlock(&(m->control));
	return pthread_mutex_destroy(&(m->control));
}

/* Release the unused entries in a FIFO mutex.
*/
static inline int fifo_mutex_trim(fifo_mutex_t *const m)
{
	struct fifo_mutex_list *curr, *next;
	int                     result;

	if (!m)
		return EINVAL;

	result = pthread_mutex_lock(&(m->control));
	if (result)
		return result;

	if (!m->unused) {
		pthread_mutex_unlock(&(m->control));
		return 0;
	}

	next = m->unused;
	m->unused = NULL;

	while (next) {

		curr = next;
		next = next->next;

		curr->next = NULL;
		pthread_mutex_destroy(&(curr->lock));
		free(curr);
	}

	pthread_mutex_unlock(&(m->control));

	return 0;
}

/* Return the number of unused entries in the mutex.
*/
static inline int fifo_mutex_count_unused(fifo_mutex_t *const m)
{
	struct fifo_mutex_list *p;
	int 			n, result;

	if (!m) {
		errno = EINVAL;
		return -1;
	}

	result = pthread_mutex_lock(&(m->control));
	if (result) {
		errno = result;
		return -1;
	}

	n = 0;
	p = m->unused;
	while (p) {
		n++;
		p = p->next;
	}

	pthread_mutex_unlock(&(m->control));
	return n;
}

/* Return the number of threads in the mutex list.
*/
static inline int fifo_mutex_count_list(fifo_mutex_t *const m)
{
	struct fifo_mutex_list *p;
	int 			n, result;

	if (!m) {
		errno = EINVAL;
		return -1;
	}

	result = pthread_mutex_lock(&(m->control));
	if (result) {
		errno = result;
		return -1;
	}

	n = 0;
	p = m->list;
	while (p) {
		n++;
		p = p->next;
	}

	pthread_mutex_unlock(&(m->control));
	return n;
}


/* Unlock the fifo mutex.
 * The cleanup is done by the next thread locking the mutex.
*/
static inline int fifo_mutex_unlock(fifo_mutex_t *const m)
{
	int result;

	if (!m)
		return EINVAL;

	result = pthread_mutex_lock(&(m->control));
	if (result)
		return result;

	/* Verify this thread is the owner, and that the fifo mutex is locked. */
	if (!m->active || m->owner != pthread_self()) {
		pthread_mutex_unlock(&(m->control));
		return EPERM;
	}

	/* Release the lock, letting the next thread lock the fifo mutex. */
	result = pthread_mutex_unlock(&(m->active->lock));
	if (result) {
		pthread_mutex_unlock(&(m->control));
		return result;
	}

	m->active = NULL;
	m->owner  = (pthread_t)-1;

	/* Cleanup is done by the next thread locking the mutex. */

	/* Done. */
	pthread_mutex_unlock(&(m->control));
	return 0;
}

/* Lock the fifo mutex.
*/
static inline int fifo_mutex_lock(fifo_mutex_t *const m)
{
	struct fifo_mutex_list	*curr, *next, *temp;
	int			 result;

	if (!m)
		return EINVAL;

	result = pthread_mutex_lock(&(m->control));
	if (result)
		return result;

	/* Get a new locked mutex. */
	if (m->unused) {

		/* Reuse an unused one. */
		curr = m->unused;

		/* If it is free, it is immediately lockable. */
		result = pthread_mutex_trylock(&(curr->lock));
		if (result) {

			/* No. Fail gracefully. */
			pthread_mutex_unlock(&(m->control));
			return result;
		}

		/* Good, detach it from the list. */
		m->unused = curr->next;

	} else {

		/* Allocate a new one. */
		curr = malloc(sizeof(struct fifo_mutex_list));
		if (!curr) {
			pthread_mutex_unlock(&(m->control));
			return ENOMEM;
		}
		pthread_mutex_init(&(curr->lock), NULL);

		result = pthread_mutex_trylock(&(curr->lock));
		if (result) {

			/* A new mutex is not lockable. WTF? */
			free(curr);
			pthread_mutex_unlock(&(m->control));
			return result;

		}
	}

	/* No previous owners? */
	if (!m->list) {

		/* We own the fifo mutex now. */
		m->owner = pthread_self();
		m->active = curr;

		/* Add self to the mutex list. */
		curr->next = NULL;
		m->list = curr;

		/* If there is more than one item in the free list,
		 * free the first one. */
		if (m->unused && m->unused->next) {

			temp      = m->unused;
			m->unused = temp->next;
			pthread_mutex_destroy(&(temp->lock));
			free(temp);
		}

		pthread_mutex_unlock(&(m->control));
		return 0;
	}

	/* There are previous owners. Add self to the list. */
	next       = m->list;
	curr->next = next;
	m->list    = curr;

	if (!pthread_mutex_trylock(&(next->lock))) {

		/* We obtained the lock immediately, so there were no
		 * waiting threads. If there is more than one item in
		 * the free list, free the first one.
		*/
		if (m->unused && m->unused->next) {

			temp      = m->unused;
			m->unused = temp->next;
			pthread_mutex_destroy(&(temp->lock));
			free(temp);
		}
			
	} else {

		/* Let other threads access the struture while we wait. */
		pthread_mutex_unlock(&(m->control));

		/* Take the previous owner mutex. This will block.
		 * If this fails, the previous owner has errored out somehow,
		 * and we own the mutex anyway. So ignore errors.
		*/
		pthread_mutex_lock(&(next->lock));

		result = pthread_mutex_lock(&(m->control));
		if (result) {

			/* We failed to obtain the mutex protecting the structure,
			 * so we're essentially screwed. This should never happen,
			 * fortunately. Release our own mutex, and let a future
			 * thread worry about the cleanup. */
			pthread_mutex_unlock(&(curr->lock));
			return result;
		}

		/* Okay, we're the owner. */
	}

	/* Mark self as the owner. */
	m->owner = pthread_self();
	m->active = curr;

	/* Release the predecessor lock; there is no-one else blocking on it. */
	pthread_mutex_unlock(&(next->lock));

	/* Discard locks left over by predecessors. */
	curr->next = NULL;
	while (next) {
		temp = next;
		next = next->next;

		temp->next = m->unused;
		m->unused = temp;
	}

	/* Done. */
	pthread_mutex_unlock(&(m->control));
	return 0;
}

#endif /* FIFO_MUTEX_H */
 
1 members found this post helpful.
Old 08-14-2011, 12:17 AM   #15
ta0kira
Senior Member
 
Registered: Sep 2004
Distribution: FreeBSD 9.1, Kubuntu 12.10
Posts: 3,078

Rep: Reputation: Disabled
You've certainly put a lot of work into something you don't need! Over all it seems like a good design. I wouldn't really call it a fifo mutex, but rather an access queue because of the growth and maintenance associated with larger numbers waiting threads. As such, I'm curious about the possibility of making it a priority queue. That would probably result in non-constant access time due to resorting, though. Your solution is right on one of those lines where it's either a robust version of something primitive (i.e. fancy mutex) or it's a primitive version of what's normally more robust (i.e. primitive queuing system), so it seems natural to want to promote it to the higher domain and bog it down with features.

One portability problem is (pthread_t)-1. pthread_t doesn't have to be an integral type, so this might not compile on all OSes. I know I couldn't compile a cast from pthread_t to unsigned int on FreeBSD (for informational output); however, I don't have FreeBSD available at the moment and it appears online references show pthread_t as a pointer in FreeBSD. To get rid of all the warnings and errors I ended up with *(unsigned int*) (void*) &the_pthread_t.

It occurred to me earlier that the "cars entering an intersection" analogy for mutex usage is also a good example of when a fifo mutex would be needed. Whenever I've seen the cars/intersection example there was at most 1 car waiting to enter the intersection. If there were 1-4 cars waiting in real life, however, they should proceed in the order they arrived.
Kevin Barry
 
  


Reply


Thread Tools Search this Thread
Search this Thread:

Advanced Search

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Mutex v/s Semaphore jayadhanesh Linux - Software 3 12-07-2010 12:20 AM
Mutex new2lunix Programming 1 12-02-2008 08:12 PM
Mutex attributes? estratos Programming 12 12-24-2006 03:54 AM
Need FIFO thread scheduling policy enigma82 Programming 0 04-20-2005 07:35 AM
P-thread+race condition+mutex+Peterson's algorithm bangla_linux Programming 3 10-29-2003 03:01 AM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 05:37 AM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration