We collect Mikrotik firewall logs into the database

Good day.

I want to tell you how easy and easy it is to configure the server for collecting network traffic metadata for Mikrotik routers.

Purpose: The goal will be to store the “chewed” firewall logs in the database for further analysis.

Means: Any fresh Linux distro with rsyslogd v8 and higher will be suitable for implementation, perhaps the proposed syntax will work on v7. We also need a DBMS, I chose mariadb. Database growth will vary depending on the number of journal rules, because the size of the drive is at your discretion, in my case 30-40 rules are logged, which is about 1,200 thousand lines per day. For the month of using the database, including indexes, it grew to 3.8GB.

Mechanics:The router sends a log to the remote server via UDP. The rsyslog server, using regular expressions, cleans up lines from unnecessary information, generates an SQL insert and sends it to the DBMS. The DBMS, with the help of a trigger before insertion, performs additional cleaning and splitting of fields that could not be parsed in rsyslog.

Customize the RSYSLOG

Edit the file /etc/rsyslog.conf
Add the following lines there:

input(type="imudp" port="514")

Thereby we load the necessary modules and open 514 UDP port.

The log line from Mikrotik looks like this:

20180927155341  BLOCKSMKNETS forward: in:ether6 - LocalTORF out:VLAN55 - RT_INET, src-mac 00:15:17:31:b8:d7, proto TCP (SYN),>, len 60

As you can see, there is a lot of excess storage in the database and a coherent selection will be difficult.
In theory, I need to add such data:

20180927155341 ether6 VLAN5 2457 65535 00:15:17:31:b8:d7 TCP SYN forward BLOCKSMKNETS 60

I could not get such a line using only one rsyslog. Regular rsyslog use POSIX ERE / BRE, therefore there is no possibility to apply such features as lookahead or lookbehind.

There is a tool that allows you to debug regulars, try maybe you can separate the port from the address, as well as the interface name from in: and out :. Just keep in mind that some sport and dport protocols are missing.

In general, my output was:

20180927155341 in:ether6 out:VLAN5 00:15:17:31:b8:d7 TCP (SYN) forward BLOCKSMKNETS 60

There is documentation on how to prepare rsyslog regulars.

In the final form, the log receive configuration file from Mikrotik /etc/rsyslog.d/20-remote.conf will look like this:

$template tpl_traflog,"insert into traflog.traffic (datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len) values ('%timereported:::date-mysql%', '%msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end%', '%msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end%', '%msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end%', '%msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end%', '%msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end%', '%msg:R,ERE,0,DFLT:[a-x]+--end%', '%msg:F,32:2%', '%msg:R,ERE,0,DFLT:[0-9]+$--end%' )",SQL
if ($fromhost-ip == '') and ($syslogtag contains "firewall") then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="rsyslogger" template="tpl_traflog") stop}

In the first line of the description of the template (template) - a string of SQL code to transfer it to the database.
The second line is the condition when the action will take place, that is, the record in the DBMS.
The condition looks like this: if the log source = ( if ($fromhost-ip == '')) And if the msg line contains “firewall” (and ($ syslogtag contains “firewall”)), then using the ommysql module with connection parameters ( then {action(type="ommysql" server="localhost" serverport="3306" db="traflog" uid="rsyslogger" pwd="..." ), we call the template tpl_traflog ( template="tpl_traflog")), and after this stop further processing of the string ( stop}).

It is possible that something will go wrong in your case, it may be due to interface names or log prefixes, maybe something else. To debug, let's do the following, comment the second line, add a new template and two new conditions:

$template tpl_traflog_test,"%timereported:::date-mysql% %msg:R,ERE,0,DFLT,0:in:[a-zA-Z]+[0-9]+|in:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,BLANK,0:out:[a-zA-Z]+[0-9]+|out:<[a-zA-Z]+-[a-zA-Z]+>--end% %msg:R,ERE,0,DFLT,0:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,DFLT,1:([0-9]+\.){3}[0-9]+[:]?([0-9]+)?--end% %msg:R,ERE,0,BLANK:([0-f]+:){5}[0-f]+--end% %msg:R,ERE,0,BLANK:\b[A-X]{3,4}\b--end% %msg:R,ERE,0,BLANK:\([A-Z]+\)|\(([A-Z]+\,){1,3}[A-Z]+\)--end% %msg:R,ERE,0,DFLT:[a-x]+--end% %msg:F,32:2% %msg:R,ERE,0,DFLT:[0-9]+$--end%\n"
if ($fromhost-ip == '') then {action(type="omfile" file="/var/log/remote/" )}
if ($fromhost-ip == '') then {action(type="omfile" file="/var/log/remote/" template="tpl_traflog_test" ) stop}

