login
API并发控制与显存检查方案
2025/06/25·StepFun can make mistakes, check important info.
c0f490aca2bc4e6b9aba46df5b1ee43a~tplv-9xxiciwj9y-image.image
同时接受2个请求显存不足会出问题, 现在希望 在 serving 入口加一个 pre_check 功能?如果负载高(或者在处理不止一个请求),就拒绝 serving 应该怎么做
Reason
来源
Python中的多线程并发提高程序性能的方法
简介: Python中的多线程并发是指允许多个线程同时执行,从而提高程序的性能。在 Python 中,可以使用 threading 模块创建和管理线程。示例代码: import threading import time def task(i):"""任务函数"""print(f"线程 { i } 开始执行")time.sleep(1)print(f"线程 { i } 执行结束")def main():"""主函数"""#创建 5 个线程 threads=[threading.Thread(target=t...
https://s2.zimgs.cn/ims?kt=url&at=smstruct&key=aHR0cHM6Ly9ndy5hbGljZG4uY29tL0wxLzcyMy8xNTg0MzQwMDIzL2Y0L2FkLzFmL2Y0YWQxZjc0Y2JhMjY4YWI4NWQ3NDRjNWZmYjk3MTg1Lmljbw==&sign=yx:pfbRprXvO3KHZoO4187qtN4aNjQ=&tv=400_400
哔哩哔哩
深入解析Python并发编程:异步与多线程并发技巧详解
本篇文章将深入探讨Python并发编程的基础概念与实用技巧,重点介绍如何通过异步编程和多线程实现高效并发,并附上丰富的代码示例,确保内容丰富且易于实践。await asyncio.gather(*(clie…
https://s2.zimgs.cn/ims?kt=url&at=smstruct&key=aHR0cHM6Ly9ndy5hbGljZG4uY29tL0wxLzcyMy8xNTg0MzI5MTIyL2ZhLzdiL2QwL2ZhN2JkMGM2MTlhODM2NmIwNzNkNWM3ZWQ0ZWFhNWUwLmljbw==&sign=yx:C1HunXUgWw_Hnydsxo7hNSZ08AU=&tv=400_400
搜狐网
深入理解Python中的多线程:并发编程实例详解
Python作为一种高效、易学的编程语言,其多线程功能为开发者提供了一种优雅的解决方案来实现并发操作。本文将深入探讨Python中的多线程编程,并通过多个完整代码实例展示如何在实际应用中使用多线程来提高程序的执行效率。多线程基础 创建和启动线程 python 复制代码 usbwt.xsjdyp.com/ import thread...
https://s2.zimgs.cn/ims?kt=url&at=smstruct&key=aHR0cHM6Ly9ndy5hbGljZG4uY29tL0wxLzcyMy8xNTg0MzI5MTIyL2ZhLzdiL2QwL2ZhN2JkMGM2MTlhODM2NmIwNzNkNWM3ZWQ0ZWFhNWUwLmljbw==&sign=yx:C1HunXUgWw_Hnydsxo7hNSZ08AU=&tv=400_400
搜狐网
6 web pages
哔哩哔哩
搜狐网
搜狐网
CSDN技术社区
cnblogs.com
CSDN技术社区
思考过程

嗯,用户的问题是关于如何在serving入口加一个pre_check功能,用来在负载高的时候拒绝新的请求。现在他们的代码里有一个视频处理的API端点,可能在处理多个请求时显存不足,所以需要防止同时处理太多请求。

首先,我需要理解用户当前的代码结构。他们展示的代码中有@app.post装饰器定义的api函数,这个函数直接调用self.process_video(request),然后返回结果。看起来这个process_video可能是阻塞的,因为注释里提到用线程池来运行阻塞函数,并且用await等待,但不阻塞主事件循环。不过代码里并没有看到异步处理的部分,比如线程池的创建或者async/await的正确使用,可能需要进一步确认。

用户提到同时接受两个请求会导致显存不足,所以需要控制并发量。可能的解决方案包括使用请求队列、限流、或者实时监控GPU显存。用户希望的是在serving入口加pre_check,也就是在处理请求之前检查当前系统状态,如果负载高(比如正在处理不止一个请求)就拒绝服务。

首先,可能需要一个计数器来跟踪当前正在处理的请求数量。每当有新请求进来时,先检查这个计数器,如果超过阈值(比如1个),就拒绝请求。但这个计数器需要是线程安全的,因为FastAPI可能在多线程环境下运行。

另外,用户可能需要考虑如何准确检测负载高的情况,不仅仅是请求的数量,还包括实际的显存使用情况。比如,即使只有一个请求,如果它的显存使用量特别大,也可能导致问题。所以可能需要同时检查显存的使用情况。

但用户的问题描述里可能更倾向于并发请求的数量控制,所以先从简单的并发控制开始考虑。比如,在进入处理流程之前,检查当前是否有正在处理的请求,如果有的话,返回拒绝。

接下来,如何实现这个pre_check呢?可以在路由处理函数中添加一个检查步骤。例如,在调用self.process_video之前,检查当前是否已经有任务在处理。如果是,则返回HTTP状态码如429(Too Many Requests)或者503(Service Unavailable)。

但这里需要考虑的是,如何跟踪正在处理的请求数量。可以使用一个原子计数器,每次请求进入时增加,处理完后减少。例如,在类中维护一个self.current_tasks变量,类型为int。在api函数中,先加锁,检查self.current_tasks是否超过阈值(比如0),如果超过则拒绝。否则,递增计数器,处理完成后递减。

不过这样可能存在问题,比如异步处理的情况下,如果process_video是异步的,可能需要确保计数器在异步函数完成后才递减。或者,如果使用线程池来处理阻塞函数,可能需要确保线程执行完毕后才会减少计数器。

另外,在FastAPI中,可以使用依赖注入系统来实现pre-check。例如,定义一个依赖函数,在进入路由处理函数之前执行检查,如果通过则继续,否则抛出异常。

具体步骤可能如下:

  1. 在类中添加一个计数器变量,比如self.current_requests,初始为0。
  2. 在进入api处理函数时,检查当前请求是否超过阈值(比如1)。如果超过,返回拒绝响应。
  3. 否则,增加计数器,执行处理逻辑。
  4. 处理完成后(包括异常情况),减少计数器。

需要注意的是,计数器的修改需要线程安全,所以可能需要使用锁(如threading.Lock)来防止竞态条件。

另外,如果process_video是异步的,或者在另一个线程中处理,需要确保在该任务完成之后才减少计数器。例如,在异步函数中使用finally块来确保递减。

另外,是否可以考虑使用信号量来控制并发量?比如在FastAPI中使用asyncio.Semaphore来限制同时处理的请求数量。但Semaphore是针对异步的,如果用户当前的代码是同步的或者使用多线程的话,可能需要调整。

假设用户的process_video是同步阻塞函数,并且他们使用线程池来执行它(比如用concurrent.futures.ThreadPoolExecutor),那么在FastAPI中可以这样处理:

  • 使用一个锁保护计数器。
  • 在收到请求时,检查当前counter,如果超过允许的数量(比如0),返回拒绝。
  • 否则,增加counter,并放入线程池处理。
  • 线程处理完毕后(无论成功或失败),减少counter。

但这样可能无法实时反映显存的情况,因为即使有一个请求在处理,它的显存占用可能很高,导致第二个请求即使被接受也会失败。这时候可能需要更精确的显存监控。

另一个思路是,在pre_check中检查当前显存的使用情况。例如,在每次请求进来时,调用一个函数检查GPU显存是否足够。如果足够则允许处理,否则拒绝。

检查显存的方法可以通过调用torch.cuda functions,比如torch.cuda.memory_allocated() 或者 torch.cuda.is_available() 来获取当前显存使用情况,并判断是否超过阈值。

但显存的使用情况可能比较复杂,因为不同的模型或处理步骤可能会有不同的内存需求。可能需要预先估算每个请求的大致显存消耗,并设置一个阈值。

综合来看,用户的场景可能更适合同时控制并发请求数量和显存使用情况。例如,当有正在处理的请求时(current_requests > 0),或者显存使用超过某个阈值时,拒绝新请求。

具体实现步骤:

  1. 在类中初始化计数器和锁:
    self.current_requests = 0
    self.lock = threading.Lock()

  2. 在api处理函数中添加pre-check逻辑:
    with self.lock:
    if self.current_requests >= 1: # 阈值设为1,只允许一个并发
    raise HTTPException(status_code=503, detail="Service busy")
    self.current_requests += 1

    try:
    # 处理逻辑
    result = await self.process_video(request)
    return MaskResponse(**result)
    finally:
    with self.lock:
    self.current_requests -= 1

不过这里需要注意异步和同步的问题。如果process_video是同步阻塞函数,并且被放入线程池中执行(比如用run_in_executor),那么在FastAPI中处理异步路由时,需要确保正确使用await。

例如,在FastAPI的异步端点中处理同步函数时,可以将其放入线程池:

from fastapi import FastAPI, HTTPException
import asyncio
import threading

class SomeClass:
def init(self):
self.current_requests = 0
self.lock = threading.Lock()
self.thread_pool = concurrent.futures.ThreadPoolExecutor()

text

这样可以保证即使process_video是阻塞的,在异步框架中仍然能正确处理,并且计数器会正确递增和递减。

不过用户原来的代码中似乎没有使用异步处理,注释里提到用线程池运行阻塞函数,并用await等待。所以可能他们的process_video已经被改造成异步的了?或者可能存在错误?

例如,在用户提供的代码中:

700 result = self.process_video(request)

如果process_video是同步函数,并且没有放入线程池中执行的话,这段代码会阻塞事件循环,导致并发问题。所以正确的做法应该是将同步函数放入线程池执行,并用await等待结果。

所以可能用户的实际代码应该像下面这样:

@app.post(...)
async def api(self, request: MaskRequest) -> MaskResponse:
# 这里需要将同步的process_video放入线程池
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, self.process_video, request)
return MaskResponse(**result)

这样在FastAPI中处理同步函数时不会阻塞事件循环。

回到用户的问题,在serving入口加pre_check。结合并发控制和显存检查:

可能需要同时检查:

  • 当前请求数量是否超过阈值(如1)
  • 当前显存使用是否超过阈值(如总显存的80%)

但实现显存检查需要实时查询显存情况,并且需要考虑不同GPU的情况。例如:

import torch

def check_gpu_memory():
if torch.cuda.is_available():
total = torch.cuda.get_device_properties(0).total_memory
allocated = torch.cuda.memory_allocated()
free = total - allocated
# 假设每个请求需要至少500MB显存
if free < 500 * 1024 * 1024: # 500MB
return False
else:
return True
else:
# 没有GPU的话可能返回其他判断逻辑?
return True # 或者根据CPU资源判断

然后在pre_check中同时检查请求数量和显存是否足够:

with self.lock:
if (self.current_requests >= 1) or not check_gpu_memory():
raise HTTPException(...)

不过这样的实时检查可能会影响性能,或者因为延迟导致问题。此外,显存的预估可能并不准确,不同处理阶段的显存占用可能不同。

另一个考虑是使用资源池或者队列来管理请求。例如,当有新请求到来时,只有队列未满时才接受请求,并将请求放入队列,由后台工作线程处理。但这样可能会引入队列等待机制,而不是立即拒绝。

但用户的需求是立即拒绝负载高的时候的请求,所以更倾向于快速判断拒绝。

综合来说,最简单的方案可能是限制并发请求数量为1,即同时只处理一个请求。这样即使显存足够的情况下可以处理多个请求,但用户的情况是同时两个就会显存不足,所以限制并发到1。

那么在代码中添加并发控制即可。例如:

class VideoService:
def init(self):
self.concurrent_counter = 0
self.lock = threading.Lock()

text

这样确保同一时间最多只能有一个请求被处理。当第二个请求到来时会被拒绝。

不过这样的做法比较粗暴,可能在某些情况下即使显存足够也无法处理多个请求。所以需要根据实际情况调整阈值。比如阈值设置为1,或者根据模型的具体显存消耗来定。

另外,在FastAPI中可以使用依赖注入的方式实现pre-check。例如:

from fastapi import Depends

@app.post(...)
async def api(self, request: MaskRequest, check: bool = Depends(pre_check)):
...

def pre_check():
with some_class_instance.lock:
if some_class_instance.concurrent_counter >= 1:
raise HTTPException(...)
return True

但这样需要确保some_class_instance是单例,并且pre_check依赖能够访问到它。

不过用户提供的代码可能是类中的方法,比如在某个类中包含app和路由的定义。假设他们的框架是类似FastAPI的应用,并且使用类来组织路由处理程序。

此外,还要考虑异常处理的情况。如果process_video在处理过程中抛出异常,必须确保finally块中的计数器递减被执行,否则会导致并发计数错误。

综上所述,解决方案步骤是:

  1. 在类中维护一个并发计数器和一个锁。
  2. 在路由处理函数开始时检查计数器,如果超过阈值则返回拒绝。
  3. 处理完成后递减计数器。
  4. 使用线程池来避免阻塞事件循环。
  5. 可选地加入显存检查逻辑。

现在需要将这些思路整理成具体的代码示例,并解释可能的注意事项。

为了在高负载时拒绝新请求,我们可以在serving入口添加并发控制机制。以下是具体实现步骤:

方案一:基于并发请求数量的控制

python

方案二:结合显存检查的增强版控制

python

关键注意事项:

  1. 线程安全:使用threading.Lock保证计数器修改的原子性
  2. 异步执行:通过loop.run_in_executor将阻塞IO/计算放到线程池
  3. 异常处理:确保finally块减少计数器(即使出现异常)
  4. 阈值调整
    • max_concurrent:根据显存实际容量调整(如RTX 3090可设为2-3)
    • min_required_mem:预留必要的显存给其他任务/系统开销
  5. 扩展性优化
    • 可结合队列实现缓冲机制(需权衡请求排队延迟)
    • 增加监控指标输出(如Prometheus暴露当前负载)

性能考量:

方案优点缺点
基础版实现简单、0延迟拒绝可能错过显存足够时的并行机会
增强版综合考虑资源利用率显存检测引入额外计算开销
队列缓冲平滑处理突发流量可能增加请求延迟

建议优先采用基础方案快速解决问题,并根据实际压力测试结果逐步优化阈值参数。