how to "empty queue wait" in a multithreaded MPSC

Greetings everybody,

I am looking for a way for a consumer (of a MultiProducer-SingleConsumer queue) to wait with popping from the queue if the latter is emtpy, where the wait process should consume as little CPU-time as possible.

I frequently see something like

while (queue.isEmpty())
  sleep(0)

but I think is quite much brings the CPU down.

I am working on a multicore multithread system where both consumer and producer thread can have different or equal priorities depending on the task they are used for.

Now, I just started with QNX-programming, and afaik it offers conditional variables as well as a sound asynchronous messaging systems. Any idea if either of those helps and if so how I could easily recode the above given sample?

Thanks a lot for help,
AndyO

Hi Andy,

I guess my first question would be why is the consumer popping from the queue if it’s empty?

The simple code example you posted seems to indicate you are merely going to sleep and then checking the queue again. Is there any real reason to pop if the queue is empty (ie some work going to be done) or was this simply designed not to be a 100% CPU polling loop waiting for data in the queue?

If it’s the latter then yes, QNX asynchronous messaging + conditional variable is the way to go.

Note: If your level of real-timeness isn’t very ‘hard’ then you could just use mqueue for your needs.

Tim

Greetings Tim,

thanks for your reply. The application where the queue is used in is a Logger which needs to be as real time and quick as possible, running in a multithread/multicore environment with potentially big amounts of logs. As the consumer is running in a thread on its own waiting for messages to be pushed into the queue it can happen that the queue is empty, thus the consumer cannot do anything until getting notified by one (of potentially many) producer. Hence I need an IDLE-state which is as time saving as possible.

Would you prefer messaging or conditional variables (I think the latter have the problem of spurious wakeups etc…), and do you have any concrete commands I could check?

Thanks a lot,
AndyO

Hi Andy,

From the online doc’s in the major section entitled ‘processes & thread’ in the subsection condition variables.

I’ve modified the comments in original code example slightly to give you a better idea of what you’d do in your app (and incidentally what we do in our app).

/*
 * cp1.c
 */

#include <stdio.h>
#include <pthread.h>

int data_ready = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  condvar = PTHREAD_COND_INITIALIZER;

// This is your logger thread
void *consumer (void *notused)
{
	printf ("In consumer thread...\n");
	while (1)
	{
		pthread_mutex_lock (&mutex);
		while (!data_ready)
		{
			pthread_cond_wait (&condvar, &mutex);
		}
		// process data
		printf("consumer:  got data from producer\n");
		data_ready = 0;
		pthread_cond_signal (&condvar);
		pthread_mutex_unlock (&mutex);

                // Now process/log your data.
	}
}

// This thread simply waits for data from any of your producer threads.
// It was called producer in the example but that's not what it really is in
// terms of your application. It's simply an intermediary thread that waits
// for data from any of your producer threads.
void *producer (void *notused)
{
	printf ("In producer thread...\n");
	while (1)
	{
		// get data from producer threads
		// we'll simulate this with a sleep (1) but would
                // really come from an async MsgReceive() call that
                // all your producer thread send to.
		sleep (1);
		printf ("producer:  got data to log. Add to queue\n");
		pthread_mutex_lock (&mutex);
		while (data_ready)
		{
			pthread_cond_wait (&condvar, &mutex);
		}

                // Note: This queue is 1 deep (data_ready = 0/1).
                // To handle a buildup of messages you need to ensure
                // that you have a queue/list to store multiple messages in
                // case the consumer can't process faster than they can arrive.
                // Adding/removing from that queue needs to take place while 
                // the mutex is unlocked both here and in the consumer.
		data_ready = 1;
		
		pthread_cond_signal (&condvar);
                pthread_mutex_unlock (&mutex);
	}
}

main ()
{
	printf ("Starting consumer/producer example...\n");

        // You'd normally initialize the MsgReceive() here to receive data
        // from your real producer threads.

	// create the producer and consumer threads
	pthread_create (NULL, NULL, producer, NULL);
	pthread_create (NULL, NULL, consumer, NULL);

        // Now you start your real producer threads and have them
        // all connect to the MsgReceive you created above.
	
	// let the threads run for a bit
	sleep (20);
}

Tim

Hi Tim,

I ran into this example before but was unsure because conditional variables seem to induce some problems,

(check here openqnx.com/PNphpBB2-viewtopic-t3338-.html )

or are often used incorrectly

(check here sendreceivereply.wordpress.com/2007/03/13/4/ )

I will give it a shot though!

Andy

Andy,

I wasn’t part of the initial discussion that had problems. They are talking about processes there which require a special initialization to use the Mutex across processes. Perhaps they didn’t do that as all the code wasn’t in the samples.

I also trust you won’t use it incorrectly as in the second example :slight_smile: If you do, you’ll see it quickly enough.

The thing to understand is that a condvar itself is not a mutex/semaphore. That’s why you need a mutex associated with the condvar. The other thing to understand is that you are responsible for correctly managing the data (in the example it’s the data_ready variable, in your final code it will be a queue/list of data to be logged) in the mutex regions so that the read/writing is enclosed within the mutex. Understand those 2 concepts and it’s really quite simple.

What I’d do is copy the example here, compile it and run it. Then gradually add a bit more to it (a 3rd thread, that really sends a message to the producer thread so you get familiar with the MsgSend/MsgReceive calls) to expand the sample program. Then add a simple linked list (a list<> if you are using C++) between the producer/consumer until you get it all working.

Tim

I should have thought before having posted it.
So deleted :slight_smile:

