LinuxQuestions.org
Welcome to the most active Linux Forum on the web.
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 12-23-2015, 12:35 AM   #1
Aquarius_Girl
Senior Member
 
Registered: Dec 2008
Posts: 4,731
Blog Entries: 29

Rep: Reputation: 940Reputation: 940Reputation: 940Reputation: 940Reputation: 940Reputation: 940Reputation: 940Reputation: 940
Thumbs up Threads checking for space two times in queues VS keeping the queues in sorted order


I request a review on the way multiple producers and
consumers access multiple shared queues.

What I have done here requires the threads to check for
the space in queues minimum two times.

Of course, I can create another queue where these
queues will be kept in the decreasing order w.r.t space.

That way the check will be eliminated since only those
queues which have some space will be on that queue.

My question is - Is all this really worth the effort?

Will it really save a significant time as compared
to the current method?


Qt libraries have been used because I wrote this
program in QtCreator. Ignore them.


Code:
#include "mainwindow.h"
#include <QApplication>

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>

pthread_mutex_t mutexVariable     = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  conditionVariable = PTHREAD_COND_INITIALIZER;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueueA;
QList <int> sharedQueueB;

/*
 * Shared queues are supposed to be shared among four threads. Two producer, and two consumer threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

void checkForSpaceAndPush (QList <int> & argList, int listId, pthread_t argTId)
{
    if (argList.length () < 10)
    {
        std::cerr << "\nQueue " << listId << ", First check by Producer: " << argTId;
        pthread_mutex_lock (&mutexVariable);
        std::cerr << "\n\nQueue " << listId << ", Locked by Producer: " << argTId;

        if (argList.length () < 10)
        {
            argList.push_back (1); std::cerr << "\nPushed by Producer " << argTId << ": " << "Length of queue " << listId << " is: " << argList.length ();
        }
        else
        {
            std::cerr << "\nProducer " << argTId << ". Queue " << listId << " is full. Length of queue is: " << argList.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        }

        std::cerr << "\n\nQueue " << listId << ", UnLocked by Producer: " << argTId;
        pthread_mutex_unlock (&mutexVariable);
    }
}

void checkForSpaceAndPop (QList <int> & argList, int listId, pthread_t argTId)
{
    if (argList.length () > 0)
    {
        std::cerr << "\nQueue " << listId << ", First check by Consumer: " << argTId;
        pthread_mutex_lock (&mutexVariable);
        std::cerr << "\n\nQueue " << listId << ", Locked by Consumer: " << argTId;

        if (argList.length () > 0)
        {
            argList.pop_front (); std::cerr << "\nRemoved by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
        }
        else
        {
            pthread_cond_signal (&conditionVariable); std::cerr << "\nSignal issued by Consumer: " << argTId << ", Length of queue " << listId << " is: " << argList.length ();
        }

        std::cerr << "\n\nQueue " << listId << ", UnLocked by Consumer: " << argTId;
        pthread_mutex_unlock (&mutexVariable);
    }
}

//  This function is run by the `Producer` threads.
void *producerThreadFunction (void *arg)
{
    Q_UNUSED (arg);

    while (1)
    {
        pthread_t tId = pthread_self(); std::cerr << "\nProducers: " << tId; std::cerr.flush();
        checkForSpaceAndPush (sharedQueueA, 1, tId);
        checkForSpaceAndPush (sharedQueueB, 2, tId);
    }

    return NULL;
}

//  This function is run by the `Consumer` threads.
void *consumerThreadFunction (void *arg)
{
    Q_UNUSED (arg);

    while (1)
    {
        pthread_t tId = pthread_self (); std::cerr << "\nConsumer: " << tId; std::cerr.flush();
        checkForSpaceAndPop (sharedQueueA, 1, tId);
        checkForSpaceAndPop (sharedQueueB, 2, tId);
    }
    return NULL;
}

int main (int argc, char *argv[])
{
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // Producer threads creation
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    }

    if (pthread_create (&producerB, NULL, producerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Producer B\n");
        return 1;
    }

    // Consumer threads creation
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    }

    if (pthread_create (&consumerB, NULL, consumerThreadFunction, NULL)) {
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    }

    // Joining every thread
    if (pthread_join (producerA, NULL)) {
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    }

    if (pthread_join (producerB, NULL)) {
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    }

    if (pthread_join (consumerB, NULL)) {
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    }

    if (pthread_join (consumerA, NULL)) {
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    }

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();
}

Last edited by Aquarius_Girl; 12-23-2015 at 12:36 AM.
 
Old 12-23-2015, 08:05 PM   #2
millgates
Member
 
Registered: Feb 2009
Location: 192.168.x.x
Distribution: Slackware
Posts: 852

Rep: Reputation: 389Reputation: 389Reputation: 389Reputation: 389
I'm not sure I understand entirely what you're trying to achieve.
Why do you have multiple queues? Are they supposed to store different kind of data?
Do the individual threads need to access BOTH (all) queues or ANY of them?
Why are you using the same mutex for both queues?
How exactly do you intend to keep the queues sorted (thread-safely)?
Personally, I wouldn't worry about that one extra check, but I might have missed something.
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Sys V Message Queues From Kernel Space jhwilliams Programming 1 08-29-2011 12:27 AM
POSIX Queues with message type facility of System V queues ehsan_haq98@yahoo.com Programming 2 12-27-2009 11:45 AM
POSIX message queues(Solaris) to SYS V message queues(Linux) devershetty Programming 1 01-22-2007 10:15 AM
Message queues for threads estratos Programming 2 12-19-2006 01:49 PM
Do the threads lock on Meesage queues before writing to it? asarkar Programming 2 09-22-2006 05:06 PM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 01:47 PM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration