We free our hands to several analysts: API Livy for automation of typical banking tasks

    Hello, Habr!

    It is no secret that banks use data from various sources (credit bureaus, mobile operators, etc.) to assess the solvency of customers. The number of external partners can reach several dozen, and the analysts in our team will recruit only a few people. The problem arises of optimizing the work of a small team and transferring routine tasks to computing systems.

    We will analyze how this data goes to the bank, and how a team of analysts monitors this process.



    Let's start in order.

    Our distributed system based on Hadoop, and all processes associated with it, we briefly call SmartData. SmartData receives API data from external agents. (Moreover, the agents for it are both external partners and internal systems of the bank). Of course, it would be useful to collect a certain “current profile” for each client, which we do. Updated data from sources fall into Operprofil. The Operprofile implements the idea of ​​Customer 360 and is stored as Hbase tables. It is convenient for further work with the client.

    Customer 360
    Customer 360 - an approach to implementing operational storage with all kinds of attributes of client data used in all processes in the organization that work with the client and his data, accessible by client’s key.

    Work with agents is ongoing and needs to be controlled. To quickly check the quality of interaction and hit rate, as well as the ease of transferring this information to other teams, we use visualization, for example, reports in Tableau.

    The source data is sent to Kafka , pre-processed and placed in a DataLake built on the basis of HDFS . It took me to come up with a solution how to organize parsing of log files from HDFS, their processing and daily uploading to analytical and visualization systems. And also combine this with the love of analysts for Python laptops.

    Finish with the internal kitchen and move on to practice.

    Our solution was to use the Livy API. Livy allows you to submit code to a cluster directly from Jupyter. An HTTP request containing code written in Python (or Scala) and meta data is sent to Livy. Livy initiates the launch of the Spark session on the cluster, which is managed by the Yarn resource manager. The requests module is suitable for sending HTTP requests. Those who like to parse sites will probably already know him (and if not, here’s a chance to learn a little about him).

    We import the necessary modules and create a session. (We also immediately find out the address of our session, in the future it will come in handy). In the parameters we pass the data for user authorization and the name of the script language that the cluster will execute.

    import json, requests, schedule, time
    host = 'http://***:8998'
    data = {'kind': 'spark', 'proxyUser': 'user'}
    headers = {'Content-Type': 'application/json'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    session_id = r.json().get('id')
    print("session_id: " + str(session_id))
    session_url = host + r.headers['location']
    r = requests.get(session_url, headers=headers)
    

    We are waiting for the session status to go to idle. If the timeout exceeds the set timeout - send an error message.

    timeout = time.time() + wait_time
    sess_state = ['starting', 'success', 'idle']
    while(True):
        time.sleep(7)
        req_st = requests.get(session_url, headers=headers).json().get('state')
        if req_st != 'idle' and time.time() > timeout:
            requests.delete(session_url, headers=headers)
            send_message("Scheduler_error", req_st)
            break
        if req_st == 'idle':
            break
        if req_st not in sess_state:
            send_message("Scheduler_error", req_st)
            break
    print("Session_state: ", req_st) 
    

    Now you can send the code to Livy.

    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    r = requests.get(statement_url, headers=headers)
    while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
        time.sleep(15)
    r = requests.get(statement_url, headers=headers).json()['output']
    session_url = 'http://***:8998/sessions/' + str(session_id)
    

    In the loop, we wait for the end of the code execution, we get the processing result:

    r.get('data').get('text/plain')

    The delete method will delete the session.

    requests.delete(session_url, headers=headers) 

    For daily unloading, you can use several options, they already wrote about cron on the hub, but about the user-friendly schedule module - no. Just add it to the code, it will not require explanation. And, for convenience, I will collect all the calculations in one place.

    The code
    import json, requests, schedule, time
    schedule.every().day.at("16:05").do(job, 300)
    while True:
        schedule.run_pending()
    def job(wait_time):
        host = 'http://***:8998'
        data = {'kind': 'spark', 'proxyUser': 'user'}
        headers = {'Content-Type': 'application/json'}
        r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
        session_id = r.json().get('id')
        print("session_id: " + str(session_id))
        session_url = host + r.headers['location']
        r = requests.get(session_url, headers=headers)
        timeout = time.time() + wait_time
        sess_state = ['starting', 'success', 'idle']
        while(True):
            time.sleep(7)
            req_st = requests.get(session_url, headers=headers).json().get('state')
            if req_st != 'idle' and time.time() > timeout:
                requests.delete(session_url, headers=headers)
                break
            if req_st == 'idle':
                break
            if req_st not in sess_state:
                send_message("Scheduler_error", req_st)
                break
        print("Session_state: ", req_st)   
        statements_url = session_url + '/statements'
        data = {'code': '1 + 1'}
        r = requests.post(statements_url, data=json.dumps(data),headers=headers)
        statement_url = host + r.headers['location']
        r = requests.get(statement_url, headers=headers)
        while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
            time.sleep(15)
        r = requests.get(statement_url, headers=headers).json()['output']
        session_url = 'http://***:8998/sessions/' + str(session_id)
        print(r.get('data').get('text/plain'))
        #requests.delete(session_url, headers=headers)
    def send_message(subject, text):
        import smtplib
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        me = "my_email_adress"
        you = "email_adress"
        msg = MIMEMultipart('alternative')
        msg['Subject'] = subject
        msg['From'] = me
        msg['To'] = you
        text = text
        part1 = MIMEText(text, 'plain')
        msg.attach(part1)
        s = smtplib.SMTP('domain.org')
        s.ehlo()
        s.starttls()
        s.login("user", "password")
        s.sendmail(me, you, msg.as_string())
        s.quit()
    


    Conclusion:


    Perhaps this solution does not claim to be the best, but it is transparent to the team of analysts. Pros that I see in it:

    • the ability to use familiar Jupyter for automation
    • visual interaction
    • the team member has the right to choose how he will work with files (spark-zoo), as a result, there is no need to rewrite existing scripts

    Of course, when starting a large number of tasks, you will have to monitor the freed resources, configure communication between unloadings. These issues are resolved on an individual basis and agreed with colleagues.

    It will be great if at least one team notes this decision.

    References


    Livy Documentation

    Also popular now: