Logstash extractor

Overview

Creates records using the very powerful and flexible Logstash platform. This is currently the only extractor capable of generating records, all the others create documents

Currently this extractor type cannot be used in conjunction with any other elements - all other pipeline elements are ignored when this one is specified.

Format

The Infinit.e side format is very simple:

{
	"display": string,
	"logstash": {
	    "config": string, // contains the complex - there are some custom substitution variables available, see below
 		"streaming": boolean, // defaults to true - data is stored only for a rolling 30 days, if false data is stored forever (useful for demos, should be used with care)
		"distributed": boolean, // defaults to false - if true, then the logstash source is run on all available nodes (if in file/s3 mode, normally used in conjunction with substitution variables - see below)
 
		"testDebugOutput": boolean, // defaults to false - if true then when running "test" eg from source editor, will generate additional debug output from logstash
		"testInactivityTimeout_secs": integer // (default 10s) when running "test" eg from source editor, controls how long after starting logstash will wait between successive input records before exiting
	} 
}

Description

The most significant element of the Logstash configuration is the "config" string field. This contains the Domain Specific Language described here, pointing to the various elements listed here.

Whenever manually creating the Logstash configuration, the source editor should be used - there is an "LS" editor window (to the left immediately below the main source form, whenever the "extractor type" is specified) a standard code editing experience. (The indentation doesn't quite work because the Logstash syntax doesn't map onto any existing languages!). There is also a "Logstash" template available from the "New Source" window.

The Infinit.e implementation provides a number of limitations:

  • For non-admin users, only the following input elements can be used:
    • collectd, drupal_dblog, gelf, gemfire, imap, irc, lumberjack, s3, snmptrap, sqs, syslog, twitter, udp, xmpp, zenoss
  • For all users, only a single input element can be specified
  • For non-admin users, only the following filter elements can be specified:
    • advisor, alter, anonymize, checksum, cidr, cipher, clone, collate, csv, date, dns, drop, elapsed, extractnumbers, fingerprint, geoip, gelfify, grep, grok, grokdiscovery, l18n, json, json_encode, kv, metaevent, metrics, multiline, mutate, noop, prune, punct, railsparallelrequest, range, sleep, split, sumnumbers, syslog_pri, throttle, translate, unique, urldecode, useragent, uuid, wms, wmts, xml
  • For all users, no output element can be specified, one is inserted automatically by Infinit.e

System administrators can configure the set of allowed inputs via the 2 configuration parameters: "harvest.logstash.allowed_inputs" and "harvest.logstash.allowed_filters". This can be used either to ban elements considered unsafe, or to add new or custom elements.

SUBSTITUTION VARIABLES

The following strings will subsitute within the config string:

  • "#LOGSTASH{host}" - resolves to the fully qualified hostname of the logstash on which the source is running (ie the master if non-distributed, each slave if distributed)
  • The standard Ikanow substitution (#IKANOW{xxx})

When using the "file" input (which only adminstrators can do), note that deleting the documents for the source does not appear to reset logstash's state for that file and therefore it does not re-ingest the data. To work around this, it is currently necessary to suspend the source, wait 5 minutes, and then re-enable it (which causes the logstash process to restart).

Example

The example shows a logstash configuration setup to filter some Cisco netflow data.

In the example, Conditionals are used to tell the logstash filter to only output data under certain conditions.

In the conditional, mutate is used to remove some fields and perform some data type conversions.

Later in the example, logstash geoip is used to add the geographical location of IP addresses.

This is code as it would be seen in the "LS" code editor, which then gets converted to a string and placed in config (Eg "input\n{\n")
input {
  s3 {
      credentials => ["ACCESSID","PASSWORD"]
      bucket => "import.bucket.name"
      prefix => "netflow_demo/"
      type => "netflow"
  }
  # NOTE: only one input block allowed
}
filter
{
        csv
        {
            columns=> [
"ts","te","td","sa","da","sp","dp","pr","flg","fwd","stos","ipkt","ibyt","opkt","obyt","in","out","sas","das","smk","dmk","dtos","dir","nh","nhb","svln",
"dvln","ismc","odmc","idmc","osmc","mpls1","mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10","cl","sl","al","ra","eng","exid","tr"
                ]
        }
        if [ts] == "ts" {
            drop {}
        }
        date {
                match => [ "ts" , "yyyy-MM-dd HH:mm:ss" ]
        }
        mutate {
                remove_field => ["al","cl","das","dir","dmk","dtos","dvln","eng","exid","fwd","host","idmc","ismc","mpls1",
                    "mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10",
                    "nh","nhb", "odmc","osmc","out","ra","sas", "sl","smk","stos","svln", "te","tr","ts"
                    ]
                convert => ["ibyt", "integer" ]
                convert => ["ipkt", "integer" ]
                convert => ["obyt", "integer" ]
                convert => ["opkt", "integer" ]
                convert => [ "td", "float" ]
        }
        if [sa] =~ "^172[.]16[.].*" {
                mutate {
                    add_field => { "ap" => "%{dp}" }
                    remove_field => ["dp", "sp" ]
                }
                geoip {
                    source => "da"
                    fields => ["timezone","location","latitude","longitude"]
                }
        }
        else {
             mutate {
                 add_field => { "ap" => "%{sp}" }
                 remove_field => ["dp", "sp" ]
             }
             geoip {
                    source => "sa"
                    fields => ["timezone","location","latitude","longitude"]
            }
       }
}
#NOTE: no output blocks allowed

Footnotes:

External documentation:

Logstash