新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、小程序定制开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了大箐山免费建站欢迎大家使用!
package main
import (
"fmt"
"time"
"os"
"bufio"
"io"
)
//读接口做抽象优化
type Reader interface {
Read(rc chan []byte)
}
//写接口做抽象优化
type Writer interface {
Write(wc chan []byte)
}
type LogProcess struct {
rc chan []byte //读chan
wc chan []byte //写chan
read Reader
write Writer
}
type ReadFromFile struct {
path string //读取文件的路径
}
type WriteToInfluxDB struct {
influxDBDsn string //influx data source
}
func (r *ReadFromFile) Read(rc chan []byte) {
//读取模块
//打开文件
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s", err.Error()))
}
//从文件末尾开始逐行读取文件内容
f.Seek(0, 2)//将文件字符指针移动到最后(0偏移量,2移动字符指针到末尾)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n') //读取文件内容直到遇到换行符(即读取一行内容)
if err == io.EOF{ //读取到文件末尾也会返回err
time.Sleep(time.Second)
continue //当前循环不让继续向下执行
}else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func (l *LogProcess) Process() {
//解析模块
for v := range l.rc {
l.wc <- v
}
}
func (w *WriteToInfluxDB) Write(wc chan []byte) {
//写入模块
for v := range wc {
fmt.Println(string(v))
}
}
func main() {
r := &ReadFromFile{
path: "D:/go_work_dir/logs/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password..",
}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan []byte),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep(10000 * time.Second)
}
正则解析等方法未写...
注意,还有可以优化的地方,如:
正则解析会比较慢,可以for给它多开几个goroutine:
而chan没有buff,读写要同时完成才行,会有阻塞。所以给它加buff,也就是相当于加了缓存:
还要记住读取到了日志的哪个行数