Big nerf one cannot use class member functions in threads…
Regards,
Andy

Andy,

I have no idea what you mean by that statement.

You most certainly can use class member functions in threads. What exactly is it you are trying to do and aren’t able to do.

Tim

Hola Tim,

what cannot be done: using a class member function pointer as a within a thread_create, for instance, as those are callback functions.

So here is the problem I am facing:
I am using the threads within a class (no idea yet how I am gonna implement this, as static wrapper functions are not really my prefered way…). The problem itself: mutex and condvar variables should be, according to the POSIX documentation and to target_nto.h (here the stupid macroes are defined in finally), statically created. However, that won’t work as soon as you instantiate more than one object (in my case loggers), as the processes will definitely interfere.

So I am stuck with this problem right now.

Any experiences?

Thanks a lot for help,
Andy

You can surely use class member fonction in threads !!! You can’t really make pointer to them ( easely ) but that has nothing to do with thread.

Check the Poco library, it has very nice tools to deal with threads. pocoproject.org/. You can even turn a class member fonction into a thread with 3 lines of code.

Hoi Mario,

I was just checking out Poco::Conditional for that reason :slight_smile: Thanks for the hint, though!
And btw, thread functions in POSIX threads can not be member variables, as threads need callback functions, which does not work with member classes due to information lack about the owner. There are workarounds, however, either using static functions or global non-member functions. The latter do sorta destroy encapsulation…

I misunderstood what you meant. That being said you will see that with Poco you CAN have a method that is a thread!!! Check out Poco::ActiveMethod or Poco::Runnable

Thanks mario,

I read about the ActiveMethods and think they mighty be a solution to one of the problems. Do you, by any chance, have an example or a link to one for Poco’s condition variables?

Thanks for help,
Andy

Hi Andy,

I know you don’t want static wrapper functions but here is how I do mine.

class foo
{
public:
static void *receiveMain(void *obj);  // For thread call
private:
void workThread(void);
// Lots of class variables/functions here
}

void *foo::receiveMain(void *obj)
{
    // Cast the void pointer to a foo object
    foo *f = static_cast<foo*>(obj);
	
    // Do the real thread work
    f->workThread();
	
    return NULL;
}

You then pass receiveMain() to the threadcreate call and the ‘this’ pointer as the only entry in the argument list for the spawned thread.

There isn’t anything exposed encapsulation wise other than the static receiveMain() call. If you are really paranoid (which I am not) you could in your workthread() function check to see if you’ve already been called once for each instance of the class and if so, ignore any further calls. That way someone couldn’t accidentally call receiveMain multiple times.

Tim

Hoi Tim and Mario,
thanks for your help. I think I will give Poco::Activity a shot, however I sitll need to see how conditional vars work in Poco. Their side does not give that many examples, and I do exactly know how the cond. vars have to be linked to a mutex.
I will just try and implement it similar to Tim’s sourcecode sample above and hope it works :slight_smile:

Regards,
Andy

Just check the source

Hi again, hi Tim,
I hope you can help me one more time:
My base question is whether on producer side I need an explicit thread (I decided for Poco btw) and especially if I need to lock the mutex and check the conditional variable in the producer (or better push) function above. As there is now problem pushing new elements to my queue while others are popped, I do not see why I need all those things in the producer part.

Thanks in advance for help,
Andy

Hi Andy,

Yes you need an explicit thread on the producer side.

Here is what I assume your system is going to look like:

ProcessA-----|
ProcessB-----|—Logger
ProcessC-----|

In other words you’ll have multiple processes sending data to your logger. I am further assuming that you don’t want ProcessA to wait until the logger completely processes a log message from ProcessB.

To achieve this on the logger you have 2 choices:

  1. Have 1 receive thread per process. That means lots of threads in your logger and sorting out lots of thread interaction.
  2. Have one push(producer)/pull(consumer) thread in your logger. This is the condvar method.

We use option 2 in our system.

To minimize the wait for the sending processes (ProcessA-C) the push(producer) thread needs to basically just receive messages and put them in an internal queue and go back and wait for the next message. The pull(consumer) thread does the work. The condvar is just needed to signal the pull thread there is work to do which was what your original post was about.

OK, now back to the example code I posted far above. In that example code the condvar is being used to signal in BOTH directions. That the queue is empty (for the push thread) AND the queue is non-empty (for the pull thread). You don’t need the empty part because you are implementing a linked list (probably a C++ List class).

So on the push (producer) thread you can eliminate the condvar wait part so that it looks like:

void *producer (void *notused)
{
   printf ("In producer thread...\n");
   while (1)
   {
      // get data from producer threads
      // we'll simulate this with a sleep (1) but would
      // really come from an async MsgReceive() call that
      // all your producer processes send to.
      sleep (1);
      printf ("producer:  got data to log. Add to queue\n");

      // Lock the mutex. Must do this since we are going to manipulate
      // The linked list of data AND activate the condvar.
      pthread_mutex_lock (&mutex);

     // Add the message to the linked list
      data_ready = 1;

     // Signal the push(consumer) thread the linked list has at least 1 item      
      pthread_cond_signal (&condvar);

      // Unlock the mutex so the push(consumer) thread can access the linked list
      pthread_mutex_unlock (&mutex);
   }
} 

This is exactly what my push(producer) thread looks like.

Note: In the pull(consumer) thread you no longer need the
pthread_cond_signal (&condvar);
line because you aren’t signaling back to the push(producer) thread to indicate empty list because the push(producer) thread doesn’t care.

Does that make a bit more sense?

Tim