input{ kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "Nginxlog" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["Nginx_log"] codec => "json" type => "Nginx_log" } } filter{ mutate { gsub => ["message", "\\x22", '"'] gsub => ["message", "\\x09", ''] } json { source => "message" remove_field=>["message","beat","@version","@timestamp"] } if [type] == "Nginx_log" { ruby { code => ' -- 获取白名单 file = File.open("/usr/local/logstash/config/white.txt", "r") text = file.read file.close -- 判断日志中request_uri属性是否在白名单中 -- 也可直接将不在白名单的日志排除 event.cancel if !text.include?(event.get("request_uri")) if !text.include?(event.get("request_uri")) then -- 如果不存在就增加一个属性es_flag=0表示该日志没用 event.set("es_flag","0") else -- 如果不存在就增加一个属性es_flag=1表示该日志有用 event.set("es_flag","1") end ' } } } } output { if [type] == "Nginx_log" { -- 判断es_flag=1放到nginx-log-yes索引中 if [es_flag] =="1" { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-log-yes" } } -- 判断es_flag=0放到nginx-log-no索引中 else { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-log-no" } } } }
Lostash event API说明
除了基本的get和set外,还提供了丰富的接口。我们能用到的方法包括:
删除事件:cancel
取消删除事件:uncancel
是否删除:cancelled?
是否包含字段:include?
删除字段:remove
事件转字符串:to_s
事件转hash字典(不含Metadata字段):to_hash
事件转hash字典(含Metadata字段):to_hash_with_Metadata
事件转json字符串:to_json
增加tag:tag
取事件时间戳:timestamp
测试配置文件
input{ stdin{ codec=>json } }filter{ ruby{ code=>' event.cancel event.set("cancelled",event.cancelled?) event.uncancel event.set("include",event.include?("hello")) event.remove("hello") event.set("to_s",event.to_s) event.set("to_hash",event.to_hash) event.set("to_hash_with_Metadata",event.to_hash_with_Metadata) event.set("to_json",event.to_json) event.tag("_test_tag") event.set("timestamp",event.timestamp) ' } }output{ stdout{ codec=>rubydebug } }
启动logstash,然后输入如下,查看结果
{"hello":"world"}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。