How machine learning in YouDo rolls in production. Lecture in Yandex

    In large services, solving a problem using machine learning means performing only part of the work. Embedding ML-models is not so easy, and building around them CI / CD-processes is even more difficult. Adam Eldarov , head of data science at YouDo, spoke about how to manage the life cycle of models, set up additional training and retraining processes, develop scalable microservices, and much more at the Yandex Data & Science conference : a program by request .


    - Let's start with introductory. There is a data scientist, he writes some code in Jupyter Notebook, does feature-engineering, cross-validation, trains models. Growing soon.



    But at some point he understands: in order to bring the company a business value, he must seal the solution somewhere in production, into some kind of mythical production, which causes us a lot of problems. Notebook, which we sawed, in production, in most cases, do not send. And the question arises: how to ship this code inside a laptop to a service. In most cases, you need to write a service that has an API. Or they communicate through PubSub, through queues.



    When we make recommendations, we often need to train models and retrain them. This process must be monitored. At the same time, it is always necessary to check with the tests both the code itself and the models, so that at one moment our model does not go crazy and does not always begin to predict zeroes. It also needs to be checked on real users through AB-tests - what we have done better or at least not worse.

    How do we approach code? We have GitLab. All our code is sawn into many small libraries that solve a specific domain problem. At the same time, it is a separate GitLab project, Git-version control and the GitFlow branching model. We use things like pre-commit hooks — so that we cannot commit code that doesn't satisfy our stat tests. And the tests themselves, unit tests. We use for them the property based testing approach.



    Usually, when you write tests, you mean that you have a test function and the arguments that you create with your hands, some examples, and what values ​​your test function returns. It is not comfortable. The code is inflated, for many, in principle, laziness to write it. As a result, we have a lot of code uncovered by tests. Property based testing implies that all your arguments have some distribution. Let's do phasing, and many times to sample all our arguments from these distributions, call the tested function with these arguments and check for some properties the result of executing this function. As a result, we have much less code, and at the same time there are many more tests.



    What is GitFlow? This is a branching model, which implies that you have two main branches - develop and master, where the production ready-code is located, and all development is done in the develop branch, where all new features come from feature-branches. That is, each feature is a new feature brunch, while the feature brunch must be short-lived, and for good, also covered by feature toggle. We then make a release, from dev we transfer the changes to master and hang the version tag of our library or service on it.



    We do development, we saw some feature, we push it on GitLab, we create merge request from feature branch in dev. Triggers are triggered, we run tests, if everything is ok, we can hold them up. But we are not merzhym, but someone from the team. It revises the code, and thus increases the bus factor. This section of the code is already known by two people. As a result, if someone gets hit by a bus, someone already knows what he is doing.



    At the same time, continuous integration for libraries usually looks like tests for any changes. And if we release, it's also publishing to the private PyPI server of our package.



    Further we can collect it in pipelines. For this we use the Luigi library. It works with such an entity as a task, which has an output, where the artifact created during the task is saved. There is a task parameter that parameterizes the business logic that it executes, identifies the task and its output. At the same time, a task always has requirements that expose other tasks. When we run some kind of task, all its dependencies are checked by checking its output. If an output exists, our dependency does not start. If the artifact is missing on some storage, it is launched. This forms a pipeline, directed cyclic graph.



    All parameters identify business logic. In doing so, they identify the artifact. It is always a date with some kind of granularity, sensitivity, or a week, day, hour, three hours. If we train some kind of model, Luigi task always has hyper parameters of this task, they leak into the artifact that we produce, the hyper parameters are reflected in the name artifacts. Thus, we essentially version all intermediate data sets and end artifacts, and they are never overwritten, always upend only on storage, and storage appears HDFS and S3 private, which sees artifacts of some models or something else at the end . And the entire code of the pipeline lies in the project of the service in the repository to which it belongs.



    This needs to be covered up somehow. HashiCorp comes to the aid of the stack, we use Terraform for declaring the infrastructure in the form of a code, Vault for managing secrets, there all passwords, appearances to the database. Consul - discovery service, distributed key value storage, which can be used for configuration. And also Consul does health checks on your nodes and your services, checking them for availability.

    And - Nomad. This is a system for orchestrating, sharing your services and some batch jobs.



    How do we use it? There is a Luigi pipeline, we will pack it in the Docker container, drop it at the Nomad batch or the periodic batch job. The batch job is something done, ended, and if everything is successful - everything is okay, we can manually flush it again. But if something went wrong, Nomad would retreat it until it exhausted the attempt, or it would end successfully.

    Periodic batch job is exactly the same, it only works according to a schedule.

    There is a problem. When we deploy a container to any orchestration system, we need to indicate how much memory is needed for this container, CPU, or memory. If we have a pipeline that runs for three hours, two hours of this consumes 10 GB of RAM, 1 hour - 70 GB. If we exceed the limit that we gave him, the Docker daemon comes in and kills the Dockers and (nrzb.) [02:26:13] We don’t want to catch out of memory all the time, so we need to specify all the 70 GB peak memory load. But there is a problem, all 70 GB for three hours will be allocated and not accessible by any other job.

    Therefore, we went the other way. Our whole Luigi Pipeline does not run some sort of business logic, it starts just a set of dice in Nomad, the so-called parameterized jobs. In fact, this is an analogue of Server (nrzb.) [02:26:39] functions, AVS Lambda, who knows. When we make a library, we deploy through CI all our code in the form of parameterized jobs, that is, a container with some parameters. Suppose, Lite JBM Classifier, it has a parameter for the path to the input data for training, hyper-parameters of models and the path to the output artifacts. All this is registered in Nomad, and then from the Luigi pipeline we can pull all these Nomad Jobs through the API, and at the same time, Luigi takes care not to run the same task many times.

    Suppose we have the same text processing. There are 10 conditional models, and we do not want to re-start text processing every time. It will run only once, and the result will be reused every time. And at the same time it all works distributed, we can run a giant grid search on a large cluster, just manage to drop iron.



    We have an artifact, we must somehow arrange it as a service. Services expose either HTTP API, or communicate through queues. In this example, this is the HTTP API, the simplest example. At the same time, communication with the service, or our service communicates with other services via HTTP. The JSON API validates the JSON scheme. At the service itself, a JSON object is always described in the documentation for its API and the schema of this object. But not always all fields of JSON of the object are needed, therefore, the validation of consumer driven contracts, the validation of this scheme, the communication takes place through the pattern circuit breaker, in order to prevent our distributed system from failing due to cascade failures.

    At the same time, the service must set HTTP health check so that Consul can come and check the availability of this service. At the same time, Nomad is able to do so that there is a service three hellchecks in a row zafililit, he can restart the service to help him. The service writes all its logs in JSON format. We use JSON logging driver and Elastics stack, at every point FileBit simply takes all the JSON logs, throws them into the log stack, from there they get into Elastic, we can analyze KBan. At the same time, we do not use logs for the collection of metrics and building dashboards, this is inefficient, we use the Prometheus entoring system for this, we have a process for creating templates for each dashboard service, and we can analyze technical metrics that are produced by the service.

    Moreover, if something went wrong, we receive alerts, but in most cases this is not enough. We come to the aid of Sentry, this is the thing for the analysis of incidents. In fact, we intercept all the error logs with the Sentry handler and push them into Sentry. And then there is a detailed traceback, there is all the information about the environment in which the service was, which version, which functions were called by which arguments and which variables in this scope were, with which values. All configs, all this is visible, and it really helps to quickly understand what happened and fix the error.



    As a result, the service looks something like this. A separate GitLab project, pipeline code, test code, service code itself, a bunch of different configs, Nomad, CI configs, API documentation, commit hooks and more.



    CI, when we make a release, we do it like this: build the container, run tests, throw clustering on staging, run test contract of our service there, conduct load testing to verify that we have a prediction not too slow and keep the load that wears . If everything is okay, we will deploy this service to production. And here there are two ways: we can slow down the pipeline, if it is a periodic batch job, it works somewhere in the background and produces artifacts, or it triggers some kind of pipeline, it trains some kind of modelka, after that we understand that everything is okay and deploy service.



    What else happens in this case? I said that in the development of feature-branches there is such a paradigm as feature toggles. Fichi in a good way must be covered up with some toggles in order to simply cut down a feature in a fight if something went wrong. We can then collect all the features into release trains, and even if the features are unfinished, we can deploy them. Just the feature toggle will be turned off. Since we are all data scientists, we also want to do AV tests. Let's say we replaced LightGBM with CatBoost. We want to check this, but at the same time, the AV test is managed with reference to some userID. Feature toggle binds to the userID, and thus passes the AV test. We need to check these metrics here.

    All services are deployed in Nomad. We have two Nomad production clusters - one for the batch job, the other for services.



    All their business events they push in Kafka. From there we can pick them up. In essence, this is a lamba architecture. We can subscribe to HDFS with some services, do some real-time analytics, and at the same time we grab everything into ClickHouse and build dashboards to analyze all the business events for our services. We can analyze AV tests, whatever.



    And if we did not change the code, do not use feature toggles. We just tried to handle some kind of pipeline, he taught us a new model. We have a new path to it. We just change the Nomad-path to the model in the config, make the release of the new service, and here the Canary Deployment paradigm comes to help us, it is available in Nomad from the box.

    We have the current version of the service in three instances. We say that we want three canaries - three more replicas of new versions will be deployed without cutting down the old ones. As a result, traffic begins to split into two parts. Part of the traffic gets on new versions of services. All services push all their business events in Kafka. As a result, we can analyze metrics in real time.

    If everything is ok, then we can say that everything is ok. Deploy, Nomad will pass, gently turn off all the old versions and zaskelit new.

    This model is bad because if we need to bind version routing for some entity, User Item. Such a scheme does not work, because traffic is balanced through round-robin. Therefore, we went the next way and sawed the service into two parts.



    This is the Gateway layer and the workers layer. The client communicates via HTTP with the Gateway layer, all the logic of selecting versions and balancing traffic is in the Gateway. At the same time, all the I / O Boundary tasks that are needed to fulfill the prediction are also in the Gateway. Suppose we are in the predicate in the query comes userID, which we need to enrich some information. We have to pull other microservices and pick up all the information, features or databases. As a result, all this happens in Gateway. He communicates with workers who are only in the model, and does one thing - a prediction. Input and Output.

    But since we cut our service into two parts, overhead appeared due to a remote network call. How to level this? The JRPS framework from Google, RPC from Google, which runs on top of HTTP2, comes to the rescue. You can use multiplexing and compression. JPRC uses protobaf. This is a strongly typed binary protocol that has fast serialization and deserialization.

    As a result, we also have the ability to independently scale the Gateway and worker. Suppose we cannot hold any number of open HTTP connections. Okay, skeylim Gateway. We have a too slow prediction, we don’t have time to keep the load - okay, scale the workers. This approach fits very well on many-armed bandits. In Gateway, since all the traffic balancing logic is implemented, it can go to external microservices and take away all the statistics for each version from them, as well as make a decision on how to balance traffic. Suppose - with the help of Thompson Sampling.



    Everything was okay, the models were somehow trained, we registered them in the Nomad config. And what if there is a model of recommendations, which during the training already has time to become obsolete, and we need to constantly retrain them? Everything is done in the same way: through periodic batch jobs, some artifact is being produced - say, every three hours. At the same time, at the end of its work, the pipeline sets the path for a new model to Consul. This is the key value storage that is used for configuration. Nomad knows how to template configs. Let there be an environment variable based on the values ​​of key value storage Consul. He monitors changes and, as soon as a new path appears, decides that it is possible to go two ways. He downloads the artifact itself through a new link, puts the service container into Docker with volume and reboots it - and does it all so that there is no downtime, that is, slowly, by the piece. Or it renders a new config and reports it to the service. Either the service itself will detect it - and inside it can independently, live, update its model card. That's all, thank you.

    Also popular now: