I request a review on the way multiple producers and
consumers access multiple shared queues.
What I have done here requires the threads to check for
the space in queues minimum two times.
Of course, I can create another queue where these
queues will be kept in the decreasing order w.r.t space.
That way the check will be eliminated since only those
queues which have some space will be on that queue.
My question is - Is all this really worth the effort?
Will it really save a significant time as compared
to the current method?
Qt libraries have been used because I wrote this
program in QtCreator. Ignore them.
Code:
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueueA;
QList <int> sharedQueueB;
/*
* Shared queues are supposed to be shared among four threads. Two producer, and two consumer threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
void checkForSpaceAndPush (QList <int> & argList, int listId, pthread_t argTId)
{
if (argList.length () < 10)
{
std::cerr << "\nQueue " << listId << ", First check by Producer: " << argTId;
pthread_mutex_lock (&mutexVariable);
std::cerr << "\n\nQueue " << listId << ", Locked by Producer: " << argTId;
if (argList.length () < 10)
{
argList.push_back (1); std::cerr << "\nPushed by Producer " << argTId << ": " << "Length of queue " << listId << " is: " << argList.length ();
}
else
{
std::cerr << "\nProducer " << argTId << ". Queue " << listId << " is full. Length of queue is: " << argList.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
std::cerr << "\n\nQueue " << listId << ", UnLocked by Producer: " << argTId;
pthread_mutex_unlock (&mutexVariable);
}
}
void checkForSpaceAndPop (QList <int> & argList, int listId, pthread_t argTId)
{
if (argList.length () > 0)
{
std::cerr << "\nQueue " << listId << ", First check by Consumer: " << argTId;
pthread_mutex_lock (&mutexVariable);
std::cerr << "\n\nQueue " << listId << ", Locked by Consumer: " << argTId;
if (argList.length () > 0)
{
argList.pop_front (); std::cerr << "\nRemoved by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
}
else
{
pthread_cond_signal (&conditionVariable); std::cerr << "\nSignal issued by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
}
std::cerr << "\n\nQueue " << listId << ", UnLocked by Consumer: " << argTId;
pthread_mutex_unlock (&mutexVariable);
}
}
// This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg)
{
Q_UNUSED (arg);
while (1)
{
pthread_t tId = pthread_self(); std::cerr << "\nProducers: " << tId; std::cerr.flush();
checkForSpaceAndPush (sharedQueueA, 1, tId);
checkForSpaceAndPush (sharedQueueB, 2, tId);
}
return NULL;
}
// This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg)
{
Q_UNUSED (arg);
while (1)
{
pthread_t tId = pthread_self (); std::cerr << "\nConsumer: " << tId; std::cerr.flush();
checkForSpaceAndPop (sharedQueueA, 1, tId);
checkForSpaceAndPop (sharedQueueB, 2, tId);
}
return NULL;
}
int main (int argc, char *argv[])
{
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// Producer threads creation
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer B\n");
return 1;
}
// Consumer threads creation
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
// Joining every thread
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}