Logstash简介及数据接入示例Kafka--->Influxdb
Logstash Kafka--->Influxdb
Logstash是一个类似于Flume的数据流采集工具,可以对接多种的数据输入源(如:各种服务器的日志文件,ES/KAFKA/HDFS等等,更多的输入插件请参照),以及将数据源数据到多种数据存储介质中(如:File/ES/Influxdb/HDFS/Mysql/Kafka等等,更多的输出插件请参照)
安装和使用:
可以从官网下载logstash,当前位置的logstash的版本是6.6.1,发布日期是2019年2月19日,可以根据自己的需求来下载适合自己需求的包,本次使用的是TAR.GZ文件
将下载的文件上传到linux的指定目录(自己喜欢的目录即可)
tar -zxvf logstash-6.6.1.tar.gz -C targetpath (targetpath为解压到自己的软件安装的目录)
vim xxx.conf(xxx是自己喜欢的名字 随意更换)
输入
input { stdin {} } output { stdout{ codec => json } }
$Logstash/bin/logstash -f xxx.conf 即可运行
Logstash的使用仅需玩家配置一个配置文件(如:xxx.conf)即可,logstash的安装也非常的简单
插件 : logstash-input-kafka (早期版本需要安装,当前版本已内置)
关于输入源kafka的相关配置,请参照,在logstash6.6.1中已经集成了logstash-input-kafka这个插件,所以不需要用户再自己去安装这个插件了,如果想查看已安装了哪些插件,可以使用命令:
$logstash/bin/logstash-plugin list
插件 : logstash-output-influxdb (没有内置,需要自己安装)
安装方式
一. 联网安装
$logstash/bin/logstash-plugin insatall logstash-output-influxdb
安装完成之后,执行
$logstash/bin/logstash-plugin list
即可查看是否安装成功
二. 离线安装
首先在一台能联网的电脑上安装logstash的需要的插件,然后在安装后的电脑上执行:
bin/logstash-plugin prepare-offline-pack logstash-output-influxdb
命令执行完后在logstash的根目录下会多出一个logstash-offline-plugins-6.6.1.zip文件
然后将logstash-offline-plugins-6.6.1.zip文件上传到需要离线安装插件的机子上,之后 执行命令:
$logstash/bin/logstash-plugin install file:///../../logstash-offline-plugins-6.6.1.zip
然后执行命令:
$logstash/bin/logstash-plugin list
查看是否已经成功安装
对于influxdb的插件的参数介绍,请以此插件github上项目下的目录 : logstash-output-influxdb/docs/index.asciidoc 文档为准,这里面的参数及使用介绍 ,是最完整的,官网及其他地方的版本可能不一样
还有一点需要注意的是,不同版本的插件参数也是不一样的,使用之前请严格查看自己插件版本对应的参数文档介绍
kafka及其他插件同理!
下面是示例配置文件及解释:
input{
kafka { #下面的这些配置信息谁在前谁在后无所谓 不同版本的插件这些配置的名字也不同 请注意
bootstrap_servers => "node01:9092,node02:9092,node03:9092" # kafka集群信息
codec => "json" # 将kafka的消息以json格式输入 这样的话 就可以直接在output和filter的部分使用"%{key}"来引用kafka消息中的对应字段的值
group_id => "logstash1" #消费者组id 为了记录消费的offset
consumer_threads => 8 # 消费者组中多少个消费者来消费这些topic中的消息 一般情况线程数对应topic的分区个数为最佳
topics => ["test"] # 需要消费的topic 可以传入多个topic
auto_offset_reset => "latest" #如果消费者组的offset丢失或者第一次加入 从什么位置开始消费
}
}
filter{
if ![host]{ #判断给定的消息中是否有host这个字段
drop {} #如果没有的话就跳过这条消息
} else {
date {#如果只有match 没有target的话 默认把匹配到的时间赋值给event的@timestamp
match => ["logdate","ISO8601"] #匹配 2019-02-26T09:30:51.55+8:00 这种带时区的日期格式
timezone => "Asia/Shanghai" #如果加时区的话 @timestamp的时间会是UTC时间 并不是该时区的时间
}
}
}
output {
influxdb {
host => "node01" # influx所在的ip port的话默认就是8086 如果没有修改默认端口的话 这个位置可以不用配置port
db => "demo" # 使用的数据库
measurement => "demo1" # 将数据插入到哪个表中
retention_policy => "autogen" # 使用哪个保留规则
send_as_tags => ['aaa'] # 这里面是标示着把哪些字段作为tag datapoints中的字段必须大于此集合中的字段数
allow_time_override => true #是否允许覆盖time字段 logstash在向influx插入消息时默认是将@timestamp作为time字段插入 ,所以上面的date过滤器中直接匹配赋值即可
flush_size => 100 #多少条消息提交到influx
idle_flush_time => 1 #多久提交一次消息到influx 这个和上面那个条件 满足其一即可
codec => json # 将消息以json格式输出
coerce_values => {"value" => "float"} #将某些字段的值类型强转
data_points => { # 字段之间不用加逗号 加了的话 报错
"aaa" => "%{aaa}"#获取tag字段的值
"value" => "%{value}"#获取field的值
}
}
stdout {codec => rubydebug}#将消息打印出来
}
如果参照上面的配置不能写入influx的话 请尝试将influx的版本降级