155 lines
3.9 KiB
Go
155 lines
3.9 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
const (
|
|
openAIHTTPRequestRetryMaxRetries = 3
|
|
openAIHTTPRequestRetryBaseDelay = 150 * time.Millisecond
|
|
)
|
|
|
|
type openAIUpstreamRequestBuilder func(context.Context) (*http.Request, error)
|
|
|
|
func (s *OpenAIGatewayService) doOpenAIUpstreamWithRequestRetry(
|
|
ctx context.Context,
|
|
c *gin.Context,
|
|
account *Account,
|
|
proxyURL string,
|
|
passthrough bool,
|
|
buildReq openAIUpstreamRequestBuilder,
|
|
) (*http.Response, error) {
|
|
if buildReq == nil {
|
|
return nil, errors.New("missing upstream request builder")
|
|
}
|
|
if account == nil {
|
|
return nil, errors.New("missing account")
|
|
}
|
|
|
|
attempts := openAIHTTPRequestRetryMaxRetries + 1
|
|
var lastErr error
|
|
startedAt := time.Now()
|
|
for attempt := 1; attempt <= attempts; attempt++ {
|
|
upstreamCtx, releaseUpstreamCtx := detachUpstreamContext(ctx)
|
|
req, err := buildReq(upstreamCtx)
|
|
releaseUpstreamCtx()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
|
SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(startedAt).Milliseconds())
|
|
if err == nil {
|
|
return resp, nil
|
|
}
|
|
|
|
lastErr = err
|
|
s.recordOpenAIHTTPRequestErrorAttempt(c, account, passthrough, attempt, attempts, err)
|
|
if !isRetryableOpenAIHTTPRequestError(err) || attempt >= attempts {
|
|
break
|
|
}
|
|
time.Sleep(openAIHTTPRequestRetryDelay(attempt))
|
|
}
|
|
|
|
if isRetryableOpenAIHTTPRequestError(lastErr) {
|
|
return nil, newOpenAIHTTPRequestFailoverError(lastErr)
|
|
}
|
|
return nil, fmt.Errorf("upstream request failed: %s", sanitizeUpstreamErrorMessage(lastErr.Error()))
|
|
}
|
|
|
|
func (s *OpenAIGatewayService) recordOpenAIHTTPRequestErrorAttempt(c *gin.Context, account *Account, passthrough bool, attempt, attempts int, err error) {
|
|
if c == nil || err == nil {
|
|
return
|
|
}
|
|
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
|
setOpsUpstreamError(c, 0, safeErr, "")
|
|
kind := "request_error"
|
|
if attempt >= attempts {
|
|
kind = "request_error:retry_exhausted"
|
|
}
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: 0,
|
|
Passthrough: passthrough,
|
|
Kind: kind,
|
|
Message: safeErr,
|
|
Detail: fmt.Sprintf("attempt=%d max_retries=%d", attempt, openAIHTTPRequestRetryMaxRetries),
|
|
})
|
|
}
|
|
|
|
func openAIHTTPRequestRetryDelay(attempt int) time.Duration {
|
|
if attempt <= 0 {
|
|
return openAIHTTPRequestRetryBaseDelay
|
|
}
|
|
delay := openAIHTTPRequestRetryBaseDelay << (attempt - 1)
|
|
if delay > time.Second {
|
|
return time.Second
|
|
}
|
|
return delay
|
|
}
|
|
|
|
func isRetryableOpenAIHTTPRequestError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
|
|
return true
|
|
}
|
|
if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.ECONNREFUSED) || errors.Is(err, syscall.ETIMEDOUT) {
|
|
return true
|
|
}
|
|
var netErr net.Error
|
|
if errors.As(err, &netErr) && netErr.Timeout() {
|
|
return true
|
|
}
|
|
msg := strings.ToLower(err.Error())
|
|
retryableMarkers := []string{
|
|
"connection reset by peer",
|
|
"connection refused",
|
|
"unexpected eof",
|
|
"server closed idle connection",
|
|
"use of closed network connection",
|
|
"broken pipe",
|
|
"connection aborted",
|
|
"tls: bad record mac",
|
|
"tls: use of closed connection",
|
|
"http2: client connection lost",
|
|
}
|
|
for _, marker := range retryableMarkers {
|
|
if strings.Contains(msg, marker) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func newOpenAIHTTPRequestFailoverError(err error) *UpstreamFailoverError {
|
|
message := "Upstream request failed"
|
|
if err != nil {
|
|
message = sanitizeUpstreamErrorMessage(err.Error())
|
|
}
|
|
body, _ := json.Marshal(gin.H{
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": message,
|
|
},
|
|
})
|
|
return &UpstreamFailoverError{
|
|
StatusCode: http.StatusBadGateway,
|
|
ResponseBody: body,
|
|
}
|
|
}
|