Merge pull request #2572 from wucm667/fix/openai-silent-refusal-failover
fix(openai): 识别上游静默拒绝(空流+finish_reason=stop)并触发 failover
This commit is contained in:
commit
66fca3d598
@ -1325,6 +1325,11 @@ func (h *GatewayHandler) handleConcurrencyError(c *gin.Context, err error, slotT
|
|||||||
func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) {
|
func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) {
|
||||||
statusCode := failoverErr.StatusCode
|
statusCode := failoverErr.StatusCode
|
||||||
responseBody := failoverErr.ResponseBody
|
responseBody := failoverErr.ResponseBody
|
||||||
|
if service.IsOpenAISilentRefusalErrorBody(responseBody) {
|
||||||
|
service.SetOpsUpstreamError(c, statusCode, service.OpenAISilentRefusalClientMessage(), "")
|
||||||
|
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", service.OpenAISilentRefusalClientMessage(), streamStarted)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 先检查透传规则
|
// 先检查透传规则
|
||||||
if h.errorPassthroughService != nil && len(responseBody) > 0 {
|
if h.errorPassthroughService != nil && len(responseBody) > 0 {
|
||||||
|
|||||||
@ -335,5 +335,10 @@ func (h *GatewayHandler) handleCCFailoverExhausted(c *gin.Context, lastErr *serv
|
|||||||
if lastErr != nil && lastErr.StatusCode > 0 {
|
if lastErr != nil && lastErr.StatusCode > 0 {
|
||||||
statusCode = lastErr.StatusCode
|
statusCode = lastErr.StatusCode
|
||||||
}
|
}
|
||||||
|
if lastErr != nil && service.IsOpenAISilentRefusalErrorBody(lastErr.ResponseBody) {
|
||||||
|
service.SetOpsUpstreamError(c, statusCode, service.OpenAISilentRefusalClientMessage(), "")
|
||||||
|
h.chatCompletionsErrorResponse(c, http.StatusBadGateway, "upstream_error", service.OpenAISilentRefusalClientMessage())
|
||||||
|
return
|
||||||
|
}
|
||||||
h.chatCompletionsErrorResponse(c, statusCode, "server_error", "All available accounts exhausted")
|
h.chatCompletionsErrorResponse(c, statusCode, "server_error", "All available accounts exhausted")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -310,5 +310,10 @@ func (h *GatewayHandler) handleResponsesFailoverExhausted(c *gin.Context, lastEr
|
|||||||
if lastErr != nil && lastErr.StatusCode > 0 {
|
if lastErr != nil && lastErr.StatusCode > 0 {
|
||||||
statusCode = lastErr.StatusCode
|
statusCode = lastErr.StatusCode
|
||||||
}
|
}
|
||||||
|
if lastErr != nil && service.IsOpenAISilentRefusalErrorBody(lastErr.ResponseBody) {
|
||||||
|
service.SetOpsUpstreamError(c, statusCode, service.OpenAISilentRefusalClientMessage(), "")
|
||||||
|
h.responsesErrorResponse(c, http.StatusBadGateway, "upstream_error", service.OpenAISilentRefusalClientMessage())
|
||||||
|
return
|
||||||
|
}
|
||||||
h.responsesErrorResponse(c, statusCode, "server_error", "All available accounts exhausted")
|
h.responsesErrorResponse(c, statusCode, "server_error", "All available accounts exhausted")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -178,6 +178,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
if channelMapping.Mapped {
|
if channelMapping.Mapped {
|
||||||
forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel)
|
forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel)
|
||||||
}
|
}
|
||||||
|
writerSizeBeforeForward := c.Writer.Size()
|
||||||
result, err := h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, promptCacheKey, "")
|
result, err := h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, promptCacheKey, "")
|
||||||
|
|
||||||
forwardDurationMs := time.Since(forwardStart).Milliseconds()
|
forwardDurationMs := time.Since(forwardStart).Milliseconds()
|
||||||
@ -203,6 +204,10 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
} else {
|
} else {
|
||||||
var failoverErr *service.UpstreamFailoverError
|
var failoverErr *service.UpstreamFailoverError
|
||||||
if errors.As(err, &failoverErr) {
|
if errors.As(err, &failoverErr) {
|
||||||
|
if c.Writer.Size() != writerSizeBeforeForward {
|
||||||
|
h.handleFailoverExhausted(c, failoverErr, true)
|
||||||
|
return
|
||||||
|
}
|
||||||
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
||||||
// Pool mode: retry on the same account
|
// Pool mode: retry on the same account
|
||||||
if failoverErr.RetryableOnSameAccount {
|
if failoverErr.RetryableOnSameAccount {
|
||||||
|
|||||||
@ -332,6 +332,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
|||||||
if channelMapping.Mapped {
|
if channelMapping.Mapped {
|
||||||
forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel)
|
forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel)
|
||||||
}
|
}
|
||||||
|
writerSizeBeforeForward := c.Writer.Size()
|
||||||
result, err := h.gatewayService.Forward(c.Request.Context(), c, account, forwardBody)
|
result, err := h.gatewayService.Forward(c.Request.Context(), c, account, forwardBody)
|
||||||
forwardDurationMs := time.Since(forwardStart).Milliseconds()
|
forwardDurationMs := time.Since(forwardStart).Milliseconds()
|
||||||
if accountReleaseFunc != nil {
|
if accountReleaseFunc != nil {
|
||||||
@ -356,6 +357,10 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
|||||||
} else {
|
} else {
|
||||||
var failoverErr *service.UpstreamFailoverError
|
var failoverErr *service.UpstreamFailoverError
|
||||||
if errors.As(err, &failoverErr) {
|
if errors.As(err, &failoverErr) {
|
||||||
|
if c.Writer.Size() != writerSizeBeforeForward {
|
||||||
|
h.handleFailoverExhausted(c, failoverErr, true)
|
||||||
|
return
|
||||||
|
}
|
||||||
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
||||||
// 池模式:同账号重试
|
// 池模式:同账号重试
|
||||||
if failoverErr.RetryableOnSameAccount {
|
if failoverErr.RetryableOnSameAccount {
|
||||||
@ -1604,6 +1609,11 @@ func (h *OpenAIGatewayHandler) handleConcurrencyError(c *gin.Context, err error,
|
|||||||
func (h *OpenAIGatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, streamStarted bool) {
|
func (h *OpenAIGatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, streamStarted bool) {
|
||||||
statusCode := failoverErr.StatusCode
|
statusCode := failoverErr.StatusCode
|
||||||
responseBody := failoverErr.ResponseBody
|
responseBody := failoverErr.ResponseBody
|
||||||
|
if service.IsOpenAISilentRefusalErrorBody(responseBody) {
|
||||||
|
service.SetOpsUpstreamError(c, statusCode, service.OpenAISilentRefusalClientMessage(), "")
|
||||||
|
h.handleStreamingAwareError(c, http.StatusBadGateway, "upstream_error", service.OpenAISilentRefusalClientMessage(), streamStarted)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 先检查透传规则
|
// 先检查透传规则
|
||||||
if h.errorPassthroughService != nil && len(responseBody) > 0 {
|
if h.errorPassthroughService != nil && len(responseBody) > 0 {
|
||||||
|
|||||||
@ -292,7 +292,7 @@ func (s *OpenAIGatewayService) ForwardAsChatCompletions(
|
|||||||
var result *OpenAIForwardResult
|
var result *OpenAIForwardResult
|
||||||
var handleErr error
|
var handleErr error
|
||||||
if clientStream {
|
if clientStream {
|
||||||
result, handleErr = s.handleChatStreamingResponse(resp, c, originalModel, billingModel, upstreamModel, includeUsage, startTime)
|
result, handleErr = s.handleChatStreamingResponse(resp, c, account, originalModel, billingModel, upstreamModel, includeUsage, startTime, len(body))
|
||||||
} else {
|
} else {
|
||||||
result, handleErr = s.handleChatBufferedStreamingResponse(resp, c, originalModel, billingModel, upstreamModel, startTime)
|
result, handleErr = s.handleChatBufferedStreamingResponse(resp, c, originalModel, billingModel, upstreamModel, startTime)
|
||||||
}
|
}
|
||||||
@ -414,22 +414,31 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse(
|
|||||||
func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
||||||
resp *http.Response,
|
resp *http.Response,
|
||||||
c *gin.Context,
|
c *gin.Context,
|
||||||
|
account *Account,
|
||||||
originalModel string,
|
originalModel string,
|
||||||
billingModel string,
|
billingModel string,
|
||||||
upstreamModel string,
|
upstreamModel string,
|
||||||
includeUsage bool,
|
includeUsage bool,
|
||||||
startTime time.Time,
|
startTime time.Time,
|
||||||
|
requestBodyLen int,
|
||||||
) (*OpenAIForwardResult, error) {
|
) (*OpenAIForwardResult, error) {
|
||||||
requestID := resp.Header.Get("x-request-id")
|
requestID := resp.Header.Get("x-request-id")
|
||||||
|
|
||||||
if s.responseHeaderFilter != nil {
|
headersWritten := false
|
||||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
writeStreamHeaders := func() {
|
||||||
|
if headersWritten {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
headersWritten = true
|
||||||
|
if s.responseHeaderFilter != nil {
|
||||||
|
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||||
|
}
|
||||||
|
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||||
|
c.Writer.Header().Set("Connection", "keep-alive")
|
||||||
|
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||||||
|
c.Writer.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
|
||||||
c.Writer.Header().Set("Connection", "keep-alive")
|
|
||||||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
|
||||||
c.Writer.WriteHeader(http.StatusOK)
|
|
||||||
|
|
||||||
state := apicompat.NewResponsesEventToChatState()
|
state := apicompat.NewResponsesEventToChatState()
|
||||||
state.Model = originalModel
|
state.Model = originalModel
|
||||||
@ -439,6 +448,9 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
var firstTokenMs *int
|
var firstTokenMs *int
|
||||||
firstChunk := true
|
firstChunk := true
|
||||||
clientDisconnected := false
|
clientDisconnected := false
|
||||||
|
clientOutputStarted := false
|
||||||
|
pendingSSE := make([]string, 0, 4)
|
||||||
|
refusalDetector := newOpenAIChatSilentRefusalDetector(requestBodyLen)
|
||||||
|
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
maxLineSize := defaultMaxLineSize
|
maxLineSize := defaultMaxLineSize
|
||||||
@ -489,6 +501,7 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
)
|
)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
refusalDetector.ObservePayload([]byte(payload))
|
||||||
|
|
||||||
// 仅按兼容转换器支持的终止事件提取 usage,避免无意扩大事件语义。
|
// 仅按兼容转换器支持的终止事件提取 usage,避免无意扩大事件语义。
|
||||||
isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type)
|
isTerminalEvent := isOpenAICompatResponsesTerminalEvent(event.Type)
|
||||||
@ -499,6 +512,7 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
chunks := apicompat.ResponsesEventToChatChunks(&event, state)
|
chunks := apicompat.ResponsesEventToChatChunks(&event, state)
|
||||||
if !clientDisconnected {
|
if !clientDisconnected {
|
||||||
for _, chunk := range chunks {
|
for _, chunk := range chunks {
|
||||||
|
refusalDetector.ObserveChatChunk(chunk)
|
||||||
sse, err := apicompat.ChatChunkToSSE(chunk)
|
sse, err := apicompat.ChatChunkToSSE(chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.L().Warn("openai chat_completions stream: failed to marshal chunk",
|
logger.L().Warn("openai chat_completions stream: failed to marshal chunk",
|
||||||
@ -507,6 +521,27 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !clientOutputStarted && !refusalDetector.ShouldReleaseClientOutput() {
|
||||||
|
pendingSSE = append(pendingSSE, sse)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !clientOutputStarted {
|
||||||
|
writeStreamHeaders()
|
||||||
|
for _, pending := range pendingSSE {
|
||||||
|
if _, err := fmt.Fprint(c.Writer, pending); err != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Info("openai chat_completions stream: client disconnected while flushing pending chunks",
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingSSE = pendingSSE[:0]
|
||||||
|
clientOutputStarted = !clientDisconnected
|
||||||
|
if clientDisconnected {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
||||||
clientDisconnected = true
|
clientDisconnected = true
|
||||||
logger.L().Info("openai chat_completions stream: client disconnected, continuing to drain upstream for billing",
|
logger.L().Info("openai chat_completions stream: client disconnected, continuing to drain upstream for billing",
|
||||||
@ -516,7 +551,7 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(chunks) > 0 && !clientDisconnected {
|
if len(chunks) > 0 && !clientDisconnected && clientOutputStarted {
|
||||||
c.Writer.Flush()
|
c.Writer.Flush()
|
||||||
}
|
}
|
||||||
return isTerminalEvent
|
return isTerminalEvent
|
||||||
@ -525,10 +560,32 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
finalizeStream := func() (*OpenAIForwardResult, error) {
|
finalizeStream := func() (*OpenAIForwardResult, error) {
|
||||||
if finalChunks := apicompat.FinalizeResponsesChatStream(state); len(finalChunks) > 0 && !clientDisconnected {
|
if finalChunks := apicompat.FinalizeResponsesChatStream(state); len(finalChunks) > 0 && !clientDisconnected {
|
||||||
for _, chunk := range finalChunks {
|
for _, chunk := range finalChunks {
|
||||||
|
refusalDetector.ObserveChatChunk(chunk)
|
||||||
sse, err := apicompat.ChatChunkToSSE(chunk)
|
sse, err := apicompat.ChatChunkToSSE(chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !clientOutputStarted && !refusalDetector.ShouldReleaseClientOutput() {
|
||||||
|
pendingSSE = append(pendingSSE, sse)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !clientOutputStarted {
|
||||||
|
writeStreamHeaders()
|
||||||
|
for _, pending := range pendingSSE {
|
||||||
|
if _, err := fmt.Fprint(c.Writer, pending); err != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Info("openai chat_completions stream: client disconnected during pending final flush",
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingSSE = pendingSSE[:0]
|
||||||
|
clientOutputStarted = !clientDisconnected
|
||||||
|
if clientDisconnected {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
||||||
clientDisconnected = true
|
clientDisconnected = true
|
||||||
logger.L().Info("openai chat_completions stream: client disconnected during final flush",
|
logger.L().Info("openai chat_completions stream: client disconnected during final flush",
|
||||||
@ -538,14 +595,35 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !clientDisconnected && !clientOutputStarted {
|
||||||
|
if refusalDetector.IsSilentRefusal() {
|
||||||
|
return nil, newOpenAISilentRefusalFailoverError(c, account, requestID)
|
||||||
|
}
|
||||||
|
if len(pendingSSE) > 0 {
|
||||||
|
writeStreamHeaders()
|
||||||
|
for _, pending := range pendingSSE {
|
||||||
|
if _, err := fmt.Fprint(c.Writer, pending); err != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Info("openai chat_completions stream: client disconnected during final pending flush",
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingSSE = pendingSSE[:0]
|
||||||
|
clientOutputStarted = !clientDisconnected
|
||||||
|
}
|
||||||
|
}
|
||||||
// Send [DONE] sentinel
|
// Send [DONE] sentinel
|
||||||
if !clientDisconnected {
|
if !clientDisconnected {
|
||||||
|
writeStreamHeaders()
|
||||||
if _, err := fmt.Fprint(c.Writer, "data: [DONE]\n\n"); err != nil {
|
if _, err := fmt.Fprint(c.Writer, "data: [DONE]\n\n"); err != nil {
|
||||||
clientDisconnected = true
|
clientDisconnected = true
|
||||||
logger.L().Info("openai chat_completions stream: client disconnected during done flush",
|
logger.L().Info("openai chat_completions stream: client disconnected during done flush",
|
||||||
zap.String("request_id", requestID),
|
zap.String("request_id", requestID),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
clientOutputStarted = !clientDisconnected
|
||||||
}
|
}
|
||||||
if !clientDisconnected {
|
if !clientDisconnected {
|
||||||
c.Writer.Flush()
|
c.Writer.Flush()
|
||||||
@ -702,10 +780,14 @@ func (s *OpenAIGatewayService) handleChatStreamingResponse(
|
|||||||
if clientDisconnected {
|
if clientDisconnected {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if refusalDetector.Enabled() && !clientOutputStarted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if time.Since(lastDataAt) < keepaliveInterval {
|
if time.Since(lastDataAt) < keepaliveInterval {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Send SSE comment as keepalive
|
// Send SSE comment as keepalive
|
||||||
|
writeStreamHeaders()
|
||||||
if _, err := fmt.Fprint(c.Writer, ":\n\n"); err != nil {
|
if _, err := fmt.Fprint(c.Writer, ":\n\n"); err != nil {
|
||||||
logger.L().Info("openai chat_completions stream: client disconnected during keepalive",
|
logger.L().Info("openai chat_completions stream: client disconnected during keepalive",
|
||||||
zap.String("request_id", requestID),
|
zap.String("request_id", requestID),
|
||||||
|
|||||||
@ -220,7 +220,7 @@ func (s *OpenAIGatewayService) forwardAsRawChatCompletions(
|
|||||||
|
|
||||||
// 8. Forward response
|
// 8. Forward response
|
||||||
if clientStream {
|
if clientStream {
|
||||||
return s.streamRawChatCompletions(c, resp, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime)
|
return s.streamRawChatCompletions(c, resp, account, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime, len(body))
|
||||||
}
|
}
|
||||||
return s.bufferRawChatCompletions(c, resp, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime)
|
return s.bufferRawChatCompletions(c, resp, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime)
|
||||||
}
|
}
|
||||||
@ -234,23 +234,32 @@ func (s *OpenAIGatewayService) forwardAsRawChatCompletions(
|
|||||||
func (s *OpenAIGatewayService) streamRawChatCompletions(
|
func (s *OpenAIGatewayService) streamRawChatCompletions(
|
||||||
c *gin.Context,
|
c *gin.Context,
|
||||||
resp *http.Response,
|
resp *http.Response,
|
||||||
|
account *Account,
|
||||||
originalModel string,
|
originalModel string,
|
||||||
billingModel string,
|
billingModel string,
|
||||||
upstreamModel string,
|
upstreamModel string,
|
||||||
reasoningEffort *string,
|
reasoningEffort *string,
|
||||||
serviceTier *string,
|
serviceTier *string,
|
||||||
startTime time.Time,
|
startTime time.Time,
|
||||||
|
requestBodyLen int,
|
||||||
) (*OpenAIForwardResult, error) {
|
) (*OpenAIForwardResult, error) {
|
||||||
requestID := resp.Header.Get("x-request-id")
|
requestID := resp.Header.Get("x-request-id")
|
||||||
|
|
||||||
if s.responseHeaderFilter != nil {
|
headersWritten := false
|
||||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
writeStreamHeaders := func() {
|
||||||
|
if headersWritten {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
headersWritten = true
|
||||||
|
if s.responseHeaderFilter != nil {
|
||||||
|
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||||
|
}
|
||||||
|
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||||
|
c.Writer.Header().Set("Connection", "keep-alive")
|
||||||
|
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||||||
|
c.Writer.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
|
||||||
c.Writer.Header().Set("Connection", "keep-alive")
|
|
||||||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
|
||||||
c.Writer.WriteHeader(http.StatusOK)
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
maxLineSize := defaultMaxLineSize
|
maxLineSize := defaultMaxLineSize
|
||||||
@ -262,9 +271,45 @@ func (s *OpenAIGatewayService) streamRawChatCompletions(
|
|||||||
var usage OpenAIUsage
|
var usage OpenAIUsage
|
||||||
var firstTokenMs *int
|
var firstTokenMs *int
|
||||||
clientDisconnected := false
|
clientDisconnected := false
|
||||||
|
clientOutputStarted := false
|
||||||
|
pendingLines := make([]string, 0, 8)
|
||||||
|
refusalDetector := newOpenAIChatSilentRefusalDetector(requestBodyLen)
|
||||||
|
|
||||||
|
writeLine := func(line string) {
|
||||||
|
if clientDisconnected {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !clientOutputStarted && !refusalDetector.ShouldReleaseClientOutput() {
|
||||||
|
pendingLines = append(pendingLines, line)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !clientOutputStarted {
|
||||||
|
writeStreamHeaders()
|
||||||
|
for _, pending := range pendingLines {
|
||||||
|
if _, werr := c.Writer.WriteString(pending + "\n"); werr != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Debug("openai chat_completions raw: client disconnected, continuing to drain upstream for billing",
|
||||||
|
zap.Error(werr),
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingLines = pendingLines[:0]
|
||||||
|
clientOutputStarted = true
|
||||||
|
}
|
||||||
|
if _, werr := c.Writer.WriteString(line + "\n"); werr != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Debug("openai chat_completions raw: client disconnected, continuing to drain upstream for billing",
|
||||||
|
zap.Error(werr),
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
|
refusalDetector.ObserveSSELine(line)
|
||||||
if payload, ok := extractOpenAISSEDataLine(line); ok {
|
if payload, ok := extractOpenAISSEDataLine(line); ok {
|
||||||
trimmedPayload := strings.TrimSpace(payload)
|
trimmedPayload := strings.TrimSpace(payload)
|
||||||
if trimmedPayload != "[DONE]" {
|
if trimmedPayload != "[DONE]" {
|
||||||
@ -279,22 +324,14 @@ func (s *OpenAIGatewayService) streamRawChatCompletions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !clientDisconnected {
|
writeLine(line)
|
||||||
if _, werr := c.Writer.WriteString(line + "\n"); werr != nil {
|
|
||||||
clientDisconnected = true
|
|
||||||
logger.L().Debug("openai chat_completions raw: client disconnected, continuing to drain upstream for billing",
|
|
||||||
zap.Error(werr),
|
|
||||||
zap.String("request_id", requestID),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if line == "" {
|
if line == "" {
|
||||||
if !clientDisconnected {
|
if !clientDisconnected && clientOutputStarted {
|
||||||
c.Writer.Flush()
|
c.Writer.Flush()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !clientDisconnected {
|
if !clientDisconnected && clientOutputStarted {
|
||||||
c.Writer.Flush()
|
c.Writer.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -306,6 +343,27 @@ func (s *OpenAIGatewayService) streamRawChatCompletions(
|
|||||||
zap.String("request_id", requestID),
|
zap.String("request_id", requestID),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
} else if !clientDisconnected && !clientOutputStarted {
|
||||||
|
if refusalDetector.IsSilentRefusal() {
|
||||||
|
return nil, newOpenAISilentRefusalFailoverError(c, account, requestID)
|
||||||
|
}
|
||||||
|
if len(pendingLines) > 0 {
|
||||||
|
writeStreamHeaders()
|
||||||
|
for _, pending := range pendingLines {
|
||||||
|
if _, werr := c.Writer.WriteString(pending + "\n"); werr != nil {
|
||||||
|
clientDisconnected = true
|
||||||
|
logger.L().Debug("openai chat_completions raw: client disconnected during final flush",
|
||||||
|
zap.Error(werr),
|
||||||
|
zap.String("request_id", requestID),
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !clientDisconnected {
|
||||||
|
c.Writer.Flush()
|
||||||
|
clientOutputStarted = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &OpenAIForwardResult{
|
return &OpenAIForwardResult{
|
||||||
|
|||||||
@ -5,6 +5,7 @@ package service
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@ -120,6 +121,157 @@ func TestForwardAsRawChatCompletions_ForcesStreamUsageUpstreamAndPassesUsageDown
|
|||||||
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForwardAsRawChatCompletions_SilentRefusalTriggersFailover(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
body := largeRawChatCompletionsBody()
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
|
||||||
|
c.Request.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
upstreamBody := strings.Join([]string{
|
||||||
|
`data: {"id":"chatcmpl_silent","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"role":"assistant"}}]}`,
|
||||||
|
"",
|
||||||
|
`data: {"id":"chatcmpl_silent","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}]}`,
|
||||||
|
"",
|
||||||
|
"data: [DONE]",
|
||||||
|
"",
|
||||||
|
}, "\n")
|
||||||
|
upstream := &httpUpstreamRecorder{resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_silent"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(upstreamBody)),
|
||||||
|
}}
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: rawChatCompletionsTestConfig(),
|
||||||
|
httpUpstream: upstream,
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := svc.forwardAsRawChatCompletions(context.Background(), c, rawChatCompletionsTestAccount(), body, "")
|
||||||
|
require.Nil(t, result)
|
||||||
|
var failoverErr *UpstreamFailoverError
|
||||||
|
require.True(t, errors.As(err, &failoverErr))
|
||||||
|
require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode)
|
||||||
|
require.True(t, IsOpenAISilentRefusalErrorBody(failoverErr.ResponseBody))
|
||||||
|
require.False(t, c.Writer.Written(), "silent refusal must not commit a 200 response before failover")
|
||||||
|
require.Empty(t, rec.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForwardAsRawChatCompletions_SilentRefusalToolCallsExempt(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
body := largeRawChatCompletionsBody()
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
|
||||||
|
c.Request.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
upstreamBody := strings.Join([]string{
|
||||||
|
`data: {"id":"chatcmpl_tool","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"role":"assistant"}}]}`,
|
||||||
|
"",
|
||||||
|
`data: {"id":"chatcmpl_tool","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_1","type":"function","function":{"name":"lookup","arguments":""}}]}}]}`,
|
||||||
|
"",
|
||||||
|
`data: {"id":"chatcmpl_tool","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`,
|
||||||
|
"",
|
||||||
|
"data: [DONE]",
|
||||||
|
"",
|
||||||
|
}, "\n")
|
||||||
|
upstream := &httpUpstreamRecorder{resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_tool"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(upstreamBody)),
|
||||||
|
}}
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: rawChatCompletionsTestConfig(),
|
||||||
|
httpUpstream: upstream,
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := svc.forwardAsRawChatCompletions(context.Background(), c, rawChatCompletionsTestAccount(), body, "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Contains(t, rec.Body.String(), `"tool_calls"`)
|
||||||
|
require.Contains(t, rec.Body.String(), `"finish_reason":"tool_calls"`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleChatStreamingResponse_SilentRefusalReasoningSummaryExempt(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
|
||||||
|
|
||||||
|
upstreamBody := strings.Join([]string{
|
||||||
|
`data: {"type":"response.created","response":{"id":"resp_reasoning","model":"gpt-5.5"}}`,
|
||||||
|
"",
|
||||||
|
`data: {"type":"response.reasoning_summary_text.delta","delta":"thinking only"}`,
|
||||||
|
"",
|
||||||
|
`data: {"type":"response.completed","response":{"id":"resp_reasoning","model":"gpt-5.5","status":"completed"}}`,
|
||||||
|
"",
|
||||||
|
}, "\n")
|
||||||
|
resp := &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_reasoning"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(upstreamBody)),
|
||||||
|
}
|
||||||
|
svc := &OpenAIGatewayService{cfg: rawChatCompletionsTestConfig()}
|
||||||
|
|
||||||
|
result, err := svc.handleChatStreamingResponse(
|
||||||
|
resp,
|
||||||
|
c,
|
||||||
|
rawChatCompletionsTestAccount(),
|
||||||
|
"gpt-5.5",
|
||||||
|
"gpt-5.5",
|
||||||
|
"gpt-5.5",
|
||||||
|
false,
|
||||||
|
time.Now(),
|
||||||
|
openAISilentRefusalMinRequestBodyBytes,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Contains(t, rec.Body.String(), `"reasoning_content":"thinking only"`)
|
||||||
|
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForwardAsRawChatCompletions_SilentRefusalNormalContentExempt(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
body := largeRawChatCompletionsBody()
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
|
||||||
|
c.Request.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
upstreamBody := strings.Join([]string{
|
||||||
|
`data: {"id":"chatcmpl_ok","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"role":"assistant"}}]}`,
|
||||||
|
"",
|
||||||
|
`data: {"id":"chatcmpl_ok","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"content":"ok"}}]}`,
|
||||||
|
"",
|
||||||
|
`data: {"id":"chatcmpl_ok","object":"chat.completion.chunk","model":"gpt-5.5","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}]}`,
|
||||||
|
"",
|
||||||
|
"data: [DONE]",
|
||||||
|
"",
|
||||||
|
}, "\n")
|
||||||
|
upstream := &httpUpstreamRecorder{resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"text/event-stream"}, "x-request-id": []string{"rid_ok"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(upstreamBody)),
|
||||||
|
}}
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: rawChatCompletionsTestConfig(),
|
||||||
|
httpUpstream: upstream,
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := svc.forwardAsRawChatCompletions(context.Background(), c, rawChatCompletionsTestAccount(), body, "")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Contains(t, rec.Body.String(), `"content":"ok"`)
|
||||||
|
require.Contains(t, rec.Body.String(), "data: [DONE]")
|
||||||
|
}
|
||||||
|
|
||||||
func TestForwardAsRawChatCompletions_ClientDisconnectDrainsUsage(t *testing.T) {
|
func TestForwardAsRawChatCompletions_ClientDisconnectDrainsUsage(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
@ -303,3 +455,9 @@ func rawChatCompletionsTestAccount() *Account {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func largeRawChatCompletionsBody() []byte {
|
||||||
|
return []byte(`{"model":"gpt-5.5","messages":[{"role":"user","content":"` +
|
||||||
|
strings.Repeat("x", openAISilentRefusalMinRequestBodyBytes) +
|
||||||
|
`"}],"stream":true}`)
|
||||||
|
}
|
||||||
|
|||||||
293
backend/internal/service/openai_silent_refusal.go
Normal file
293
backend/internal/service/openai_silent_refusal.go
Normal file
@ -0,0 +1,293 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
openAISilentRefusalMinRequestBodyBytes = 64 * 1024
|
||||||
|
openAISilentRefusalErrorCode = "openai_silent_refusal"
|
||||||
|
openAISilentRefusalUpstreamMessage = "OpenAI upstream returned an empty completion stream with finish_reason=stop and no usage"
|
||||||
|
openAISilentRefusalClientMessage = "Upstream returned an empty completion without usage; no fallback account was available"
|
||||||
|
)
|
||||||
|
|
||||||
|
type openAIChatSilentRefusalDetector struct {
|
||||||
|
enabled bool
|
||||||
|
sawContent bool
|
||||||
|
sawToolCall bool
|
||||||
|
sawFunctionCall bool
|
||||||
|
sawUsage bool
|
||||||
|
sawError bool
|
||||||
|
sawReasoning bool
|
||||||
|
sawFinish bool
|
||||||
|
finishReason string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOpenAIChatSilentRefusalDetector(requestBodyLen int) *openAIChatSilentRefusalDetector {
|
||||||
|
return &openAIChatSilentRefusalDetector{
|
||||||
|
enabled: requestBodyLen >= openAISilentRefusalMinRequestBodyBytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) Enabled() bool {
|
||||||
|
return d != nil && d.enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) ObserveSSELine(line string) {
|
||||||
|
if d == nil || !d.enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if eventType, ok := extractOpenAISSEEventLine(line); ok {
|
||||||
|
d.observeEventType(eventType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if payload, ok := extractOpenAISSEDataLine(line); ok {
|
||||||
|
d.ObservePayload([]byte(payload))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) ObservePayload(payload []byte) {
|
||||||
|
if d == nil || !d.enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
payload = bytes.TrimSpace(payload)
|
||||||
|
if len(payload) == 0 || bytes.Equal(payload, []byte("[DONE]")) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !gjson.ValidBytes(payload) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
eventType := strings.TrimSpace(gjson.GetBytes(payload, "type").String())
|
||||||
|
d.observeEventType(eventType)
|
||||||
|
|
||||||
|
if gjson.GetBytes(payload, "error").Exists() {
|
||||||
|
d.sawError = true
|
||||||
|
}
|
||||||
|
if usage := gjson.GetBytes(payload, "usage"); usage.Exists() && usage.IsObject() {
|
||||||
|
d.sawUsage = true
|
||||||
|
}
|
||||||
|
if usage := gjson.GetBytes(payload, "response.usage"); usage.Exists() && usage.IsObject() {
|
||||||
|
d.sawUsage = true
|
||||||
|
}
|
||||||
|
|
||||||
|
d.observeChatChoicesPayload(payload)
|
||||||
|
d.observeResponsesPayload(payload, eventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) ObserveChatChunk(chunk apicompat.ChatCompletionsChunk) {
|
||||||
|
if d == nil || !d.enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if chunk.Usage != nil {
|
||||||
|
d.sawUsage = true
|
||||||
|
}
|
||||||
|
for _, choice := range chunk.Choices {
|
||||||
|
if choice.FinishReason != nil {
|
||||||
|
d.observeFinishReason(*choice.FinishReason)
|
||||||
|
}
|
||||||
|
delta := choice.Delta
|
||||||
|
if delta.Content != nil && *delta.Content != "" {
|
||||||
|
d.sawContent = true
|
||||||
|
}
|
||||||
|
if delta.ReasoningContent != nil {
|
||||||
|
d.sawReasoning = true
|
||||||
|
}
|
||||||
|
if len(delta.ToolCalls) > 0 {
|
||||||
|
d.sawToolCall = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) ShouldReleaseClientOutput() bool {
|
||||||
|
if d == nil || !d.enabled {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if d.sawContent || d.sawToolCall || d.sawFunctionCall || d.sawUsage || d.sawError || d.sawReasoning {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return d.sawFinish && d.finishReason != "" && d.finishReason != "stop"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) IsSilentRefusal() bool {
|
||||||
|
if d == nil || !d.enabled {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !d.sawContent &&
|
||||||
|
!d.sawToolCall &&
|
||||||
|
!d.sawFunctionCall &&
|
||||||
|
!d.sawUsage &&
|
||||||
|
!d.sawError &&
|
||||||
|
!d.sawReasoning &&
|
||||||
|
d.sawFinish &&
|
||||||
|
d.finishReason == "stop"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) observeEventType(eventType string) {
|
||||||
|
eventType = strings.TrimSpace(eventType)
|
||||||
|
if eventType == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if eventType == "error" || eventType == "response.failed" {
|
||||||
|
d.sawError = true
|
||||||
|
}
|
||||||
|
if strings.Contains(eventType, "reasoning") || strings.Contains(eventType, "reasoning_summary") {
|
||||||
|
d.sawReasoning = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) observeFinishReason(reason string) {
|
||||||
|
reason = strings.TrimSpace(reason)
|
||||||
|
if reason == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.sawFinish = true
|
||||||
|
d.finishReason = reason
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) observeChatChoicesPayload(payload []byte) {
|
||||||
|
choices := gjson.GetBytes(payload, "choices")
|
||||||
|
if !choices.Exists() || !choices.IsArray() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, choice := range choices.Array() {
|
||||||
|
if finish := choice.Get("finish_reason"); finish.Exists() {
|
||||||
|
d.observeFinishReason(finish.String())
|
||||||
|
}
|
||||||
|
delta := choice.Get("delta")
|
||||||
|
if !delta.Exists() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if content := delta.Get("content"); content.Exists() && content.String() != "" {
|
||||||
|
d.sawContent = true
|
||||||
|
}
|
||||||
|
if delta.Get("tool_calls").Exists() {
|
||||||
|
d.sawToolCall = true
|
||||||
|
}
|
||||||
|
if delta.Get("function_call").Exists() {
|
||||||
|
d.sawFunctionCall = true
|
||||||
|
}
|
||||||
|
if delta.Get("reasoning").Exists() ||
|
||||||
|
delta.Get("reasoning_content").Exists() ||
|
||||||
|
delta.Get("reasoning_summary").Exists() {
|
||||||
|
d.sawReasoning = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) observeResponsesPayload(payload []byte, eventType string) {
|
||||||
|
switch eventType {
|
||||||
|
case "response.output_text.delta":
|
||||||
|
if gjson.GetBytes(payload, "delta").String() != "" {
|
||||||
|
d.sawContent = true
|
||||||
|
}
|
||||||
|
case "response.output_item.added":
|
||||||
|
switch strings.TrimSpace(gjson.GetBytes(payload, "item.type").String()) {
|
||||||
|
case "function_call":
|
||||||
|
d.sawToolCall = true
|
||||||
|
case "reasoning":
|
||||||
|
d.sawReasoning = true
|
||||||
|
}
|
||||||
|
case "response.function_call_arguments.delta":
|
||||||
|
d.sawToolCall = true
|
||||||
|
case "response.reasoning_summary_text.delta", "response.reasoning_summary_text.done":
|
||||||
|
d.sawReasoning = true
|
||||||
|
case "response.completed", "response.done":
|
||||||
|
d.observeFinishReason("stop")
|
||||||
|
case "response.incomplete":
|
||||||
|
d.observeFinishReason("length")
|
||||||
|
case "response.failed":
|
||||||
|
d.sawError = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if output := gjson.GetBytes(payload, "response.output"); output.Exists() && output.IsArray() {
|
||||||
|
for _, item := range output.Array() {
|
||||||
|
switch strings.TrimSpace(item.Get("type").String()) {
|
||||||
|
case "function_call":
|
||||||
|
d.sawToolCall = true
|
||||||
|
case "reasoning":
|
||||||
|
d.sawReasoning = true
|
||||||
|
case "message":
|
||||||
|
d.observeResponseMessageItem(item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *openAIChatSilentRefusalDetector) observeResponseMessageItem(item gjson.Result) {
|
||||||
|
content := item.Get("content")
|
||||||
|
if !content.Exists() || !content.IsArray() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, part := range content.Array() {
|
||||||
|
if part.Get("text").String() != "" {
|
||||||
|
d.sawContent = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOpenAISilentRefusalFailoverError(c *gin.Context, account *Account, upstreamRequestID string) *UpstreamFailoverError {
|
||||||
|
accountID := int64(0)
|
||||||
|
accountName := ""
|
||||||
|
platform := PlatformOpenAI
|
||||||
|
if account != nil {
|
||||||
|
accountID = account.ID
|
||||||
|
accountName = account.Name
|
||||||
|
platform = account.Platform
|
||||||
|
}
|
||||||
|
|
||||||
|
setOpsUpstreamError(c, http.StatusBadGateway, openAISilentRefusalUpstreamMessage, "")
|
||||||
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||||
|
Platform: platform,
|
||||||
|
AccountID: accountID,
|
||||||
|
AccountName: accountName,
|
||||||
|
UpstreamStatusCode: http.StatusBadGateway,
|
||||||
|
UpstreamRequestID: upstreamRequestID,
|
||||||
|
Kind: "failover",
|
||||||
|
Message: openAISilentRefusalUpstreamMessage,
|
||||||
|
})
|
||||||
|
|
||||||
|
headers := http.Header{}
|
||||||
|
if strings.TrimSpace(upstreamRequestID) != "" {
|
||||||
|
headers.Set("x-request-id", strings.TrimSpace(upstreamRequestID))
|
||||||
|
}
|
||||||
|
return &UpstreamFailoverError{
|
||||||
|
StatusCode: http.StatusBadGateway,
|
||||||
|
ResponseBody: openAISilentRefusalErrorBody(),
|
||||||
|
ResponseHeaders: headers,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func openAISilentRefusalErrorBody() []byte {
|
||||||
|
body, err := json.Marshal(map[string]any{
|
||||||
|
"error": map[string]any{
|
||||||
|
"type": "upstream_error",
|
||||||
|
"code": openAISilentRefusalErrorCode,
|
||||||
|
"message": openAISilentRefusalUpstreamMessage,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return []byte(`{"error":{"type":"upstream_error","code":"openai_silent_refusal","message":"OpenAI upstream returned an empty completion stream with finish_reason=stop and no usage"}}`)
|
||||||
|
}
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsOpenAISilentRefusalErrorBody reports whether a failover body was produced
|
||||||
|
// by the OpenAI silent-refusal detector.
|
||||||
|
func IsOpenAISilentRefusalErrorBody(body []byte) bool {
|
||||||
|
return strings.TrimSpace(gjson.GetBytes(body, "error.code").String()) == openAISilentRefusalErrorCode
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenAISilentRefusalClientMessage returns the exhausted-failover client message
|
||||||
|
// for OpenAI silent refusals.
|
||||||
|
func OpenAISilentRefusalClientMessage() string {
|
||||||
|
return openAISilentRefusalClientMessage
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user