Centralized collection of Windows event logs, without installing the agent, followed by visualization using ELK

    The task of centralized processing of logs is quite simply formulated and arises when it is required to monitor the operation of a large number of servers. I think it’s not worth mentioning that you can get a lot of information about the vital functions and well-being of systems from the logs. The fact that writing and reading logs is just as important as being able to write programs.

    Accordingly, for the implementation of such a system, the administrator is tasked: firstly, how to collect these logs, and secondly, how it is convenient and centralized to work with them. Thanks to the well-developed ELK bunch (Elasticsearch + Logstash + Kibana), which has been described on Habré more than once, the administrator has tools for convenient search and display of all information present in the logs. Therefore, the answer to the second problem is available initially, and it remains only to solve the problem of collecting logs.

    Since in my case the requirement for the system was the absence of a client on the servers, and the fact that the logs needed to be pulled from Windows servers, we selected powershell, the native one for Windows, as a collection tool.
    Based on this, the following model for collecting and displaying information from logs was compiled: logs are remotely collected from servers with a powershell script, and then they are added as files to the storage, then ELK (Elasticsearch + Logstash + Kibana) processes and displays them.

    An example of the operation of the entire bundle is presented in the image:

    Anticipating criticism, I will describe that this system does not set the task of collecting logs in real time, the goal is only statistics, which are collected over a certain period of time, and then displayed on the dashboard. It is used to come in the morning, see how the servers behaved at night, and compare with the results obtained, say, last week. In the proposed system, logs are collected once per hour, respectively, the lag between the current logs and what is displayed (to be precise, displayed and displayed on request) on the dashboard can be about an hour.

    There are currently two ways to get logs from a remote computer in powershell:

    • native powershell command: Get-EventLog -ComputerName $ computer –LogName System
    • getting logs through a WMI request: Get-WmiObject -Class win32_NTLogEvent -filter "logfile = 'System'" -ComputerName $ computer

    But in the first case, the file with the event logs of the system is completely uploaded and only then they are processed on the computer where the script is executed. As a result, such requests are processed unreasonably long. The ability to select logs (for example, only in the last day) does not work out very well here, since the whole file is initially pulled out and only then some work is being done with it.

    In the second case, a WMI request is sent to the server, processing occurs on the server side and the key point here is that it becomes possible to limit the interval of logs of interest to us already at the request stage (in the example below, the interval is set to 1 hour). Since this command fulfills much faster than the first one, and the query execution time directly depends on the requested log interval, the choice fell on Get-WmiObject.

    In the script below, there are several non-obvious and complex points:
    First, the logic for limiting the time interval for selecting logs when logs are required in the last hour, but in an hour not from the moment of request, but in the last full hour, i.e. starting at 00 min. and ending 59 min.
    The second point is that the time in WMI format is different from the usual format, therefore, conversion to WMI format of time and vice versa is constantly required.

    # импортируем список серверов из Active Directory (для этого в powershell должен быть дополнительно установлен модуль для Active Directory)
    import-module activedirectory
    $computers = Get-ADComputer -SearchBase "OU=Servers,DC=domain,DC=ru" -Filter * | ForEach-Object {$_.Name} | Sort-Object
    # определяем директорию для логирования 
    $logdir = "\\storage\Logs\ServersLog\" + $(Get-Date -UFormat "%Y_%m")
    # если директория отсутствует, то создаем 
    if((Test-Path $logdir) -eq 0) {
    	New-Item -ItemType directory $logdir -Force
    # указываем данные пользователя под которым будут выполнятся команды
    $domain = "domain"
    $username = "username" 
    $password = 'password'
    $account = "$domain"+"\"+$($username)
    $accountpwd = ConvertTo-SecureString $password -AsPlainText -Force
    $credential = New-Object System.Management.Automation.PsCredential($account, $accountpwd)
    # для того, чтобы делать выгрузку за предыдущий час, нужно ограничить время за которое лог был сформирован следующим образом: верхний предел - минус час, нижний предел - начало текущего часа.
    # получается примерно следующее:
    # BiginDate = 08/26/2014 12:00:00
    # EndDate = 08/26/2014 13:00:00
    # в результате будет выгружен лог созданый в пределах от BiginDate = 08/26/2014 12:00:00 до EndDate = 08/26/2014 13:00:00
    $date = Get-Date
    Write-Host "Date = $date"
    $m = $date.Minute
    $s = $date.Second
    $begindate = (($date.AddSeconds(-$s)).AddMinutes(-$m)).addHours(-1)
    Write-Host "BiginDate = $begindate"
    $enddate = ($date.AddSeconds(-$s)).AddMinutes(-$m)
    Write-Host "EndDate = $enddate"
    # перевод времени в формат WMI
    Write-Host "WMIBiginDate = $wmibegindate"
    Write-Host "WMIEndDate = $wmienddate"
    $logjournals = "System", "Application", "Security"
    foreach ($computer in $computers) {
    	Write-Host "Processing computer: $computer"
    	foreach ($logjournal in $logjournals) {
    		Write-Host "Processing log: $logjournal"
    		$systemlog = Get-WmiObject -Class win32_NTLogEvent -filter "logfile = '$logjournal' AND (TimeWritten>='$wmibegindate') AND (TimeWritten<'$wmienddate')" -computerName $computer -Credential $credential -ErrorAction SilentlyContinue
            foreach ($logstring in $systemlog) {
    			$wmitime = $logstring.TimeGenerated
    			$time = [System.Management.ManagementDateTimeconverter]::ToDateTime("$wmitime")
    			#Write-Host $logtime
    			$level = $logstring.Type
    			#Write-Host "$level"
    			$journal = $logstring.LogFile
    			#Write-Host "$journal"
    			$category = $logstring.CategoryString
    			#Write-Host "$category"
    			$source = $logstring.SourceName
    			#Write-Host "$source"
    			$message = $logstring.Message
    			#Write-Host "$message"
    			$code = $logstring.EventCode
    			#Write-Host "$code"
                @{Server="$computer";Time="$time";Level="$level";Journal="$journal";Category="$category";Source="$source";Message="$message";Code="$code"} | ConvertTo-Json -depth 10 -Compress | Out-File "$logdir\$computer-$logjournal.json" -Encoding utf8 -Append

    Upon completion of the script, the output produces files of the form: ComputerName-JournalName.json.
    The json format is somewhat inconsistent with the standard (there are no opening and closing brackets), but the Logstash parser normally digests and processes it. Three files are created for each server: ComputerName-System.json ComputerName-Application.json ComputerName-Security.json Since the files have the same format, their processing is ideal.

    You can limit the collection of logs to a specific log by simply editing the line: $ logjournals = "System", "Application", "Security"

    Next Logstash comes into action with the following configuration:

    input {
      file {
        type => "ServersLogs"
        discover_interval => 1800
        path => [ "//storage/Logs/ServersLog/*/*.json" ]
        codec => "json"
    filter {
      date {
        type => "ServersLogs"
        match => [ "Time", "MM/dd/YYYY HH:mm:ss" ]
        locale => "en"
        target => "Logtimestamp"
      mutate {
        gsub => [ "Level", "[ -]", "_" ]
        gsub => [ "Source", "[ -]", "_" ]
        gsub => [ "Server", "[ -]", "_" ]
        remove_field => ["message"]
        remove_field => ["host"] 
    output {
      elasticsearch {
        embedded => false
        host => "logserver"
        protocol => "http"
        cluster => "windowseventlogs"
        codec => "plain"
        index => "windowseventlogs-%{+YYYY.MM.dd}"

    Data is entered into Elasticsearch, from where it is subsequently displayed using Kibana.

    As a result, information is displayed on the screen (in my case for the last 2 days): about the most problematic servers, about the most problematic services; a graph is drawn, according to which you can immediately see an increase in the number of logs or errors at a certain point in time. You can always search by error text or username, or sort by level or error id.

    Also popular now: