Expanding Data Processing Bandwidth — Automatically

Well-written software can typically deal with any performance requirement pretty easily as long as the requirement is constant. It’s when requirements change over time that things can get dicey. For example, if your test system generates a new data packet to process every second and it consistently takes 5 seconds for the data to be processed, a little simple math will tell you how much processing bandwidth you need to create to keep up with the flow of data. But how are you to properly size things when variability is inserted into the process? What if the time between data packets can vary between 100 msec and several minutes? Or what if the data processing time can change dramatically due to things like network traffic?

These are the kind of situations where the processing needs to be more than simply “flexible”, it has to be able to automatically maintain its own operation and reconfigure itself on the fly. To demonstrate one possible implementation of this “advanced” technique, we will build on the simple pieces that we have learned in the past. In some ways, good software design techniques are like Lego blocks. Each one by itself is not very impressive, but when you stick them together, magic happens. But before we can stick anything together, we need to understand…

…what we’re going to do.

You’ll notice that any of the bandwidth management challenges that I mentioned earlier can be addressed by either adding more data processing clones, or removing existing ones that are being underutilized. Consequently, the question of how to implement this self-maintenance functionality really gets down to a matter of how to dynamically manage the number of data processing clones that are currently available — which in turn boils down to answering two very simple questions:

1. How do we know we need more?

Given that the whole point of the exercise is to manage a queue, the current state of that queue will give us all the information that we need to answer this question. Specifically, we can know when more processing bandwidth is needed by monitoring how many items are currently in the queue waiting processing. When the code starts to see the depth going steadily up, it can launch additional processes to handle the data backlog. Of course, this functionality assumes that there is a process that is constantly monitoring the queue and managing that aspect of its operation — which we actually have already in the test code from last week (Data Processing Queue Handler.vi). All we have to do is repurpose this VI to be a permanent part of the final application.

2. How do we know a clone is no longer needed?

The one part of the system that knows whether a clone is being under-utilized is, in fact, the clone itself. As a part of its normal operation, it knows and can keep track of how often is it being used. Having said that, there are (at least) a couple of ways to quantify how much a clone is being utilized. We could, for instance, consider how much time the clone is spending processing data versus how much time it spends waiting to receive data to process. If the utilization percentage drops below a given limit, the clone could then shut itself down. However, for this demonstration, I’m going to use a much simpler criteria that, quite frankly, works pretty well. The code will simply keep track of how many times in a row it goes to the queue and doesn’t find any data.

Code Modifications

Before I start describing the changes that will we will need to make in order to fashion this new ability, I want to consider for a moment the thing that won’t have to change: Queue Test.vi. You might be tempted to say, “Well big deal. All it does is stuff some data into the queue every so often. Who cares if it doesn’t have to change? It’s not even deliverable code”

While that is undoubtedly true, the fact of the matter is that this test routine is important, but not because of what it is or what it does. Rather we care about Queue Test.vi because in our little test environment, it represents the rest of our application — or at least that part of it that is generating data. Consequently, the fact that it doesn’t need modification means that your main application, likewise, won’t need modification if you decide to upgrade from a data processing environment that uses a fixed number of data processors to one that dynamically manages itself.

Data Processing Queue Handler.vi

First, note that previously this routine’s primary job was to simply report how deep the data queue was — a bit of functionality that would likely have not been needed in a real application. Now however, this routine is going to be taking an active part in the process, so I started the modifications by adding the error reporting VI that will transfer errors it generates to the exception handler.

New Queue Handler Timeout Case

In addition, because the software will initially only start a single data processing clone, I also modified the timeout event handler that performs the VI’s initialization, by removing the loop around the clone launching subVI.

The remainder of the modifications to this routine occurs in the event handler for the Check Queue Size UDE. Previously this event only reported how deep the queue was getting. While it still performs that function, that queue depth information in addition now drives the logic that determines whether we have enough processing bandwidth online.

New Queue Handler Queue Size Check Case

Note that the queue depth is compared to a new configuration value called Max Queue Size that defines how large the queue can grow before a new data processing clone is launched. Regardless of whether it launches a new clone, event handler calls another new subVI that returns the number clones that are currently running. As you will see in a moment, one of the modifications to the data processing VI is the addition of logic that keeps track of the names of the clones that are running. The subVI that we are calling here returns a count of the number of names that have been recorded so far.

Data Processor.vi

Turning now the data processing code itself, the first stop is in the state-machine’s Initialize state. Here we have all the same logic that existed before, but with a couple minor additions

New Data Processor Initialize

First, there is a new subVI that registers a clone is starting up. This subVI writes the clone’s name to the FGV that is maintaining the clone count. Second, there is also a new shift register carrying a cluster of internal data that clone will need to do its work. All that is needed during initialization is to set a timestamp value. The Check for Data state is next and it has likewise seen some tweaks — the most significant of which is moving the logic for responding to the dequeue operation into a subVI.

New Data Processor Check for Data