Restart the logger.

The tpl_traflog_test template is similar to tpl_traflog, but without SQL INSERT.

The first condition adds the unprocessed line% msg% to the /var/log/remote/ file, because the template is not specified.

The second condition adds the processed string to the same file.
So it will be more convenient to compare.
Next, prepare the database.

Preparing the database

Setting the DBMS is omitted, everything is standard here.

We start the mysql console and execute the following code:

--добавляем базу данныхcreatedatabase traflog characterset utf8 collate utf8_bin;
use traflog;
--добавляем таблицуcreatetable traffic (idBIGINTUNSIGNEDNOTNULL AUTO_INCREMENT PRIMARY KEY,
datetime DATETIME,
inif VARCHAR(20),
outif VARCHAR(20),
src VARCHAR(21),
sport INT(5),
dst VARCHAR(21),
dport INT(5),
smac VARCHAR(17),
proto VARCHAR(4),
flags VARCHAR(11),
logpref VARCHAR(24),
--добавляем пользователяcreateuser rsyslogger@localhost identifiedby'...';
grant all privilegeson traflog.* to rsyslogger@localhost;

The table is ready, the user is.

Now we add a trigger, it will do what the logger failed to, separate the address from the port, clean the interface names, remove the brackets from the flag:

--добавляем триггер для traffic
createTRIGGER delim_ip_port_flags BEFOREinsertON traffic
FOREACHROWbeginset NEW.inif = REGEXP_REPLACE ((NEW.inif), 'in:', '' );
set NEW.outif = REGEXP_REPLACE ((NEW.outif), 'out:', '' );
set NEW.sport = REGEXP_REPLACE ((NEW.src), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.src = REGEXP_REPLACE ((NEW.src), ':[0-9]+', '' );
set NEW.dport = REGEXP_REPLACE ((NEW.dst), '([0-9]+\.){3}[0-9]+:|([0-9]+\.){3}[0-9]+', '' );
set NEW.dst = REGEXP_REPLACE ((NEW.dst), ':[0-9]+', '' );
set NEW.flags = REGEXP_REPLACE ((NEW.flags), '\\(|\\)', '' );
end //
delimiter ;

REGEXP_REPLACE searches for the second after comma parameter (regular) and replaces it with the third parameter, in our case there is nothing in quotes, so it simply removes what it found.

Let's make a test insert, in the same way as a logger will do:

--вставка тестовой строкиinsertinto traffic (datetime, inif, outif, src, dst, smac, proto, chain, logpref)
values (20180730075437, 'in:ether6', 'out:VLAN55', '', '', '00:15:17:31:b8:d7', 'TCP', '(SYN)', 'forward', 'BLOCKSMKNETS');

Let's see what happened:

select * from tarffic;

If everything is correct, then go ahead. If not, look for what is the error.

Add at least one index. I am not a wizard to create indexes, but as I understood, in mysql for different queries it is more correct to use indexes with different junction fields, since one query can use only one index (or am I wrong?). If you understand, do at your discretion.
I often have to make queries with a specific prefix, so I added this index:
--добавляем индексcreateindex traffic_index on traffic (datetime,logpref,src);

Is done.

Now you need to start sending on the router, add the settings of the remote log server and the action to it, add the log option to one of the firewall rules, add the prefix no more than 24 characters.

In the micro console, it looks like this:

/system logging action
set 3 remote= src-address=
add name=remote2 remote= syslog-facility=local6 target=remote
/system logging
add action=remote topics=error,account,critical,event,info
add action=remote2 topics=firewall
/ip firewall filter
add action=drop chain=input comment="drop ssh brute forcers" dst-port=22,8291 log=yes log-prefix=DROP_SSH_BRUTE protocol=tcp src-address-list=ssh_blacklist

Where is the address of the router, is the address of the server log for the firewall logs, and is another log server, my microtic system logs are falling there, we don't need it now. Our setting is remote2.

Further look that falls in the file:

tail -f /var/log/remote/

The file should be sprinkled with lines from the router, unless of course your rule works often enough.

If there are not enough fields, that is, the datetime, inif, outif, src, dst, smac, proto, flags, chain, logpref, len sequence is not observed, then you can try changing the parameter in the debugging templates of the logger, replacing BLANK with DLFT. Then, instead of the emptiness of any field, some letters will appear, I don’t remember which ones already. If this happens, then something is wrong with the regular season and it should be corrected.

If everything went as it should, then disable the test conditions and the template.

Also, the default config in /etc/rsyslog.d/ needs to be lowered below, I renamed it to 50-default.conf, so that remote logs are not streamed to the system log / var / log / message.
Restart the logger.

Let's wait a bit until our database is full. Then we can start the sample.

Some queries for example:

To see the size of the database and the number of rows:
MariaDB [traflog]> select table_schema as"database", round(sum(data_length + index_length)/1024/1024,2) as"size Mb", TABLE_ROWS as"count rows"from information_schema.tables groupby table_schema;        +--------------------+---------+------------+
| database           | size Mb | count rows |
| information_schema |    0.17 |       NULL |
| traflog            | 3793.39 |   21839553 |
2 rows in set (0.48 sec)

За месяц выросла почти 4Гб, но это зависит от количества и свойств логгируемых правил фаервола

Number of logged prefixes
Количество логгируемых префиксов не равно количеству правил, некоторые правила работают с одним префиксом, но все же сколько всего префиксов? и сколько по ним отработано правил?:

MariaDB [traflog]> select logpref,count(logpref) from traffic groupby logpref orderbycount(logpref) desc;
| logpref              | count(logpref) |
| ACCEPT_TORF_INET     |       14582602 |
| ACCEPT_SMK_PPP       |        1085791 |
| DROP_FORWARD_INVALID |         982374 |
| REJECT_BNK01         |         961503 |
| ACCEPT_MMAX_TORF     |         802455 |
| ACCEPT_TORF_PPP      |         736803 |
| SMTP_DNAT            |         689533 |
| ACCEPT_SMK_INET      |         451411 |
| ACCEPT_INET_TORF     |         389857 |
| BLOCK_SMKNETS        |         335424 |
| DROP_SMTP_BRUTE      |         285850 |
| ACCEPT_ROZN_TORF     |         154811 |
| ACCEPT_TORF_MMAX     |         148393 |
| DROP_ETHALL_ETHALL   |          80679 |
| ACCEPT_SMTP          |          48921 |
| DROP_SMTP_DDOS       |          32190 |
| RDP_DNAT             |          28757 |
| ACCEPT_TORF_ROZN     |          18456 |
| SIP_DNAT             |          15494 |
| 1CWEB_DNAT           |           6406 |
| BLOCKSMKNETS         |           5789 |
| DROP_SSH_BRUTE       |           3162 |
| POP_DNAT             |           1997 |
| DROP_RDP_BRUTE       |            442 |
| DROP_BNK01           |            291 |
| DROPALL              |            138 |
| ACCEPT_RTP_FORWARD   |             90 |
| REJECT_SMTP_BRUTE    |             72 |
| L2TP_INPUT_ACCEPT    |             33 |
29 rows in set (2min51.03 sec)

ACCEPT_TORF_INET в лидерах, по этому префиксу можно найти всех кто сходил в интернет из нашей локальной сети, протоколы и порты записаны, настанет время и доступ кое-кому будет закрыт. Тут есть опорные данные для будущей работы над ошибками.

Smtp tyka leader
Посмотрим кто сегодня пытался подобраться к smtp серверу:

MariaDB [traflog]> select src,count(dport) from traffic where logpref='SMTP_DNAT'and datetime > '2018101600000000'groupby src orderbycount(dport) desclimit10;
| src            | count(dport) |
|  |        12440 |
|  |         4556 |
|  |         4537 |
| |         3119 |
|  |          226 |
|  |          216 |
| |          211 |
| |           40 |
|  |           32 |
| |           21 |
10 rows in set, 1warning (21.36 sec)

Понятно, узел сегодня победитель. Посмотрим в каких логгируемых правилах он еще фигурировал:

MariaDB [traflog]> select src,dport,count(dport),logpref from traffic where src=''groupby logpref orderbycount(dport) desc;
| src           | dport | count(dport) | logpref         |
| |    25 |       226989 | SMTP_DNAT       |
| |    25 |       170714 | DROP_SMTP_BRUTE |
| |    25 |         2907 | DROP_SMTP_DDOS  |
| |    25 |         2061 | ACCEPT_SMTP     |
4 rows in set (10min44.21 sec)

Этот специализируется только на smtp, ~1% попаданий для попытки подбора пароля или попытки отправки какой-нибудь фигни, остальное ушло в баню.

Запрос формировался 10 минут это много, текущие индексы не подходят для него, либо можно переформулировать запрос, но сейчас не будем об этом.

In the future, it is planned to fasten a web interface with sample requests and forms.
The vector is set, I hope that this article will be useful.

Thanks to all!


documentation Documentation for mysql
Documentation for Mikrotik logging

Thanks to the LOR community for hints

Added the flags field to the database, now you can track the connection duration by catching SYN, FIN.
Fixed some bugs in rsyslog regulars, as well as mysql triggers.

Curiously, the default defconf rule: drop invalid drops all final TCP connection packets, with the result that all nodes that are trying to close the connection fail by sending several FINs. Is it correct?

I added a rule allowing TCP passage with ACK, FIN flags.

Under the spoiler SQL procedure that will show the time of TCP connections in the last five minutes
connections_list ()
CREATEPROCEDURE connections_list()
DECLARE conntime TIME;
DECLARE connsport INT;
DECLARE conndport INT;
DECLARE connsrc VARCHAR(21);
DECLARE conndst VARCHAR(21);
DECLARE cur CURSORFORSELECTid,datetime,src,sport,dst,dport FROM conn_syn_fin WHERE flags='SYN';
CREATEtemporaryTABLE connless(datestart DATETIME,dateend DATETIME,durationTIME,src VARCHAR(21),sport INT,dst VARCHAR(21),dport INT);
CREATEtemporaryTABLE conn_syn_fin (SELECT * from traffic WHERE datetime > now() - interval5minuteand
 src in (select src from traffic where datetime > now() - interval5minuteand logpref='TCP_FIN'and flags like'%FIN%') and (flags like'%SYN%'or flags like'%FIN%') orderbyid);
OPEN cur;
read_loop: LOOP
        FETCH cur INTO logid,datesyn,connsrc,connsport,conndst,conndport;
                IF done THEN
                LEAVE read_loop;
        set datefin=(SELECT datetime FROM conn_syn_fin WHEREid>logid and src=connsrc and sport=connsport and flags like'%FIN%'and dst=conndst and dport=conndport limit1);
        set conntime=(SELECTtimediff(datefin,datesyn));
INSERTINTO connless (datestart,dateend,duration,src,sport,dst,dport) value (datesyn,datefin,conntime,connsrc,connsport,conndst,conndport);
CLOSE cur;
select * from connless;

As a result of the procedure, two temporary tables will be created.
The conn_syn_fin table contains log entries with SYN and FIN flags, then a cursor is searched in this table.
The connless table contains a list of connections opened and completed, the completed ones have a duration of open ones, respectively.
Note the sample time minus five minutes from the current time. My request is slow. Slowly passes the search on the cursor, processes about 10 entries per second, tried to speed up in every possible way, but the execution time is always about the same.
Also note that this procedure is intended only for demonstration. If you need to do a sample for a specific src / sport / dst / dport, it is better to do a separate procedure similar to this one. If you master sql, you can write your query better.

call connections_list ();

MariaDB [traflog]> call connections_list();
| datestart           | dateend             | duration | src           | sport | dst             | dport |
| 2019-03-20 14:12:19 | 2019-03-20 14:13:14 | 00:00:55 |  | 41868 |  |   443 |
| 2019-03-20 14:12:25 | NULL                | NULL     |  | 49311 |     |   443 |
| 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | | 54433 |   |   443 |
| 2019-03-20 14:12:31 | 2019-03-20 14:12:51 | 00:00:20 | | 54434 |   |   443 |
| 2019-03-20 14:12:32 | NULL                | NULL     | | 37977 |   |   443 |
| 2019-03-20 14:17:12 | NULL                | NULL     | | 39331 |  |   443 |
| 2019-03-20 14:17:13 | NULL                | NULL     |  | 63388 |  |   443 |
399 rows in set (33.17 sec)
Query OK, 0rows affected (33.18 sec)

After the procedure is completed, the temporary tables conn_syn_fin and connless remain, you can see them in more detail if you find something suspicious or not reliable. After starting the procedure, the old tables will be deleted and new ones will appear. Write if you find an error.

Also popular now: