A small library for using AI in Telegram chat bots

    Good afternoon! In the wake of the general interest in chatbots in particular and systems of dialogue intelligence in general, for some time I was engaged in projects related to this topic. Today I would like to put one of the written libraries in open source. I will make a reservation that first of all I specialize in the algorithmic aspects of development and therefore I will be glad to constructively criticize decisions of coder nature from more knowledgeable specialists in this matter.


    The library is dedicated to building an interface between an algorithm that returns a response to a text request and the Telegram messenger API. Designed for flexible application of machine learning algorithms. By the way, if more knowledgeable than me, experts can offer a successful option, unifying the interfaces of various possible communication channels (instant messengers, web widgets, etc.) with a single entry point to the response function, I will be glad to discuss it in the comments. Personally, I, in due time, had to use a module independently written in Flask and giving access to the algorithm through http requests. To interact with each user interface (Telegram, Facebook, etc.), I had to write a separate program that was involved in communicating with it, translating the request into a unified format, understandable written on the Flask API, and, finally, translating the response back to a format that the user interface understands. This design looks somewhat awkward, so, I repeat, I will be glad to discuss this issue in the comments with more experienced colleagues in this topic.

    The main function that is called from the outside when the library starts is main_loop_webhooks()that it receives a function generating a meta-model [1], a function generating a response based on the result of applying the model, setting updates (for example, regular updates of weather forecasts or prices for certain goods), a list of resources in a specific format (currently used to send responses to Telegram and regularly alert an external script that the bot is functioning and has not crashed).

    The generation of the meta-model and the final answer are separated into two different functions for reasons of greater convenience in setting up, validating and testing the meta-model and at the same time, the simplicity of writing and testing the essence of the UX part, which translates the output value of the meta-model into a user-friendly one format.

    Hidden text
    # Author: Andrei Grinenko 
    # License: BSD 3 clause
    import os
    import json
    import time
    import urllib.request
    import urllib.parse
    import threading
    import traceback
    import datetime
    def get_safe_response_data(url):
      try:
        response = urllib.request.urlopen(url, timeout=1)
        response_data = json.loads(response.read())
        return response_data
      except KeyboardInterrupt:
        raise
      except:
        time.sleep(3)
        return {}
    def clear_updates(token, offset):
      urllib.request.urlopen('https://api.telegram.org/bot' + token + '/getupdates?offset='
                             + str(offset + 1))
    def write_message(message, chat_id, token):
      if type(message) == str:
        http_request = to_request(message, chat_id, token)
        response = urllib.request.urlopen(http_request)
      # including keyboard, new format
      elif type(message) == dict:
        text = message['text']
        reply_markup = message['reply_markup']
        http_request = to_request(text, chat_id, token, reply_markup)
        response = urllib.request.urlopen(http_request)
      # type(mesage) == list
      else:
        for str_message in message:
          write_message(str_message, chat_id, token)
    def write_log(**kwargs):
      file_name = kwargs.get('file_name', './data/log.txt')
      current_datetime = datetime.datetime.now()
      kwargs.update({'current_datetime': current_datetime})
      with open(file_name, 'a') as output_stream:
        try:
          output_stream.write(to_beautified_log_line(**kwargs))
        except:
          print(kwargs)
          print(traceback.print_exc())
          output_stream.write(to_simple_log_line(**kwargs))
    def to_request(message, chat_id, token, reply_markup={}):
      message_ = urllib.parse.quote(message)
      return ('https://api.telegram.org/bot' + token + '/sendmessage?'
              + 'chat_id=' + str(chat_id) + '&parse_mode=Markdown'
              + '&text=' + message_ + '&reply_markup=' + json.dumps(reply_markup))
    def to_request_old(message, chat_id, token, reply_markup={}):
      return to_request(message, chat_id, token, reply_markup)
    def get_last_message_data(token):
      try:
        response = urllib.request.urlopen('https://api.telegram.org/bot' + token + '/getupdates')
        text_response = response.read().decode('utf-8')
        json_response = json.loads(text_response)
        return json_response
      except KeyboardInterrupt:
        raise
      except:
        traceback.print_exc()
        time.sleep(3)
        return None
    def update_data(updates_settings):
      result = dict()
      for key in updates_settings['data']:
        result[key] = updates_settings['data'][key]()
      return result
    # Updates cash file with new data just obtained from sources
    def update_cash_file(new_data, cash_file_name):
      """
      new_data: dict of dicts and values
      """
      if os.path.isfile(cash_file_name):
        with open(cash_file_name) as input_stream:
          cash_file_data = json.loads(input_stream.read())
      else:
        cash_file_data = {}
      for key in new_data:
        if type(new_data[key]) == dict:
          if not key in cash_file_data:
            cash_file_data[key] = dict()
          for subkey in new_data[key]:
            cash_file_data[key][subkey] = new_data[key][subkey]
        else:
          cash_file_data[key] = new_data[key]
      with open(cash_file_name, 'w') as output_stream:
        output_stream.write(json.dumps(cash_file_data))
    def to_simple_log_line(**kwargs):
      channel = kwargs.get('channel', None)
      chat_id = kwargs.get('chat_id', None)
      message_text = kwargs['message_text']
      username = kwargs.get('username', None)
      first_name = kwargs.get('first_name', None)
      last_name = kwargs.get('last_name', None)
      response = kwargs['response']
      current_datetime = kwargs.get('current_datetime', None)
      datetime_output = (str(current_datetime.year) + '-' + str(current_datetime.month) + '-'
                         + str(current_datetime.day) + '/' + str(current_datetime.hour) + ':'
                         + str(current_datetime.minute))
      output_data = {'datetime':datetime_output, 'channel':channel,
                     'chat_id':chat_id,
                     'message_text':message_text,
                     'username':username,
                     'first_name':first_name,
                     'last_name':last_name,
                     'response':response}
      return str(output_data) + '\n'
    def to_beautified_log_line(**kwargs):
      channel = kwargs.get('channel', None)
      chat_id = kwargs.get('chat_id', None)
      message_text = kwargs['message_text']
      username = kwargs.get('username', None)
      first_name = kwargs.get('first_name', None)
      last_name = kwargs.get('last_name', None)
      response = kwargs['response']
      current_datetime = kwargs.get('current_datetime', datetime.datetime(2000, 1, 1, 0, 0))
      datetime_output = (str(current_datetime.year) + '-' + str(current_datetime.month) + '-'
                         + str(current_datetime.day) + '/' + str(current_datetime.hour) + ':'
                         + str(current_datetime.minute))# .encode('utf-8')
      username_data = str(username)# .encode('utf-8')
      first_name_data = str(first_name)# .encode('utf-8')
      last_name_data = str(last_name)# .encode('utf-8')
      response_data = str(response)# .encode('utf-8')
      output_data = {'datetime':datetime_output, 'channel':str(channel), # .encode('utf-8'),
                     'chat_id':str(chat_id), 'message_text':message_text, # .encode('utf-8'),
                     'username':username_data,
                     'first_name':first_name_data,
                     'last_name':last_name_data,
                     'response':response_data}
      return json.dumps(output_data, ensure_ascii=False) + '\n'
    def message_to_input(message_data):
      input_data = dict()
      try:
        input_data['message_text'] = message_data['result'][-1]['message']['text']
      except:
        input_data['message_text'] = None
      try:
        input_data['update_id'] = message_data['result'][-1]['update_id']
      except:
        input_data['update_id'] = None
      try:
        input_data['chat_id'] = message_data['result'][-1]['message']['chat']['id']
      except:
        input_data['chat_id'] = None
      try:
        input_data['username'] = message_data['result'][-1]['message']['chat']['username']
      except:
        input_data['username'] = None
      try:
        input_data['first_name'] = message_data['result'][-1]['message']['chat']['first_name']
      except:
        input_data['first_name'] = None
      try:
        input_data['last_name'] = message_data['result'][-1]['message']['chat']['last_name']
      except:
        input_data['last_name'] = None
      return input_data
    def is_update_time(new_datetime, last_update_datetime, updates_settings):
      if updates_settings['frequency'] == None:
        return False
      else:
        return new_datetime - last_update_datetime > updates_settings['frequency']
    def update_data_thread_function(updates_settings, cash_file_name):
      global updating_data
      last_update_datetime = datetime.datetime.now() - datetime.timedelta(weeks=10)
      while True:
        new_datetime = datetime.datetime.now()
        if is_update_time(new_datetime, last_update_datetime, updates_settings):
          print('Updating data at datetime: {}', new_datetime)
          last_update_datetime = datetime.datetime.now()
          try:
            new_data = update_data(updates_settings)
            if cash_file_name:
              update_cash_file(new_data, cash_file_name)
              updating_data = json.load(open(cash_file_name))
            else:
              updating_data = new_data
          except:
            traceback.print_exc()
            time.sleep(3)
            continue
        time.sleep(10)
    def main_loop_webhooks(generate_model_func, generate_response_func,
                           updates_settings={'frequency':None, 'data':{}}, **kwargs):
      """
      updates_settings format:
      {frequency: datetime.timedelta,
       data: {first_field: first_function, second_field: second_function, ...}}
      kwargs:
      cash_file_name: str, cash file name for cashing updates data, None for no file name
      """
      cash_file_name = kwargs.get('cash_file_name', None)
      log_file_name = kwargs.get('log_file_name', None)
      list_of_sources = kwargs['list_of_sources'] # .get('list_of_sources', [])
      model = generate_model_func()
      print('Model trained')
      global updating_data
      if os.path.isfile(cash_file_name):
        updating_data = json.load(open(cash_file_name))
      else:
        updating_data = {}
      update_data_thread = threading.Thread(target=update_data_thread_function,
                                            args=(updates_settings, cash_file_name))
      update_data_thread.start()
      current_update_id = 0
      activity_status = None
      last_ping_time = datetime.datetime.now()
      while True:
        for source in list_of_sources:
          current_datetime = datetime.datetime.now()
          if source['node'] == 'https://api.telegram.org/' and activity_status != 'waiting':
            try:
              message_data = get_last_message_data(source['id']['token'])
            except KeyboardInterrupt:
              raise
            except:
              traceback.print_exc()
              time.sleep(3)
              continue
            input_data = message_to_input(message_data)
            try:
              if input_data['update_id'] != None:
                clear_updates(source['id']['token'], input_data['update_id'])
            except:
              traceback.print_exc()
            if (input_data['update_id'] != None and input_data['message_text'] != None
                and (current_update_id == None or input_data['update_id'] > current_update_id)):
              current_update_id = input_data['update_id']
              analizing_data = {'message_text':input_data['message_text'],
                                'username':input_data['username'],
                                'first_name':input_data['first_name'],
                                'last_name':input_data['last_name'],
                                'recipient':{'channel':'telegram',
                                             'token':source['id']['token'],
                                             'chat_id':input_data['chat_id']}}
              user_id = {'channel':analizing_data['recipient']['channel'],
                         'chat_id':analizing_data['recipient']['chat_id']}
              try:
                response = generate_response_func(input_data['message_text'], model,
                                                  user_id, updating_data)
              except:
                response = u'Sorry, can\'t reply'
                traceback.print_exc()
              write_attempts_number = 5
              write_attempts_counter = 0
              while write_attempts_counter < write_attempts_number:
                try:
                  write_message(response, analizing_data['recipient']['chat_id'],
                                          analizing_data['recipient']['token'])
                  write_attempts_counter = write_attempts_number # Success!
                except:
                  if write_attempts_counter == 1:
                    traceback.print_exc()
                  write_attempts_counter += 1
                  time.sleep(3)
              write_log(file_name=log_file_name,
                        message_text=analizing_data['message_text'],
                        response=response,
                        channel=analizing_data['recipient']['channel'],
                        chat_id=analizing_data['recipient']['chat_id'],
                        username=analizing_data['username'],
                        first_name=analizing_data['first_name'],
                        last_name=analizing_data['last_name'])
          else:
            current_time = datetime.datetime.now()
            if current_time - last_ping_time > datetime.timedelta(seconds=10):
              last_ping_time = current_time
              response_data = get_safe_response_data(source['node'] + 'ping' + source['id']['token']
                                                     + '/getupdates')
              if response_data.get('sysmsg') == 'ping':
                activity_status = response_data.get('status')
                analizing_data = {'node':source['node'],
                                  'recipient':source['id'], 'ping':True}
                http_address = (analizing_data['node'] + 'ping' + analizing_data['recipient']['token']
                                + '/sendmessage')
                try:
                  urllib.request.urlopen(http_address, timeout=1)
                except:
                  pass
    


    We briefly discuss the functions contained in the library.

    main_loop_webhook(generate_model_func, generate_response_func, updates_settings, **kwargs)
    

    The main function called from a script using a library. It loads the cache file, launches a function in the parallel stream that updates the data, polls and sends a response to Telegram and a service that monitors the state of the bot.

    get_last_message_data(token)
    

    Gets the latest messages from the bot user.

    message_to_input(message_data)
    

    It processes the received message and converts it into a form convenient for further processing.

    write_message(message, chat_id, token)
    

    The function responsible for sending a message to the Telegram chat. For historical reasons, the message key argument can be of type str (for simple single messages), list (for message threads), or dict (for messages that contain an embedded keyboard).

    write_log(**kwargs)
    

    The function responsible for logging the dialog.

    to_request(message, chat_id, token, reply_markup)
    

    The function generates a request to the Telegram API based on the message, chat number, token and response markup (as a rule, containing information about the built-in keyboard).

    update_data(updates_settings)
    

    Updates data from third-party sources

    update_cash_file(new_data, cash_file_name)
    

    Updates the contents of the cache file. It is useful in case the next time you start the program (for example, after a crash or when deploying a new version) the data service is not available.

    is_update_time(new_datetime, last_update_datetime, updates_settings)
    

    Checks if it is time to update data from third-party APIs.

    update_data_thread_function(updates_settings, cash_file_name)
    

    A function that runs in a parallel thread and updates third-party API data. I had to recall the basics of multi-threaded programming, as some APIs can respond to a request for tens of seconds, and during this time the main program hung and did not respond to user requests.

    The library is primarily intended for interaction with machine learning algorithms, however, their discussion goes far beyond the scope of this article. In order to show how it works, we write two simple functions and use them as a generator of an example model and an output interpreter.

    An example script using a library.

    Hidden text
    # Author: Andrei Grinenko 
    # License: BSD 3 clause
    import sys
    import datetime
    import web3
    TOKEN = '123456789:abcdefghijklmnopqrstuvwxyzABCDEFGHI'
    REGISTER_TOKEN = '12:abcdef'
    class HelloWorldMetaModel(object):
      def __init__(self):
        pass
      def predict(self, instance):
        if instance == '/start':
          return 'start'
        elif instance in ['hi', 'hello']:
          return 'hello'
        elif instance == 'how are you':
          return 'how_are_you'
        else:
          return 'unknown'
    def generate_meta_model():
      return HelloWorldMetaModel()
    def generate_response(instance, model, user_id, updating_data):
      current_datetime = updating_data['datetime']
      meaning = model.predict(instance)
      if meaning == 'start':
        reply = current_datetime + ': ' + 'I am habr example bot'
      elif meaning == 'hello':
        reply = current_datetime + ': ' + 'Hello, human!'
      elif meaning == 'how_are_you':
        reply = current_datetime + ': ' + 'I\'m fine, thanks!'
      else:
        reply = current_datetime + ': ' + 'Don\'t know yet'
      # TODO Add datetime
      return reply
    def update_datetime():
      return str(datetime.datetime.now())
    if __name__ == '__main__':
      web3.main_loop_webhooks(
                      generate_model_func=generate_meta_model,
                      generate_response_func=generate_response,
                      updates_settings={'frequency':datetime.timedelta(seconds=5),
                                        'data':{'datetime':update_datetime}},
                      list_of_sources=[{'node':'https://api.telegram.org/', 'id':{'token':TOKEN}},
                                       # {'node':'http://123.456.78.90/',
                                       #  'id':{'token':REGISTER_TOKEN}}
                                       ],
                      cash_file_name='./cash_file.txt',
                      log_file_name='./log.txt')
    


    TOKEN = '123456789:abcdefghijklmnopqrstuvwxyzABCDEFGHI'
    

    Token for authorizing a bot in Telegram. It needs to be replaced with the one issued by BotFather for a specific bot.

    REGISTER_TOKEN = '12:abcdef'
    

    Token for notifying the script controlling that the bot is working. You can replace it with a random string if this function is not needed.

    class HelloWorldMetaModel(object)
    

    A trivial replacement for the class responsible for the logic for processing incoming messages. Someday there will be a complex AI, but, alas, not yet. At least not in this article. And those who want to read about how to quickly write a machine learning algorithm and roll it into production can do it here

    generate_meta_model()
    

    The function that generates the model.

    generate_response(instance, model, user_id, updating_data)
    

    The function responsible for the logic of forming the answer based on the result of applying the model. In our simple example, the answer uniquely corresponds to one of the possible meanings of the user's messages specified in the predict () function of the HelloWorldMetaModel class.

    update_datetime()
    

    An example of a function that attracts external changing data.

    An example of work (the picture is clickable): The code on the github is here . The code for the bitbucket is here . Run simply through , after changing the value of the TOKEN constant. Glossary (Trained) machine learning model - a function that takes an input an array of fixed-length numbers (for mathematicians, a point in n-dimensional space) and returns a class identifier (when solving the classification problem) or a number (when solving the regression problem).




    python3 example.py




    [1] Meta-model is a generalization of the concept of machine learning model. It differs in that it takes an object of arbitrary nature (e.g. an array encoding an image, a string, a description of a website user, etc.) and gives an object of an arbitrary nature (usually a class identifier, number, dictionary or string). Abstraction is useful for encapsulating a machine learning pipeline, which usually includes preprocessing, applying the model, and postprocessing into a single entity (author terminology).

    Also popular now: