源码走读: fasthttp 为什么这么快?

前戏 今天我们聊聊 golang 的 web 框架, fasthttp 和 gin 到底谁更丝滑? fasthttp 简介 安装 go get -u github.com/valyala/fasthttp fasthttp 性能吊打 net/http 简而言之,fasthttp服务器的速度比net/http快多达 10 倍, 具体参数可以查看官网https://github.com/valyala/fasthttp fasthttp 使用方式与 net/http 有所差异 // using the given handler. func ListenAndServe(addr string, handler RequestHandler) error { s := &Server{ Handler: handler, } return s.ListenAndServe(addr) } type RequestHandler func(ctx *RequestCtx) 可以这样使用: type MyHandler struct { foobar string } // request handler in net/http style, i.e. method bound to MyHandler struct. func (h *MyHandler) HandleFastHTTP(ctx *fasthttp.RequestCtx) { // notice that we may access MyHandler properties here - see h.foobar. fmt.Fprintf(ctx, "Hello, world! Requested path is %q. Foobar is %q", ctx.Path(), h.foobar) } // pass bound struct method to fasthttp myHandler := &MyHandler{ foobar: "foobar", } fasthttp.ListenAndServe(":8080", myHandler.HandleFastHTTP) 在多个handler的时候需要这样使用: ...

十二月 27, 2024 · 5 分钟 · 1000 字 · zhu733756

segmentio/kafka-go: 如何实现Kafka读写的极致性能与精准一次处理

前戏 小白: 老花, 我想学习下golang如何批量写入和消费 kafka? 老花: 可以的, 我这里要推荐一个golang的kafka库, github.com/segmentio/kafka-go, 在此之前, 我们先来一波八股文。 kafka 分区架构(八股文) 分区 分区(Partition)是 Kafka 中最基本的数据存储单元。每个主题(Topic)可以有多个分区, 每个分区都是一个有序的、不可变的消息序列。分区的设计使得 Kafka 能够实现数据的并行处理和负载均衡。 并行处理: 通过分区, Kafka 允许多个消费者并行处理不同分区的数据, 从而提高系统的处理能力。 数据本地性: 每个分区可以分布在不同的 Broker 上, 这样可以充分利用集群的存储能力, 并实现数据的分布式存储。 顺序保证: 在单个分区内, 消息是有序的, 这有助于确保一些需要顺序处理的场景, 如日志记录。 生产者与消费者操作分区 生产者发送消息到指定 Partition: 生产者可以根据需要将消息发送到特定的分区, 这可以通过指定消息的分区键来实现。 消费者订阅指定 Partition: 消费者可以选择订阅特定的分区, 也可以订阅整个 Topic, 以实现对特定数据流的处理。 负载均衡 Kafka 的生产者在将消息发送到分区时, 使用分区键的哈希函数来决定消息应该被分配到哪个分区, 这有助于实现负载均衡。 幂等性 enable.idempotence 被设置成 true 后, Producer 自动升级成幂等性, 在底层设计架构中引入了 ProducerID 和 SequenceNumber, 自动帮你做消息的重复去重。 事务 事务要求消息一起成功发送, 要么一起失败。简单来说,就像是个打包操作,把消息 Record1 和 Record2 看作一个整体,要么一起成功,要么一起失败。 ...

十二月 23, 2024 · 7 分钟 · 1464 字 · zhu733756

性能提升秘籍:Golang对象池与复用技术,GC不再是瓶颈!

