742 lines
25 KiB
Go
742 lines
25 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
mathrand "math/rand"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
kiropkg "github.com/Wei-Shaw/sub2api/internal/pkg/kiro"
|
|
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/tidwall/gjson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type kiroEndpointConfig struct {
|
|
URL string
|
|
AmzTarget string
|
|
Name string
|
|
}
|
|
|
|
const kiroInvalidModelTempUnschedDuration = time.Minute
|
|
|
|
const (
|
|
kiroRetryBaseDelay = 200 * time.Millisecond
|
|
kiroRetryMaxDelay = 2 * time.Second
|
|
)
|
|
|
|
var kiroRetrySleep = sleepWithContext
|
|
|
|
func kiroRetryBackoffDelay(attempt int) time.Duration {
|
|
if attempt < 0 {
|
|
attempt = 0
|
|
}
|
|
delay := kiroRetryBaseDelay * time.Duration(1<<attempt)
|
|
if delay > kiroRetryMaxDelay {
|
|
delay = kiroRetryMaxDelay
|
|
}
|
|
jitterMax := delay / 4
|
|
if jitterMax <= 0 {
|
|
return delay
|
|
}
|
|
return delay + time.Duration(mathrand.Int63n(int64(jitterMax)+1))
|
|
}
|
|
|
|
func sleepKiroRetry(ctx context.Context, attempt int) error {
|
|
return kiroRetrySleep(ctx, kiroRetryBackoffDelay(attempt))
|
|
}
|
|
|
|
func (s *GatewayService) forwardKiroMessages(ctx context.Context, c *gin.Context, account *Account, parsed *ParsedRequest, startTime time.Time) (*ForwardResult, error) {
|
|
if account == nil || parsed == nil {
|
|
return nil, fmt.Errorf("kiro forward: missing account or request")
|
|
}
|
|
|
|
originalModel := parsed.Model
|
|
mappedModel := originalModel
|
|
if next := account.GetMappedModel(originalModel); next != "" {
|
|
mappedModel = next
|
|
}
|
|
body := parsed.Body
|
|
if mappedModel != originalModel {
|
|
body = s.replaceModelInBody(body, mappedModel)
|
|
}
|
|
logger.L().Debug("gateway forward_kiro_messages: request prepared",
|
|
zap.Int64("account_id", account.ID),
|
|
zap.String("auth_method", strings.TrimSpace(account.GetCredential("auth_method"))),
|
|
zap.String("requested_model", originalModel),
|
|
zap.String("mapped_model", mappedModel),
|
|
zap.Bool("has_profile_arn", strings.TrimSpace(account.GetCredential("profile_arn")) != ""),
|
|
)
|
|
|
|
if s.shouldEmulateWebSearch(ctx, account, parsed.GroupID, body) {
|
|
parsedForEmulation := *parsed
|
|
parsedForEmulation.Body = body
|
|
return s.handleWebSearchEmulation(ctx, c, account, &parsedForEmulation)
|
|
}
|
|
|
|
if parsed.Stream {
|
|
resp, _, err := s.openKiroAnthropicStreamResponse(ctx, account, body, mappedModel, originalModel, c.Request.Header)
|
|
if err != nil {
|
|
var failoverErr *UpstreamFailoverError
|
|
if errors.As(err, &failoverErr) {
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: failoverErr.StatusCode,
|
|
Kind: "failover",
|
|
Message: sanitizeUpstreamErrorMessage(err.Error()),
|
|
})
|
|
return nil, failoverErr
|
|
}
|
|
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
|
setOpsUpstreamError(c, 0, safeErr, "")
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: 0,
|
|
Kind: "request_error",
|
|
Message: safeErr,
|
|
})
|
|
c.JSON(http.StatusBadGateway, gin.H{
|
|
"type": "error",
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": "Upstream request failed",
|
|
},
|
|
})
|
|
return nil, fmt.Errorf("kiro upstream request failed: %s", safeErr)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
if resp.StatusCode >= 400 {
|
|
return nil, s.handleKiroHTTPError(ctx, resp, c, account, mappedModel, body)
|
|
}
|
|
upstreamModel := normalizeModelNameForPricing(kiropkg.MapModel(mappedModel))
|
|
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, mappedModel, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if streamResult.usage == nil {
|
|
streamResult.usage = &ClaudeUsage{}
|
|
}
|
|
return &ForwardResult{
|
|
RequestID: resp.Header.Get("x-request-id"),
|
|
Usage: *streamResult.usage,
|
|
Model: originalModel,
|
|
UpstreamModel: upstreamModel,
|
|
Stream: true,
|
|
Duration: time.Since(startTime),
|
|
FirstTokenMs: streamResult.firstTokenMs,
|
|
ClientDisconnect: streamResult.clientDisconnect,
|
|
}, nil
|
|
}
|
|
|
|
token, tokenType, err := s.GetAccessToken(ctx, account)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if tokenType != "oauth" {
|
|
return nil, fmt.Errorf("kiro requires oauth token, got %s", tokenType)
|
|
}
|
|
if isOnlyWebSearchToolInBody(body) {
|
|
webSearchResult, webSearchErr := s.executeKiroWebSearch(ctx, account, body, mappedModel, originalModel, token, c.Request.Header)
|
|
switch {
|
|
case errors.Is(webSearchErr, errKiroWebSearchFallback):
|
|
case webSearchErr == nil:
|
|
upstreamModel := normalizeModelNameForPricing(kiropkg.MapModel(mappedModel))
|
|
c.Header("Content-Type", "application/json")
|
|
if webSearchResult.RequestID != "" {
|
|
c.Header("x-request-id", webSearchResult.RequestID)
|
|
}
|
|
c.Data(http.StatusOK, "application/json", webSearchResult.ResponseBody)
|
|
return &ForwardResult{
|
|
RequestID: webSearchResult.RequestID,
|
|
Usage: webSearchResult.Usage,
|
|
Model: originalModel,
|
|
UpstreamModel: upstreamModel,
|
|
Stream: false,
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
default:
|
|
var httpErr *kiroWebSearchHTTPError
|
|
if errors.As(webSearchErr, &httpErr) && httpErr.Response != nil {
|
|
return nil, s.handleKiroHTTPError(ctx, httpErr.Response, c, account, mappedModel, body)
|
|
}
|
|
var failoverErr *UpstreamFailoverError
|
|
if errors.As(webSearchErr, &failoverErr) {
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: failoverErr.StatusCode,
|
|
Kind: "failover",
|
|
Message: sanitizeUpstreamErrorMessage(webSearchErr.Error()),
|
|
})
|
|
return nil, failoverErr
|
|
}
|
|
safeErr := sanitizeUpstreamErrorMessage(webSearchErr.Error())
|
|
c.JSON(http.StatusBadGateway, gin.H{
|
|
"type": "error",
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": "Upstream request failed",
|
|
},
|
|
})
|
|
return nil, fmt.Errorf("kiro upstream request failed: %s", safeErr)
|
|
}
|
|
}
|
|
|
|
inputTokens := estimateKiroInputTokens(body)
|
|
resp, requestCtx, err := s.executeKiroUpstream(ctx, account, body, mappedModel, originalModel, token, c.Request.Header)
|
|
if err != nil {
|
|
var failoverErr *UpstreamFailoverError
|
|
if errors.As(err, &failoverErr) {
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: failoverErr.StatusCode,
|
|
Kind: "failover",
|
|
Message: sanitizeUpstreamErrorMessage(err.Error()),
|
|
})
|
|
return nil, failoverErr
|
|
}
|
|
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
|
c.JSON(http.StatusBadGateway, gin.H{
|
|
"type": "error",
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": "Upstream request failed",
|
|
},
|
|
})
|
|
return nil, fmt.Errorf("kiro upstream request failed: %s", safeErr)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
if resp.StatusCode >= 400 {
|
|
return nil, s.handleKiroHTTPError(ctx, resp, c, account, mappedModel, body)
|
|
}
|
|
|
|
parseResult, err := kiropkg.ParseNonStreamingEventStreamWithContext(resp.Body, mappedModel, requestCtx)
|
|
if err != nil {
|
|
c.JSON(http.StatusBadGateway, gin.H{
|
|
"type": "error",
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": "Failed to parse Kiro upstream response",
|
|
},
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
c.Header("Content-Type", "application/json")
|
|
if requestID := resp.Header.Get("x-request-id"); requestID != "" {
|
|
c.Header("x-request-id", requestID)
|
|
}
|
|
c.Data(http.StatusOK, "application/json", parseResult.ResponseBody)
|
|
|
|
upstreamModel := normalizeModelNameForPricing(kiropkg.MapModel(mappedModel))
|
|
|
|
return &ForwardResult{
|
|
RequestID: resp.Header.Get("x-request-id"),
|
|
Usage: kiroUsageToClaude(parseResult.Usage, inputTokens),
|
|
Model: originalModel,
|
|
UpstreamModel: upstreamModel,
|
|
Stream: false,
|
|
Duration: time.Since(startTime),
|
|
}, nil
|
|
}
|
|
|
|
func (s *GatewayService) openKiroAnthropicStreamResponse(ctx context.Context, account *Account, anthropicBody []byte, mappedModel, requestModel string, headers http.Header) (*http.Response, int, error) {
|
|
token, tokenType, err := s.GetAccessToken(ctx, account)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if tokenType != "oauth" {
|
|
return nil, 0, fmt.Errorf("kiro requires oauth token, got %s", tokenType)
|
|
}
|
|
|
|
inputTokens := estimateKiroInputTokens(anthropicBody)
|
|
if isOnlyWebSearchToolInBody(anthropicBody) {
|
|
pr, pw := io.Pipe()
|
|
headers := make(http.Header)
|
|
headers.Set("Content-Type", "text/event-stream")
|
|
go func() {
|
|
streamErr := s.streamKiroWebSearchAsAnthropic(ctx, account, anthropicBody, mappedModel, requestModel, token, inputTokens, headers, pw)
|
|
if streamErr != nil {
|
|
_ = pw.CloseWithError(streamErr)
|
|
return
|
|
}
|
|
_ = pw.Close()
|
|
}()
|
|
return &http.Response{
|
|
StatusCode: http.StatusOK,
|
|
Header: headers,
|
|
Body: pr,
|
|
}, inputTokens, nil
|
|
}
|
|
|
|
resp, requestCtx, err := s.executeKiroUpstream(ctx, account, anthropicBody, mappedModel, requestModel, token, headers)
|
|
if err != nil {
|
|
var failoverErr *UpstreamFailoverError
|
|
if errors.As(err, &failoverErr) {
|
|
return nil, inputTokens, err
|
|
}
|
|
return nil, inputTokens, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return resp, inputTokens, nil
|
|
}
|
|
|
|
pr, pw := io.Pipe()
|
|
wrappedHeaders := resp.Header.Clone()
|
|
wrappedHeaders.Set("Content-Type", "text/event-stream")
|
|
if requestID := buildKiroRequestID(resp); requestID != "" {
|
|
wrappedHeaders.Set("x-request-id", requestID)
|
|
}
|
|
|
|
go func() {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
_, streamErr := kiropkg.StreamEventStreamAsAnthropicWithContext(ctx, resp.Body, pw, mappedModel, inputTokens, requestCtx)
|
|
if streamErr != nil {
|
|
_ = pw.CloseWithError(streamErr)
|
|
return
|
|
}
|
|
_ = pw.Close()
|
|
}()
|
|
|
|
return &http.Response{
|
|
StatusCode: resp.StatusCode,
|
|
Header: wrappedHeaders,
|
|
Body: pr,
|
|
}, inputTokens, nil
|
|
}
|
|
|
|
func (s *GatewayService) executeKiroUpstream(ctx context.Context, account *Account, anthropicBody []byte, mappedModel, requestModel, token string, headers http.Header) (*http.Response, kiropkg.KiroRequestContext, error) {
|
|
var requestCtx kiropkg.KiroRequestContext
|
|
if err := s.checkAndWaitKiroCooldown(ctx, buildKiroAccountKey(account)); err != nil {
|
|
if failoverErr := asKiroCooldownFailoverError(err); failoverErr != nil {
|
|
return nil, requestCtx, failoverErr
|
|
}
|
|
return nil, requestCtx, err
|
|
}
|
|
|
|
modelID := kiropkg.MapModel(mappedModel)
|
|
currentToken := token
|
|
buildResult, err := buildKiroPayloadForAccountWithRepo(ctx, s.accountRepo, account, anthropicBody, modelID, currentToken, requestModel, headers)
|
|
if err != nil {
|
|
return nil, requestCtx, err
|
|
}
|
|
payload := buildResult.Payload
|
|
requestCtx = buildResult.Context
|
|
|
|
endpoints := buildKiroEndpoints(account)
|
|
proxyURL := kiroProxyURL(account)
|
|
tlsProfile := s.tlsFPProfileService.ResolveTLSProfile(account)
|
|
accountKey := buildKiroAccountKey(account)
|
|
maxRetries := 2
|
|
|
|
for idx, endpoint := range endpoints {
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
req, err := newKiroJSONRequest(ctx, endpoint.URL, payload, currentToken, accountKey, buildKiroMachineID(account), endpoint.AmzTarget, account)
|
|
if err != nil {
|
|
return nil, requestCtx, err
|
|
}
|
|
|
|
resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, tlsProfile)
|
|
if err != nil {
|
|
if attempt < maxRetries {
|
|
if sleepErr := sleepKiroRetry(ctx, attempt); sleepErr != nil {
|
|
return nil, requestCtx, sleepErr
|
|
}
|
|
continue
|
|
}
|
|
return nil, requestCtx, err
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusTooManyRequests {
|
|
cooldown, err := s.markKiro429(ctx, accountKey)
|
|
if err != nil {
|
|
_ = resp.Body.Close()
|
|
return nil, requestCtx, err
|
|
}
|
|
if idx+1 < len(endpoints) {
|
|
_ = resp.Body.Close()
|
|
if sleepErr := sleepKiroRetry(ctx, attempt); sleepErr != nil {
|
|
return nil, requestCtx, sleepErr
|
|
}
|
|
break
|
|
}
|
|
resp.Header.Set("x-kiro-cooldown", cooldown.String())
|
|
return resp, requestCtx, nil
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusRequestTimeout || (resp.StatusCode >= 500 && resp.StatusCode < 600) {
|
|
if attempt < maxRetries {
|
|
_ = resp.Body.Close()
|
|
if sleepErr := sleepKiroRetry(ctx, attempt); sleepErr != nil {
|
|
return nil, requestCtx, sleepErr
|
|
}
|
|
continue
|
|
}
|
|
if idx+1 < len(endpoints) {
|
|
_ = resp.Body.Close()
|
|
if sleepErr := sleepKiroRetry(ctx, attempt); sleepErr != nil {
|
|
return nil, requestCtx, sleepErr
|
|
}
|
|
break
|
|
}
|
|
return resp, requestCtx, nil
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusPaymentRequired {
|
|
respBody, readErr := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if readErr != nil {
|
|
return nil, requestCtx, readErr
|
|
}
|
|
classification := classifyKiroHTTPError(resp.StatusCode, string(respBody))
|
|
if classification.Category == kiroErrorMonthlyRequest {
|
|
s.markKiroMonthlyRequestCountRateLimited(ctx, account, string(respBody))
|
|
}
|
|
return nil, requestCtx, &UpstreamFailoverError{
|
|
StatusCode: resp.StatusCode,
|
|
ResponseBody: respBody,
|
|
ResponseHeaders: resp.Header.Clone(),
|
|
}
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
|
|
respBody, readErr := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if readErr != nil {
|
|
return nil, requestCtx, readErr
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusForbidden && isKiroSuspendedBody(respBody) {
|
|
if _, err := s.markKiroSuspended(ctx, accountKey); err != nil {
|
|
return nil, requestCtx, err
|
|
}
|
|
resetHTTPResponseBody(resp, respBody)
|
|
return resp, requestCtx, nil
|
|
}
|
|
|
|
if s.kiroTokenProvider != nil && (resp.StatusCode == http.StatusUnauthorized || isKiroTokenErrorBody(respBody)) && attempt < maxRetries {
|
|
refreshedToken, refreshErr := s.kiroTokenProvider.ForceRefreshAccessToken(ctx, account)
|
|
if refreshErr == nil && strings.TrimSpace(refreshedToken) != "" {
|
|
currentToken = refreshedToken
|
|
accountKey = buildKiroAccountKey(account)
|
|
buildResult, err = buildKiroPayloadForAccountWithRepo(ctx, s.accountRepo, account, anthropicBody, modelID, currentToken, requestModel, headers)
|
|
if err != nil {
|
|
return nil, requestCtx, err
|
|
}
|
|
payload = buildResult.Payload
|
|
requestCtx = buildResult.Context
|
|
if sleepErr := sleepKiroRetry(ctx, attempt); sleepErr != nil {
|
|
return nil, requestCtx, sleepErr
|
|
}
|
|
continue
|
|
}
|
|
if refreshErr != nil && isNonRetryableRefreshError(refreshErr) {
|
|
resetHTTPResponseBody(resp, respBody)
|
|
return resp, requestCtx, nil
|
|
}
|
|
}
|
|
|
|
if classifyKiroHTTPError(resp.StatusCode, string(respBody)).Category == kiroErrorAuthError {
|
|
s.markKiroAuthTemporarilyUnavailable(ctx, account, resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
resetHTTPResponseBody(resp, respBody)
|
|
return resp, requestCtx, nil
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusBadRequest {
|
|
respBody, readErr := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
if readErr != nil {
|
|
return nil, requestCtx, readErr
|
|
}
|
|
classification := classifyKiroHTTPError(resp.StatusCode, string(respBody))
|
|
logKiroBadRequestClassification(classification, account, mappedModel, resp.Header, respBody)
|
|
resetHTTPResponseBody(resp, respBody)
|
|
return resp, requestCtx, nil
|
|
}
|
|
|
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
if err := s.markKiroSuccess(ctx, accountKey); err != nil {
|
|
_ = resp.Body.Close()
|
|
return nil, requestCtx, err
|
|
}
|
|
}
|
|
return resp, requestCtx, nil
|
|
}
|
|
}
|
|
return nil, requestCtx, fmt.Errorf("kiro upstream endpoints exhausted")
|
|
}
|
|
|
|
func buildKiroEndpoints(account *Account) []kiroEndpointConfig {
|
|
region := kiroAPIRegion(account)
|
|
return []kiroEndpointConfig{
|
|
{
|
|
URL: fmt.Sprintf("https://q.%s.amazonaws.com/generateAssistantResponse", region),
|
|
Name: "AmazonQ",
|
|
},
|
|
}
|
|
}
|
|
|
|
func buildKiroPayloadForAccount(ctx context.Context, account *Account, anthropicBody []byte, modelID, token, requestModel string, headers http.Header) ([]byte, error) { //nolint:unused // exercised by package tests as a narrow payload helper.
|
|
result, err := buildKiroPayloadForAccountWithRepo(ctx, nil, account, anthropicBody, modelID, token, requestModel, headers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.Payload, nil
|
|
}
|
|
|
|
func buildKiroPayloadForAccountWithRepo(ctx context.Context, repo AccountRepository, account *Account, anthropicBody []byte, modelID, token, requestModel string, headers http.Header) (*kiropkg.KiroBuildResult, error) {
|
|
profileArn := resolveKiroPayloadProfileArn(account)
|
|
anthropicBody = prepareKiroPayloadBodyForRequestModel(anthropicBody, requestModel)
|
|
return kiropkg.BuildKiroPayloadWithContext(anthropicBody, modelID, profileArn, "AI_EDITOR", headers)
|
|
}
|
|
|
|
func prepareKiroPayloadBodyForRequestModel(anthropicBody []byte, requestModel string) []byte {
|
|
requestModel = strings.TrimSpace(requestModel)
|
|
if requestModel == "" || !strings.Contains(strings.ToLower(requestModel), "thinking") {
|
|
return anthropicBody
|
|
}
|
|
bodyModel := strings.TrimSpace(gjson.GetBytes(anthropicBody, "model").String())
|
|
if bodyModel == "" || strings.EqualFold(bodyModel, requestModel) || strings.Contains(strings.ToLower(bodyModel), "thinking") {
|
|
return anthropicBody
|
|
}
|
|
if next, ok := setJSONValueBytes(anthropicBody, "model", requestModel); ok {
|
|
return next
|
|
}
|
|
return anthropicBody
|
|
}
|
|
|
|
func (s *GatewayService) markKiroAuthTemporarilyUnavailable(ctx context.Context, account *Account, statusCode int, body string) {
|
|
if s == nil || s.accountRepo == nil || account == nil {
|
|
return
|
|
}
|
|
until := time.Now().Add(10 * time.Minute)
|
|
reason := fmt.Sprintf("kiro auth failure (%d): %s", statusCode, strings.TrimSpace(body))
|
|
_ = s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason)
|
|
}
|
|
|
|
func (s *GatewayService) markKiroMonthlyRequestCountRateLimited(ctx context.Context, account *Account, body string) {
|
|
if s == nil || s.accountRepo == nil || account == nil {
|
|
return
|
|
}
|
|
resetAt := nextKiroMonthlyResetUTC(time.Now())
|
|
if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetAt); err != nil {
|
|
logger.L().Warn("kiro monthly request count rate-limit failed",
|
|
zap.Int64("account_id", account.ID),
|
|
zap.Time("reset_at", resetAt),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
reason := "kiro monthly request count exhausted (402): MONTHLY_REQUEST_COUNT"
|
|
if trimmed := strings.TrimSpace(body); trimmed != "" {
|
|
reason = fmt.Sprintf("%s body=%s", reason, truncateForLog([]byte(trimmed), 512))
|
|
}
|
|
logger.L().Warn("kiro monthly request count rate-limited",
|
|
zap.Int64("account_id", account.ID),
|
|
zap.Time("reset_at", resetAt),
|
|
zap.String("reason", reason),
|
|
)
|
|
}
|
|
|
|
func nextKiroMonthlyResetUTC(now time.Time) time.Time {
|
|
utc := now.UTC()
|
|
year, month, _ := utc.Date()
|
|
return time.Date(year, month+1, 1, 0, 0, 0, 0, time.UTC)
|
|
}
|
|
|
|
func resetHTTPResponseBody(resp *http.Response, body []byte) {
|
|
if resp == nil {
|
|
return
|
|
}
|
|
resp.Body = io.NopCloser(bytes.NewReader(body))
|
|
resp.ContentLength = int64(len(body))
|
|
}
|
|
|
|
func estimateKiroInputTokens(body []byte) int {
|
|
if len(body) == 0 {
|
|
return 0
|
|
}
|
|
if tokens := gjson.GetBytes(body, "metadata.input_tokens").Int(); tokens > 0 {
|
|
return int(tokens)
|
|
}
|
|
tokens := len(body) / 4
|
|
if tokens == 0 {
|
|
return 1
|
|
}
|
|
return tokens
|
|
}
|
|
|
|
func kiroUsageToClaude(usage kiropkg.Usage, fallbackInput int) ClaudeUsage {
|
|
inputTokens := usage.InputTokens
|
|
if inputTokens == 0 {
|
|
inputTokens = fallbackInput
|
|
}
|
|
return ClaudeUsage{
|
|
InputTokens: inputTokens,
|
|
OutputTokens: usage.OutputTokens,
|
|
CacheReadInputTokens: usage.CacheReadInputTokens,
|
|
CacheCreationInputTokens: usage.CacheCreationInputTokens,
|
|
}
|
|
}
|
|
|
|
func (s *GatewayService) markKiroInvalidModelRateLimited(ctx context.Context, account *Account, mappedModel string) {
|
|
if s == nil || s.accountRepo == nil || account == nil || account.Type != AccountTypeOAuth {
|
|
return
|
|
}
|
|
resetAt := time.Now().Add(kiroInvalidModelTempUnschedDuration)
|
|
if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetAt); err != nil {
|
|
logger.L().Warn("kiro invalid model rate-limit failed",
|
|
zap.Int64("account_id", account.ID),
|
|
zap.String("mapped_model", strings.TrimSpace(mappedModel)),
|
|
zap.Time("reset_at", resetAt),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
logger.L().Warn("kiro invalid model rate-limited",
|
|
zap.Int64("account_id", account.ID),
|
|
zap.String("mapped_model", strings.TrimSpace(mappedModel)),
|
|
zap.Time("reset_at", resetAt),
|
|
)
|
|
}
|
|
|
|
func (s *GatewayService) handleKiroHTTPError(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, mappedModel string, requestBody []byte) error {
|
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
|
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
|
if upstreamMsg == "" {
|
|
upstreamMsg = strings.TrimSpace(string(respBody))
|
|
}
|
|
classification := classifyKiroHTTPError(resp.StatusCode, string(respBody))
|
|
if resp.StatusCode == http.StatusBadRequest {
|
|
logKiroBadRequestClassification(classification, account, "", resp.Header, respBody)
|
|
}
|
|
if classification.Category == kiroErrorMonthlyRequest {
|
|
s.markKiroMonthlyRequestCountRateLimited(ctx, account, string(respBody))
|
|
}
|
|
if classification.Category == kiroErrorBadRequestInvalidModel && account != nil && account.Type == AccountTypeOAuth {
|
|
s.markKiroInvalidModelRateLimited(ctx, account, mappedModel)
|
|
event := s.buildKiroInvalidModelUpstreamEvent(account, resp, upstreamMsg, mappedModel, requestBody, c)
|
|
appendOpsUpstreamError(c, event)
|
|
return &UpstreamFailoverError{
|
|
StatusCode: resp.StatusCode,
|
|
ResponseBody: respBody,
|
|
ResponseHeaders: resp.Header.Clone(),
|
|
}
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusPaymentRequired || s.shouldFailoverUpstreamError(resp.StatusCode) {
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: resp.StatusCode,
|
|
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
|
Kind: "failover",
|
|
Message: upstreamMsg,
|
|
})
|
|
if s.rateLimitService != nil {
|
|
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
|
}
|
|
return &UpstreamFailoverError{
|
|
StatusCode: resp.StatusCode,
|
|
ResponseBody: respBody,
|
|
ResponseHeaders: resp.Header.Clone(),
|
|
}
|
|
}
|
|
|
|
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, "")
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: resp.StatusCode,
|
|
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
|
Kind: "http_error",
|
|
Message: upstreamMsg,
|
|
})
|
|
c.JSON(mapUpstreamStatusCode(resp.StatusCode), gin.H{
|
|
"type": "error",
|
|
"error": gin.H{
|
|
"type": "upstream_error",
|
|
"message": coalesceKiroErrorMessage(resp.StatusCode, upstreamMsg),
|
|
},
|
|
})
|
|
return fmt.Errorf("kiro upstream error: %d %s", resp.StatusCode, upstreamMsg)
|
|
}
|
|
|
|
func (s *GatewayService) buildKiroInvalidModelUpstreamEvent(account *Account, resp *http.Response, upstreamMsg, mappedModel string, requestBody []byte, c *gin.Context) OpsUpstreamErrorEvent {
|
|
_ = s
|
|
requestedModel := strings.TrimSpace(gjson.GetBytes(requestBody, "model").String())
|
|
hasTools := gjson.GetBytes(requestBody, "tools").Exists()
|
|
hasAdaptiveThinking := strings.EqualFold(strings.TrimSpace(gjson.GetBytes(requestBody, "thinking.type").String()), "adaptive")
|
|
hasContext1MBeta := false
|
|
if c != nil {
|
|
hasContext1MBeta = strings.Contains(c.GetHeader("Anthropic-Beta"), "context-1m")
|
|
}
|
|
return OpsUpstreamErrorEvent{
|
|
Platform: account.Platform,
|
|
AccountID: account.ID,
|
|
AccountName: account.Name,
|
|
UpstreamStatusCode: resp.StatusCode,
|
|
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
|
Kind: "failover",
|
|
Message: upstreamMsg,
|
|
RequestedModel: requestedModel,
|
|
MappedModel: strings.TrimSpace(mappedModel),
|
|
KiroModelID: kiropkg.MapModel(mappedModel),
|
|
HasTools: hasTools,
|
|
HasAdaptiveThinking: hasAdaptiveThinking,
|
|
HasContext1MBeta: hasContext1MBeta,
|
|
}
|
|
}
|
|
|
|
func logKiroBadRequestClassification(classification kiroErrorClassification, account *Account, model string, headers http.Header, body []byte) {
|
|
if classification.StatusCode != http.StatusBadRequest {
|
|
return
|
|
}
|
|
var accountID int64
|
|
if account != nil {
|
|
accountID = account.ID
|
|
}
|
|
logger.L().Warn("kiro upstream bad request classified",
|
|
zap.String("category", classification.Category),
|
|
zap.Int("status", classification.StatusCode),
|
|
zap.Int64("account_id", accountID),
|
|
zap.String("model", strings.TrimSpace(model)),
|
|
zap.String("request_id", headers.Get("x-request-id")),
|
|
zap.String("body_excerpt", truncateForLog(body, 512)),
|
|
)
|
|
}
|
|
|
|
func coalesceKiroErrorMessage(statusCode int, upstreamMsg string) string {
|
|
if upstreamMsg != "" {
|
|
return upstreamMsg
|
|
}
|
|
switch statusCode {
|
|
case http.StatusTooManyRequests:
|
|
return "Rate limit exceeded"
|
|
case http.StatusForbidden:
|
|
return "Access denied"
|
|
case http.StatusUnauthorized:
|
|
return "Authentication failed"
|
|
default:
|
|
return "Upstream request failed"
|
|
}
|
|
}
|