from__future__importannotationsimporttimefromcollections.abcimportIterablefromdjango.dbimportconnectionsfromdjango.db.backends.utilsimportCursorWrapperfromdjango.db.utilsimportDEFAULT_DB_ALIASfromdjango.utils.functionalimportSimpleLazyObjectfromdjango_mysql.exceptionsimportTimeoutErrorclassBaseStatus:""" Base class for the status classes """query=""def__init__(self,using:str|None=None)->None:ifusingisNone:self.db=DEFAULT_DB_ALIASelse:self.db=usingdefget_cursor(self)->CursorWrapper:returnconnections[self.db].cursor()defget(self,name:str)->int|float|bool|str:if"%"inname:raiseValueError("get() is for fetching single variables, ""no % wildcards")withself.get_cursor()ascursor:num_rows=cursor.execute(self.query+" LIKE %s",(name,))ifnum_rows==0:raiseKeyError(f"No such status variable {name!r}")returnself._cast(cursor.fetchone()[1])defget_many(self,names:Iterable[str])->dict[str,int|float|bool|str]:ifnotnames:return{}ifany(("%"inname)fornameinnames):raiseValueError("get_many() is for fetching named ""variables, no % wildcards")withself.get_cursor()ascursor:query=" ".join([self.query,"WHERE Variable_name IN (",", ".join("%s"forninnames),")",])cursor.execute(query,names)return{name:self._cast(value)forname,valueincursor.fetchall()}defas_dict(self,prefix:str|None=None)->dict[str,int|float|bool|str]:withself.get_cursor()ascursor:ifprefixisNone:cursor.execute(self.query)else:cursor.execute(self.query+" LIKE %s",(prefix+"%",))rows=cursor.fetchall()return{name:self._cast(value)forname,valueinrows}def_cast(self,value:str)->int|float|bool|str:# Many status variables are integers or floats but SHOW GLOBAL STATUS# returns them as stringstry:returnint(value)exceptValueError:try:returnfloat(value)exceptValueError:passifvalue=="ON":returnTrueelifvalue=="OFF":returnFalsereturnvalue
[docs]classGlobalStatus(BaseStatus):query="SHOW GLOBAL STATUS"
[docs]defwait_until_load_low(self,thresholds:dict[str,int|float]|None=None,timeout:float=60.0,sleep:float=0.1,)->None:ifthresholdsisNone:thresholds={"Threads_running":10}start=time.time()names=thresholds.keys()whileTrue:current=self.get_many(names)higher=[]forname,valueincurrent.items():assertisinstance(value,(int,float))ifvalue>thresholds[name]:higher.append(name)ifnothigher:returniftimeoutandtime.time()>start+timeout:raiseTimeoutError("Span too long waiting for load to drop: "+",".join(f"{name} > {thresholds[name]}"fornameinhigher))time.sleep(sleep)