====== Logstash ====== "logstash" ist ein Tool zur Verwaltung von Ereignissen und Protokollen. Man kann es verwenden, um Protokolle zu sammeln, sie zu analysieren und zur späteren Verwendung (z.B. zum Suchen) aufzubewahren. ===== Installation ===== apt-get install default-jre * **Über Installationspakete für Ubuntu** \\ wget https://download.elastic.co/logstash/logstash/packages/debian/logstash_.deb dpkg -i logstash_.deb ==== Boot Startscript für manuelle Installation==== Wenn man nicht über das "deb" Paket installiert hat, muss mann sich eine ''upstart-job'' Konfigurationsdatei anlegen, damit logstash beim booten gestartet wird. vi /etc/init/logstash-agent.conf Inhalt: # logstash - agent instance # description "logstash agent" start on virtual-filesystems stop on runlevel [06] # Respawn it if the process exits respawn # We're setting high here, we'll re-limit below. limit nofile 65550 65550 setuid logstash setgid logstash # You need to chdir somewhere writable because logstash needs to unpack a few # temporary files on startup. console log script # Defaults PATH=/bin:/usr/bin LS_HOME=/var/lib/logstash LS_HEAP_SIZE="500m" LS_JAVA_OPTS="-Djava.io.tmpdir=${LS_HOME}" LS_LOG_FILE=/var/log/logstash/logstash.log LS_USE_GC_LOGGING="" LS_CONF_DIR=/etc/logstash/conf.d LS_OPEN_FILES=16384 LS_NICE=19 LS_OPTS="" # Override our defaults with user defaults: [ -f /etc/default/logstash ] && . /etc/default/logstash HOME="${HOME:-$LS_HOME}" JAVA_OPTS="${LS_JAVA_OPTS}" # Reset filehandle limit ulimit -n ${LS_OPEN_FILES} cd "${LS_HOME}" # Export variables export PATH HOME JAVA_OPTS LS_HEAP_SIZE LS_JAVA_OPTS LS_USE_GC_LOGGING test -n "${JAVACMD}" && export JAVACMD exec nice -n ${LS_NICE} /opt/logstash/bin/logstash agent -f "${LS_CONF_DIR}" -l "${LS_LOG_FILE}" ${LS_OPTS} end script Das Script wird dann noch nach ''/etc/init.d'' und in die Runlevel, zum autom. Start beim Booten, verlinkt. ln -s /lib/init/upstart-job /etc/init.d/logstash-agent update-rc.d logstash-agent defaults Jetzt noch einige Verzeichnisse erstellen und die dazugehörigen Benutzer rechte setzen. Service-Benutzer "logstash", als Systembenutzer, ohne Anmeldeshell und ohne Home-Dir, anlegen. useradd -r -s /bin/false -M logstash Verzeichnisse anlegen mkdir -p /etc/logstash/conf.d /var/log/logstash Der logstash-Benutzer ist Besitzer folgender Verzeichnisse. chown -R logstash.adm /var/log/logstash/ chown -R logstash.adm /opt/logstash ===== Startparameter ===== Hier die Startparameter für Logstash (Quelle: http://logstash.net/docs/1.4.0/flags) ^ Parameter ^ Beschreibung ^ | -f, --config CONFIGFILE | Load the logstash config from a specific file, directory, or a wildcard. If given a directory or wildcard, config files will be read from the directory in alphabetical order. | | -e CONFIGSTRING | Use the given string as the configuration data. Same syntax as the config file. If not input is specified, 'stdin { type => stdin }' is default. If no output is specified, 'stdout { debug => true }}' is default. | | -w, --filterworkers COUNT | Run COUNT filter workers (default: 1) | | --watchdog-timeout TIMEOUT | Set watchdog timeout value in seconds. Default is 10. | | -l, --log FILE | Log to a given path. Default is to log to stdout | | --verbose | Increase verbosity to the first level, less verbose. | | --debug | Increase verbosity to the last level, more verbose. | | -v | *DEPRECATED: see --verbose/debug* Increase verbosity. There are multiple levels of verbosity available with '-vv' currently being the highest | | --pluginpath PLUGIN_PATH | A colon-delimted path to find other logstash plugins in Web | | -a, --address ADDRESS | Address on which to start webserver. Default is 0.0.0.0. | | -p, --port PORT | Port on which to start webserver. Default is 9292 | ===== Konfiguration ===== Hier eine kleine initiale Konfigurationsdatei (''/etc/logstash/logstash.yml'') path: data: /var/lib/logstash config: /etc/logstash/conf.d/*.conf logs: /var/log/logstash # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace # # log.level: info config: reload: automatic: true interval: 3s Nach der Installation müssen nun noch drei Konfigurationsdateien erstellt werden. touch /etc/logstash/conf.d/input.conf touch /etc/logstash/conf.d/filter.conf touch /etc/logstash/conf.d/output.conf Hier eine Konfiguration wo Eingaben, die man in der Konsole (die gleiche wo man logstash gestartet hat) eingibt, aufgefangen und verarbeitet. Inhalt ''input.conf'': input { stdin { type => "stdin-type" } } Inhalt ''filter.conf'' Da hier erstmal Logstash nur getestet wird, kommt hier erstmal so nicht an Inhalt hinein. Inhalt ''output.conf'' output { stdout { codec => "json" workers => 2 } } ==== Starten und prüfen ==== Testweise kann logstash nun wie folgt gestartet werden: * Version 6.1: /usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash * Version 1.4: gibt es den Parameter ''--configtest'', welcher Logstash nicht startet sondern nur die Konfiguration testet. Ohne den Parameter kann Logstash nach erfolgreichem Test gestartet werden. /usr/share/logstash/bin/logstash --path.settings /etc/logstash Nun kann man einfach in der Konsole, in der man logstash selbst gestartet hat (einige Sekunden nach dem Start warten), z.b. "test" eingeben. Es sollte etwa folgendes Ergebnis ausgegeben werden: {"@source":"stdin://linsv01/","@tags":[],"@fields":{},"@timestamp":"2013-01-07T10:08:10.728Z","@source_host":"linsv01","@source_path":"/","@message":"test","@type":"stdin-type"} Zum starten als Service gibt man folgendes ein: initctl start logstash-agent ==== Reload der Konfiguration ==== Ab Version 2.3 kann logstash seine Konfigurationsdateien überwachen und bei Änderung diese neu laden. Dazu startet man logstash mit diesem Parameter ''--config.reload.automatic'' logstash –f --config.reload.automatic Standardmäßig schaut logstash alle 3 Sekunden nach Konfigurationsänderungen. Man kann dem o.g. Parameter ein Interval hinzufügen, wenn dies geändert werden soll. Hier noch ein Beispiel aus der ''logstash.yml'' config: reload: automatic: true interval: 30s Wenn logstash bereits läuft und der o.g. Parameter nicht gesetzt ist, kann man ihn mit dem kill SIGHUP (signal hangup) zum reloaden zwingen. kill -1 oder kill -SIGHUP ==== Debugging ==== Um Nun zu sehen was logstash tatsächlich gerade macht, kann man das Logging höher schalten. Dazu ändert man in der Datei ''/etc/logstash/log4j2.properties'' den Eintrag "status=error" auf "status=debug". Dannach startet man Logstash auf der Konsole mit /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" "--log.level=debug" Das Log befindet sich dann unter ''/var/log/logstash/logstash-plain.log'' Wenn man z.B. nur das Output Plugin für Elasticsearch debuggen möchte, kann man die log4j2.properties wie folgt ergänzen: logger.elasticsearchoutput.name = logstash.outputs.elasticsearch logger.elasticsearchoutput.level = debug Wenn die Eigenschaft "status:" weiterhin auf "error" stehen bleibt, werden nur die Elasticsearch Meldungen mit Debug versehen. === Alles auf Konsole === Hiermit startet man Logstash mit den Minimalangaben im Verbose-Mode. Alle Eingaben die mit Enter bestätigt werden, werden übergeben. /usr/share/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => localhost } stdout { codec => rubydebug } }' --[verbose|debug] Auf der Elasticsearch Seite schaut man im Webbrowser auf die Adresse '''http://localhost:9200/_search?pretty''' oder setzt den curl Befehl ab: curl 'http://localhost:9200/_search?pretty' ===== Konfigurationsbeispiele ===== Hier sind einige Beispiele wie die Konfiguration von logstash mit div. In- und Outputs sowie Filtern aussehen kann. Um Filter mit ''grok'' nutzen zu können, benötigt man Patterns. Bei dieser Installation findet man eine Liste möglicher Patterns in dem Verzeichniss ''/opt/grok/patterns/base'' oder Online unter [[https://github.com/logstash/logstash/blob/v1.2.1/patterns/grok-patterns|Github]] Beim [[http://grokdebug.herokuapp.com/|Grok Debugger]] kann man die einzelnen Patterns mit einer Syslog-Meldung testen. ==== Inputs ==== input { tcp { port => 10514 type => "syslog_ssg_auth" } udp { port => 10514 type => "syslog_ssg_auth" } file { codec => plain { charset => "UTF-8" } path => "" sincedb_path => "/opt/logstash/sincedb" start_position => "end" type => "" } beats { host => "" port => "5044" } } ==== Filter ==== filter { if [fields][device] == "" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:logTimestamp} %{HOST:host} %{WORD}: %{WORD} %{WORD}=%{HOST} \[Root\]%{WORD}-%{WORD}-%{INT:MSG_ID}%{DATA}: %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => [ "logTimestamp", "yyy-MM-dd HH:mm:ss", "dd/MMM/YYYY:HH:mm:ss Z" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "" { grok { match => { "message" => "%{SYSLOGTIMESTAMP} %{HOST:host} %{WORD}: \<%{DAY}\, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME}\,%{WORD}\> \[%{WORD:filterprog}\|%{WORD:loglevel}\] %{GREEDYDATA:message}" } overwrite => ["message"] } date { # match for TIMESTAMP_ISO8601 match => [ "logTimestamp", "yyy-MM-dd HH:mm:ssZ" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:logTimestamp} %{IPORHOST:host} %{WORD}: %{GREEDYDATA:message}" } overwrite => ["message"] } grok { match => { "path" => "%{YEAR:log_year}" } } mutate { # adds Year in Timestamp replace => [ "logTimestamp", "%{log_year} %{logTimestamp}" ] remove_field => "log_year" } date { # # match for mutated SYSLOGTIMESTAMP match => [ "%{log_year} logTimestamp", "yyy MMM dd HH:mm:ss", "yyy MMM d HH:mm:ss" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched" } } } if [fields][device] == "" { grok { match => { "message" => "%{SQUIDMKHLOG2}" } overwrite => ["message"] } date { match => [ "logTimestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] timezone => "Europe/Berlin" target => "@timestamp" add_field => { "debug" => "timestampMatched"} } } } === Datum und @timestamp === Sinnvoll ist es das Datum der Nachricht und nicht das Eingangsdatum von Logstash als Timestamp in Elasticsearch zu verwenden. So werden die Indizes mit dem Datum angelegt, welches in der Nachricht verwendet wird. Wenn man z.B. alte Daten importieren möchte, dann ist das unumgänglich. Oben sieht man in den Filter Beispielen einige Möglichkeiten da zu realisieren. Bei Logs mit einem Timestamp im Sinne von Syslog (MMM dd HH:mm:ss), kann man mit Logstash diesen Timestamp so nicht als Timestamp im Elasticsearch verwenden, da hier das Jahr fehlt. Dafür kann man z.B. den Dateinamen der Logdatei mit einem Datum oder nur dem Jahr in der diese Datei erstellt wurde versehen und diese Info dann im Logstash verwenden. Das ist hier in dem Filter-Beispiel für dargestellt. Evtl. muss z.B. dem Syslogserver mitgeteilt werden, dass dieser die Datei mit einem Jahr/Monat/Tag versieht. Beispiele findet man hier in der Wiki unter [[linux:syslog#dynamicfilenameformat|DynamicFileName]]. Eine andere Methode wäre die, vom Syslog-Server empfangene Nachricht, anzupassen in dem man dieser ein anderes Datums-Format verpasst. Ein Beispiel hier für findet man auch hier in der Wiki unter [[linux:syslog#messageformat|MessageFormat]]. Unter [[https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html|mapping-date-format]] und [[https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-daterange-aggregation.html#date-format-pattern|date-format-pattern]] findet man Infos zum Datumsformat in Elasticsearch. Ein kleines Script für das zusammenstellen von komprimierten Dateien, welche vom Logrotation-Daemon erstellt wurden, findet man [[linux:scriptschnipsel#logrotation_files_to_1_file|hier]]. ==== Outputs ==== output { elasticsearch_http { host => "localhost" # index => "logstash-%{+YYYY.MM.dd}" # user => "some Username" # password => "some_Password" # port => "9200" <-- this ist default workers => 2 } } ===== Upgradeverfahren ===== Logstash systemctl stop logstash.service dpkg -i ...deb /usr/share/logstash/bin/logstash --config.test_and_exit --path.settings /etc/logstash systemctl start logstash.service ===== Troubleshooting ===== Ab Version >=6.3.0 wird das Feld "host" vom Filebeat selbst gefüllt und produziert dadurch ein mappingconflict mit dem Feld "host" im Index. Hier die [[https://github.com/elastic/beats/blob/master/libbeat/docs/breaking.asciidoc#breaking-changes-6.3|Breaking Changes in 6.3]], dies ist ein Auszug: Starting with version 6.3, Beats provides an `add_host_metadata` processor for adding fields, such as `host.name` and `host.id`, to Beats events. These fields are grouped under a `host` prefix and conform to the https://github.com/elastic/ecs[Elastic Common Schema (ECS)]. The `host` object is defined in the Elasticsearch index template even if the processor is not used. We've also added a `host.name` field to all events sent by Beats. This field prevents the Beats input plugin in Logstash from adding a default `host` field. (By default, the plugin adds a `host` field if the event doesn't already have one. We don't want the plugin to add this field because it causes a mapping conflict with the `host` object defined in the index template.) Daher muss man in die Filter folgendes eintragen: mutate { remove_field => [ "[host]" ] } mutate { add_field => { "host" => "%{[beat][hostname]}" } }