Implement a small asynchronous server

    The purpose of publishing this topic is to present the audience of Habrahabr the code of a small asynchronous server written in Python using almost bare sockets.

    I had to write quite a lot of applications working as network services. These services were written in different languages, for different loads, and each time the implementation of a new service was somewhat different from the previous one. Under the habrakat, I give an example of a rather successful, in my opinion, implementation of the “training” server, accompanying the code with my comments as necessary.

    Introductory word


    So, in the beginning a few words about what the server actually does. Since it is a training example, its task is quite simple: at startup, create a socket listening on the port; when a client connects, start transmitting information to him. As information, clients receive an endless stream of digits of the number Pi calculated on the server. The server generates numbers only when at least one client is connected to it, remembering the sequence already generated. When all clients are disconnected, generation stops, the server waits for a new connection.

    The server implementation is single-threaded. The advantage of this approach is the lack of an additional overhead in the form of a separate thread or even a separate application instance that would arise when using other commonly used techniques.

    By and large, this difference can hardly be called decisive, because Python is still an interpreted language and introduces its limitations on the area of ​​use, but in the future we will be able to use the threads “saved” with this architecture in the application to perform any parallel computing tasks.

    Before continuing, I strongly recommend downloading the code and reading it fluently. Also, the code can be viewed online at the show code .
    To start the server, users of UNIX-like systems can run the following commands:
    chmod + x ./pypi.py
    ./pypi.py

    To connect to the server, you can use the following command:
    telnet 127.0.0.1 45067


    Code parsing


    The core of the application is the main () function, which starts the dispatch cycle of the asyncore module :
    def main ():
        try:
            asyncore.loop (0.1, True, Server ('127.0.0.1'))
        except KeyboardInterrupt:
            print '\ nBye: - *'
            sys.exit (0)
     
    if __name__ == '__main__':
        main ()
     

    The asyncore module provides a loop function that takes four optional arguments: (1) timeout, (2) preference flag of the poll mechanism over the usual select, (3) socket descriptor dictionary, (4) number of iterations. The third parameter is important for us, by which we passed the newly created server object to the function.

    Thanks to “special Python magic”, the Server class is inherited both from the dispatcher class from the asyncore module and from the dict dictionary class, which allows it to act both as a server socket and as a repository of socket descriptors for all connected clients.

    The start of a Server class declaration is as follows:
    class Server (dispatcher, dict):
        writable = lambda x: False
     
        def __init __ (self, host = None, port = 0xB00B):
            dispatcher .__ init __ (self)
     
            self.create_socket (AF_INET, SOCK_STREAM)
            dict .__ init __ (self, {self .fileno (): self})
     
            self.set_reuse_addr ()
            self.bind ((host, port))
            self.listen (0xA)
     
            self.dataSource = PiGenerator ()

    In the constructor, the object is first initialized as a server socket handler, and then as a dictionary consisting of a single record - the socket descriptor of the server itself, pointing to the server object. It is important that the socket is created by the create_socket function before the dictionary is initialized, because before creating the socket, we could not get its handle. Then, the server socket is bound to the port on the specified host, wiretapping is started and the Pi number generator is created, which will be further used to generate the data stream to the client.

    After the dispatch cycle is started, the bulk of the work falls on the asyncore module, which, when a new client is connected, will call the handle_accept method of the server object to process the incoming connection:
        def handle_accept (self):
            sock, (host, port) = self.accept ()
            print 'new client from% s:% d'% (host, port)
     
            stream = Stream (self.dataSource)        
            self [sock.fileno ( )] = Client (sock, self, stream)

    Inside the handler method, a new client is directly accepted using the accept function, which returns a newly created socket for communication with the client and a pair of host ports from which the connection occurred. Having received the client socket, the server creates a new data stream (implemented by the Stream class) for reading data from the generator. After that, a new client object is added to the list of clients, initialized by the newly created data stream.

    The client reads data from the stream inside the writable () method:
        def writable (self):
            data = self.stream.read ()
            if data == None:
                print 'client finished reading'
                self.close ()
                return False
     
            self.buffer + = data
            return len (self.buffer)> 0

    The writable method is called by the asyncore module for each socket before the next iteration of the dispatch cycle to find out whether it is necessary to check write access for this socket. We use this to try to read data from the stream and to report the need for writing if there is data in the stream. If the stream returns None, it means that there will be no more data in it and the socket is closed. In this example, this situation should not occur, because numbers are generated endlessly.

    After learning about the availability of the write operation for the client socket, asyncore calls the handle_write () method, which sends data previously read from the stream through the socket:
        def handle_write (self):                
            sent = self.send (self.buffer)
            self.buffer = self.buffer [sent:]

    The generator and the flow are closely interconnected realizing the observer pattern . The generator acts as an observable object and provides the subscribe and unsubscribe methods, respectively, for subscribing to and unsubscribing from events:
    class PiGenerator (list):    
        def subscribe (self, obj):  
            self.lock.acquire ()
            try:     
                self.append (obj)
                self._notify (obj = obj)
            finally:
                self.lock.release ()            
     
            if not self. calculator:
                self.calculator = PiCalcThread (self, self.lock)
                self.calculator.start ()
            else:
                if len (self)> 0:
                    self._resumeCalculator ()
     
        def unsubscribe (self, obj):
            self.lock.acquire ( )
            self.remove (obj)   
            self.lock.release ()
     
            if len (self) <= 0:
                self._pauseCalculator ()

    Directly generating numbers is implemented by a separate class PiCalcThread, which performs calculations in a separate thread, therefore, a synchronization mechanism is used to synchronize the operations of adding and deleting a subscription. Using the same blocking object, a thread is suspended in the absence of subscribed threads. When subscribing a new stream using the _notify () method, the numbers calculated and saved before that are sent to it, if it is not the first subscribed stream.

    The _notify () method runs through the signed streams and calls the update method passing new digits to the stream:
        def _notify (self, digits = None, obj = None):
            objs = [obj] if obj else self
            digits = digits or self.digits
     
            for obj in objs:
                obj.update (digits)

    The update () method of the stream simply adds new data to the existing one:
        def update (self, data):
            self.data + = data

    The class of the Pi digit generation stream uses the algorithm proposed by Jeremy Gibbons in the Unbounded Spigot Algorithm for the Digits of Pi :
    class PiCalcThread (Thread):
        def __init __ (self, buffer, lock):
            Thread .__ init __ (self)
            self.buffer = buffer
            self.lock = lock
     
        def run (self):
            q, r, t, k, n, l = 1,0,1,1,3,3
     
            while True:
                if 4 * q + rt <n * t:
                    self.lock.acquire ()
                    self.buffer.newDigits (str (n))
                    self.lock.release ()
     
                    q, r, t, k, n, l = (10 * q, 10 * (rn * t), t, k, (10 * (3 * q + r)) / t-10 * n, l)
                else :
                    q, r, t, k, n, l = (q * k, (2 * q + r) * l, t * l, k + 1, (q * (7 * k + 2) + r * l ) / (t * l), l + 2)
     
                time.sleep (0.001)

    The run () method infinitely calculates the next digit of the Pi number and, again, using the lock, reports it to the generator, which in turn passes it to the data streams subscribing to the generator. Artificial delay has been added so that the server does not generate too much data flow per unit time.

    To syntax highlight in the preparation of the material, the resource http://highlight.hohli.com/ was used . I really hope that this article will prove useful to someone, although the description turned out to be messy with a fairly large volume.

    Also popular now: