Logstash
„logstash“ ist ein Tool zur Verwaltung von Ereignissen und Protokollen.
Man kann es verwenden, um Protokolle zu sammeln, sie zu analysieren und zur späteren Verwendung (z.B. zum Suchen) aufzubewahren.
Installation
apt-get install default-jre
- Über Installationspakete für Ubuntu
wget https://download.elastic.co/logstash/logstash/packages/debian/logstash_<VERSION>.deb dpkg -i logstash_<VERSION>.deb
Boot Startscript für manuelle Installation
Wenn man nicht über das „deb“ Paket installiert hat, muss mann sich eine upstart-job
Konfigurationsdatei anlegen, damit logstash beim booten gestartet wird.
vi /etc/init/logstash-agent.conf
Inhalt:
# logstash - agent instance # description "logstash agent" start on virtual-filesystems stop on runlevel [06] # Respawn it if the process exits respawn # We're setting high here, we'll re-limit below. limit nofile 65550 65550 setuid logstash setgid logstash # You need to chdir somewhere writable because logstash needs to unpack a few # temporary files on startup. console log script # Defaults PATH=/bin:/usr/bin LS_HOME=/var/lib/logstash LS_HEAP_SIZE="500m" LS_JAVA_OPTS="-Djava.io.tmpdir=${LS_HOME}" LS_LOG_FILE=/var/log/logstash/logstash.log LS_USE_GC_LOGGING="" LS_CONF_DIR=/etc/logstash/conf.d LS_OPEN_FILES=16384 LS_NICE=19 LS_OPTS="" # Override our defaults with user defaults: [ -f /etc/default/logstash ] && . /etc/default/logstash HOME="${HOME:-$LS_HOME}" JAVA_OPTS="${LS_JAVA_OPTS}" # Reset filehandle limit ulimit -n ${LS_OPEN_FILES} cd "${LS_HOME}" # Export variables export PATH HOME JAVA_OPTS LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING test -n "${JAVACMD}" && export JAVACMD exec nice -n ${LS_NICE} /opt/logstash/bin/logstash agent -f "${LS_CONF_DIR}" -l "${LS_LOG_FILE}" ${LS_OPTS} end script
Das Script wird dann noch nach /etc/init.d
und in die Runlevel, zum autom. Start beim Booten, verlinkt.
ln -s /lib/init/upstart-job /etc/init.d/logstash-agent update-rc.d logstash-agent defaults
Jetzt noch einige Verzeichnisse erstellen und die dazugehörigen Benutzer rechte setzen.
Service-Benutzer „logstash“, als Systembenutzer, ohne Anmeldeshell und ohne Home-Dir, anlegen.
useradd -r -s /bin/false -M logstash
Verzeichnisse anlegen
mkdir -p /etc/logstash/conf.d /var/log/logstash
Der logstash-Benutzer ist Besitzer folgender Verzeichnisse.
chown -R logstash.adm /var/log/logstash/ chown -R logstash.adm /opt/logstash
Startparameter
Hier die Startparameter für Logstash (Quelle: http://logstash.net/docs/1.4.0/flags)
Parameter | Beschreibung |
---|---|
-f, –config CONFIGFILE | Load the logstash config from a specific file, directory, or a wildcard. If given a directory or wildcard, config files will be read from the directory in alphabetical order. |
-e CONFIGSTRING | Use the given string as the configuration data. Same syntax as the config file. If not input is specified, 'stdin { type ⇒ stdin }' is default. If no output is specified, 'stdout { debug ⇒ true }}' is default. |
-w, –filterworkers COUNT | Run COUNT filter workers (default: 1) |
–watchdog-timeout TIMEOUT | Set watchdog timeout value in seconds. Default is 10. |
-l, –log FILE | Log to a given path. Default is to log to stdout |
–verbose | Increase verbosity to the first level, less verbose. |
–debug | Increase verbosity to the last level, more verbose. |
-v | *DEPRECATED: see –verbose/debug* Increase verbosity. There are multiple levels of verbosity available with '-vv' currently being the highest |
–pluginpath PLUGIN_PATH | A colon-delimted path to find other logstash plugins in Web |
-a, –address ADDRESS | Address on which to start webserver. Default is 0.0.0.0. |
-p, –port PORT | Port on which to start webserver. Default is 9292 |
Konfiguration
Hier eine kleine initiale Konfigurationsdatei (/etc/logstash/logstash.yml
)
path: data: /var/lib/logstash config: /etc/logstash/conf.d/*.conf logs: /var/log/logstash # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace # # log.level: info config: reload: automatic: true interval: 3s
Nach der Installation müssen nun noch drei Konfigurationsdateien erstellt werden.
touch /etc/logstash/conf.d/input.conf touch /etc/logstash/conf.d/filter.conf touch /etc/logstash/conf.d/output.conf
Hier eine Konfiguration wo Eingaben, die man in der Konsole (die gleiche wo man logstash gestartet hat) eingibt,
aufgefangen und verarbeitet.
Inhalt input.conf
:
input { stdin { type => "stdin-type" } }
Inhalt filter.conf
Da hier erstmal Logstash nur getestet wird, kommt hier erstmal so nicht an Inhalt hinein.
Inhalt output.conf
output { stdout { codec => "json" workers => 2 } }
Starten und prüfen
Testweise kann logstash nun wie folgt gestartet werden:
- Version 6.1:
/usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash
- Version 1.4: gibt es den Parameter
–configtest
, welcher Logstash nicht startet sondern nur die Konfiguration testet.
Ohne den Parameter kann Logstash nach erfolgreichem Test gestartet werden.
/usr/share/logstash/bin/logstash --path.settings /etc/logstash
Nun kann man einfach in der Konsole, in der man logstash selbst gestartet hat (einige Sekunden nach dem Start warten), z.b. „test“ eingeben.
Es sollte etwa folgendes Ergebnis ausgegeben werden:
{"@source":"stdin://linsv01/","@tags":[],"@fields":{},"@timestamp":"2013-01-07T10:08:10.728Z","@source_host":"linsv01","@source_path":"/","@message":"test","@type":"stdin-type"}
Zum starten als Service gibt man folgendes ein:
initctl start logstash-agent
Reload der Konfiguration
Ab Version 2.3 kann logstash seine Konfigurationsdateien überwachen und bei Änderung diese neu laden.
Dazu startet man logstash mit diesem Parameter –config.reload.automatic
logstash –f <KONFIGURATIONSDATEI> --config.reload.automatic
Standardmäßig schaut logstash alle 3 Sekunden nach Konfigurationsänderungen. Man kann dem o.g. Parameter ein Interval hinzufügen, wenn dies geändert werden soll.
Hier noch ein Beispiel aus der logstash.yml
config: reload: automatic: true interval: 30s
Wenn logstash bereits läuft und der o.g. Parameter nicht gesetzt ist, kann man ihn mit dem kill SIGHUP (signal hangup) zum reloaden zwingen.
kill -1 <PID>
oder
kill -SIGHUP <PID>
Debugging
Um Nun zu sehen was logstash tatsächlich gerade macht, kann man das Logging höher schalten.
Dazu ändert man in der Datei /etc/logstash/log4j2.properties
den Eintrag „status=error“ auf „status=debug“.
Dannach startet man Logstash auf der Konsole mit
/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" "--log.level=debug"
Das Log befindet sich dann unter /var/log/logstash/logstash-plain.log
Wenn man z.B. nur das Output Plugin für Elasticsearch debuggen möchte, kann man die log4j2.properties wie folgt ergänzen:
logger.elasticsearchoutput.name = logstash.outputs.elasticsearch logger.elasticsearchoutput.level = debug
Wenn die Eigenschaft „status:“ weiterhin auf „error“ stehen bleibt, werden nur die Elasticsearch Meldungen mit Debug versehen.
Alles auf Konsole
Hiermit startet man Logstash mit den Minimalangaben im Verbose-Mode. Alle Eingaben die mit Enter bestätigt werden, werden übergeben.
/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => localhost } stdout { codec => rubydebug } }' --[verbose|debug]
Auf der Elasticsearch Seite schaut man im Webbrowser auf die Adresse 'http://localhost:9200/_search?pretty
' oder setzt den curl Befehl ab:
curl 'http://localhost:9200/_search?pretty'
Konfigurationsbeispiele
Hier sind einige Beispiele wie die Konfiguration von logstash mit div. In- und Outputs sowie Filtern aussehen kann.
Um Filter mit grok
nutzen zu können, benötigt man Patterns.
Bei dieser Installation findet man eine Liste möglicher Patterns in dem Verzeichniss /opt/grok/patterns/base
oder Online unter Github
Beim Grok Debugger kann man die einzelnen Patterns mit einer Syslog-Meldung testen.
Inputs
input { tcp { port => 10514 type => "syslog_ssg_auth" } udp { port => 10514 type => "syslog_ssg_auth" } file { codec => plain { charset => "UTF-8" } path => "<PFAD_ZU_DATEI>" sincedb_path => "/opt/logstash/sincedb" start_position => "end" type => "<BEZEICHNUNG>" } beats { host => "<IP_ADRESSE>" port => "5044" } }
Filter
filter { if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" { grok { match => { "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_JUNIPER_FIREWALLS>%{SYSLOGTIMESTAMP:logTimestamp} %{HOST:host} %{WORD}: %{WORD} %{WORD}=%{HOST} \[Root\]%{WORD}-%{WORD}-%{INT:MSG_ID}%{DATA}: %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => [ "logTimestamp", "yyy-MM-dd HH:mm:ss", "dd/MMM/YYYY:HH:mm:ss Z" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" { grok { match => { "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_TRENDMICRO_IWSVA>%{SYSLOGTIMESTAMP} %{HOST:host} %{WORD}: \<%{DAY}\, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME}\,%{WORD}\> \[%{WORD:filterprog}\|%{WORD:loglevel}\] %{GREEDYDATA:message}" } overwrite => ["message"] } date { # match for TIMESTAMP_ISO8601 match => [ "logTimestamp", "yyy-MM-dd HH:mm:ssZ" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" { grok { match => { "message" => "<HIER_NUR_EIN_BEISPIEL_FÜR_HP_PROCURVE_SWITCHES>%{SYSLOGTIMESTAMP:logTimestamp} %{IPORHOST:host} %{WORD}: %{GREEDYDATA:message}" } overwrite => ["message"] } grok { match => { "path" => "%{YEAR:log_year}" } } mutate { # adds Year in Timestamp replace => [ "logTimestamp", "%{log_year} %{logTimestamp}" ] remove_field => "log_year" } date { # # match for mutated SYSLOGTIMESTAMP match => [ "%{log_year} logTimestamp", "yyy MMM dd HH:mm:ss", "yyy MMM d HH:mm:ss" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "<BEZEICHNUNG_DES_TYPE_DES_INPUTS>" { grok { match => { "message" => "%{SQUIDMKHLOG2}" } overwrite => ["message"] } date { match => [ "logTimestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched"} } } }
Datum und @timestamp
Sinnvoll ist es das Datum der Nachricht und nicht das Eingangsdatum von Logstash als Timestamp in Elasticsearch zu verwenden. So werden die Indizes mit dem Datum angelegt, welches in der Nachricht verwendet wird. Wenn man z.B. alte Daten importieren möchte, dann ist das unumgänglich.
Oben sieht man in den Filter Beispielen einige Möglichkeiten da zu realisieren.
Bei Logs mit einem Timestamp im Sinne von Syslog (MMM dd HH:mm:ss), kann man mit Logstash diesen Timestamp so nicht als Timestamp im Elasticsearch verwenden, da hier das Jahr fehlt.
Dafür kann man z.B. den Dateinamen der Logdatei mit einem Datum oder nur dem Jahr in der diese Datei erstellt wurde versehen und diese Info dann im Logstash verwenden.
Das ist hier in dem Filter-Beispiel für <HP_PROCURVE_SWITCHES> dargestellt.
Evtl. muss z.B. dem Syslogserver mitgeteilt werden, dass dieser die Datei mit einem Jahr/Monat/Tag versieht.
Beispiele findet man hier in der Wiki unter DynamicFileName.
Eine andere Methode wäre die, vom Syslog-Server empfangene Nachricht, anzupassen in dem man dieser ein anderes Datums-Format verpasst.
Ein Beispiel hier für findet man auch hier in der Wiki unter MessageFormat.
Unter mapping-date-format und date-format-pattern findet man Infos zum Datumsformat in Elasticsearch.
Ein kleines Script für das zusammenstellen von komprimierten Dateien, welche vom Logrotation-Daemon erstellt wurden, findet man hier.
Outputs
output { elasticsearch_http { host => "localhost" # index => "logstash-%{+YYYY.MM.dd}" # user => "some Username" # password => "some_Password" # port => "9200" <-- this ist default workers => 2 } }
Upgradeverfahren
Logstash
systemctl stop logstash.service dpkg -i ...deb /usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash systemctl start logstash.service
Troubleshooting
Ab Version >=6.3.0 wird das Feld „host“ vom Filebeat selbst gefüllt und produziert dadurch ein mappingconflict mit dem Feld „host“ im Index.
Hier die Breaking Changes in 6.3, dies ist ein Auszug:
Starting with version 6.3, Beats provides an `add_host_metadata` processor for adding fields, such as `host.name` and `host.id`, to Beats events. These fields are grouped under a `host` prefix and conform to the https://github.com/elastic/ecs[Elastic Common Schema (ECS)]. The `host` object is defined in the Elasticsearch index template even if the processor is not used. We've also added a `host.name` field to all events sent by Beats. This field prevents the Beats input plugin in Logstash from adding a default `host` field. (By default, the plugin adds a `host` field if the event doesn't already have one. We don't want the plugin to add this field because it causes a mapping conflict with the `host` object defined in the index template.)
Daher muss man in die Filter folgendes eintragen:
mutate { remove_field => [ "[host]" ] } mutate { add_field => { "host" => "%{[beat][hostname]}" } }