LinuxQuestions.org
Visit Jeremy's Blog.
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 03-16-2011, 01:57 AM   #1
saman_artorious
Member
 
Registered: Sep 2008
Posts: 78

Rep: Reputation: 1
multi-threading


In this piece i implemented the gossip method. The first thread is invoked from inside the
Quote:
Main()
(msg is first sent from node -1 to 0 from main()) and the other threads are invoked from inside of the thread function itself. I used two mutexes and a condition variable to control the synchronization.
One mutex is declared inside of the node structure to lock any structure properties. The second mutex, is used to lock an array of node structure. The thread function description is as follows:

Quote:
When thread function is called, it sets the properties of the node according to the parameter passed to it. like node id which sent the msg to the current node, msg content, msg_received flag ...,
Before the new node starts searching for its neighbours and send them the msg (call thread) if they haven't yet received it, the node first lock the condition variable and checks whether the message has already been sent to all neighbours of the sender's neighbours or not. This way, i make sure redundant threads are not called.

Code:

#include<iostream>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<list>
#include<pthread.h>
#include<cstring>
using namespace std;

static int round;
#define checkResults(string, val){	\
   if(val){	\
       printf("failed with %d at %s", val, string);	\
       exit(1);	\
   }	\
}	\

#define _NODES 8

typedef struct{      // thread parameter
  int id;
  int rcvd_from;
  string msg;
}thread_param_t;

struct node          // node structure 
{
   pthread_mutex_t f_lock;
   thread_param_t  packet;
   vector<int> edges;
   bool msg_received;
} nodes[_NODES];

pthread_t callThd[_NODES];    //keeps track of all thread IDs
pthread_mutex_t list_lock;    //locks the array nodes[], which stores node structures
pthread_mutex_t path_mutex;   //locks the path array--we don't need it as we always read the path
pthread_cond_t  next_round_ready = PTHREAD_COND_INITIALIZER;  //statically initializing a condition variable-- it assures the message is sent
// to all other sender's neighbours before one of the neighbours tries to find it's surrounding neighbours


bool msg_sent_to_all(int , int );  // it looks whether the message is sent to all sender's surrounding neighbours

//---------------
int path[_NODES][_NODES] = {
      { 0, 1, 1, 1, 0, 0, 0, 0 },
      { 1, 0, 0, 1, 1, 0, 0, 0 },
      { 1, 0, 0, 1, 0, 1, 0, 0 },
      { 1, 1, 1, 0, 1, 0, 0, 0 },
      { 0, 1, 0, 1, 0, 1, 0, 1 },
      { 0, 0, 1, 0, 1, 0, 1, 1 },
      { 0, 0, 0, 0, 0, 1, 0, 1 },
      { 0, 0, 0, 0, 1, 1, 1, 0 }
                                 }; // keeps the sparse graph
//---------------

void *funcThd(void *arg)   //Thread function, the argument is the structure defined above (thread_param..)
{
   cout << "void *funcThd(void *arg)\n";

   pthread_attr_t attr;
   string message;
   int index, rc, received_from, neighbours_size;
   pthread_t tid;

  rc = pthread_attr_init(&attr);
 	 checkResults("pthread_attr_init()\n", rc);
  rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // setting detached as our thread attributes, this way none of the calling threads
//expect the caller thread to wait for them-- so, no need for pthread_join(..)
  	checkResults("pthread_attr_setdetachstate()\n", rc);

   thread_param_t *p = (thread_param_t *) arg;  //store the message properties temporarily
               index = p -> id;	
             message = p -> msg;
       received_from = p -> rcvd_from;

   printf("Thread %d Activated by %d\n", index, received_from);
  //pthread_mutex_lock(&path_mutex); we don need to lock the path as we always read it

  pthread_mutex_lock(&list_lock);           //lock the list of nodes
  pthread_mutex_lock(&nodes[index].f_lock); //lock the particular node in which the message has been sent to, this lock exists in every node structure
  
  nodes[index].msg_received = true;      //node got the message
  nodes[index].packet.id    = index;     //index of the node
  nodes[index].packet.msg   = message;   //message content
  neighbours_size = nodes[received_from].edges.size();   //number of neighbours staying adjacent to the node 

  printf("Message Delivered to %d from %d : ", index, received_from); 
  cout << message << endl;

 //for optimality, for each round the calling threads are not gonna start finding their neighbouring nodes BEFORE the message is sent to all
//the neighbours of the calling threads, for example is message is sent from 0 to 1, 1 is not gonna send the message to its neighbours 1, 3, 4
 // before 0 sends the message to all its neighbours first 1, 3, 5 -- so, we wouldn't have redundant threads

  while(msg_sent_to_all(received_from, neighbours_size) == false && received_from != 0)
	pthread_cond_wait(&next_round_ready, &list_lock);   
//the condition variable is locked by the node list mutex, it will then automaticly get unlocked whenever the condition satisfies
// that is when the message is sent to all the neighbours     


     for(int i = 0; i < _NODES; i++)
     {
        printf("loop %d\n", i);

         if(path[index][i])
         {
             nodes[index].edges.push_back(i);  //finding neighbours from path matrix and push them back to the nodes edge property

                if(nodes[i].msg_received == false)  // if the msg is not sent to the node, then try sending it
                {
                   p -> id = i; 
                   p -> msg = message;
                   p -> rcvd_from = index;
                   
                   printf("Sending Message to %d from %d \n", i, index);                 
                   pthread_create(&callThd[i], &attr, funcThd, (void *) p); // calling the thread
                   cout << "\n\n";
                }
         }
     }//end of for
     
  pthread_mutex_unlock(&nodes[index].f_lock);  //unlocking, node structure lock
  pthread_mutex_unlock(&list_lock);            //unlocking node list lock
  //pthread_mutex_unlock(&path_mutex);
  
  pthread_exit(NULL);

}
//-----------------
bool msg_sent_to_all(int index, int neighbours_size)  
{
   int temp;

   cout << "msg_send_to_all()..\n";   
   //pthread_mutex_lock(&list_lock);
   //pthread_mutex_lock(&nodes[index].f_lock);   
   printf("Node ID : %d  No. of neighbours : %d\n", index, neighbours_size);    

     for(int i = 0; i < neighbours_size; i++)
     {
           temp = nodes[index].edges[i];   cout << "temp =  " << temp;
           	if(nodes[temp].msg_received == false)      cout << "--> " << nodes[temp].msg_received << endl;
                cout << "returning false..\n";
              	return false; 
     }
   //pthread_mutex_unlock(&list_lock);
   //pthread_mutex_unlock(&nodes[index].f_lock);
   cout << "returning true..\n";
   return true;
}
//-----------------

int main(int argc, char **argv)
{
  thread_param_t	param;
  pthread_attr_t	attr;
  pthread_t		thread;  
  int 			rc = 0, detachstate;

  //START-->initializing mutexs
   if(!pthread_mutex_init(&list_lock, NULL))
        	cout << "mutex_nodes initialized..\n";

   if(!pthread_mutex_init(&path_mutex, NULL))
		cout << "path_mutex initialized..\n";

   	for(int index = 0; index < _NODES; index++)
       		if(!pthread_mutex_init(&nodes[index].f_lock, NULL))
                	printf("nodes[%d].f_lock initialized..\n", index);
  //END-->initializing mutex

  //START-->setting thread attributes

  // create a default thread attributes object
   rc = pthread_attr_init(&attr);
   checkResults("pthread_attr_init()\n", rc);
   //set the detach state thread attribute
   rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   checkResults("pthread_attr_setdetachstate()\n", rc);
          
   //END-->setting thread attributes
  
  //Initializing the source node -- node 0

    param.id = 0;
    param.rcvd_from = -1;
    param.msg = "Purple";
    
    nodes[0].msg_received = true;
    pthread_create(&callThd[0], &attr, funcThd, (void *) &param); // starting the first round ;-)
    sleep(10);

    /*    
	for(int i = 0; i < _NODES; i++)
	{
    		cout << nodes[i].msg_received << " " << nodes[i].packet.id << " " << nodes[i].packet.msg << endl;
    		int cap = nodes[i].edges.size();
    				for(int j = 0;  j < cap; j++)
    				{
        				cout << nodes[0].edges[j] << "  ";
    				}
  		cout << "\n";
	}
*/	
   cout << "Exiting Main()..\n"; 
   pthread_exit(NULL);

  return 0;
}

//-------------------

The output of the code is as following:

Code:

user@user-desktop:~/Desktop/gossip today$ ./a.out
mutex_nodes initialized..
path_mutex initialized..
nodes[0].f_lock initialized..
nodes[1].f_lock initialized..
nodes[2].f_lock initialized..
nodes[3].f_lock initialized..
nodes[4].f_lock initialized..
nodes[5].f_lock initialized..
nodes[6].f_lock initialized..
nodes[7].f_lock initialized..
void *funcThd(void *arg)
Thread 0 Activated by -1
Message Delivered to 0 from -1 : Purple
msg_send_to_all()..
Node ID : -1  No. of neighbours : 0
returning true..
loop 0
loop 1
Sending Message to 1 from 0 


loop 2
Sending Message to 2 from 0 
void *funcThd(void *arg)
Thread 2 Activated by 0


void *funcThd(void *arg)
Thread 2 Activated by 0
loop 3
Sending Message to 3 from 0 
void *funcThd(void *arg)
Thread 3 Activated by 0


loop 4
loop 5
loop 6
loop 7
Message Delivered to 2 from 0 : Purple
msg_send_to_all()..
Node ID : 0  No. of neighbours : 3
temp =  1--> 0
returning false..
loop 0
loop 1
loop 2
loop 3
Sending Message to 3 from 2 
void *funcThd(void *arg)
Thread 3 Activated by 2


loop 4
loop 5
Sending Message to 5 from 2 
void *funcThd(void *arg)
Thread 5 Activated by 2


loop 6
loop 7
Message Delivered to 2 from 0 : Purple
msg_send_to_all()..
Node ID : 0  No. of neighbours : 3
temp =  1--> 0
returning false..
loop 0
loop 1
loop 2
loop 3
Sending Message to 3 from 2 
void *funcThd(void *arg)
Thread 3 Activated by 2


loop 4
loop 5
Sending Message to 5 from 2 
void *funcThd(void *arg)
Thread 5 Activated by 2


loop 6
loop 7
Message Delivered to 3 from 0 : Purple
msg_send_to_all()..
Node ID : 0  No. of neighbours : 3
temp =  1--> 0
returning false..
loop 0
loop 1
Sending Message to 1 from 3 
void *funcThd(void *arg)
Thread 1 Activated by 3


loop 2
loop 3
loop 4
Sending Message to 4 from 3 
void *funcThd(void *arg)
Thread 4 Activated by 3


loop 5
loop 6
loop 7
Message Delivered to 3 from 2 : Purple
msg_send_to_all()..
Node ID : 2  No. of neighbours : 6
temp =  0returning false..
Message Delivered to 5 from 2 : Purple
msg_send_to_all()..
Node ID : 2  No. of neighbours : 6
temp =  0returning false..
Exiting Main()..


Quote:
The problem in the output is that the same thread is executed a few times, like maybe thread 3 executes 3 times, and the node is not able to find its neighbours !!!!
Any helps would be appreciable

Last edited by saman_artorious; 03-17-2011 at 01:59 AM.
 
Old 03-16-2011, 08:42 AM   #2
dugan
LQ Guru
 
Registered: Nov 2003
Location: Canada
Distribution: distro hopper
Posts: 11,235

Rep: Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320Reputation: 5320
Why did you use such a ridiculously small and unreadable font? A code fragment inside a quote is not of that size by default.

Quote:
Code:
sample code
I suggest that people would be more likely to help you if they could read what you wrote.

Last edited by dugan; 03-16-2011 at 08:45 AM.
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Multi threading in C pramod.srk Programming 2 03-01-2010 01:51 AM
Multi threading greplinux Linux - Newbie 2 12-29-2008 04:25 PM
multi threading slower than single threading on dual core. why? nebojsa.andjelkovic Programming 13 01-30-2007 09:56 PM
problems about multi-threading and/or multi-processing with tcp network in java ... alred Programming 5 06-23-2006 03:21 PM
Multi threading Mohsen Programming 5 03-01-2003 11:13 PM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 11:07 PM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration