Small convenience in student life

A simple story about how I felt ashamed to constantly ask the groupmates for missing information and I decided to make our lives a little easier.


I suppose that many of my peers are familiar with the situation when in the general chat, where important information is often flashed, there are about 30 active interlocutors who constantly load Vkontakte databases with their messages. Under these conditions, it is unlikely that everyone will see this important information. It happens to me. A year ago, it was decided to correct this misunderstanding.

Those who are ready not to resent the next article about the bot, please under the cat.

Since I am a first level student, examples will be related to this topic.
So, there is a task: to make the transfer of information from the headman to the students convenient both for the headman and for the students. Thanks to the relatively new features of Vkontakte (namely, personal communications of communities), the solution caught my eye immediately. A bot sitting in a group must receive messages from the headman (the headman if there are many groups on the stream) and send them to interested persons (students).

The task is set, proceed.

We will need:

  1. vk_api library for using Vk Api
  2. peewee orm to work with the database
  3. and built-in python modules

Also, before reading, I suggest refreshing the Observer patterns ( habr , wiki ) and façade ( habr , wiki )

Part 1. "Nice to meet you, comrade bot."

First you need to teach our bot to understand itself as a community. Create a class called Group. Let the session object and the representative (Proxy) object of the database be taken as arguments.

classGroup(BaseCommunicateVK):def__init__(self, vksession, storage):
        super().__init__(vksession) = storage

BaseCommunicateVK? What is there?

Решение вынести эту функциональность в отдельный класс объясняется тем, что в будущем, возможно, кто-то из вас решит дополнить бота каким-нибудь другим функционалом Вконтакте.
Ну и чтобы разгрузить абстракцию сообщества, естественно.

    longpoll = Nonedef__init__(self, vksession):
        self.session = vksession
        self.api = vksession.get_api()
        if BaseCommunicateVK.longpoll isNone:
            BaseCommunicateVK.longpoll = VkLongPoll(self.session)
    defget_api(self):return self.api
    defget_longpoll(self):return self.longpoll
    defmethod(self, func, args):return self.api.method(func, args)
    @staticmethoddefcreate_session(token=None, login=None, password=None, api_v='5.85'):try:
            if token:
                session = vk_api.VkApi(token=token, api_version=api_v)
            elif login and password:
                session = vk_api.VkApi(login, password, api_version=api_v)
                raise vk_api.AuthError("Define login and password or token.")
            return session
        except vk_api.ApiError as error:
    defget_last_message(self, user_id):return self.api.messages.getHistory(
            peer_id=user_id, count=1)["items"][0]
    @staticmethoddefget_attachments(last_message):ifnot last_message or"attachments"notin last_message:
        attachments = last_message["attachments"]
        attach_strings = []
        for attach in attachments:
            attach_type = attach["type"]
            attach_info = attach[attach_type]
            attach_id = attach_info["id"]
            attach_owner_id = attach_info["owner_id"]
            if"access_key"in attach_info:
                access_key = attach_info["access_key"]
                attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key)
                attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id)
    @staticmethoddefget_forwards(attachments, last_message):ifnot attachments or"fwd_count"notin attachments:
            return""if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]):
            return last_message["id"]
    defsend(self, user_id, message, attachments=None, **kwargs):
        send_to = int(user_id)
        if"last_message"in kwargs:
            last_message = kwargs["last_message"]
            last_message = None
        p_attachments = self.get_attachments(last_message)
        p_forward = self.get_forwards(attachments, last_message)
        if message or p_attachments or p_forward:
                user_id=send_to, message=message,
        if destroy:
            accept_msg_id = self.api.messages \
                .getHistory(peer_id=user_id, count=1) \
            self.delete(accept_msg_id, destroy_type=destroy_type)
    defdelete(self, msg_id, destroy_type=1):
        self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)

Create a method for updating community members. Immediately divide them into administrators and members and save in the database.

  • self.api is configured when creating the base class Group (BaseCommunicateVK)

    fields = 'domain, sex'
    admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers')
    members = self.api.groups.getMembers(group_id=self.group_id, fields=fields)
    return self
defsave_members(self, members):
@staticmethoddef_configure_users(items, exclude=None):if exclude isNone:
        exclude = []
    users = []
    for user in items.get('items'):
        if user.get('id') notin exclude:
            member = User()
    return users

This class should also be able to send messages to the addressees, so the following method is in the studio. In the parameters: the list of recipients, text messages and applications. All this business is launched in a separate thread so that the bot can receive messages from other participants.
Messages are received in synchronous mode, so with an increase in the number of active clients, the response speed will obviously diminish.

defbroadcast(self, uids, message, attachments=None, **kwargs):
    report = BroadcastReport()
        users_ids = uids
        ifnot isinstance(users_ids, list):
            users_ids = list(users_ids)
        report.should_be_sent = len(users_ids)
        for user_id in users_ids:
                self.send(user_id, message, attachments, **kwargs)
                if message or attachments:
                    report.sent += 1except vk_api.VkApiError as error:
                report.errors.append('{}: {}'.format(user_id, error))
            except ValueError:
                continuefor uid in self.get_member_ids(admins=True, moders=True):
            self.send(uid, str(report))
    broadcast_thread = Thread(target=send_all)

BroadcastReport - report class
        self.should_be_sent = 0
        self.sent = 0
        self.errors = []
        res = "# Отчет #"
        res += "\nПлан: {} сообщений ".format(self.should_be_sent)
        res += "\nРазослано: {} ".format(self.sent)
        if self.errors:
            res += "\nОшибки:"for i in self.errors:
                res += "\n- {}".format(i)
        return res

On this, seemingly, the abstraction of the group is over. We met with all the community members, now we need to learn how to understand them.

Part 2. "Psh ... welcome .."

Make the bot listen to all messages from members of our community.
To do this, create a class HhatHandler, which will do it in the

  • group_manager is an instance of the community class we just wrote.
  • command_observer recognizes connected commands (but this is the third part)

classChatHandler(Handler):def__init__(self, group_manager, command_observer):
        self.longpoll = group_manager.get_longpoll() = group_manager
        self.api = group_manager.get_api()
        self.command_observer = command_observer

Next, in fact, we listen to messages from users and recognize the commands.

        for event in self.longpoll.listen():
            if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me:
                self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id)
    except ConnectionError:
        logging.error("I HAVE BEEN DOWNED AT {}".format(
defhandle(self, user_id, message, attachments, **kwargs):
    member =
    self.command_observer.execute(member, message, attachments,, **kwargs)

Part 3. "What did you write about my ..?"

A separate subsystem implemented through the "Observer" pattern deals with the recognition of commands.
Attention, CommandObserver:

classCommandObserver(AbstractObserver):defexecute(self, member, message, attachments, group, **kwargs):for command in self.commands:
            for trigger in command.triggers:
                body = command.get_body(trigger, message)
                if body isnotNone:
                    group.api.messages.setActivity(, type="typing")
                    if command.system:
                        kwargs.update({"trigger": trigger, "commands": self.commands})
                        kwargs.update({"trigger": trigger})
        return command.proceed(member, body, attachments, group, **kwargs)


Опять же, вынесение сделано для будущего возможного расширения.

        self.commands = []
    defadd(self, *args):for arg in args:
    @abstractmethoddefexecute(self, *args, **kwargs):pass

But what will this observer recognize?
So we got to the most interesting - the team.
Each command is an independent class, descendant from the base Command class.
All that is required of the command is to run the proceed () method if its keyword is found at the beginning of the user's message. Command keywords are defined in the command class triggers variable (string or list of strings)

        self.triggers = []
        self.description = "Empty description."
        self.system = False
        self.privilege = False
        self.activate_times = []
        self.activate_days = set()
        self.autostart_func = self.proceed
    defproceed(self, member, message, attachments, group, **kwargs):raise NotImplementedError()
    @staticmethoddefget_body(kw, message):ifnot isinstance(kw, list): kw = [kw, ]
        for i in kw:
            reg = '^ *(\\{}) *'.format(i)
            if, message):
                return re.sub(reg, '', message).strip(' ')

As can be seen from the signature of the proceed () method, each team receives as input a link to the instance of a group member, its message (no longer a keyword), applications, and a link to the group instance. That is, all interaction with a member of the group falls on the shoulders of the team. I think this is the most correct decision, since it is thus possible to create a shell (Shell) for greater interactivity.
(In truth, for this you will need to either make an asynchronous, because the processing is synchronous, or each received message is processed in a new thread, which is not profitable)

Examples of command implementation:

        self.triggers = ['.mb']
        self.privilege = True
        self.description = "Рассылка сообщения всем участникам сообщества."defproceed(self, member, message, attachments, group, **kwargs):if notin group.get_member_ids(admins=True, editors=True):
            group.send(, "You cannot do this ^_^")
        last_message = group.get_last_message(
        group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs)

        self.commands = []
        self.triggers = ['.h', '.help']
        self.system = True
        self.description = "Показ этого сообщения."defproceed(self, member, message, attachments, group, **kwargs):
        commands = kwargs["commands"]
        help = "Реализованы следующие команды:\n\n"
        admins = group.get_member_ids(admins=True, moders=True)
        i = 0for command in commands:
            if command.privilege and notin admins:
            help += "{}) {}\n\n".format(i + 1,
            i += 1
        group.send(, help)

Part 4. "We are one big team."

Now all these modules and handlers need to be combined and configured.
One more class please!
Create a facade that will customize our bot.

classVKManage:def__init__(self, token=None, login=None, password=None):
        self.session = BaseCommunicateVK.create_session(token, login, password, api_version) = DBProxy(DatabaseORM) = Group(self.session, = ChatHandler(, CommandObserver.get_observer())
    defget_command(self, command_name):return {
            "рассылка участникам": BroadcastCommand(),
            "рассылка админам": AdminBroadcastCommand(),
            "помощь": HelpCommand(),
            "учет прогулов": SkippedLectionsCommand(),
            "расписание": TopicTimetableCommand().setup_account(,
    defconnect_command(self, command_name):
        command = self.get_command(str(command_name).lower())
        if command:
        return self
    defconnect_commands(self, command_names):for i in command_names.split(','): self.connect_command(i.strip())
        return self

The last stage is the launch. Always the nastiest, because any surprise can come out. Not this time.

  • ConfigParser is imported from core.settings.ConfigParser. In fact, just reads the config.
  • project_path is imported from the settings module in the project root.

    if __name__ == '__main__':
    config = ConfigParser(project_path)
    VKManage(token=config['token'], login=config['login'], password=config['password'])\
        .connect_commands("помощь, рассылка участникам, рассылка админам, учет прогулов")\

This seems to be all.

At the moment, this program has benefited at least three groups and, I hope, will bring you too.

You can deploy for free on Heroku, but that's another story.


Also popular now: