Inhaltsverzeichnis

Logstash

„logstash“ ist ein Tool zur Verwaltung von Ereignissen und Protokollen.
Man kann es verwenden, um Protokolle zu sammeln, sie zu analysieren und zur späteren Verwendung (z.B. zum Suchen) aufzubewahren.

Installation

apt-get install default-jre

Boot Startscript für manuelle Installation

Wenn man nicht über das „deb“ Paket installiert hat, muss mann sich eine upstart-job Konfigurationsdatei anlegen, damit logstash beim booten gestartet wird.

vi /etc/init/logstash-agent.conf

Inhalt:

# logstash - agent instance
#

description     "logstash agent"

start on virtual-filesystems
stop on runlevel [06]

# Respawn it if the process exits
respawn

# We're setting high here, we'll re-limit below.
limit nofile 65550 65550

setuid logstash
setgid logstash

# You need to chdir somewhere writable because logstash needs to unpack a few
# temporary files on startup.
console log
script
  # Defaults
  PATH=/bin:/usr/bin
  LS_HOME=/var/lib/logstash
  LS_HEAP_SIZE="500m"
  LS_JAVA_OPTS="-Djava.io.tmpdir=${LS_HOME}"
  LS_LOG_FILE=/var/log/logstash/logstash.log
  LS_USE_GC_LOGGING=""
  LS_CONF_DIR=/etc/logstash/conf.d
  LS_OPEN_FILES=16384
  LS_NICE=19
  LS_OPTS=""

  # Override our defaults with user defaults:
  [ -f /etc/default/logstash ] && . /etc/default/logstash

  HOME="${HOME:-$LS_HOME}"
  JAVA_OPTS="${LS_JAVA_OPTS}"
  # Reset filehandle limit
  ulimit -n ${LS_OPEN_FILES}
  cd "${LS_HOME}"

  # Export variables
  export PATH HOME JAVA_OPTS LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING
  test -n "${JAVACMD}" && export JAVACMD

  exec nice -n ${LS_NICE} /opt/logstash/bin/logstash agent -f "${LS_CONF_DIR}" -l "${LS_LOG_FILE}" ${LS_OPTS}
end script

Das Script wird dann noch nach /etc/init.d und in die Runlevel, zum autom. Start beim Booten, verlinkt.

ln -s /lib/init/upstart-job /etc/init.d/logstash-agent
update-rc.d logstash-agent defaults

Jetzt noch einige Verzeichnisse erstellen und die dazugehörigen Benutzer rechte setzen.

Service-Benutzer „logstash“, als Systembenutzer, ohne Anmeldeshell und ohne Home-Dir, anlegen.

useradd -r -s /bin/false -M logstash

Verzeichnisse anlegen

mkdir -p /etc/logstash/conf.d /var/log/logstash

Der logstash-Benutzer ist Besitzer folgender Verzeichnisse.

chown -R logstash.adm /var/log/logstash/
chown -R logstash.adm /opt/logstash

Startparameter

Hier die Startparameter für Logstash (Quelle: http://logstash.net/docs/1.4.0/flags)

Parameter Beschreibung
-f, –config CONFIGFILE Load the logstash config from a specific file, directory, or a wildcard. If given a directory or wildcard, config files will be read from the directory in alphabetical order.
-e CONFIGSTRING Use the given string as the configuration data. Same syntax as the config file. If not input is specified, 'stdin { type ⇒ stdin }' is default. If no output is specified, 'stdout { debug ⇒ true }}' is default.
-w, –filterworkers COUNT Run COUNT filter workers (default: 1)
–watchdog-timeout TIMEOUT Set watchdog timeout value in seconds. Default is 10.
-l, –log FILE Log to a given path. Default is to log to stdout
–verbose Increase verbosity to the first level, less verbose.
–debug Increase verbosity to the last level, more verbose.
-v *DEPRECATED: see –verbose/debug* Increase verbosity. There are multiple levels of verbosity available with '-vv' currently being the highest
–pluginpath PLUGIN_PATH A colon-delimted path to find other logstash plugins in Web
-a, –address ADDRESS Address on which to start webserver. Default is 0.0.0.0.
-p, –port PORT Port on which to start webserver. Default is 9292

Konfiguration

Hier eine kleine initiale Konfigurationsdatei (/etc/logstash/logstash.yml)

path:
  data: /var/lib/logstash
  config: /etc/logstash/conf.d/*.conf
  logs: /var/log/logstash
 
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
 
config:
  reload:
    automatic: true
    interval: 3s

Nach der Installation müssen nun noch drei Konfigurationsdateien erstellt werden.

touch /etc/logstash/conf.d/input.conf
touch /etc/logstash/conf.d/filter.conf
touch /etc/logstash/conf.d/output.conf

Hier eine Konfiguration wo Eingaben, die man in der Konsole (die gleiche wo man logstash gestartet hat) eingibt,
aufgefangen und verarbeitet.

Inhalt input.conf:

input {
    stdin { 
        type => "stdin-type"
    }
}

Inhalt filter.conf

Da hier erstmal Logstash nur getestet wird, kommt hier erstmal so nicht an Inhalt hinein.

Inhalt output.conf

output {
    stdout {
        codec => "json"
        workers => 2
    }
}

Starten und prüfen

Testweise kann logstash nun wie folgt gestartet werden:

Ohne den Parameter kann Logstash nach erfolgreichem Test gestartet werden.

/usr/share/logstash/bin/logstash --path.settings /etc/logstash

Nun kann man einfach in der Konsole, in der man logstash selbst gestartet hat (einige Sekunden nach dem Start warten), z.b. „test“ eingeben.
Es sollte etwa folgendes Ergebnis ausgegeben werden:

{"@source":"stdin://linsv01/","@tags":[],"@fields":{},"@timestamp":"2013-01-07T10:08:10.728Z","@source_host":"linsv01","@source_path":"/","@message":"test","@type":"stdin-type"}

Zum starten als Service gibt man folgendes ein:

initctl start logstash-agent

Reload der Konfiguration

Ab Version 2.3 kann logstash seine Konfigurationsdateien überwachen und bei Änderung diese neu laden.
Dazu startet man logstash mit diesem Parameter –config.reload.automatic

logstash –f <KONFIGURATIONSDATEI> --config.reload.automatic

Standardmäßig schaut logstash alle 3 Sekunden nach Konfigurationsänderungen. Man kann dem o.g. Parameter ein Interval hinzufügen, wenn dies geändert werden soll.
Hier noch ein Beispiel aus der logstash.yml

config:
  reload:
    automatic: true
    interval: 30s

Wenn logstash bereits läuft und der o.g. Parameter nicht gesetzt ist, kann man ihn mit dem kill SIGHUP (signal hangup) zum reloaden zwingen.

kill -1 <PID>

oder

kill -SIGHUP <PID>

Debugging

Um Nun zu sehen was logstash tatsächlich gerade macht, kann man das Logging höher schalten.
Dazu ändert man in der Datei /etc/logstash/log4j2.properties den Eintrag „status=error“ auf „status=debug“.
Dannach startet man Logstash auf der Konsole mit

/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" "--log.level=debug"

Das Log befindet sich dann unter /var/log/logstash/logstash-plain.log

Wenn man z.B. nur das Output Plugin für Elasticsearch debuggen möchte, kann man die log4j2.properties wie folgt ergänzen:

logger.elasticsearchoutput.name = logstash.outputs.elasticsearch
logger.elasticsearchoutput.level = debug

Wenn die Eigenschaft „status:“ weiterhin auf „error“ stehen bleibt, werden nur die Elasticsearch Meldungen mit Debug versehen.

Alles auf Konsole

Hiermit startet man Logstash mit den Minimalangaben im Verbose-Mode. Alle Eingaben die mit Enter bestätigt werden, werden übergeben.

/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => localhost } stdout { codec => rubydebug } }' --[verbose|debug]

Auf der Elasticsearch Seite schaut man im Webbrowser auf die Adresse 'http://localhost:9200/_search?pretty' oder setzt den curl Befehl ab:

curl 'http://localhost:9200/_search?pretty'

Konfigurationsbeispiele

Hier sind einige Beispiele wie die Konfiguration von logstash mit div. In- und Outputs sowie Filtern aussehen kann.
Um Filter mit grok nutzen zu können, benötigt man Patterns.
Bei dieser Installation findet man eine Liste möglicher Patterns in dem Verzeichniss /opt/grok/patterns/base
oder Online unter Github

Beim Grok Debugger kann man die einzelnen Patterns mit einer Syslog-Meldung testen.

Inputs

input {
       tcp {
               port => 10514
               type => "syslog_ssg_auth"
       }
       udp {
               port => 10514
               type => "syslog_ssg_auth"
       }
 
       file {
               codec => plain {
                        charset => "UTF-8"
                        }
               path => "<PFAD_ZU_DATEI>"
               sincedb_path => "/opt/logstash/sincedb"
               start_position => "end"
               type => "<BEZEICHNUNG>"
       }
 
       beats {
               host => "<IP_ADRESSE>"
               port => "5044"
  }
}

Filter

filter {
        if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" {
                grok {
                        match => {  "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_JUNIPER_FIREWALLS>%{SYSLOGTIMESTAMP:logTimestamp} %{HOST:host} %{WORD}: %{WORD} %{WORD}=%{HOST}  \[Root\]%{WORD}-%{WORD}-%{INT:MSG_ID}%{DATA}: %{GREEDYDATA:message}" }
                        overwrite => ["message"]
                }
 
                date {
                        match => [ "logTimestamp", "yyy-MM-dd HH:mm:ss", "dd/MMM/YYYY:HH:mm:ss Z" ]
                        timezone => "Europe/Berlin"
                        target => "@timestamp"
                        add_field => { "debug" => "timestampMatched" }
                }
        }
 
        if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" {        
                grok {
                        match => { "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_TRENDMICRO_IWSVA>%{SYSLOGTIMESTAMP} %{HOST:host} %{WORD}: \<%{DAY}\, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME}\,%{WORD}\> \[%{WORD:filterprog}\|%{WORD:loglevel}\] %{GREEDYDATA:message}" }
                        overwrite => ["message"]
                }
 
                date {
                        # match for TIMESTAMP_ISO8601
                        match => [ "logTimestamp", "yyy-MM-dd HH:mm:ssZ" ]
                        timezone => "Europe/Berlin"
                        target => "@timestamp"
                        add_field => { "debug" => "timestampMatched" }
                }
        }
 
        if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" {        
                grok {
                        match => { "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_HP_PROCURVE_SWITCHES>%{SYSLOGTIMESTAMP:logTimestamp} %{IPORHOST:host} %{WORD}: %{GREEDYDATA:message}" }
                        overwrite => ["message"]
                }
 
                grok {
                        match => { "path" => "%{YEAR:log_year}" }
                }
 
                mutate {
                        # adds Year in Timestamp
                        replace => [ "logTimestamp", "%{log_year} %{logTimestamp}" ]
                        remove_field => "log_year"
                }
 
                date {
#                       # match for mutated SYSLOGTIMESTAMP
                        match => [ "%{log_year} logTimestamp", "yyy MMM dd HH:mm:ss", "yyy MMM  d HH:mm:ss" ]
                        timezone => "Europe/Berlin"
                        target => "@timestamp"
                        add_field => { "debug" => "timestampMatched" }
                }
 
 
        }
 
        if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" {
 
                grok {
                        match => {  "message" => "%{SQUIDMKHLOG2}" }
                        overwrite => ["message"]
                }
 
                date {
                        match => [ "logTimestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
                        timezone => "Europe/Berlin"
                        target => "@timestamp"
                        add_field => { "debug" => "timestampMatched"}
                }
        }
}

Datum und @timestamp

Sinnvoll ist es das Datum der Nachricht und nicht das Eingangsdatum von Logstash als Timestamp in Elasticsearch zu verwenden. So werden die Indizes mit dem Datum angelegt, welches in der Nachricht verwendet wird. Wenn man z.B. alte Daten importieren möchte, dann ist das unumgänglich.
Oben sieht man in den Filter Beispielen einige Möglichkeiten da zu realisieren.
Bei Logs mit einem Timestamp im Sinne von Syslog (MMM dd HH:mm:ss), kann man mit Logstash diesen Timestamp so nicht als Timestamp im Elasticsearch verwenden, da hier das Jahr fehlt.
Dafür kann man z.B. den Dateinamen der Logdatei mit einem Datum oder nur dem Jahr in der diese Datei erstellt wurde versehen und diese Info dann im Logstash verwenden.
Das ist hier in dem Filter-Beispiel für <HP_PROCURVE_SWITCHES> dargestellt.
Evtl. muss z.B. dem Syslogserver mitgeteilt werden, dass dieser die Datei mit einem Jahr/Monat/Tag versieht.
Beispiele findet man hier in der Wiki unter DynamicFileName.

Eine andere Methode wäre die, vom Syslog-Server empfangene Nachricht, anzupassen in dem man dieser ein anderes Datums-Format verpasst.
Ein Beispiel hier für findet man auch hier in der Wiki unter MessageFormat.

Unter mapping-date-format und date-format-pattern findet man Infos zum Datumsformat in Elasticsearch.

Ein kleines Script für das zusammenstellen von komprimierten Dateien, welche vom Logrotation-Daemon erstellt wurden, findet man hier.

Outputs

output {
        elasticsearch_http {
                host => "localhost"
        #       index => "logstash-%{+YYYY.MM.dd}"
        #       user => "some Username"
        #       password => "some_Password"
        #       port => "9200" <-- this ist default
                workers => 2
        }
}

Upgradeverfahren

Logstash

systemctl stop logstash.service
 
dpkg -i ...deb
 
/usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash
 
systemctl start logstash.service

Troubleshooting

Ab Version >=6.3.0 wird das Feld „host“ vom Filebeat selbst gefüllt und produziert dadurch ein mappingconflict mit dem Feld „host“ im Index.
Hier die Breaking Changes in 6.3, dies ist ein Auszug:

Starting with version 6.3, Beats provides an `add_host_metadata` processor for
adding fields, such as `host.name` and `host.id`, to Beats events. These fields
are grouped under a `host` prefix and conform to the
https://github.com/elastic/ecs[Elastic Common Schema (ECS)]. The `host` object
is defined in the Elasticsearch index template even if the processor is not used.
We've also added a `host.name` field to all events sent by Beats. This field
prevents the Beats input plugin in Logstash from adding a default `host` field.
(By default, the plugin adds a `host` field if the event doesn't already have
one. We don't want the plugin to add this field because it causes a mapping
conflict with the `host` object defined in the index template.)

Daher muss man in die Filter folgendes eintragen:

mutate {
      remove_field => [ "[host]" ]
    }
    mutate {
      add_field => {
    	"host" => "%{[beat][hostname]}"
      }
    }