Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
{
	"display": string,
	"logstash": {
	    "config": string, // contains the complex - there are some custom substitution variables available, see below
 		"streaming": boolean, // defaults to true - data is stored only for a rolling 30 days, if false data is stored forever (useful for demos, should be used with care)
		"distributed": boolean, // defaults to false - if true, then the logstash source is run on all available nodes (if in file/s3 mode, normally used in conjunction with substitution variables - see below)
 
		"testDebugOutput": boolean, // defaults to false - if true then when running "test" eg from source editor, will generate additional debug output from logstash
		"testInactivityTimeout_secs": integer // (default 10s) when running "test" eg from source editor, controls how long after starting logstash will wait between successive input records before exiting
	} 
}

Description

Obviously the The most significant element of the Logstash configuration is the "config" string field. This contains the Domain Specific Language described here, pointing to the various elements listed here.

...

Info

System administrators can configure the set of allowed inputs via the 2 configuration parameters: "harvest.logstash.allowed_inputs" and "harvest.logstash.allowed_filters". This can be used either to ban elements considered unsafe, or to add new or custom elements.

Info

SUBSTITUTION VARIABLES

The following strings will subsitute within the config string:

  • "#LOGSTASH{host}" - resolves to the fully qualified hostname of the logstash on which the source is running (ie the master if non-distributed, each slave if distributed)
  • The standard Ikanow substitution (#IKANOW{xxx})
Warning

When using the "file" input (which only adminstrators can do), note that deleting the documents for the source does not appear to reset logstash's state for that file and therefore it does not re-ingest the data. To work around this, it is currently necessary to suspend the source, wait 5 minutes, and then re-enable it (which causes the logstash process to restart).

Example

Note this The example shows a logstash configuration setup to filter some Cisco netflow data.

In the example, Conditionals are used to tell the logstash filter to only output data under certain conditions.

In the conditional, mutate is used to remove some fields and perform some data type conversions.

Later in the example, logstash geoip is used to add the geographical location of IP addresses.

Info
This is code as it would be seen in the "LS" code editor, which then gets converted to a string and placed in config (Eg "input\n{\n")
Code Block
input {
  s3 {
      credentials => ["ACCESSID","PASSWORD"]
      bucket => "import.bucket.name"
      prefix => "netflow_demo/"
      type => "netflow"
  }
  # NOTE: only one input block allowed
}
filter
{
        csv
        {
            columns=> [
"ts","te","td","sa","da","sp","dp","pr","flg","fwd","stos","ipkt","ibyt","opkt","obyt","in","out","sas","das","smk","dmk","dtos","dir","nh","nhb","svln",
"dvln","ismc","odmc","idmc","osmc","mpls1","mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10","cl","sl","al","ra","eng","exid","tr"
                ]
        }
        if [ts] == "ts" {
            drop {}
        }
        date {
                match => [ "ts" , "yyyy-MM-dd HH:mm:ss" ]
        }
        mutate {
                remove_field => ["al","cl","das","dir","dmk","dtos","dvln","eng","exid","fwd","host","idmc","ismc","mpls1",
                    "mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10",
                    "nh","nhb", "odmc","osmc","out","ra","sas", "sl","smk","stos","svln", "te","tr","ts"
                    ]
                convert => ["ibyt", "integer" ]
                convert => ["ipkt", "integer" ]
                convert => ["obyt", "integer" ]
                convert => ["opkt", "integer" ]
                convert => [ "td", "float" ]
        }
        if [sa] =~ "^172[.]16[.].*" {
                mutate {
                    add_field => { "ap" => "%{dp}" }
                    remove_field => ["dp", "sp" ]
                }
                geoip {
                    source => "da"
                    fields => ["timezone","location","latitude","longitude"]
                }
        }
        else {
             mutate {
                 add_field => { "ap" => "%{sp}" }
                 remove_field => ["dp", "sp" ]
             }
             geoip {
                    source => "sa"
                    fields => ["timezone","location","latitude","longitude"]
            }
       }
}
#NOTE: no output blocks allowed

...