Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Warning

When using the "file" input (which only adminstrators can do), note that deleting the documents for the source does not appear to reset logstash's state for that file and therefore it does not re-ingest the data. To work around this, it is currently necessary to suspend the source, wait 5 minutes, and then re-enable it (which causes the logstash process to restart).

Example

Note this The example shows a logstash configuration setup to filter some Cisco netflow data.

In the example, Conditionals are used to tell the logstash filter to only output data under certain conditions.

In the conditional, mutate is used to remove some fields and perform some data type conversions.

Later in the example, logstash geoip is used to add the geographical location of IP addresses.

 

Info
This is code as it would be seen in the "LS" code editor, which then gets converted to a string and placed in config (Eg "input\n{\n")
Code Block
input {
  s3 {
      credentials => ["ACCESSID","PASSWORD"]
      bucket => "import.bucket.name"
      prefix => "netflow_demo/"
      type => "netflow"
  }
  # NOTE: only one input block allowed
}
filter
{
        csv
        {
            columns=> [
"ts","te","td","sa","da","sp","dp","pr","flg","fwd","stos","ipkt","ibyt","opkt","obyt","in","out","sas","das","smk","dmk","dtos","dir","nh","nhb","svln",
"dvln","ismc","odmc","idmc","osmc","mpls1","mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10","cl","sl","al","ra","eng","exid","tr"
                ]
        }
        if [ts] == "ts" {
            drop {}
        }
        date {
                match => [ "ts" , "yyyy-MM-dd HH:mm:ss" ]
        }
        mutate {
                remove_field => ["al","cl","das","dir","dmk","dtos","dvln","eng","exid","fwd","host","idmc","ismc","mpls1",
                    "mpls2","mpls3","mpls4","mpls5","mpls6","mpls7","mpls8","mpls9","mpls10",
                    "nh","nhb", "odmc","osmc","out","ra","sas", "sl","smk","stos","svln", "te","tr","ts"
                    ]
                convert => ["ibyt", "integer" ]
                convert => ["ipkt", "integer" ]
                convert => ["obyt", "integer" ]
                convert => ["opkt", "integer" ]
                convert => [ "td", "float" ]
        }
        if [sa] =~ "^172[.]16[.].*" {
                mutate {
                    add_field => { "ap" => "%{dp}" }
                    remove_field => ["dp", "sp" ]
                }
                geoip {
                    source => "da"
                    fields => ["timezone","location","latitude","longitude"]
                }
        }
        else {
             mutate {
                 add_field => { "ap" => "%{sp}" }
                 remove_field => ["dp", "sp" ]
             }
             geoip {
                    source => "sa"
                    fields => ["timezone","location","latitude","longitude"]
            }
       }
}
#NOTE: no output blocks allowed

...