On the issue of buffers (ring)

    “If the costs of developing an architecture seem excessive to you, think how much the wrong architecture can do for you”


    - I can not exactly remember the source

    Once, “long time ago, in one distant galaxy,” I purchased Charles Wetherell’s remarkable study, Etudes for Programmers, in which the author justified the need to study teaching examples and tasks before starting independent programming. I strongly recommend that you find this book, read the preface (and without stopping at this, read the rest of the book and solve the problems listed in it), since I will not be able to substantiate the need for such a practice. Even if you follow my recommendation, and get a lot of knowledge and practical skills in reading this book, you can come back and read this post, since it is devoted to several other issues. And if you do not follow my recommendations, then all the more you should enter under cat.

    Not so long ago, in a post in which I scolded I expressed my opinion about one domestic RTOS, I mentioned that the implementation of the ring buffer in the well-known (and in some aspects perfectly brilliant) mcucpp library cannot be considered ideal. I will try to explain my point of view and present the ideal (as far as possible in the real world) implementation. Note - the text offered to your attention lay in the "unfinished" for quite a long time, and then this opportunity turned up.

    We continue to develop a library for working with a peripheral device, and we have memory management and buffering in the queue (yes, we still continue preparatory operations, but without them in any way). Where does the need for buffer organization come from and what kind of animal is it? The fact is that a significant part of the periphery has a limited speed and the transfer process, being launched in one way or another, takes a certain, and sometimes quite significant compared to creating a new piece of information for transmission, time. Of course, that before the expiration of this time, the next transfer cannot be completed and, accordingly, cannot be started.

    We have a classic case of a pair of "writer-reader" with different speeds of work. It is simply impossible to solve this problem in general terms, since “for arbitrarily small, but not zero, exceeding the flow of requests over the service flow, the queue size tends to infinity,” and infinity is not realizable in principle. But a special case of the problem, when we have local bursts of requests, but on average the service flow is able to cope with the load, the buffer memory of sufficient capacity can be solved. Let's pay attention to the phrase “sufficient capacity”, we will later learn to count it, while we are satisfied with the fact that this is possible in principle.

    Whether the availability of buffer memory is an absolute requirement is of course not. For the transmitted information, you can use a blocking record, but from the received one it is somewhat worse, you have to add it somewhere before processing, if you do not take appropriate measures in the top-level protocol (the magic expression xon / xoff was not born from scratch), which is not always realizable and, in any case, usually leads to a significant limitation of the transmission rate. There is also a hardware implementation of internal buffers in peripheral devices (at least for one element), but this is not always done and the buffer size is strictly limited from above.

    Therefore, we will still implement the software buffer, for which it would be natural to apply the FIFO method (that is, the queue) to organize such a buffer, and the queue, in turn, is best implemented on a circular buffer with two pointers. When I write “best of all”, this does not mean at all that other implementations (for example, a reference queue) are impossible, or they have unavoidable shortcomings besides fatal ones. This expression merely means that the implementation will not be too complicated and effective enough, although others may have indisputable advantages over it, for which they will have to pay with something, because Darseness.

    Since it is highly unlikely that the MK model you use had a hardware implementation of such a general-purpose device (individual peripheral modules may have their own ring buffers, but they have no relation to the topic of this post), we will have to create a ring buffer in linear memory (implement on vector, it is, in general, the only natural object in the addressable memory), and this will require a buffer index (or maybe even two indices, but more on that later). In my opinion, a circular buffer with two pointers (indices) is the only acceptable way to implement a queue on a vector, but there are different points of view on this question and I saw with my own eyes the implementation in the “x1 = x2” style; x2 = x3; ... x8 = new symbol ”, if I may, I will not consider such exotics. That

    Consider the correct implementation of the program module of the organization of the pointer and to begin with, pay attention to the first word in the definition. The difference between the correct code and the wrong one is not only that the correct code contains no errors, although this is an absolute requirement. Even a code that fully performs its functions may be incorrect if it is incomprehensible, or if there is an option that is no less clear, but runs faster, or is executed just as quickly, but is clearer written, therefore the concept of correctness is somewhat relative. We continue our consideration of our example buffer implementation, which will allow us to demonstrate the difference between different degrees of correctness.

    Before getting to the point, one important note about the following statement. I mean that your compiler is always on a non-zero (-O2) level of optimization, so we can stop thinking about minor improvements like 1) prefix modification against postfix, or 2) using the results of the previous operation, or 3) the difference between increment and addition units and so on - we assume that the compiler will do a lot for us. Of course, this is not a strict assumption, but otherwise we will have to sink into the bowels of the assembler, which in our time is not a mainstream.

    Let me remind you that we were instructed to implement the index (pointer) of the ring buffer, that is, we need to create a variable behavior thatsequentially runs through a series of values, from some initial to some final . Immediately assume that the initial value will be zero, otherwise we will immediately have to write a more or less correct code, and this contradicts the educational goals and we are not in a hurry, and the final is equal to Max.

    This behavior of the variable can be implemented using the following construction:

    volatileint Counter = 0;
    Counter = (++Counter) % (Max+1);

    and it is this code that we can see in the set (that is, quite often) cases. What is wrong here? Well, firstly, for some time (from performing the increment operation to assigning the result) our variable will be more than the maximum allowable value and if it is at this point that an interruption occurs that needs to take into account the value of this variable, then I personally predict the results do not dare. Therefore, we will rewrite the program:

    int Counter=0;
    Counter = (Counter + 1) % (Max + 1);

    We have eliminated one irregularity, and the code (hereinafter, under the word “code” we have in mind the executable code generated by the compiler) has not become longer and is executed no longer (in fact, it executes faster, but only because in the first version the word volatile is used in a completely unnecessary way in this case, and it has not become less clear (rather, more understandable, but this is a matter of taste).

    Necessary note about volatile - this directive is necessary if we want to avoid optimizing the code that leads to incorrect execution, and it is in this particular case (when outside the module’s visibility the value of the variable does not change and there are no consecutive entries in the scope) it (the directive ) completely unnecessary. I strongly recommend that you see the generated code for both options on godbolt.org. Why not abuse the volatile directive, as opposed to the static keyword, which is recommended to be used wherever possible. Well, firstly, we prohibit optimization, that is, the code definitely will not be faster (most likely, it will become larger and slower, but we prefer strict wording). And secondly, in this particular case, this word is misleading, since in relation to our program the value of the counter can in no way change beyond our control. In the program that reads its value - that is, in the implementation of the ring buffer, you can consider the counter to be modified outside the module, and there it is in question, therefore here this attribute is simply not applicable to the counter. If one variable should be interpreted differently in different modules, to our services of association, but if we are talking about the organization of the critical section, for example, when implementing a transaction or atomic operations, then here this directive does not give anything at all. therefore, this attribute is simply not applicable to the counter. If one variable should be interpreted differently in different modules, to our services of association, but if we are talking about the organization of the critical section, for example, when implementing a transaction or atomic operations, then here this directive does not give anything at all. therefore, this attribute is simply not applicable to the counter. If one variable should be interpreted differently in different modules, to our services of association, but if we are talking about the organization of the critical section, for example, when implementing a transaction or atomic operations, then here this directive does not give anything at all.

    We return to the code and see that the program is still wrong - what is the matter - and the fact is that it does not do what we need (see the description of the task), but is completely different (calculates the remainder of the division), just the results match up. Well, this is what we think (I don’t think so, but the authors of the code, of course) that the results are the same, in fact, in general, they don’t match, we were just lucky with the range of the variable (positive values). Moreover, the process of code execution is more time-consuming than it could be done, since at best we have the execution of an integer division operation (if it is part of the commands of our architecture), but it is not performed in one processor cycle (the characteristic value is 10 clocks for 8 bit architecture),

    So why such a completely wrong approach can still be found quite often. Here, from the audience, they tell me that with the Mach value + 1, which is the stem of the two, the compiler will guess instead of the division operation to put the bitwise multiplication operation on the corresponding mask (equal to Mach), which will be executed very quickly and everything will be fine.

    I would agree with this statement and take this approach, if not for the following circumstances:

    • this is only possible for Mach, which is statically defined at compile time,
    • this only happens when optimization is enabled,
    • this happens only under Mach, which meets this condition,
    • this does not happen for all cardinal types.

    Moreover, it is in this particular case (when the variable is defined as signed) that, apart from the multiplication command (logical) by the mask, a comparison command with zero and a branch for negative values ​​will be generated, and although for our range this branch will never it will be fulfilled, it will take place in the memory (and in the case of the substituted function it will take more than once) and the time to perform the comparison operation will require, if you do not believe, again we follow the specified site and look for ourselves. Another argument in favor of unsigned cardinals, to which I recently devoted a whole post.

    Therefore, if we want to use logic multiplication with a mask (obtained by optimizing the calculation of the remainder), then we should rewrite the module accordingly:

    typedefuint8_t Counter_t;
    typedefint8_t sCounter_t;
    staticinline Counter_t NextCounter(const Counter_t Counter){
    #if  IS_POWER2(Max + 1) return (Counter + 1) & Max
    #elsereturn (Counter + 1) % (Max + 1);
    #endif
    };

    In this version, everything is completely clear and controlled and everything is true (although a number of flaws remained, but they are now obvious, not disguised), so it is correct, although there are more correct and we will now look for them. The main drawback, in my opinion, is a violation of the KISS principle, since the application of the operation of the remainder of the division in an obvious way neglects this principle. Therefore, we will destroy all the shortcomings with one blow (do not worry about their fate, they will be revived 100,500 times, because not all programmers for Arduino read my posts).

    But first, a slight deviation to the side. How we can implement a power of two check (the number in the binary notation can be represented as {0} 1 {0}) that we just used

    don't peep
    #define IS_POWER2 (N) ((((N) - 1) & (N)) == 0)

    And how can we implement the verification that the number is the right sequence of units {0} 1 {1} in the binary notation - one option is obvious

    #define IsRightSequence(N) IsPower2 ((N) + 1) 

    and the second is trivial

    #define IsRightSequence(N) ( (((N) + 1) & (N)) == 0)

    Note: I can not help but remember the excellent theorem “A transcendental number to a transcendental degree is always transcendental, unless the reverse is obvious or trivial.”

    And how can we verify that the number is a sequence of units {0} 1 {1} {0}

    #define IsSequence(N) IsPower2( (N) ^ ((N) << 1))

    And finally - how to select the low bit of the number (I don’t know why this may be necessary, but it will come in handy)

    #define LowerBit(N) ((((N) - 1) ^ (N)) & (N)).


    But he came up with what might be useful
    #define IsRightSequence(N) (IsSequence(N) && (LowerBit(N) == 1))

    A curious observation is that these macros are not quite correct, it turns out that 0 is both a power of two and a right sequence (of course, a sequence too), which is a bit strange. But 1 is all of these objects perfectly true, so zero seems to be simply taken into account separately. Another interesting property of these macros is that we make no assumptions about the length of the argument, that is, they work correctly with any cardinal type.

    There is a wonderful book “Tricks for Programmers”, there you can find the mentioned macros and many other equally amusing and instructive tasks, I highly recommend it for reading, especially since there are not too many letters in it.

    But back to our ring buffer index. We gave the right decision, but promised even more correctly, so our last decision has flaws (who would doubt). One of them - the buffer length must be statically determined at the compilation stage, the second - in the case of an unsuccessful length, the execution time is very long and there is a certain amount of irregularities in a relatively small piece of the program, which brings to mind the anecdote about 4 errors in the spelling of the word "else." Eliminate them all (some leave for later) and immediately, for which, finally, we write the solution to the original problem, as it is:

    staticinline Counter_t NextCounter(const Counter_t Counter){
       if  ((Counter + 1) > Max) {
          return0;
       } else {
          return Counter + 1;
       };
    };

    (As you already understood, I am a supporter of the Egyptian brackets and there's nothing to be done about it).

    Let us pay attention to the fact that we simply rewrote the condition of the problem from a natural language in the chosen programming language, so it turns out to be extremely clear and understandable. Whether it is possible to improve it is undoubtedly, but only from the point of view of code performance, since this solution simply does not have any other shortcomings (it’s not obvious, in fact, they exist and we will successfully eliminate them).

    Let us estimate the computational complexity of this solution - addition with unity (1) and comparison (2) always, further assignment of zero (1) (rarely) or addition (1) (almost always) - which gives 1 + 2 + 1 + Δ ~ 4 elementary operations and zero memory. It is possible that a good compiler in the correct mode will make certain optimizations and shorten the time it takes to execute the code, but better we will do it ourselves in an explicit form. Here is the next option:

    staticinline Counter_t NextCouner(const Counter_t Counter){
       register sCounter_t Tmp;
       Tmp = (Counter + 1);
       if (Tmp > Max) {
          Tmp = 0;
       };
       return Tmp;
    };

    We estimate the complexity - addition and comparison are always, assignment of zero (rarely) - approximately 3 operations and one memory element. In fact, in the previous version there was also one memory element (implicit), so we have a net gain of one elementary operation. Moreover, the previous version had two more drawbacks - 1) violated the principle of DRY (calculated the increase by one twice) and 2) had more than one exit point, which is not good. In clarity, we also did not lose, that is, we managed to kill a bunch of hares with a single shot, and we didn’t spend any ammunition - right story in the style of Baron Munchausen.

    Note that I did not use the designif ( (Tmp = Counter + 1) > Max), although it contains explicit instructions to the compiler to try not to make redundant transfers. This is a taste in the most blatant form, I just don’t like the value returned by the operator and I try to avoid using it. The reason for this strong feeling I can not explain, according to Freud, it is most likely a psychological trauma in childhood. Modern compilers are quite capable of carrying out simple optimization on their own, and besides, I also added a register qualifier, so the code for my version and the correct one (from the point of view of the C language) will coincide. Nevertheless, I do not at all limit your freedom to use the method that you consider preferable.

    We continue to improve, because there is no limit to perfection, but we have not yet reached it. In order to achieve it, we slightly reformulate the original task and leave only the variable requirement in the range of values, without indicating the direction of change. This approach allows rewriting the program as follows.

    staticinline Counter_t NextCouner(const Counter_t Counter){
       register Counter_t Tmp;
       Tmp = (Counter - 1);
       if (Tmp < 0) {
          Tmp = Мах;
       };
       return Tmp;
    };

    At first glance, nothing has changed much, but, nevertheless, we get a gain in time. Of course, not because the reduction operation per unit is faster than the increase operation on it (although I have heard of this version), but due to the peculiarities of the comparison. If in the previous versions I considered the comparison for 2 elementary operations (first subtraction takes place, and then the decision is made), then in this case the result of the previous operation is used when making the decision directly and the comparison takes one elementary operation, which leads to two operations always and one assignment (rarely) and we saved one operation (without losing anything), as they say "a trifle, but nice." Is the resulting solution perfect - unfortunately not.

    There is an even faster solution - simply increase (decrease) the value of the counter and do nothing else, but it is possible only in the only case when the maximum value coincides with the maximum value represented in the accepted type. For a counter with a length of 8 digits (that is, the type of uint8_t), this will be 255, then we just write Counter = Counter + 1 and take my word for writing Counter + = 1 or ++ Counter is not necessary, although many and write and will be absolutely right. If we do not seriously consider the version of the need to save characters (since the first version is the longest), then there is no point in this, at least if we are writing a program for ARM or AVR architecture (for others I just did not check, I suspect that the result will be same) under the GCC compiler (the author understands

    Modern compilers are very, very advanced in terms of optimization and generate really very good code, of course, if you have the appropriate mode enabled. Although I am ready to agree that no such language constructs can do any harm, and can bring benefits under certain conditions, the only thing I note is that Counter ++ expressions (in this particular case, of course) should be avoided unequivocally, since it is intended for completely different situations and can generate slower code, though optional.

    Another question is that the buffer length of 256 elements is not always acceptable, but if you have enough memory, then why not. With this implementation, if you can align the buffer to the page border, then access to the elements can be made quite fast by eliminating the operation from index to pointer (the union keyword will tell you the implementation of this feature, I will not give it, so as not to teach bad), but this is a very, very specific solution with a strong reference to architecture, which is dangerously close to tricks in the worst sense of the word, and this is not our style.

    Of course, no one forbids us to write a wrapper that will call one or another method depending on the value of the maximum (and minimum, since many methods simply do not work at a non-zero minimum) counter value, I have already proposed the basic principles of this solution, so Let's offer this as an exercise.

    Well, in conclusion, let's summarize - let's bring together different implementations of work with the ring index and evaluate their properties.
    MethodVersatilityTime of completion
    ±0 (1)one
    ±%1 (7)2
    + if3 (any)3.x
    - if3 (any)2.x

    The second line in parentheses shows the number of buffer size values ​​(not exceeding 256) for which this implementation is available, while it means that the buffer of size 0 does not interest us.

    As it is not difficult to see from this table, DarSaBe (my favorite expression, as you might have noticed), and the advantages are bought at the price of disadvantages, the only thing that can be stated unequivocally is that the increment with verification has a more successful competitor in the face of decrement with verification and does not pass into the next round no way.

    The necessary remark is that there are programming languages ​​in which we would not have to think about the implementation of the index at all, but simply could use the interval type. Unfortunately, I can’t call the implementation of these constructions in code optimal, since these constructions (and these languages) are not intended at all to optimize for execution time, but a pity.

    So, we made the correct module (what a strong name for the inline function) of working with the index, and now we are ready to start implementing the actual ring buffer.

    And to begin with, we should decide what we want from this program object. It is absolutely necessary to be able to put a data element in a buffer and retrieve it — a total of two main methods, a kind of getter and setter. Theoretically, you can imagine a buffer without one of these methods, or even without both (there is little that can be presented purely theoretically), but the practical value of such an implementation is a big question. The next necessary functionality - checking for information - can be implemented either as a separate method or as a special value (or attribute) returned by reading. Usually prefer the first method, so it turns out clearer and not too expensive.
    But checking for the buffer completeness is already a big question - this operation will require additional time, which will always be spent on recording, although no one forces us to use it - so let it be. We don’t need anything else from the buffer, remember this phrase for the future.

    We return to the implementation. We need a place to store the elements of the queue and two indexes - one to write to the buffer and one to read from it. How exactly we get this place (and these pointers) is a topic for a separate conversation, for the time being we leave this moment in parentheses and we believe that we just have them. Some (including the authors of the book “Programming for Mathematicians”, respected by me, I recommend it for reading) also use the counter of the filled places, but we will not do this further in the text I will try to show why this is evil.

    First, about indexes - we immediately note that these are indexes, not pointers, although sometimes I allowed myself to call them so. Why indexes (storing information about the number of an element of a queue), rather than pointers (storing information about the location in memory of an element of a queue) is a very difficult question, there are situations when pointers are more profitable, but this is clearly not our case. Our queues will be short (even we look at 256 with caution), so the indices will take up less space, elementary operations will be faster performed for them, there will be no problems with atomic operations (in a normal architecture there shouldn't be any of them, but with indices with a length of 8 bits simply will never happen, of course, if you don’t have a 4-bit controller), the additional costs associated with the transition from index to pointer will not be too large (assuming

    Margin notes
    Например, в реализации языка С для архитектуры х51 (байтовой в своей основе) указатели имели размер 2 (в случае включения определенной прагмы) или даже 3 байта (по умолчанию), и тот факт, что старший байт был просто тэгом размещения и в адресных операциях не участвовал, намного легче жизнь нам не делал. Кстати, не очень понимаю, почему GCC не создает кода для x51, хотя для довольно таки близкой архитектуры AVR вполне работает.

    Moreover, many techniques that increase the speed of obtaining the next value will become unavailable when using the pointer. And if we take into account the opinion that pointers are more difficult to understand (it’s not that this opinion seemed correct to me, but it exists), then the choice is unequivocal - indices.

    But what exactly the indexes should show is that the scope for fantasy is limitless within the size of the Max buffer (and even more), but a very small set of options has practical meaning. For an index entry, these are two possibilities: 1) point to the place where the last element was written and 2) point to the place where the next element will be written. Since immediately after the creation of the queue, the first option looks a bit strange, then we choose the second one, especially since it promises us a tangible gain in the future. For the reading index, we immediately assume that it indicates the element that will be read during the next reading. Immediately there is a simple (in the sense of verification) criterion that the queue is not empty - the indices are not equal. But there is a second problem - if we write exactly Mach elements in the queue,

    The first solution to this problem (“the obvious, understandable and simple wrong solution”) was used many times and consists in setting the counter of the number of elements placed in the buffer or in the advanced case of the flag of completeness. Why I disapprove of him - this is 1) additional space in memory, 2) time spent on working with him (they are small, but there is) 3) until the moment of coincidence of the indexes the counter value is redundant, because it coincides with the difference of the indices, 4) in the case of a buffer with 256 elements, the counter should be longer than the indices and may not be the native type, 5) there is one more drawback (almost fatal), but more on that later. As mentioned above, these deficiencies can be partially mitigated by organizing not a counter, but a full flag, but there is a much better solution.

    All we have to do is not to allow situations when indexes can coincide after the next record (it’s obvious that they can match after reading), and this will require limiting the possible number of elements in the buffer to 1 less than possible. Here is its implementation:

    #define NeedOverflowControl YEStypedefuint8_t Data_t;
    static Data_t BufferData[Max];
    static Counter_t BufferWriteCounter=0, BufferReadCounter=BufferWriteCounter;
    voidBufferWrite(constdata_t Data){ 
       BufferData[BuffWriteCounter] = Data;
       registercounter_t Tmp = NextCount(BufferWriteCounter);
    #if (NeedOverflowControl == YES) if (Tmp == BufferReadCounter) {BufferOverflow();} else 
    #endif
       { BufferWriteCounter = Tmp; }
    };

    In the previous function there is a slight irregularity, I suggest finding it and eliminating it yourself, although ... there is still one, but continue:

    inlineintBufferIsEmpty(void){ return ( BufferReadCounter == BufferWriteCounter ); };
    inlineintBufferIsFull(void){ return ( BufferReadCounter == NextCounter(BufferWriteCounter) ); };
    #define DataSizeIsSmaller (sizeof(data_t) < sizeof(counter_t))data_t BufferRead(void) { 
    #if DataSizeIsSmallerregisterdata_t Tmp = BufferData[BufferReadCounter];
    #elseregistercounter_t Tmp = BufferReadCounter;
    #endif
        BufferReadCounter = NextCount(BufferReadCounter);
    #if DataSizeIsSmallerreturn Tmp;
    #elsereturn BufferData[Tmp];
    #endif
    };

    Let us pay attention to the situation in which we called the overflow handling procedure (if we set the processing need flag) —when we tried to write the last unused byte of the buffer, we did not report this by pushing the record index, respectively, we would not be able to read it - like me and warned that with the selected implementation, the buffer capacity is reduced by one. Also note that we first put the next item in the queue, and only then we inform about this by moving the index, the reverse order could lead to very unpleasant consequences.

    Before looking at the code with the flag, let's talk a little about overflow - it occurs when we cannot put another element in the buffer, and we have various ways to solve the problem, some of which are good and so-so.

    First of all, (correct and good) way number

    1) to prevent such a situation in principle by choosing the right buffer size (there is a sub-variant of this method - to increase the buffer size if necessary, but in the world of embedded programming it did not catch on, and in general it looks doubtful - since we had to increase the size of the buffer, where the guarantee is that we don’t have to do it again and again).
    The next way (correct, but worse, although still good) number

    2) is to report an overflow that has occurred with the return value and pause the write to the buffer — the so-called blocking record, but it is not always implementable.

    But then there are two wrong and bad ways:

    3) and 4) ignore the problem and pretend that everything is good (“smile and wave”). Why are they bad - because we only pretend that everything is fine, in fact, the Dirichlet principle (the problem of N cells and N + 1 birds) cannot be broken and we lose the data element, and two ways because? that we can

    3) lose the last recorded data item, and we can

    4) lose the first of the not yet transmitted elements.

    Which of these methods is worse - “both are worse”, although for a specific task some of them may be more acceptable, but the main drawback is unavoidable - we have to lose information. Therefore, method 3 is most often used, since it is easier to implement (to do this, it is enough to leave the record index unchanged), which I did in the previous example if the overflow handling is empty.

    There is one more way - to not control the situation at all, (in our example, comment out the line with the define, but not the line with the actual check), while we will

    5) lose the entire full buffer - at first glance it seems the worst because the losses large, in fact, this is not quite true, since any data loss is evil, but it has an undoubted advantage - this method is faster, since overflow is not controlled at all.

    An interesting observation - a quick search in the Internet did not find the data recovery algorithm in case of loss of an element, unlike the case of element distortion, where block codes work fine.

    The variant with the overflow flag, surprisingly, loses very little in speed, if you write it correctly, but nevertheless it loses, and from memory we, of course, win one element, but we need to allocate a place under the flag, so that the savings are questionable . We will not consider the counter option, because I have already listed 4 of its shortcomings and it is time to remember the fifth one, as I promised, in addition to the fatal one. In the previously proposed implementation, the indexes have the properties of MRSW (Multi-Reader Single-Writer) according to the classification of “The Art of Mulpiprocessor Programming” (I highly recommend reading, absolutely amazing work) and in the case of atomic operations, changing the indexes (for the native type) does not require no means of synchronization.

    But the counter will have the property of MRMW and it’s simply impossible to work without synchronization with it, from the word “absolutely” (unless, of course, your goal is to create a “suddenly buggy” program). If we take into account the fact that we are writing a module for working with peripherals and, accordingly, a record or reading can be called from an interrupt, the issue of synchronization is absolutely necessary to consider. Interestingly, a flag that seems to have similar properties nevertheless allows working with it without synchronization tools (funny, isn't it, but it has a very scientific explanation - the change operation becomes atomic, and the logic of the flag operation allows, and even imposes, overlapping records), which is illustrated by the following program fragment.

    Note that such an approach (a flag without synchronization tools) is possible only under certain conditions that should be carefully checked in your case. Details can be found in the literature, I will not give them, because I think this way of organizing the buffer is not very good, and I cite only to show that not a good solution can be implemented neatly, and also to demonstrate another concept that I consider very useful and that intends to adhere.

    typedefuint8_tdata_t;
    staticdata_t BufferData[Max];
    staticcounter_t BufferWriteCounter=0, BufferReadCounter=WriteCounter;
    staticint8_t BufferHaveData = 0;
    voidBufferWrite(constdata_t Data){ 
     if ((BufferWriteCounter == BufferReadCounter) && (BufferHaveDataFlag == 1)) {BufferOverflow();} else {
      BufferData[BufferWriteCounter] = Data;
      BufferHaveDataFlag = 1;
      BufferWriteCounter = NextCounter(BufferWriteCounter);
     };
    };
    inlineintBufferIsEmpty(void){ return ((BufferReadCounter==BufferWriteCounter) && (BufferHaveDataFlag == 0));};
    data_t BufferRead(void) { 
     registercounter_t Tmp;
     Tmp = BufferReadCounter;
     BufferReadCounter = NextCount(BufferReadCounter);
     if (BufferReadCount == BufferWriteCounter) { BufferHaveDataFlag = 1; };
     return BufferData[Tmp];
    };

    Note again that in the writing procedure, we first set the flag, and then we modify the index, and in the reading procedure, we first check the indexes, and then we control the flag, which again saves us from trouble and somehow overlaps with the use of resources to eliminate interlocking .

    Generally speaking, this fragment should be rewritten in the correct style (with the exception of the magic constants 0 and 1, if you think that these are not magic constants, then you are mistaken), and if you use it, then do so, I hide the corrected the code in the spoiler, not because I'm shy, but in order not to incite another round of holy war (senseless and merciless), haters of transfers, you should not open this button,

    the rest can
    typedef (NoBufferHaveData= 0, BufferHaveData =1) BufferHave DataFlag_t;
    BufferHaveData_t BufferYaveDataFlag;
    inlinevoidBufferHaveDataFlagSet(void){BufferHaveDataFlag = NoBufferHaveData;};
    inlinevoidBufferHaveDataFlagClr(void){BufferHaveDataFlag = BufferHaveData;};
    inlineintBufferHaveDataFlagIsSet(void){return (int)(BufferHaveDataFlag == BufferHaveData);};


    Interestingly, the code for this approach will be exactly the same as for the direct constants 0 and 1, but everything is extremely transparent and clear and leaves no room for interpretations. I agree in advance that the example looks far-fetched and, if only functions are issued outside with the flag, then inside you can use the constants 0 and 1. All this is true, the only thing I insist on is precisely this behavior of the flag, you can call BufferFullFlag and change the logic of working with it, but in no case should be called BufferIsNotEmptyFlag with the consequences he effects a mysterious logical operations. Once again, the KISS principle has repeatedly demonstrated its unconditional loyalty and, if we are interested in whether the buffer is empty, we should write directly in the program, rather than asking whether it is “incomplete”.

    In any case, I do not think the implementation with the flag is good, so I strongly recommend to put up with the buffer that was not used for the entire size and accept the implementation with two indices and no additional fields.

    Actually, unexpectedly, an extensive post turned out for such a simple topic, I was still thinking of writing about synchronization and critical sections, but this is the next time.

    PS Well, in conclusion, I didn’t like it at the mentioned library, but at the same time, the authors of the national RTOS took this into their code without the slightest doubt:

    1. Two implementations of the buffer are given - one for the size of a power of two (I hope, I showed that it is absolutely not necessary), the second - for other cases, but you will have to choose the version with pens, of course, they will not be mistaken, there are checks everywhere.
    2. Absolutely unnecessary methods are made, such as deleting the last element or direct access from the buffer element.
    3. The data buffer is aligned to an integer.
    4. In the implementation of the degree 2 error in checking occupancy.
    5. In the implementation of an arbitrary size there was a counter
    6. The critical sections are not organized at all, which are simply not needed in the correct implementation (with two indices), but then we cannot do without them, the use of atomic operations instead is not enough.
    7. Some carelessness in style, sort strings

      return ((_writeCount - Atomic::Fetch(&_readCount)) &
      	(size_type)~(_mask)) != 0;

      especially its second half - this is exactly what C is blamed for, and the language itself is not to blame, it only allows you to write something like this instead of more understandable

      size_type(~(_mask))

      but does not force to do it.

    PPS I hope the library author agrees to consider this criticism constructive and will make the appropriate corrections.

    Only registered users can participate in the survey. Sign in , please.

    My traditional survey regarding the level of presentation of the post, this time in the "pool style"


    Also popular now: