Restarting the daemon in PHP without losing connections to it

    At various conferences, we have repeatedly talked about our cloud for CLI-scripts ( video report , slides ). The cloud is designed to run various PHP scripts on schedule or through the API. As a rule, these scripts process queues, and the load is "spread" over approximately 100 servers. Earlier, we focused on how the control logic is implemented, which is responsible for the uniform distribution of the load across so many servers and the generation of tasks on a schedule. But, in addition to this, we needed to write a daemon that would be able to run our PHP scripts in the CLI and monitor the status of their execution.

    It was originally written in C, like all other demons in our company. However, we were faced with the fact that a substantial part of the processor time (about 10%) was wasted, in fact, in vain: this is the launch of the interpreter and loading the “core” of our framework. Therefore, in order to be able to initialize the interpreter and our framework only once, it was decided to rewrite the daemon in PHP. We called him Php rocksyd (similar to Phproxyd - PHP Proxy Daemon, the C daemon we had before). It accepts requests to run individual classes and makes fork () on each request, and also knows how to report the execution status of each of the launches. This architecture is in many ways similar to the Apache web server model, when all initialization is done once in the “wizard” and the “children” are already engaged in processing the request. As an additional “bun,” we get the opportunity to enable opcode cache in the CLI, which will work correctly, since all children inherit the same area of ​​shared memory as the master process. To reduce delays when processing a launch request, you can do fork () in advance (prefork model), but in our case, the delay on fork () is about 1 ms, which suits us well.

    However, since we update the code quite often, this daemon also has to be restarted often, otherwise the code that is loaded into it may become outdated. Since each restart would be accompanied by a lot of errors of the type connection reset by peer , including denial of service for end users (the daemon is useful not only for the cloud, but also for part of our site), we decided to look for ways to restart the daemon without losing any connections that were already established. There is one popular technique by which graceful reload is done for daemons: fork-exec is done and the descriptor from the listen socket is passed to the descendant. Thus, new connections are accepted by the new version of the daemon, while the old ones are “modified” using the old version.

    In this article, we will consider a complicated version of graceful reload : old connections will continue to be processed by the new version of the daemon, which is important in our case, since otherwise it will run the old code.

    Theory


    Let's think for a start: is it possible that we want to receive? And if so, how to achieve this?

    Since the daemon runs under Linux, which is POSIX compatible, the following options are available to us:

    1. All open files and sockets are numbers corresponding to the number of the open descriptor. The standard input, output, and error stream have descriptors 0, 1, and 2, respectively.
    2. There are no significant differences between the open file, the socket, and the pipe (for example, you can work with sockets using the read / write and sendto / recvfrom system calls).
    3. When executing the fork () system call, all open descriptors are inherited with their numbers and read / write positions (in files) preserved.
    4. When executing the execve () system call, all open descriptors are also inherited, and in addition the process PID and, therefore, binding to its children are saved.
    5. A list of open process descriptors is available from the / dev / fd directory, which on Linux is symlinked to / proc / self / fd.

    Thus, we have every reason to believe that our task is feasible, and without much effort. So let's get started.

    PHP patches


    Unfortunately, there is one small detail that complicates our work: in PHP there is no way to get the file descriptor number for streams and open the file descriptor by number (instead, a copy of the file descriptor is opened, which is not suitable for our daemon, since we are very we carefully monitor open descriptors so as not to create leaks during restart and when starting child processes).

    To begin with, we will introduce a couple of small patches in the PHP code to add the ability to get fd from a stream and make fopen (php: // fd /) did not open a copy of the descriptor (the second change is incompatible with the current behavior of PHP, so you can add a new “address” instead, for example, php: // fdraw /):

    Patch code
    diff --git a/ext/standard/php_fopen_wrapper.c b/ext/standard/php_fopen_wrapper.c
    index f8d7bda..fee964c 100644
    --- a/ext/standard/php_fopen_wrapper.c
    +++ b/ext/standard/php_fopen_wrapper.c
    @@ -24,6 +24,7 @@
    #if HAVE_UNISTD_H
    #include 
    #endif
    +#include 
    #include "php.h"
    #include "php_globals.h"
    @@ -296,11 +297,11 @@ php_stream * php_stream_url_wrap_php(php_stream_wrapper *wrapper, char *path, ch
    				"The file descriptors must be non-negative numbers smaller than %d", dtablesize);
    			return NULL;
    		}
    -		
    -		fd = dup(fildes_ori);
    -		if (fd == -1) {
    +
    +		fd = fildes_ori;
    +		if (fcntl(fildes_ori, F_GETFD) == -1) {
    			php_stream_wrapper_log_error(wrapper, options TSRMLS_CC,
    -				"Error duping file descriptor %ld; possibly it doesn't exist: "
    +				"File descriptor %ld invalid: "
    				"[%d]: %s", fildes_ori, errno, strerror(errno));
    			return NULL;
    		}
    diff --git a/ext/standard/streamsfuncs.c b/ext/standard/streamsfuncs.c
    index 0610ecf..14fd3b0 100644
    --- a/ext/standard/streamsfuncs.c
    +++ b/ext/standard/streamsfuncs.c
    @@ -24,6 +24,7 @@
    #include "ext/standard/flock_compat.h"
    #include "ext/standard/file.h"
    #include "ext/standard/php_filestat.h"
    +#include "ext/standard/php_fopen_wrappers.h"
    #include "php_open_temporary_file.h"
    #include "ext/standard/basic_functions.h"
    #include "php_ini.h"
    @@ -484,6 +485,7 @@ PHP_FUNCTION(stream_get_meta_data)
    	zval *arg1;
    	php_stream *stream;
    	zval *newval;
    +	int tmp_fd;
    	if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &arg1) == FAILURE) {
    		return;
    @@ -502,6 +504,9 @@ PHP_FUNCTION(stream_get_meta_data)
    		add_assoc_string(return_value, "wrapper_type", (char *)stream->wrapper->wops->label, 1);
    	}
    	add_assoc_string(return_value, "stream_type", (char *)stream->ops->label, 1);
    +	if (SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&tmp_fd, 1) && tmp_fd != -1) {
    +		add_assoc_long(return_value, "fd", tmp_fd);
    +	}
    	add_assoc_string(return_value, "mode", stream->mode, 1);
    


    We added the fd field to the result returned by stream_get_meta_data (), if it makes sense (for example, for zlib streams, the fd field will not be present). We also replaced the dup () call from the transferred file descriptor with a simple check. Unfortunately, this code will not work without modifications for Windows, since the fcntl () call is POSIX-specific, so the full patch must contain additional code branches for other OSs.

    Daemon without restart


    To begin with, we will write a small server that can accept requests in JSON format and give some kind of response. For example, it will return the number of elements in the array that came in the request.

    The daemon listens on port 31337. The result should be something like this:
    $ telnet localhost 31337
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    {"hash":1} # ввод пользователя
    "Request had 1 keys"
    {"hash":1,"cnt":2} # ввод пользователя
    "Request had 2 keys"
    


    We will use stream_socket_server () to start listening to the port, and stream_select () to determine which descriptors are ready to read / write.

    Simple Implementation Code (Simple.php)
     stream) */
       private $streams = [];
       /** @var string[] (client_id => read buffer) */
       private $read_buf = [];
       /** @var string[] (client_id => write buffer) */
       private $write_buf = [];
       /** @var resource[] (client_id => stream from which to read) */
       private $read = [];
       /** @var resource[] (client_id => stream where to write) */
       private $write = [];
       /** @var int Total connection count */
       private $conn_count = 0;
       public function run()
       {
           $this->listen();
           echo "Entering main loop\n";
           $this->mainLoop();
       }
       protected function listen()
       {
           $port = self::PORT;
           $ip_port = "0.0.0.0:$port";
           $address = "tcp://$ip_port";
           $server = stream_socket_server($address, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
           if (!$server) {
               fwrite(STDERR, "stream_socket_server failed: $errno $errstr\n");
               exit(1);
           }
           $this->read[self::SERVER_KEY] = $server;
           echo "Listening on $address\n";
       }
       public function response($stream_id, $response)
       {
           $json_resp = json_encode($response);
           echo "stream$stream_id " . $json_resp . "\n";
           $this->write($stream_id, $json_resp . "\n");
       }
       public function write($stream_id, $buf)
       {
           $this->write_buf[$stream_id] .= $buf;
           if (!isset($this->write[$stream_id])) {
               $this->write[$stream_id] = $this->streams[$stream_id];
           }
       }
       public function accept($server)
       {
           echo "Accepting new connection\n";
           $client = stream_socket_accept($server, 1, $peername);
           $stream_id = ($this->conn_count++);
           if (!$client) {
               fwrite(STDERR, "Accept failed\n");
               return;
           }
           stream_set_read_buffer($client, 0);
           stream_set_write_buffer($client, 0);
           stream_set_blocking($client, 0);
           stream_set_timeout($client, 1);
           $this->read_buf[$stream_id] = '';
           $this->write_buf[$stream_id] = '';
           $this->read[$stream_id] = $this->streams[$stream_id] = $client;
           echo "Connected stream$stream_id: $peername\n";
       }
       private function disconnect($stream_id)
       {
           echo "Disconnect stream$stream_id\n";
           unset($this->read_buf[$stream_id], $this->write_buf[$stream_id]);
           unset($this->streams[$stream_id]);
           unset($this->write[$stream_id], $this->read[$stream_id]);
       }
       private function handleRead($stream_id)
       {
           $buf = fread($this->streams[$stream_id], 8192);
           if ($buf === false || $buf === '') {
               echo "got EOF from stream$stream_id\n";
               if (empty($this->write_buf[$stream_id])) {
                   $this->disconnect($stream_id);
               } else {
                   unset($this->read[$stream_id]);
               }
               return;
           }
           $this->read_buf[$stream_id] .= $buf;
           $this->processJSONRequests($stream_id);
       }
       private function processJSONRequests($stream_id)
       {
           if (!strpos($this->read_buf[$stream_id], "\n")) return;
           $requests = explode("\n", $this->read_buf[$stream_id]);
           $this->read_buf[$stream_id] = array_pop($requests);
           foreach ($requests as $req) {
               $res = json_decode(rtrim($req), true);
               if ($res !== false) {
                   $this->response($stream_id, "Request had " . count($res) . " keys");
               } else {
                   $this->response($stream_id, "Invalid JSON");
               }
           }
       }
       private function handleWrite($stream_id)
       {
           if (!isset($this->write_buf[$stream_id])) {
               return;
           }
           $wrote = fwrite($this->streams[$stream_id], substr($this->write_buf[$stream_id], 0, 65536));
           if ($wrote === false) {
               fwrite(STDERR, "write failed into stream #$stream_id\n");
               $this->disconnect($stream_id);
               return;
           }
           if ($wrote === strlen($this->write_buf[$stream_id])) {
               $this->write_buf[$stream_id] = '';
               unset($this->write[$stream_id]);
               if (empty($this->read[$stream_id])) {
                   $this->disconnect($stream_id);
               }
           } else {
               $this->write_buf[$stream_id] = substr($this->write_buf[$stream_id], $wrote);
           }
       }
       public function mainLoop()
       {
           while (true) {
               $read = $this->read;
               $write = $this->write;
               $except = null;
               echo "Selecting for " . count($read) . " reads, " . count($write) . " writes\n";
               $n = stream_select($read, $write, $except, NULL);
               if (!$n) {
                   fwrite(STDERR, "Could not stream_select()\n");
               }
               if (count($read)) {
                   echo "Can read from " . count($read) . " streams\n";
               }
               if (count($write)) {
                   echo "Can write to " . count($write) . " streams\n";
               }
               if (isset($read[self::SERVER_KEY])) {
                   $this->accept($read[self::SERVER_KEY]);
                   unset($read[self::SERVER_KEY]);
               }
               foreach ($read as $stream_id => $_) {
                   $this->handleRead($stream_id);
               }
               foreach ($write as $stream_id => $_) {
                   $this->handleWrite($stream_id);
               }
           }
       }
    }
    $instance = new Simple();
    $instance->run();
    


    The code of this daemon is more than standard, however, I would like to note one implementation detail: we store all the read and write buffers with reference to specific connections and we process the requests right in the same place where we read the request. This is important because one of these requests may be restart, and in this case it will not come to processing the following requests. However, since we have not read the requests yet, next time stream_select () will return the same result from the same descriptors. Thus, we won’t lose a single request if we restart directly from the command handler (except for the case when we will be sent several commands at once to the same connection, and one of these commands will be restart).

    So, how to make restarting the daemon possible?

    Daemon with restart and saving established connections


    Our simplest example did not know how to do anything useful, so let's still write the demon that was discussed at the very beginning. We want to get something like the following (commands are sent to the daemon in the form “command_name [JSON data]”, the answer is in the form of JSON):
    $ telnet localhost 31337
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    # сразу же попросим демон перезапуститься
    restart
    # ответ посылает уже перезапущенный демон
    "Restarted successfully"
    # запустим тестовый класс
    run {"hash":1,"params":[1,2,3],"class":"TestClass1"}
    # запущен успешно
    {"error_text":"OK"}
    # рестартим демон еще раз (его child TestClass1 все еще работает)
    restart
    "Restarted successfully"
    # проверим статус задания: все еще работает
    check {"hash":1}
    {"error_text":"Still running"}
    # подождем 5 секунд и проверим еще раз: класс TestClass1 отработал успешно
    check {"hash":1}
    {"retcode":0}
    # демон помнит обо всех запусках, поэтому нужно делать free
    check {"hash":1}
    {"retcode":0}
    free {"hash":1}
    {"error_text":"OK"}
    restart
    "Restarted successfully"
    # я обновил код, поэтому второй раз мы видим уже другой ответ на restart
    restart
    {"error_text":"Restarted successfully"}
    bye
    Connection closed by foreign host.
    


    The idea for restarting is simple: we will create a file with all the necessary information, and at startup we will try to read it and restore open file descriptors.

    First, write the code to write to the restart file:

    echo "Creating restart file...\n";
    if (!$res = $this->getFdRestartData()) {
       fwrite(STDERR, "Could not get restart FD data, exiting, graceful restart is not supported\n");
       exit(0);
    }
    /* Close all extra file descriptors that we do not know of, including opendir() descriptor :) */
    $dh = opendir("/proc/self/fd");
    $fds = [];
    while (false !== ($file = readdir($dh))) {
       if ($file[0] === '.') continue;
       $fds[] = $file;
    }
    foreach ($fds as $fd) {
       if (!isset($this->known_fds[$fd])) {
           fclose(fopen("php://fd/" . $fd, 'r+'));
       }
    }
    $contents = serialize($res);
    if (file_put_contents(self::RESTART_DIR . self::RESTART_FILENAME, $contents) !== strlen($contents)) {
       fwrite(STDERR, "Could not fully write restart file\n");
       unlink(self::RESTART_DIR . self::RESTART_FILENAME);
    }
    


    The code to get the data array (getFdRestartData () function) is given below:

    $res = [];
    foreach (self::$restart_fd_resources as $prop) {
       $res[$prop] = [];
       foreach ($this->$prop as $k => $v) {
           $meta = stream_get_meta_data($v);
           if (!isset($meta['fd'])) {
               fwrite(STDERR, "No fd in stream metadata for resource $v (key $k in $prop), got " . var_export($meta, true) . "\n");
               return false;
           }
           $res[$prop][$k] = $meta['fd'];
           $this->known_fds[$meta['fd']] = true;
       }
    }
    foreach (self::$restart_fd_props as $prop) {
       $res[$prop] = $this->$prop;
    }
    return $res;
    

    The code takes into account that we have 2 types of properties:
    1. Properties containing resources with connections: $ restart_fd_resources = ['read', 'write', 'streams'].
    2. Properties containing buffers and other connection information that can be serialized raw: $ restart_fd_props = ['read_buf', 'write_buf', 'conn_count'].

    We also remember all the fd stored in the restart file and close all the others (if any), since otherwise file descriptors could be leaked.

    Next, we must load this file at startup and continue to use open descriptors, as if nothing had happened :). The code for the two functions (loading the restart file and downloading information about file descriptors) is given below:

    Downloading the file:

    if (!file_exists(self::RESTART_DIR . self::RESTART_FILENAME)) {
       return;
    }
    echo "Restart file found, trying to adopt it\n";
    $contents = file_get_contents(self::RESTART_DIR . self::RESTART_FILENAME);
    unlink(self::RESTART_DIR . self::RESTART_FILENAME);
    if ($contents === false) {
       fwrite(STDERR, "Could not read restart file\n");
       return;
    }
    $res = unserialize($contents);
    if (!$res) {
       fwrite(STDERR, "Could not unserialize restart file contents");
       return;
    }
    foreach (self::$restart_props as $prop) {
       if (!array_key_exists($prop, $res)) {
           fwrite(STDERR, "No property $prop in restart file\n");
           continue;
       }
       $this->$prop = $res[$prop];
    }
    $this->loadFdRestartData($res);
    


    The loadFdRestartData () function to deploy an array of file descriptors back:

    $fd_resources = [];
    foreach (self::$restart_fd_resources as $prop) {
       if (!isset($res[$prop])) {
           fwrite(STDERR, "Property '$prop' is not present in restart fd resources\n");
           continue;
       }
       $pp = [];
       foreach ($res[$prop] as $k => $v) {
           if (isset($fd_resources[$v])) {
               $pp[$k] = $fd_resources[$v];
           } else {
               $fp = fopen("php://fd/" . $v, 'r+');
               if (!$fp) {
                   fwrite(STDERR, "Failed to open fd = $v, exiting\n");
                   exit(1);
               }
               stream_set_read_buffer($fp, 0);
               stream_set_write_buffer($fp, 0);
               stream_set_blocking($fp, 0);
               stream_set_timeout($fp, self::CONN_TIMEOUT);
               $fd_resources[$v] = $fp;
               $pp[$k] = $fp;
           }
       }
       $this->$prop = $pp;
    }
    foreach (self::$restart_fd_props as $prop) {
       if (!isset($res[$prop])) {
           fwrite(STDERR, "Property '$prop' is not present in restart fd properties\n");
           continue;
       }
       $this->$prop = $res[$prop];
    }
    

    We re-set read_buffer and write_buffer for open file descriptors and configure timeouts. Oddly enough, after these manipulations, PHP calmly does accept () on these file descriptors and continues to normally read / write to them even though it does not know that these are sockets.

    In the end, we have to write the logic for launching and tracking the execution status of workers. Since this is not relevant to the topic of the article, the full implementation of the daemon is placed on the github repository, the link to which is given below.

    Conclusion


    So, this article described the implementation of a daemon that communicates via the JSON protocol and can run arbitrary classes in separate processes with monitoring the process of their execution. To run individual classes, the fork () model is used to query , so to process the request, you do not need to restart the interpreter and load the framework, and it becomes possible to use opcode cache in the CLI. Since the daemon needs to be restarted each time the code is updated, it is necessary to provide a smooth restart mechanism for this daemon (in our company, the code is sometimes updated every few minutes, in the form of “hotfixes”).

    Restarting occurs by executing the execve () system call, as a result of which all descendants remain bound to the parent (since the PID of the process does not change with execve ()). All open file descriptors are also saved, which allows you to continue to process requests from users in already open connections. All network buffers, information about running children and open descriptors are saved in a separate restart file, which is read by a new instance of the daemon, after which work continues in the standard event loop.

    The full implementation code can be seen on GitHub at the following address: github.com/badoo/habr/tree/master/phprocksyd

    Questions, suggestions, clarifications are welcome.

    Sincerely,
    Yuri youROCK Nasretdinov
    Lead PHP developer
    Badoo

    Also popular now: