ProgrammingThis forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.
Notices
Welcome to LinuxQuestions.org, a friendly and active Linux Community.
You are currently viewing LQ as a guest. By joining our community you will have the ability to post topics, receive our newsletter, use the advanced search, subscribe to threads and access many other special features. Registration is quick, simple and absolutely free. Join our community today!
Note that registered members see fewer ads, and ContentLink is completely disabled once you log in.
If you have any problems with the registration process or your account login, please contact us. If you need to reset your password, click here.
Having a problem logging in? Please visit this page to clear all LQ-related cookies.
Get a virtual cloud desktop with the Linux distro that you want in less than five minutes with Shells! With over 10 pre-installed distros to choose from, the worry-free installation life is here! Whether you are a digital nomad or just looking for flexibility, Shells can put your Linux machine on the device that you want to use.
Exclusive for LQ members, get up to 45% off per month. Click here for more info.
#include <stdio.h>
#include <sys/types.h>
#include <pthread.h>
#include <sys/time.h>
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#define MAX_THREAD 1000
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct {
int id;
} parm;
#define failed() printf("Errr... That shouldn't happen! Exiting...\n"); exit(1);
static int x=0;
void thread_delay(int millisec){
int i;
for(i = 0; i < (millisec / 10);i++){
usleep(8200); // Assume next thread will take alittle time
sched_yield();
}
}
void *runth(void *arg)
{ pthread_mutex_lock(&mutex);
x++;
pthread_mutex_unlock(&mutex);
pthread_yield(0);
}
int main(int argc, char* argv[]) {
int n,i;
pthread_t *threads;
pthread_attr_t pthread_custom_attr;
parm *p;
size_t stack;
if (argc != 2)
{
printf ("Usage: %s n\n where n is no. of threads\n",argv[0]);
exit(1);
}
n=atoi(argv[1]);
if ((n < 1) || (n > MAX_THREAD))
{
printf ("The no of thread should between 1 and %d.\n",MAX_THREAD);
exit(1);
}
stack = 0xFFFF;
threads=(pthread_t *)malloc(n*sizeof(*threads));
if(threads == NULL){failed();}
pthread_attr_init(&pthread_custom_attr);
pthread_attr_setstacksize(&pthread_custom_attr, stack);
p=(parm *)malloc(sizeof(parm)*n);
if(p == NULL){failed();}
/* Start up thread */
for (i=0; i<n; i++)
{
p[i].id=i;
pthread_create(&threads[i], &pthread_custom_attr, runth, (void *)(p+i));
}
while(1){
pthread_mutex_lock(&mutex);
printf("%d ", x);
pthread_mutex_unlock(&mutex);
thread_delay(1500);
}
/* - Never reached but OS should handle it - */
free(p);
return 0;
}
It ~should~ output an ever increasing count every 1500ms incremented by the thread(s). But it actually just sits on the terminal outputing nothing.
I do not particularly like multi-threading and do not understand why it does not work!
Thanks in advance.
Last edited by smeezekitty; 05-08-2011 at 02:43 PM.
Reason: Fixed mismatched tag
It does not work because the code is .. problematic.
First: I've never seen correct use of sched_yield() or pthread_yield(). Even when they do work, they invariably cause a performance regression on some workloads. Don't use them, ever; you'll be happier.
Second: All threads are created in a tight loop in your program. The thread function will increase x under the mutex immediately. The main function will not have time to see x change, since by the time the print loop runs, x has already grown to the maximum value. Depending on the scheduler, runth() may even do its work and exit before main() can call pthread_create() for the next thread.
Because of the above, I think it pointless to analyze your code in further detail.
Consider this code instead:
Code:
#define _POSIX_C_SOURCE 199309L
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#define MODE_WAIT 0L
#define MODE_RUN 1L
#define MODE_STOP 2L
long mode = 0L;
long started = 0L;
long running = 0L;
long changes = 0L;
long repeats = 0L;
long current = 0L;
#ifdef LOCKLESS_ATOMICS
static inline long atomic_setl(long *const ptr, long const newvalue)
{
long oldvalue;
do {
oldvalue = __sync_fetch_and_or(ptr, 0);
} while (!__sync_bool_compare_and_swap(ptr, oldvalue, newvalue));
return oldvalue;
}
static inline long atomic_getl(long *const ptr)
{
return __sync_fetch_and_or(ptr, 0);
}
static inline long atomic_addl(long *const ptr, long const value)
{
return __sync_fetch_and_add(ptr, value);
}
static inline long atomic_subl(long *const ptr, long const value)
{
return __sync_fetch_and_sub(ptr, value);
}
#else
pthread_mutex_t atomic_lock = PTHREAD_MUTEX_INITIALIZER;
static inline long atomic_setl(long *const ptr, long const newvalue)
{
long oldvalue;
pthread_mutex_lock(&atomic_lock);
oldvalue = *ptr;
*ptr = newvalue;
pthread_mutex_unlock(&atomic_lock);
return oldvalue;
}
static inline long atomic_getl(long *const ptr)
{
long oldvalue;
pthread_mutex_lock(&atomic_lock);
oldvalue = *ptr;
pthread_mutex_unlock(&atomic_lock);
return oldvalue;
}
static inline long atomic_addl(long *const ptr, long const value)
{
long oldvalue;
pthread_mutex_lock(&atomic_lock);
oldvalue = *ptr;
*ptr = oldvalue + value;
pthread_mutex_unlock(&atomic_lock);
return oldvalue;
}
static inline long atomic_subl(long *const ptr, long const value)
{
long oldvalue;
pthread_mutex_lock(&atomic_lock);
oldvalue = *ptr;
*ptr = oldvalue - value;
pthread_mutex_unlock(&atomic_lock);
return oldvalue;
}
#endif
void *worker(void *data)
{
long const id = (long)data;
long old;
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = 10000L; /* 10000 ns = 1/100th of a second */
atomic_addl(&started, 1L);
while (atomic_getl(&mode) == MODE_WAIT)
nanosleep(&t, NULL);
atomic_addl(&running, 1L);
while (atomic_getl(&mode) == MODE_RUN) {
old = atomic_setl(¤t, id);
if (old != id)
atomic_addl(&changes, 1L);
else
atomic_addl(&repeats, 1L);
};
atomic_subl(&running, 1L);
return NULL;
}
int main(int argc, char *argv[])
{
pthread_t *thread;
struct timespec ts;
long threads, t;
long curr, prev, count;
char dummy;
if (argc != 2) {
fprintf(stderr, "Usage: %s number-of-threads\n", argv[0]);
return 1;
}
if (sscanf(argv[1], "%ld %c", &threads, &dummy) != 1) {
fprintf(stderr, "%s: Invalid number of threads.\n", argv[1]);
return 1;
}
if (threads < 2L) {
fprintf(stderr, "%s: Number of threads is too small.\n", argv[1]);
return 1;
}
thread = malloc(sizeof(pthread_t) * (size_t)threads);
if (!thread) {
fprintf(stderr, "Not enough memory for %ld threads.\n", threads);
return 1;
}
for (t = 0L; t < threads; t++)
if (pthread_create(&thread[t], NULL, worker, (void *)t)) {
fprintf(stderr, "Cannot create thread %ld of %ld. Aborted.\n", t+1L, threads);
return 1;
}
ts.tv_sec = 0;
ts.tv_nsec = 10000L; /* 10000 ns = 1/100th of a second */
printf("Waiting for all threads to start .. ");
fflush(stdout);
while (atomic_getl(&started) < threads)
nanosleep(&ts, NULL);
printf("okay.\nRacing:\n");
fflush(stdout);
atomic_setl(&mode, MODE_RUN);
curr = -1L;
do {
prev = curr;
curr = atomic_getl(¤t) + 1L;
count = atomic_getl(&changes);
if (curr != prev && count > 0L) {
printf("%9ld. %ld\n", count, curr);
fflush(stdout);
}
} while (count < 1000L && count < 2L * threads);
atomic_setl(&mode, MODE_STOP);
printf("Waiting for all threads to exit.. ");
fflush(stdout);
for (t = 0L; t < threads; t++)
pthread_join(thread[t], NULL);
printf("Done.\n\nThread statistics:\n");
printf("%9ld threads started\n", started);
printf("%9ld owner changes\n", changes);
printf("%9ld owner repeats\n\n", repeats);
free(thread);
return 0;
}
If you compile it with LOCKLESS_ATOMICS defined (-DLOCKLESS_ATOMICS), it will use the __sync_...() built-ins supported by most current C compilers, instead of mutexes.
This program has the threads fighting for the value of current. The changes is the number of times a new thread got to set the value, and repeats is the number of times the same thread got again (in a row, without any other thread in between).
Note how main uses the mode variable to create all the threads, to let the threads start fighting at the same time, and to tell the threads to stop fighting. (This is very inefficient with regards to caches, since each core running a thread will need to get access to the cache line containing mode very often. As usual, it would be better to copy such global variables privately for each thread. The time taken by the extra memory accesses is still minuscule compared to the time taken by the cacheline ping-pong it eliminates.)
If you run this program on different machines, you'll see that main() gets to see very, very few changes, at all. This is because the printf() family of functions is atrociously slow, in comparison to what worker() threads do.
It should be interesting to modify this code to use a structure for each thread (the pointer to which is cast to void pointer, just like the code now casts the long to/from a void pointer). Then, each thread could for example get the wall clock (via gettimeofday() or clock_gettime()), and update the running time and latencies each thread experiences.
I hope you find this useful. If you have any questions about the code, don't hesitate to ask.
It is unspecified in POSIX whether usleep() works correctly when multiple threads are calling it in parallel; I think it should work correctly on Linux, though. On POSIX systems, you should use nanosleep() instead; you need to define _POSIX_C_SOURCE as 199309L or later before including time.h to have it available.
The delay does nothing here. Try running that in a loop instead, e.g.
Code:
void *runth(void *arg)
{
int i;
for (i = 0; i < 100; i++) {
pthread_mutex_lock(&mutex);
x++;
pthread_mutex_unlock(&mutex);
thread_delay(5);
}
}
Running in a loop makes sure the threads are likely to run in parallel, and not be done before the next thread is started. Remember, main() starts the threads serially, and it does take a bit of time to start a new thread.
Quote:
Originally Posted by smeezekitty
Now it strangly prints the number of threads, not the increment.
That is to be expected, because even in your revised code the thread workers immediately increment x and then exit -- the delay before the exit is irrelevant. The threads will simply increase x to the final value, before the main has time to notice any changes.
If you put a loop with a delay in runth(), main() has a better chance of actually seeing the changes. Remember that printf() is very, very slow in comparison, so using a delay (or a long-running loop in runth()) is important.
Distribution: M$ Windows / Debian / Ubuntu / DSL / many others
Posts: 2,339
Original Poster
Rep:
It works now after rapping the thread in a while(1){} loop and increasing the delay to 20.
YAY! I still do not like threads. I think they are a pain to work with.
Thank you.
Last edited by smeezekitty; 05-09-2011 at 02:07 PM.
It works now after rapping the thread in a while(1){} loop and increasing the delay to 20.
YAY! I still do not like threads. I think they are a pain to work with.
Good to hear!
I'm personally not too fond of the pthread interface either, but I like using atomic built-ins and the __thread keyword (which specifies that a variable is per-thread; each thread gets their own). I usually only need pthread_create() and pthread_join from pthreads.
I've recently started using an alternative search engine to Google called Startpage, and it returns Wikipedia results as pointing to the secure.wikimedia.org versions.
Distribution: M$ Windows / Debian / Ubuntu / DSL / many others
Posts: 2,339
Original Poster
Rep:
Quote:
Originally Posted by MrCode
I've recently started using an alternative search engine to Google called Startpage, and it returns Wikipedia results as pointing to the secure.wikimedia.org versions.
I am in support for dumping Google since they just keep getting more evil.
I think that https is a bit overkill for Wikipedia and there would be a performance hit albeit minor.
----edit-----
errr... I just tried it and it returned non https wikipedia
Last edited by smeezekitty; 05-09-2011 at 11:11 PM.
Atomic operations are instantaneous: either fully done, or not done at all. Others will see either the final state, or the original state, but never a mix of the two.
There are only 14 atomic operations you're likely to use: (the cursive one is the equivalent non-atomic operation)
result = __sync_fetch_and_add(&variable, value); result = variable; variable += value;
result = __sync_fetch_and_sub(&variable, value); result = variable; variable -= value;
result = __sync_fetch_and_or(&variable, value); result = variable; variable |= value;
result = __sync_fetch_and_and(&variable, value); result = variable; variable &= value;
result = __sync_fetch_and_xor(&variable, value); result = variable; variable ^= value;
result = __sync_fetch_and_nand(&variable, value); result = variable; variable = ~(variable & value);
result = __sync_add_and_fetch(&variable, value); variable += value; result = variable;
result = __sync_sub_and_fetch(&variable, value); variable -= value; result = variable;
result = __sync_or_and_fetch(&variable, value); variable |= value; result = variable;
result = __sync_and_and_fetch(&variable, value); variable &= value; result = variable;
result = __sync_xor_and_fetch(&variable, value); variable ^= value; result = variable;
result = __sync_nand_and_fetch(&variable, value); variable = ~(variable & value); result = variable;
result = __sync_bool_compare_and_swap(&variable, oldvalue, newvalue); result = (variable == oldvalue); if (result) variable = newvalue;
result = __sync_val_compare_and_swap(&variable, oldvalue, newvalue); result = variable; if (result == oldvalue) variable = newvalue;
GCC accepts any 1-, 2-, 4-, and 8-byte data types for variable, result, and value. (All three must have the same type, of course.)
ICC may require you to cast other types to an integer type of the same size.
The compare and swap operations are especially powerful. For example, if you wish to do an atomic floating-point addition, which normally requires a mutex or other lock, you can use this:
Code:
#include <stdint.h>
/* ANSI C has no "static inline", so use "static" instead. */
#ifdef __STRICT_ANSI__
#define STATIC_INLINE static
#else
#define STATIC_INLINE static inline
#endif
struct word64 {
union {
double d;
uint64_t u;
int64_t i;
} type;
};
STATIC_INLINE double atomic_addd(double *const variable, double const value)
{
struct word64 *const ptr = (struct word64 *)variable;
struct word64 old, tmp;
do {
old.type.i = __sync_fetch_and_or(&(ptr->type.i), (int64_t)0);
tmp.type.d = old.type.d + value;
} while (!__sync_bool_compare_and_swap(&(ptr->type.i), &(old.type.i), &(tmp.type.i)));
return tmp.type.d;
}
The logic is simple: Get the old value atomically. Calculate new value. Atomically replace the old value with the new value, if still the same; otherwise, retry. The union is used to type-pun the double to an integer value of the same size, so that ICC too should be happy. (I only tested this code with GCC, though.)
Individual atomic operations are faster than taking any kind of a lock, but if you do significant work while holding the lock, with nobody waiting for the lock to be released, a lock will be faster than atomic operations. This is because atomic operations do incur a small overhead. If the lock is contended, or there are other threads blocking (waiting) on the lock, atomic operations are likely to save significant wall clock time.
The real benefits start to appear when you use the __thread keyword where necessary, to mark (global) variables per-thread. This is important because a significant drawback of most threaded code is the dreaded cacheline ping-pong: many CPU cores try to access the same area of memory, causing the cache lines to bounce between cores, slowing things down.
If you just convert a multi-process application to threads, and ignore these memory effects, you will notice that the multithreaded version is measurably slower. This is because separate processes have separate memory areas, and do not access each others' memory areas (except if using shared memory), and thus do not suffer from cacheline ping-pong. Interprocess communication overheads -- even if using OpenMPI, I've found -- are much smaller than the delays caused by cacheline ping-pong.
After mastering the above, there is a final, often ignored key paradigm that leads to efficient parallel and distributed code: The spice... uh, the data, must flow. Interleave preparation (especially communication) and CPU work, so that latencies are not translated to waiting. This means that your processing must be aware of what data it will need in the future, and take steps to make sure that the data is available when processing gets that far.
For example, most molecular dynamics simulation code I've seen still does the computation and communication in separate steps, keeping the CPU cores idle while communicating data. What a waste.
To those who have read this far: I apologize for yet another overly long post,
Distribution: M$ Windows / Debian / Ubuntu / DSL / many others
Posts: 2,339
Original Poster
Rep:
Quote:
Originally Posted by Nominal Animal
Atomic operations are instantaneous: either fully done, or not done at all. Others will see either the final state, or the original state, but never a mix of the two.
There are only 14 atomic operations you're likely to use: (the cursive one is the equivalent non-atomic operation)
result = __sync_fetch_and_add(&variable, value); result = variable; variable += value;
result = __sync_fetch_and_sub(&variable, value); result = variable; variable -= value;
result = __sync_fetch_and_or(&variable, value); result = variable; variable |= value;
result = __sync_fetch_and_and(&variable, value); result = variable; variable &= value;
result = __sync_fetch_and_xor(&variable, value); result = variable; variable ^= value;
result = __sync_fetch_and_nand(&variable, value); result = variable; variable = ~(variable & value);
result = __sync_add_and_fetch(&variable, value); variable += value; result = variable;
result = __sync_sub_and_fetch(&variable, value); variable -= value; result = variable;
result = __sync_or_and_fetch(&variable, value); variable |= value; result = variable;
result = __sync_and_and_fetch(&variable, value); variable &= value; result = variable;
result = __sync_xor_and_fetch(&variable, value); variable ^= value; result = variable;
result = __sync_nand_and_fetch(&variable, value); variable = ~(variable & value); result = variable;
result = __sync_bool_compare_and_swap(&variable, oldvalue, newvalue); result = (variable == oldvalue); if (result) variable = newvalue;
result = __sync_val_compare_and_swap(&variable, oldvalue, newvalue); result = variable; if (result == oldvalue) variable = newvalue;
GCC accepts any 1-, 2-, 4-, and 8-byte data types for variable, result, and value. (All three must have the same type, of course.)
ICC may require you to cast other types to an integer type of the same size.
The compare and swap operations are especially powerful. For example, if you wish to do an atomic floating-point addition, which normally requires a mutex or other lock, you can use this:
Code:
#include <stdint.h>
/* ANSI C has no "static inline", so use "static" instead. */
#ifdef __STRICT_ANSI__
#define STATIC_INLINE static
#else
#define STATIC_INLINE static inline
#endif
struct word64 {
union {
double d;
uint64_t u;
int64_t i;
} type;
};
STATIC_INLINE double atomic_addd(double *const variable, double const value)
{
struct word64 *const ptr = (struct word64 *)variable;
struct word64 old, tmp;
do {
old.type.i = __sync_fetch_and_or(&(ptr->type.i), (int64_t)0);
tmp.type.d = old.type.d + value;
} while (!__sync_bool_compare_and_swap(&(ptr->type.i), &(old.type.i), &(tmp.type.i)));
return tmp.type.d;
}
The logic is simple: Get the old value atomically. Calculate new value. Atomically replace the old value with the new value, if still the same; otherwise, retry. The union is used to type-pun the double to an integer value of the same size, so that ICC too should be happy. (I only tested this code with GCC, though.)
Individual atomic operations are faster than taking any kind of a lock, but if you do significant work while holding the lock, with nobody waiting for the lock to be released, a lock will be faster than atomic operations. This is because atomic operations do incur a small overhead. If the lock is contended, or there are other threads blocking (waiting) on the lock, atomic operations are likely to save significant wall clock time.
The real benefits start to appear when you use the __thread keyword where necessary, to mark (global) variables per-thread. This is important because a significant drawback of most threaded code is the dreaded cacheline ping-pong: many CPU cores try to access the same area of memory, causing the cache lines to bounce between cores, slowing things down.
If you just convert a multi-process application to threads, and ignore these memory effects, you will notice that the multithreaded version is measurably slower. This is because separate processes have separate memory areas, and do not access each others' memory areas (except if using shared memory), and thus do not suffer from cacheline ping-pong. Interprocess communication overheads -- even if using OpenMPI, I've found -- are much smaller than the delays caused by cacheline ping-pong.
After mastering the above, there is a final, often ignored key paradigm that leads to efficient parallel and distributed code: The spice... uh, the data, must flow. Interleave preparation (especially communication) and CPU work, so that latencies are not translated to waiting. This means that your processing must be aware of what data it will need in the future, and take steps to make sure that the data is available when processing gets that far.
For example, most molecular dynamics simulation code I've seen still does the computation and communication in separate steps, keeping the CPU cores idle while communicating data. What a waste.
To those who have read this far: I apologize for yet another overly long post,
It looks non portable.
I prefer my code to be portable across compilers (among other things) if possible.
Anyway, this thread is marked as solved.
Thank you.
I am off to start a new thread
LinuxQuestions.org is looking for people interested in writing
Editorials, Articles, Reviews, and more. If you'd like to contribute
content, let us know.