Asterisk + AMI + Python

    Honestly, I have long thought whether to publish this material. For those who know how to work with AMI Asterisk, there is nothing interesting here. For those who are just starting to do something, they are unlikely to figure it out in my code (although I tried to write clearly). Vanguyu comments like: "Why use Habr for your notes?". On the other hand, the script given under the cut can be someone's starting point. The script does nothing except that it sends to the console all the events from AMI and is able to filter them. For example, I show all calls in the console that fall into any of the contexts “zadarma-in” or “sibseti_in”. If interested, please under the cat:

    There was a real-time task to look at which trunk the call came from, which buttons in ivr the user clicked, who answered the call, etc. I have long wanted to try working with AMI from Python, before that I had a little experience from Bash and then to organize a call back.

    Having rummaged with various ready-made libraries, it quickly came to the realization that none of them suits me. As a result, his “bicycle” was invented in the form of a script, which gives all the information from AMI to json. Uses standard Python libraries. Plus the fact that in this form it is easy to receive and parse any events and not to lose binding to a specific call.

    The first script prints only those events that fall into any of the “zadarma-in” or “sibseti_in” contexts.

    Script number 1
    import telnetlib
    import json
    import re
    ##
    HOST = "192.168.10.10"
    PORT = "5038"
    user = "zabbix"
    password = "password"##
    tn = telnetlib.Telnet(HOST,PORT)
    tn.write("Action: login".encode('ascii') + b"\n")
    username = "Username: " + user
    tn.write(username.encode('ascii') + b"\n")
    passWord = "Secret: " + password
    string_NOW = ''
    string_out = ''
    cd = 0
    tn.write(passWord.encode('ascii') + b"\n\n")
    deftelnet_for_string(string):global string_out
        string_out_def = ''for mes in string:
            try:
                if string[mes]['Context'] == 'zadarma-in'or string[mes]['Context'] == 'sibseti_in'or string[mes]['Context'] == 'IVR':
                    Uniqueid = string[mes]['Uniqueid']
                    CallerIDNum = string[mes]['CallerIDNum']
                    Exten = string[mes]['Exten']
                    CallerIDName = string[mes]['CallerIDName']
                    try:
                        Digit = string[mes]['Digit']
                    except KeyError:
                        Digit = ''# if Exten == 's' or Exten == 'h':#     Exten = ''# Context = string[mes]['Context']
                    string_out_def = json.dumps({'Uniqueid': Uniqueid,
                               'CallerIDNum':CallerIDNum,
                               'CallerIDName':CallerIDName,
                               'Exten':Exten,
                               'Digit':Digit})
                    # print(string_out_def)except UnboundLocalError:
                1+1except KeyError:
                1+1# print(string_out_def)if string_out_def:
            if string_out_def != string_out:
                print(string_out_def)
            string_out = string_out_def
    whileTrue:
        string = ''
        event_string = ''
        elements_string = ''
        c = 0
        read_some = tn.read_some()  # Получаем строчку из AMI
        string = read_some.decode('utf8', 'replace').replace('\r\n', '#')   # Декодируем строчки и заменяем переносы строк на ## print(string)# Отлавливаем начало строки и склеиваем строчкуifnot string.endswith('##'):
            string_NOW = string_NOW + string
            # print('1 --->',string_NOW)# Если строчка закончилась, то доклеиваем конец строки и# совершаем магию, которая двойной перенос строки в середине строки заменит на $,# а все одинарные переносы заменит на #, так-же удалим кавычки и обратные слешиif string.endswith('##'):
            string_NOW = string_NOW + string
            string_NOW = string_NOW.replace('##', '$')  # заменяем двойной перенос строки на $
            string_NOW = string_NOW.replace('\n', '#')  # Заменяем  перенос на #
            string_NOW = string_NOW.replace('\r', '#')  # Заменяем  перенос на #
            string_NOW = string_NOW.replace('"', '')    # Удаляем кавычки
            string_NOW = string_NOW.replace('\\', '')   # удаляем обратный слеш# print('string_NOW -->',string_NOW)# print()# Делим полученую строчку на Евенты т.к. двойной перенос как раз её так и делил
            events = re.findall(r'[A-Z][\w]+:\s[^$]+', string_NOW)
            for event in events:
                c+=1# print('event ---> ',event)
                event_elements = re.findall(r'[A-Z][\w]+:\s[^#]+', event)   # А тут делим евенты на елеменыfor element in event_elements:
                    element = '\"' + element.replace(': ', '\": "') + '\", '# Вручную делаем словарь# print('element', element)
                    elements_string = elements_string + element # Склеиваем строчки обратно, получаем словарь# event_string = event_string + '\"' + elements_string.split(':')[1].split(',')[0].replace('"','') + '\": ' + '{' + elements_string + '}'# print(elements_string)# print(str(elements_string.split(':')[1].split(',')[0]))# собираем обратно евенты попутно формирую json:
                event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}'
                event_string = event_string.replace('}{', '},{')    #   Добавляем запятую между евентами
                event_string = event_string.replace(', }', '}, ')   #
            event_string = '{' + event_string + '}'
            event_string = event_string.replace('}, }', '}}')
            # Превращаем полученую строчку в json, если вдруг есть ошибка в синтаксисе json, то выводим как сам невалидный# json, так и строчку  из которой не получилось его собрать.try:
                parsed_string = json.loads(event_string)
            except json.decoder.JSONDecodeError:
                print('#############################################', '\n\n\n')
                print(event_string, '\n\n\n')
                print(string_NOW, '\n\n\n')
                print('#############################################', '\n\n\n')
            # print(event_string)# print(parsed_string['1'])# Отправляем полученую строчку в функуию "telnet_for_string", в которой уже можно обработать полученую строчку.
            telnet_for_string(parsed_string)
            string_NOW = ''# Очищем строчку


    And the second script, which writes all events to the console, looking at both scripts, it becomes clear what needs to be changed in order to achieve the desired result. If it is not completely clear, then parsit needs json "string [mes]" in the function "def telnet_for_string (string)":

    Script number 2
    import telnetlib
    import time
    import json
    import re
    ##
    HOST = "192.168.10.10"
    PORT = "5038"
    user = "zabbix"
    password = "password"##
    tn = telnetlib.Telnet(HOST,PORT)
    tn.write("Action: login".encode('ascii') + b"\n")
    username = "Username: " + user
    tn.write(username.encode('ascii') + b"\n")
    passWord = "Secret: " + password
    string_NOW = ''
    string_out = ''
    tn.write(passWord.encode('ascii') + b"\n\n")
    deftelnet_for_string(string):for mes in string:
            print(string[mes])
    whileTrue:
        # time.sleep(0.1)
        string = ''
        event_string = ''
        elements_string = ''
        c = 0
        read_some = tn.read_some()
        string = read_some.decode('utf8', 'replace').replace('\r\n', '#')
        # print(string)ifnot string.endswith('##'):
            string_NOW = string_NOW + string
        if string.endswith('##'):
            string_NOW = string_NOW + string
            string_NOW = string_NOW.replace('##', '$')
            string_NOW = string_NOW.replace('\n', '#')
            string_NOW = string_NOW.replace('\r', '#')
            string_NOW = string_NOW.replace('"', '')
            string_NOW = string_NOW.replace('\\', '')
            events = re.findall(r'[A-Z][\w]+:\s[^$]+', string_NOW)
            for event in events:
                c+=1
                event_elements = re.findall(r'[A-Z][\w]+:\s[^#]+', event)
                for element in event_elements:
                    element = '\"' + element.replace(': ', '\": "') + '\", '
                    elements_string = elements_string + element
                event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}'
                event_string = event_string.replace('}{', '},{')
                event_string = event_string.replace(', }', '}, ')
            event_string = '{' + event_string + '}'
            event_string = event_string.replace('}, }', '}}')
            try:
                parsed_string = json.loads(event_string)
            except json.decoder.JSONDecodeError:
                print('#############################################', '\n\n\n')
                print(event_string, '\n\n\n')
                print(string_NOW, '\n\n\n')
                print('#############################################', '\n\n\n')
            telnet_for_string(parsed_string)
            string_NOW = ''


    Also popular now: