Merge pull request #2399 from gaoren002/fix/openai-image-upstream-errors
fix(openai): surface image moderation errors
This commit is contained in:
commit
bd3d4d9a24
@ -217,6 +217,18 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
|
|||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
var imageUpstreamErr *service.OpenAIImagesUpstreamError
|
||||||
|
if errors.As(err, &imageUpstreamErr) {
|
||||||
|
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, true, nil)
|
||||||
|
reqLog.Warn("openai.images.upstream_user_error",
|
||||||
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int("status_code", imageUpstreamErr.StatusCode),
|
||||||
|
zap.String("error_type", imageUpstreamErr.ErrorType),
|
||||||
|
zap.String("error_code", imageUpstreamErr.Code),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
var failoverErr *service.UpstreamFailoverError
|
var failoverErr *service.UpstreamFailoverError
|
||||||
if errors.As(err, &failoverErr) {
|
if errors.As(err, &failoverErr) {
|
||||||
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
h.gatewayService.ReportOpenAIAccountScheduleResult(account.ID, false, nil)
|
||||||
|
|||||||
@ -29,6 +29,69 @@ type openAIResponsesImageResult struct {
|
|||||||
Model string
|
Model string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OpenAIImagesUpstreamError struct {
|
||||||
|
StatusCode int
|
||||||
|
ErrorType string
|
||||||
|
Code string
|
||||||
|
Message string
|
||||||
|
Param string
|
||||||
|
UpstreamRequestID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *OpenAIImagesUpstreamError) Error() string {
|
||||||
|
if e == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
code := strings.TrimSpace(e.Code)
|
||||||
|
if code == "" {
|
||||||
|
code = strings.TrimSpace(e.ErrorType)
|
||||||
|
}
|
||||||
|
message := strings.TrimSpace(e.Message)
|
||||||
|
if code != "" && message != "" {
|
||||||
|
return fmt.Sprintf("openai images upstream error: %s: %s", code, message)
|
||||||
|
}
|
||||||
|
if message != "" {
|
||||||
|
return "openai images upstream error: " + message
|
||||||
|
}
|
||||||
|
if code != "" {
|
||||||
|
return "openai images upstream error: " + code
|
||||||
|
}
|
||||||
|
return "openai images upstream error"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *OpenAIImagesUpstreamError) clientStatusCode() int {
|
||||||
|
if e == nil {
|
||||||
|
return http.StatusBadGateway
|
||||||
|
}
|
||||||
|
if e.StatusCode > 0 {
|
||||||
|
return e.StatusCode
|
||||||
|
}
|
||||||
|
return http.StatusBadGateway
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *OpenAIImagesUpstreamError) clientErrorType() string {
|
||||||
|
if e == nil {
|
||||||
|
return "upstream_error"
|
||||||
|
}
|
||||||
|
if trimmed := strings.TrimSpace(e.ErrorType); trimmed != "" {
|
||||||
|
return trimmed
|
||||||
|
}
|
||||||
|
return "upstream_error"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *OpenAIImagesUpstreamError) clientMessage() string {
|
||||||
|
if e == nil {
|
||||||
|
return "Upstream request failed"
|
||||||
|
}
|
||||||
|
if trimmed := strings.TrimSpace(e.Message); trimmed != "" {
|
||||||
|
return trimmed
|
||||||
|
}
|
||||||
|
if trimmed := strings.TrimSpace(e.Code); trimmed != "" {
|
||||||
|
return trimmed
|
||||||
|
}
|
||||||
|
return "Upstream request failed"
|
||||||
|
}
|
||||||
|
|
||||||
func openAIResponsesImageResultKey(itemID string, result openAIResponsesImageResult) string {
|
func openAIResponsesImageResultKey(itemID string, result openAIResponsesImageResult) string {
|
||||||
if strings.TrimSpace(result.Result) != "" {
|
if strings.TrimSpace(result.Result) != "" {
|
||||||
return strings.TrimSpace(result.OutputFormat) + "|" + strings.TrimSpace(result.Result)
|
return strings.TrimSpace(result.OutputFormat) + "|" + strings.TrimSpace(result.Result)
|
||||||
@ -465,6 +528,57 @@ func collectOpenAIImagesFromResponsesBody(body []byte) ([]openAIResponsesImageRe
|
|||||||
return nil, createdAt, usageRaw, openAIResponsesImageResult{}, foundFinal, nil
|
return nil, createdAt, usageRaw, openAIResponsesImageResult{}, foundFinal, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractOpenAIImagesUpstreamError(body []byte) *OpenAIImagesUpstreamError {
|
||||||
|
var upstreamErr *OpenAIImagesUpstreamError
|
||||||
|
forEachOpenAISSEDataPayload(string(body), func(payload []byte) {
|
||||||
|
if upstreamErr != nil || !gjson.ValidBytes(payload) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
upstreamErr = openAIImagesUpstreamErrorFromSSEPayload(payload)
|
||||||
|
})
|
||||||
|
return upstreamErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func openAIImagesUpstreamErrorFromSSEPayload(payload []byte) *OpenAIImagesUpstreamError {
|
||||||
|
if !gjson.ValidBytes(payload) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
switch gjson.GetBytes(payload, "type").String() {
|
||||||
|
case "error":
|
||||||
|
return openAIImagesUpstreamErrorFromGJSON(gjson.GetBytes(payload, "error"), "")
|
||||||
|
case "response.failed":
|
||||||
|
response := gjson.GetBytes(payload, "response")
|
||||||
|
return openAIImagesUpstreamErrorFromGJSON(response.Get("error"), response.Get("id").String())
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func openAIImagesUpstreamErrorFromGJSON(errorObj gjson.Result, upstreamRequestID string) *OpenAIImagesUpstreamError {
|
||||||
|
if !errorObj.Exists() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
code := strings.TrimSpace(errorObj.Get("code").String())
|
||||||
|
errType := strings.TrimSpace(errorObj.Get("type").String())
|
||||||
|
message := strings.TrimSpace(errorObj.Get("message").String())
|
||||||
|
param := strings.TrimSpace(errorObj.Get("param").String())
|
||||||
|
statusCode := http.StatusBadGateway
|
||||||
|
if strings.EqualFold(code, "moderation_blocked") || strings.EqualFold(errType, "image_generation_user_error") {
|
||||||
|
statusCode = http.StatusBadRequest
|
||||||
|
}
|
||||||
|
if message == "" {
|
||||||
|
message = "Upstream request failed"
|
||||||
|
}
|
||||||
|
return &OpenAIImagesUpstreamError{
|
||||||
|
StatusCode: statusCode,
|
||||||
|
ErrorType: errType,
|
||||||
|
Code: code,
|
||||||
|
Message: sanitizeUpstreamErrorMessage(message),
|
||||||
|
Param: param,
|
||||||
|
UpstreamRequestID: strings.TrimSpace(upstreamRequestID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func buildOpenAIImagesAPIResponse(
|
func buildOpenAIImagesAPIResponse(
|
||||||
results []openAIResponsesImageResult,
|
results []openAIResponsesImageResult,
|
||||||
createdAt int64,
|
createdAt int64,
|
||||||
@ -531,6 +645,41 @@ func buildOpenAIImagesStreamErrorBody(message string) []byte {
|
|||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildOpenAIImagesStreamErrorBodyFromUpstream(err *OpenAIImagesUpstreamError) []byte {
|
||||||
|
if err == nil {
|
||||||
|
return buildOpenAIImagesStreamErrorBody("")
|
||||||
|
}
|
||||||
|
body := buildOpenAIImagesStreamErrorBody(err.clientMessage())
|
||||||
|
body, _ = sjson.SetBytes(body, "error.type", err.clientErrorType())
|
||||||
|
if code := strings.TrimSpace(err.Code); code != "" {
|
||||||
|
body, _ = sjson.SetBytes(body, "error.code", code)
|
||||||
|
}
|
||||||
|
if param := strings.TrimSpace(err.Param); param != "" {
|
||||||
|
body, _ = sjson.SetBytes(body, "error.param", param)
|
||||||
|
}
|
||||||
|
return body
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeOpenAIImagesUpstreamErrorResponse(c *gin.Context, err *OpenAIImagesUpstreamError) bool {
|
||||||
|
if c == nil || c.Writer == nil || c.Writer.Written() || err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
errorObj := gin.H{
|
||||||
|
"type": err.clientErrorType(),
|
||||||
|
"message": err.clientMessage(),
|
||||||
|
}
|
||||||
|
if code := strings.TrimSpace(err.Code); code != "" {
|
||||||
|
errorObj["code"] = code
|
||||||
|
}
|
||||||
|
if param := strings.TrimSpace(err.Param); param != "" {
|
||||||
|
errorObj["param"] = param
|
||||||
|
}
|
||||||
|
c.JSON(err.clientStatusCode(), gin.H{
|
||||||
|
"error": errorObj,
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (s *OpenAIGatewayService) writeOpenAIImagesStreamEvent(c *gin.Context, flusher http.Flusher, eventName string, payload []byte) error {
|
func (s *OpenAIGatewayService) writeOpenAIImagesStreamEvent(c *gin.Context, flusher http.Flusher, eventName string, payload []byte) error {
|
||||||
if strings.TrimSpace(eventName) != "" {
|
if strings.TrimSpace(eventName) != "" {
|
||||||
if _, err := fmt.Fprintf(c.Writer, "event: %s\n", eventName); err != nil {
|
if _, err := fmt.Fprintf(c.Writer, "event: %s\n", eventName); err != nil {
|
||||||
@ -588,6 +737,11 @@ func (s *OpenAIGatewayService) handleOpenAIImagesOAuthNonStreamingResponse(
|
|||||||
return OpenAIUsage{}, 0, nil, err
|
return OpenAIUsage{}, 0, nil, err
|
||||||
}
|
}
|
||||||
if len(results) == 0 {
|
if len(results) == 0 {
|
||||||
|
if upstreamErr := extractOpenAIImagesUpstreamError(body); upstreamErr != nil {
|
||||||
|
setOpsUpstreamError(c, upstreamErr.clientStatusCode(), upstreamErr.clientMessage(), "")
|
||||||
|
writeOpenAIImagesUpstreamErrorResponse(c, upstreamErr)
|
||||||
|
return OpenAIUsage{}, 0, nil, upstreamErr
|
||||||
|
}
|
||||||
return OpenAIUsage{}, 0, nil, fmt.Errorf("upstream did not return image output")
|
return OpenAIUsage{}, 0, nil, fmt.Errorf("upstream did not return image output")
|
||||||
}
|
}
|
||||||
if strings.TrimSpace(firstMeta.Model) == "" {
|
if strings.TrimSpace(firstMeta.Model) == "" {
|
||||||
@ -742,6 +896,16 @@ func (s *OpenAIGatewayService) handleOpenAIImagesOAuthStreamingResponse(
|
|||||||
imageCount = len(emitted)
|
imageCount = len(emitted)
|
||||||
imageOutputSizes = openAIResponsesImageResultSizes(finalResults)
|
imageOutputSizes = openAIResponsesImageResultSizes(finalResults)
|
||||||
processDataDone = true
|
processDataDone = true
|
||||||
|
case "error", "response.failed":
|
||||||
|
if upstreamErr := openAIImagesUpstreamErrorFromSSEPayload(dataBytes); upstreamErr != nil {
|
||||||
|
if !clientDisconnected {
|
||||||
|
s.tryWriteOpenAIImagesStreamEvent(c, flusher, &clientDisconnected, &lastDownstreamWriteAt, "error", buildOpenAIImagesStreamErrorBodyFromUpstream(upstreamErr))
|
||||||
|
}
|
||||||
|
setOpsUpstreamError(c, upstreamErr.clientStatusCode(), upstreamErr.clientMessage(), "")
|
||||||
|
processDataErr = upstreamErr
|
||||||
|
processDataDone = true
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -553,6 +553,59 @@ func TestOpenAIGatewayServiceForwardImages_OAuthPassesNAndReturnsAllImages(t *te
|
|||||||
require.Equal(t, "draw a cat 3", gjson.Get(rec.Body.String(), "data.2.revised_prompt").String())
|
require.Equal(t, "draw a cat 3", gjson.Get(rec.Body.String(), "data.2.revised_prompt").String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayServiceForwardImages_OAuthNonStreamModerationBlockedReturnsClientError(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
body := []byte(`{"model":"gpt-image-2","prompt":"draw blocked image","response_format":"b64_json"}`)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/v1/images/generations", bytes.NewReader(body))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = req
|
||||||
|
c.Set("api_key", &APIKey{ID: 42})
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{}
|
||||||
|
parsed, err := svc.ParseOpenAIImagesRequest(c, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
svc.httpUpstream = &httpUpstreamRecorder{
|
||||||
|
resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{
|
||||||
|
"Content-Type": []string{"text/event-stream"},
|
||||||
|
"X-Request-Id": []string{"req_img_blocked"},
|
||||||
|
},
|
||||||
|
Body: io.NopCloser(strings.NewReader(
|
||||||
|
"data: {\"type\":\"response.created\",\"response\":{\"created_at\":1710000020}}\n\n" +
|
||||||
|
"data: {\"type\":\"error\",\"error\":{\"type\":\"image_generation_user_error\",\"code\":\"moderation_blocked\",\"message\":\"Your request was rejected by the safety system. safety_violations=[sexual].\"}}\n\n" +
|
||||||
|
"data: {\"type\":\"response.failed\",\"response\":{\"id\":\"resp_blocked\",\"status\":\"failed\",\"error\":{\"type\":\"image_generation_user_error\",\"code\":\"moderation_blocked\",\"message\":\"Your request was rejected by the safety system. safety_violations=[sexual].\"}}}\n\n",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 1,
|
||||||
|
Name: "openai-oauth",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeOAuth,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"access_token": "token-123",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := svc.ForwardImages(context.Background(), c, account, body, parsed, "")
|
||||||
|
require.Nil(t, result)
|
||||||
|
var upstreamErr *OpenAIImagesUpstreamError
|
||||||
|
require.ErrorAs(t, err, &upstreamErr)
|
||||||
|
require.Equal(t, http.StatusBadRequest, upstreamErr.StatusCode)
|
||||||
|
require.Equal(t, "moderation_blocked", upstreamErr.Code)
|
||||||
|
|
||||||
|
require.Equal(t, http.StatusBadRequest, rec.Code)
|
||||||
|
require.Equal(t, "image_generation_user_error", gjson.Get(rec.Body.String(), "error.type").String())
|
||||||
|
require.Equal(t, "moderation_blocked", gjson.Get(rec.Body.String(), "error.code").String())
|
||||||
|
require.Contains(t, gjson.Get(rec.Body.String(), "error.message").String(), "safety system")
|
||||||
|
}
|
||||||
|
|
||||||
func TestOpenAIGatewayServiceForwardImages_APIKeyGenerationUsesConfiguredV1BaseURL(t *testing.T) {
|
func TestOpenAIGatewayServiceForwardImages_APIKeyGenerationUsesConfiguredV1BaseURL(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
body := []byte(`{"model":"gpt-image-2","prompt":"draw a cat","response_format":"b64_json"}`)
|
body := []byte(`{"model":"gpt-image-2","prompt":"draw a cat","response_format":"b64_json"}`)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user