微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Logstash利用ruby将有用的日志放到一个ES_INDEX将无用的日志放到另一个ES_INDEX

input{
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        client_id => "Nginxlog"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["Nginx_log"]
        codec => "json"
        type => "Nginx_log"
    }
}
filter{
   mutate {
        gsub => ["message", "\\x22", '"']
        gsub => ["message", "\\x09", '']
    }
    json {
        source => "message"
        remove_field=>["message","beat","@version","@timestamp"]
    }
    if [type] == "Nginx_log" {
        ruby {
                code => '
                    -- 获取白名单
                    file = File.open("/usr/local/logstash/config/white.txt", "r")
                    text = file.read
                    file.close
                    -- 判断日志中request_uri属性是否在白名单中 -- 也可直接将不在白名单的日志排除 event.cancel if !text.include?(event.get("request_uri"))
                    if !text.include?(event.get("request_uri")) then
                        -- 如果不存在就增加一个属性es_flag=0表示该日志没用
                        event.set("es_flag","0")
                    else
                        -- 如果不存在就增加一个属性es_flag=1表示该日志有用
                        event.set("es_flag","1")
                    end
                '
            }
        }
    }
}
output {
    if [type] == "Nginx_log" {
        -- 判断es_flag=1放到nginx-log-yes索引中
        if [es_flag] =="1" {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-yes"
            }
        }
        -- 判断es_flag=0放到nginx-log-no索引中
        else {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-no"
            }
        }
    }
}

 

Lostash event API说明

除了基本的get和set外,还提供了丰富的接口。我们能用到的方法包括
删除事件:cancel
取消删除事件:uncancel
是否删除:cancelled?
是否包含字段:include?
删除字段:remove
事件转字符串:to_s
事件转hash字典(不含Metadata字段):to_hash
事件转hash字典(含Metadata字段):to_hash_with_Metadata
事件转json字符串:to_json
增加tag:tag
取事件时间戳:timestamp

测试配置文件

input{
    stdin{
        codec=>json
    }
}filter{
    ruby{
        code=>'
            event.cancel
            event.set("cancelled",event.cancelled?)
            event.uncancel
            event.set("include",event.include?("hello"))
            event.remove("hello")
            event.set("to_s",event.to_s)
            event.set("to_hash",event.to_hash)
            event.set("to_hash_with_Metadata",event.to_hash_with_Metadata)
            event.set("to_json",event.to_json)
            event.tag("_test_tag")
            event.set("timestamp",event.timestamp)
        '
    }
}output{
    stdout{
        codec=>rubydebug
    }
}

启动logstash,然后输入如下,查看结果

{"hello":"world"}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