前戏 小白: 老花, 我今天想了解下 golang 的对象复用是干啥用的? 老花: 对象复用, 顾名思义, 就是把对象进行复用, 而不是每次都重新创建一个对象, 这样可以减少内存的开销, 提高性能, 减少 GC 的次数。 对象复用如何使用? golang的对象复用主要是要sync.Pool实现, 它是并发安全的对象池。 首先, 声明对象池, 需要提供一个对象的构造函数 New: var pool = sync.Pool{ New: func() interface{} { return new(MyType) // MyType 是你想要复用的对象类型 }, } 当从对象池中获取对象时,如果池中没有可用对象,会调用 New 函数创建一个新的对象。 使用 Get 方法从对象池中获取对象,使用 Put 方法将对象放回对象池。 obj := pool.Get().(*MyType) pool.Put(obj) 不过项目中, 我们都会简单地进行封装使用: var dataPool sync.Pool type UserData struct { Data []byte Key string } func GetUserData() *UserData { si := dataPool.Get() if si == nil { return &UserData{} } return si.(*UserData) } func PutUserData(si *UserData) { if si == nil { return } dataPool.Put(si) } 这样, 我们只需要使用GetUserData 和 PutUserData 就可以了。 ...

十二月 20, 2024 · 5 分钟 · 924 字 · zhu733756

性能翻倍!Golang开发者必读的pprof全攻略

前戏 小白: golang 有哪些性能提升的技巧? 能分享一下吗? 老花: 当然可以, 我们先看下如何分析我们的程序有什么性能问题吧! pprof 简介 pprof 是一个用于分析 Go 程序性能的工具,它提供了丰富的性能分析功能,包括 CPU、内存、goroutine、锁、网络、HTTP、GC 等等。 pprof 是 Go 语言自带的性能分析工具,可以帮助我们监控和分析程序的性能。 它有两个包: runtime/pprof:适用于所有类型的代码,特别是非 Web 应用程序。 net/http/pprof:对 runtime/pprof 的简单封装,适合 Web 应用程序使用,通过 HTTP 端口暴露性能数据。 pprof 监控内容 pprof 可以监控以下内容: allocs:内存分配情况的采样信息。 blocks:阻塞操作情况的采样信息。 cmdline:显示程序启动命令及参数。 goroutine:当前所有协程的堆栈信息。 heap:堆上内存使用情况的采样信息。 mutex:锁争用情况的采样信息。 profile:CPU 占用情况的采样信息。 threadcreate:系统线程创建情况的采样信息。 如何使用 pprof 对于 golang 代码, 我们可以使用下面这个办法来引入在线 pprof 分析工具: import ( _ "net/http/pprof" "net/http" ) func main() { go func() { http.ListenAndServe("localhost:6060", nil) }() // 程序主逻辑 } 这样,你就可以通过 go tool pprof http://localhost:6060/debug/pprof/heap访问堆栈, 其中heap可以换成allocs、goroutines、threadcreate等。 ...

十二月 19, 2024 · 4 分钟 · 712 字 · zhu733756

聊聊MongoDB BSON数据格式

前戏 小白:老花,最近我在研究MongoDB,听说它使用BSON作为数据格式,这BSON到底是个啥玩意儿? 老花:哈哈,小白,BSON是Binary JSON的缩写,它是一种类JSON的二进制编码格式。BSON的设计目的, 是为了在MongoDB中存储和交换文档数据。它继承了JSON的灵活性和可读性,同时增加了一些额外的数据类型,比如日期、二进制数据和代码等。 小白:那为啥要用BSON这种数据存储格式呢? 老花:有几个原因。首先,BSON是二进制格式,这意味着它比JSON更紧凑,传输效率更高。其次,BSON支持更多的数据类型,这使得它能够存储更复杂的数据结构。再者,BSON是自描述的,每个字段都有类型信息,这使得解析BSON数据时更加方便。 小白:听起来挺酷的,那我们怎么解析BSON数据呢? BSON 数据格式 解析BSON数据其实很简单。 在 Go 语言中,MongoDB的官方驱动提供了很好的支持。你可以使用go.mongodb.org/mongo-driver/bson这个包来处理BSON数据。 比如,你可以这样解析一个BSON文档: var doc bson.M err := bson.Unmarshal(data, &doc) if err != nil { log.Fatal(err) } fmt.Println(doc) 这里的data是你从 MongoDB 获取的 BSON 格式的字节切片,bson.M是一个可以存储任何类型值的 map,非常适合用来解析BSON文档。 如何实现 BSON 模糊搜索? 小白:那如果我想对 BSON 数据进行模糊搜索,比如搜索名字中包含某个字符串的文档? 老花:这个也好办。在 MongoDB 中,你可以使用正则表达式来进行模糊搜索。在 Go 语言中,你可以这样构建查询: import "go.mongodb.org/mongo-driver/bson" // 假设我们要搜索名字中包含"haha"的文档 filter := bson.M{"name": bson.M{"$regex": "haha", "$options": "i"}} // "i"代表忽略大小写 // 然后使用这个 filter 来查询数据库 cursor, err := collection.Find(context.TODO(), filter) if err != nil { log.Fatal(err) } // 遍历结果 for cursor.Next(context.TODO()) { var result bson.M err := cursor.Decode(&result) if err != nil { log.Fatal(err) } fmt.Println(result) 这段代码会查询所有 name 字段中包含haha的文档,并且忽略大小写。 在 MongoDB 中,除了$regex 算子用于模糊搜索外,还有一些其他的算子可以用于不同的搜索场景: ...

十二月 13, 2024 · 5 分钟 · 935 字 · zhu733756

实战 | 用golang撸一个极简MongoDB测试客户端?

前戏 小白:你好,老花!最近有个刁民让我测试部署的分片集群是否可用, 除了上官方上下载mongosh, 还有其他思路? 老花:当然有的! 我们可以用golang直接撸一个! 我们先整理一个功能清单: mclient flags: --uri mongodb 连接地址 --user 测试创建随机用户,并返回用户和密码 --insert 测试写入数据, 打印写入的随机条数 --read 测试插入随机数据, 并读取改数据 --remove 测试插入随机数据, 并删除该数据, 返回删除条数 搭建golang环境 下载 Go 语言 SDK 工具包:访问 Go 官网下载适合Linux的安装包。 解压安装包到指定目录,例如/root/go。 配置环境变量: 使用root权限编辑/etc/profile文件。 添加export GOROOT=/root/go和export PATH=$PATH:$GOROOT/bin。 执行source /etc/profile刷新配置。 测试配置是否生效,运行go version。 配置 Go 语言开发工具 可以选择多种开发工具,如 Visual Studio Code、GoLand 等,并安装相应的 Go 语言插件以支持智能提示、编译运行等功能 。 配置国内代理 由于国内网络环境的特殊性,配置国内代理可以加速 Go 模块的下载。可以通过设置环境变量 GOPROXY 来实现: go env -w GOPROXY=https://goproxy.cn,direct go env -w GOPROXY=https://mirrors.aliyun.com/goproxy/ mclient 功能实现 连接数据库 client, err := mongo.NewClient(options.Client().ApplyURI(uri)) if err != nil { log.Fatalf("Failed to create MongoDB client: %v", err) } // 设置连接超时 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 连接到MongoDB集群 err = client.Connect(ctx) if err != nil { log.Fatalf("Failed to connect to MongoDB cluster: %v", err) } defer client.Disconnect(ctx) // 确认连接成功,发送ping命令 err = client.Ping(ctx, readpref.Primary()) if err != nil { log.Fatalf("Failed to ping MongoDB cluster: %v", err) } fmt.Println("Connected to MongoDB cluster successfully!") 创建随机用户 func createRandomUser(client *mongo.Client) { collection := client.Database("admin").Collection("system.users") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() rand.Seed(time.Now().UnixNano()) username := fmt.Sprintf("user%d", rand.Int()) password := fmt.Sprintf("password%d", rand.Int()) user := bson.M{"user": username, "pwd": password, "roles": []bson.M{{"role": "readWrite", "db": "testdb"}}} _, err := collection.InsertOne(ctx, user) if err != nil { log.Fatalf("Failed to create user: %v", err) } fmt.Printf("User created with username: %s and password: %s\n", username, password) } 插入随机数据 func insertRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } result, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } fmt.Printf("Inserted %d documents with IDs: %v\n", len(result.InsertedIDs), result.InsertedIDs) } 读取和删除数据 func insertAndReadRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } result, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } var results []bson.M cur, err := collection.Find(ctx, bson.M{}) if err != nil { log.Fatalf("Failed to read documents: %v", err) } defer cur.Close(ctx) for cur.Next(ctx) { var elem bson.M err := cur.Decode(&elem) if err != nil { log.Fatal(err) } results = append(results, elem) } if err := cur.Err(); err != nil { log.Fatal(err) } fmt.Println("Documents read from database:", results) } func insertAndRemoveRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } result, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } filter := bson.M{"name": bson.M{"$in": []string{"hahahaa0"}}} deleteResult, err := collection.DeleteMany(ctx, filter) if err != nil { log.Fatalf("Failed to delete documents: %v", err) } fmt.Printf("Deleted %d documents\n", deleteResult.DeletedCount) } 完整代码 package main import ( "context" "flag" "fmt" "log" "math/rand" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" ) // 定义命令行参数 var uri string var createUserFlag bool var insertFlag bool var readFlag bool var removeFlag bool var numDocuments int func init() { // 默认URI设置为localhost,用户可以通过命令行参数--uri来覆盖 flag.StringVar(&uri, "uri", "mongodb://localhost:27017", "MongoDB connection URI") flag.BoolVar(&createUserFlag, "user", false, "Test creating a random user") flag.BoolVar(&insertFlag, "insert", false, "Test inserting random data") flag.BoolVar(&readFlag, "read", false, "Test inserting and reading random data") flag.BoolVar(&removeFlag, "remove", false, "Test inserting and removing random data") flag.IntVar(&numDocuments, "num", 1, "Number of random documents to insert") } func main() { flag.Parse() // 解析命令行参数 // 创建MongoDB客户端 client, err := mongo.NewClient(options.Client().ApplyURI(uri)) if err != nil { log.Fatalf("Failed to create MongoDB client: %v", err) } // 设置连接超时 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 连接到MongoDB集群 err = client.Connect(ctx) if err != nil { log.Fatalf("Failed to connect to MongoDB cluster: %v", err) } defer client.Disconnect(ctx) // 确认连接成功,发送ping命令 err = client.Ping(ctx, readpref.Primary()) if err != nil { log.Fatalf("Failed to ping MongoDB cluster: %v", err) } fmt.Println("Connected to MongoDB cluster successfully!") if createUserFlag { createRandomUser(client) } if insertFlag { insertRandomDocuments(client, numDocuments) } if readFlag { insertAndReadRandomDocuments(client, numDocuments) } if removeFlag { insertAndRemoveRandomDocuments(client, numDocuments) } } func createRandomUser(client *mongo.Client) { collection := client.Database("admin").Collection("system.users") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() rand.Seed(time.Now().UnixNano()) username := fmt.Sprintf("user%d", rand.Int()) password := fmt.Sprintf("password%d", rand.Int()) user := bson.M{"user": username, "pwd": password, "roles": []bson.M{{"role": "readWrite", "db": "testdb"}}} _, err := collection.InsertOne(ctx, user) if err != nil { log.Fatalf("Failed to create user: %v", err) } fmt.Printf("User created with username: %s and password: %s\n", username, password) } func insertRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } _, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } fmt.Printf("Inserted %d documents with IDs: %v\n", len(result.InsertedIDs), result.InsertedIDs) } func insertAndReadRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } _, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } var results []bson.M cur, err := collection.Find(ctx, bson.M{}) if err != nil { log.Fatalf("Failed to read documents: %v", err) } defer cur.Close(ctx) for cur.Next(ctx) { var elem bson.M err := cur.Decode(&elem) if err != nil { log.Fatal(err) } results = append(results, elem) } if err := cur.Err(); err != nil { log.Fatal(err) } fmt.Println("Documents read from database:", results) } func insertAndRemoveRandomDocuments(client *mongo.Client, num int) { collection := client.Database("testdb").Collection("testdata") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var documents []interface{} for i := 0; i < num; i++ { documents = append(documents, bson.M{"name": fmt.Sprintf("hahahaa%d", i), "age": rand.Intn(100)}) } _, err := collection.InsertMany(ctx, documents) if err != nil { log.Fatalf("Failed to insert documents: %v", err) } filter := bson.M{"name": bson.M{"$in": []string{"hahahaa0"}}} deleteResult, err := collection.DeleteMany(ctx, filter) if err != nil { log.Fatalf("Failed to delete documents: %v", err) } fmt.Printf("Deleted %d documents\n", deleteResult.DeletedCount) } 编译二进制 编译命令 编译单个文件:go build -o myapp myapp.go。 编译模块:go build -mod=readonly -o myapp ./...。 编译交叉平台:GOOS=windows GOARCH=amd64 go build -o myapp_windows_amd64 myapp.go。 编译优化 优化代码:避免不必要的包依赖,使用 Go 的性能分析工具(如 pprof),优化数据结构和算法。 优化编译选项:使用-gcflags="all=-N -l"禁用垃圾回收和内联优化,使用-buildmode=pie生成位置无关可执行文件。 优化构建工具:使用 Makefile 或构建系统(如 Bazel、Maven)自动化编译过程,使用并行编译提高编译速度。 测试效果 使用上一篇博客部署的集群测试, 拿到mongos的ip: 10.244.2.2: ...

十一月 29, 2024 · 7 分钟 · 1336 字 · zhu733756