在使用GO中的SQL数据库时,确保原子能和在多步骤交易期间管理回滚可能具有挑战性。在本文中,我将指导您创建一个可重复使用和可测试的框架,以使用gon中的SQL Transactions,使用仿制药进行灵活性。
为什么我们需要SQL交易的框架?
在现实世界应用程序中,很少隔离数据库操作。考虑以下方案:
与TXN管理一起工作。
如果您正在编写数据库TXN,则在编写核心逻辑之前,可能需要考虑几个锅炉板。尽管此TXN管理由Java的Spring Boot管理,而在Java编写代码时,您从来没有打扰过这些管理,但在Golang中并非如此。下面提供了一个简单的示例
func basictxn(db *sql.db)错误{
//开始交易
TX,err:= db.begin()
如果err!= nil {
返回错误
}
defer func(){
如果r:= recover(); r!= nil {
tx.rollback()
} else如果err!= nil {
tx.rollback()
} 别的 {
tx.commit()
}
}()
//将数据插入订单表
_,err = tx.exec(“插入订单(id,customer_name,order_date)值(1,'john doe','2022-01-01')”)”)”)
如果err!= nil {
返回错误
}
返回无
}
func testsqlwriteexec_createordertxn(t *testing.t){
db:= setupDatabase()
//创建一个新的SQL Write Executor
err:= dbutils.newsqltxnexec [orderRequest,orderProcessingResponse](context.todo(),db,nil和orderRequest {customername:“ customEra”:“ customId:productid:productid:1,tentity:10})。
statefulexec(insertorder)。
statefulexec(UpdateInventory)。
StateFulexec(插入)。
犯罪()
//检查交易是否成功完成
如果err!= nil {
T.Fatal(err)
返回
}
验证transactionsuccescessful(t,db)
T.Cleanup(
func(){
清理(DB)
db.close()
},,
)
}
func basicTxn(db *sql.DB) error { // start a transaction tx, err := db.Begin() if err != nil { return err } defer func() { if r := recover(); r != nil { tx.Rollback() } else if err != nil { tx.Rollback() } else { tx.Commit() } }() // insert data into the orders table _, err = tx.Exec("INSERT INTO orders (id, customer_name, order_date) VALUES (1, 'John Doe', '2022-01-01')") if err != nil { return err } return nil }
此代码将非常精确和简洁。
如何实现核心逻辑
func TestSqlWriteExec_CreateOrderTxn(t *testing.T) { db := setupDatabase() // create a new SQL Write Executor err := dbutils.NewSqlTxnExec[OrderRequest, OrderProcessingResponse](context.TODO(), db, nil, &OrderRequest{CustomerName: "CustomerA", ProductID: 1, Quantity: 10}). StatefulExec(InsertOrder). StatefulExec(UpdateInventory). StatefulExec(InsertShipment). Commit() // check if the transaction was committed successfully if err != nil { t.Fatal(err) return } verifyTransactionSuccessful(t, db) t.Cleanup( func() { cleanup(db) db.Close() }, ) }
func InsertOrder(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error { // Insert Order result, err := txn.Exec("INSERT INTO orders (customer_name, product_id, quantity) VALUES ($1, $2, $3)", order.CustomerName, order.ProductID, order.Quantity) if err != nil { return err } // Get the inserted Order ID orderProcessing.OrderID, err = result.LastInsertId() return err } func UpdateInventory(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error { // Update Inventory if it exists and the quantity is greater than the quantity check if it exists result, err := txn.Exec("UPDATE inventory SET product_quantity = product_quantity - $1 WHERE id = $2 AND product_quantity >= $1", order.Quantity, order.ProductID) if err != nil { return err } // Get the number of rows affected rowsAffected, err := result.RowsAffected() if rowsAffected == 0 { return errors.New("Insufficient inventory") } return err } func InsertShipment(ctx context.Context, txn *sql.Tx, order *OrderRequest, orderProcessing *OrderProcessingResponse) error { // Insert Shipment result, err := txn.Exec("INSERT INTO shipping_info (customer_name, shipping_address) VALUES ($1, 'Shipping Address')", order.CustomerName) if err != nil { return err } // Get the inserted Shipping ID orderProcessing.ShippingID, err = result.LastInsertId() return err }
这两种是函数类型,将在TXN中处理某些内容。现在,在数据层中实现了这样的函数,并将其传递给执行程序类,该类负责注入args并执行函数。
这是我们存储所有txn_fn详细信息的地方,我们将拥有commit()方法来尝试进行TXN。
type TxnFn[T any] func(ctx context.Context, txn *sql.Tx, processingReq *T) error type StatefulTxnFn[T any, R any] func(ctx context.Context, txn *sql.Tx, processingReq *T, processedRes *R) error
您可以在回购中找到更多示例和测试 -
https://github.com/mahadev-k/go-utils/tree/main/main/examples
// SQL Write Executor is responsible when executing write operations // For dependent writes you may need to add the dependent data to processReq and proceed to the next function call type SqlTxnExec[T any, R any] struct { db *sql.DB txn *sql.Tx txnFns []TxnFn[T] statefulTxnFns []StatefulTxnFn[T, R] processingReq *T processedRes *R ctx context.Context err error }
让我知道是否有人希望在此基础上做出贡献并建立!
感谢您阅读这篇文章!
https://in.linkedin.com/in/mahadev-k-934520223
https://x.com/mahadev_k_
func (s *SqlTxnExec[T, R]) Commit() (err error) { defer func() { if p := recover(); p != nil { s.txn.Rollback() panic(p) } else if err != nil { err = errors.Join(err, s.txn.Rollback()) } else { err = errors.Join(err, s.txn.Commit()) } return }() for _, writeFn := range s.txnFns { if err = writeFn(s.ctx, s.txn, s.processingReq); err != nil { return } } for _, statefulWriteFn := range s.statefulTxnFns { if err = statefulWriteFn(s.ctx, s.txn, s.processingReq, s.processedRes); err != nil { return } } return }
免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。
Copyright© 2022 湘ICP备2022001581号-3