The justification for this move lies in the fact that this logic is now also responsible for determining whether or not the clone is being adequately utilized. As I stated before, each time the clone goes to the queue and comes up empty, the logic will increment a counter that is being carried in the new shift register’s data. If this count exceeds a new configuration value Clone Idle Count, the code will branch to a new state that will shutdown the clone. Likewise, anytime the clone does get data to process, it will reset the count to 0. The changes to the Process Data state, which comes next, are pretty trivial.

New Data Processor Process Data

All that happens here is that the timestamp extracted from the data to be “analyzed” updates the new cluster data — as well as the indicator on the front panel. Finally, there is the new state: Self Shutdown

New Data Processor Self Shutdown

…which simply calls a subVI that removes the clone’s name from FGV maintaining a list of all running clones, and stops the event loop.

Let’s talk about “Race Conditions”

All we have left to do now is test this work and see the differences that it makes, but before we can do that, we need to have a short conversation about race conditions. Very often developers and instructors (myself included) will talk about the necessity of avoiding race conditions. The dirty little secret is that as long as you have multiple things happening in parallel, race conditions will always be present. The real point that these admonitions attempt to make is that you should avoid the race conditions that are unrecognized and potentially problematic.

I bring this point up because as you do the following testing, you may get the chance to see this concept in action. The way it will appear is that the system will launch a new clone immediately after one kills itself off for being underutilized. The reason for this apparent logical lapse is that a race condition exists between the part of the code that is checking to see if another clone is needed and the several places where the clones are deciding whether or not they are being used. There are two causes for this race condition, one we can ameliorate a bit and one over which we have no possible control.

Starting with the cause we can’t control, a simple immutable law of nature is that no matter how sophisticated our logic or algorithms might be, they can not see so much as a nanosecond into the future. Consequently, the first source of a race condition is that when the queue checking logic sees that there are three elements enqueued, it has no way of knowing that a currently active clone will be available in a few milliseconds. While it is true that under certain circumstances it might be possible to provide this logic with a bit of “foresight”, there is no generalized solution to this aspect of the problem. Consequently, this is an issue that we may just have to live with.

The news, however, is better for the second cause. Here the problem is that with all the clones having the same timeout between data checks, it is probable that sooner or later one of the clones is going to become “synchronized” with the others such that it is always checking the queue just after it was emptied by one of the others. However the solution to this problem lies in its very definition. The cure is to see to it that the clones do not have constant timeouts from one check to the next. To implement this concept in our test code I modified the routine that returns the delay to add a small random difference that changes each time it’s called.

The bottom line is that while completely removing all race conditions is not possible, they can be managed such that their impacts are minimized.

New Tests for New Code

The testing of the modified code starts the same as it did before: open and launch Data Processing Queue Handler.vi and Queue Test.vi. The first difference that you will notice is that only 1 clone is initially launched, but at the default data rate, 1 clone is more than adequate.

Now decrease the delay between data packets to 2 seconds. Here the queue depth will bounce around a bit but the clone count will stabilize at 3 or 4.

Finally, take the delay all the way down to 1 second. Initially the clone count may shoot up to 8 or 9, but on my system the clone count eventually settled down to 6 or 7.

At this point, you can begin increasing the delay again and the slowly the clones will start dropping out from disuse. Before you shutdown the test, however, you might want to set the delay back to 2 seconds and leave the code running while you go about whatever else you have to do today. It could be instructive to notice how other things you are doing on the same computer effect the queue operation. You might also want to rerun the test but start Queue Test.vi first and let it run for a minute or so before you startData Processing Queue Handler.vi — just to see what happens.

Further Enhancements?

So we have our basic scalable system completed, but are there things we could still do to improve its operation? Of course. For example, we know that due to timing issues which we can only partially control, the number of clones that is running at one time can vary a bit, even if the data rate is constant. One thing that could be done to improve efficiency would be to change the way clones are handled. For example, right now a data processor is either in memory and running or it is closed. One thing you could do is create a new state that a clone could be in — like loaded into memory, but inactive. You could implement this zombie state by setting the timeout to 0, thus effectively turning off the state machine.

It might also be helpful to change to queue depth limit at which a new clone is created by making it softer. Instead of launching a new clone anytime the queue depth exceeds 3, it might be useful in some situations to maintain a running average and only create a new clone if the average queue depth over the last N checks is greater than 3.

Who knows? Some of you might think of still other modifications and enhancements. The point is to experiment and see what works best for your specific application.

Parallel Data Processing — Release 2

The Big Tease

So what is in store for next time? Well in the past we have discussed how to dynamically launch and use VIs that run as separate processes. But what if the code you want to access dynamically like this happens to exist in a process that you have already compiled into a standalone application? If this application is working you don’t want to risk breaking something by modifying it. As it turns out there are ways to manage and reuse that code as well, even if it was created in an older version of LabVIEW. Next time we’ll start exploring how to do it.

Until Next Time…

Mike…

Leave a Reply