Source code for astrosaber.utils.grogu

# THIS CODE IS TAKEN FROM THE BUNNY ADAPTATION OF TQDM
# check out https://github.com/bheinzerling/bunny

import sys
import multiprocessing as mp
import threading as th
from tqdm import tqdm
from tqdm import TqdmDeprecationWarning
from tqdm.utils import _term_move_up

up = _term_move_up()


try:
    mp_lock = mp.RLock()  # multiprocessing lock
except ImportError:  # pragma: no cover
    mp_lock = None
except OSError:  # pragma: no cover
    mp_lock = None
try:
    th_lock = th.RLock()  # thread lock
except OSError:  # pragma: no cover
    th_lock = None


[docs]class TqdmDefaultWriteLock(object): def __init__(self): global mp_lock, th_lock self.locks = [lk for lk in [mp_lock, th_lock] if lk is not None]
[docs] def acquire(self): for lock in self.locks: lock.acquire()
[docs] def release(self): for lock in self.locks[::-1]: lock.release()
def __enter__(self): self.acquire() def __exit__(self, *exc): self.release()
[docs]class yoda(tqdm): monitor_interval = 10 # set to 0 to disable the thread monitor = None _lock = TqdmDefaultWriteLock() def __init__(self, iterable, **kwargs): super().__init__(iterable, **kwargs) def __iter__(self): """Backward-compatibility to use: for x in tqdm(iterable)""" iterable = self.iterable # If the bar is disabled, then just walk the iterable # (note: keep this check outside the loop for performance) if self.disable: for obj in iterable: yield obj else: mininterval = self.mininterval maxinterval = self.maxinterval miniters = self.miniters dynamic_miniters = self.dynamic_miniters last_print_t = self.last_print_t last_print_n = self.last_print_n n = self.n smoothing = self.smoothing #avg_time = self.avg_time _time = self._time try: sp = self.sp except AttributeError: raise TqdmDeprecationWarning("""\ Please use `tqdm_gui(...)` instead of `tqdm(..., gui=True)` """, fp_write=getattr(self.fp, 'write', sys.stderr.write)) tqdm.write("\r" + " " * self.ncols + "\n" * 9) # make space for baby yoda for obj in iterable: yield obj # Update and possibly print the progressbar. # Note: does not call self.update(1) for speed optimisation. n += 1 # check counter first to avoid calls to time() if n - last_print_n >= self.miniters: miniters = self.miniters # watch monitoring thread changes delta_t = _time() - last_print_t if delta_t >= mininterval: cur_t = _time() delta_it = n - last_print_n self.n = n with self._lock: if self.pos: self.moveto(abs(self.pos)) # Print bar update sp(self.__repr__()) if self.pos: self.moveto(-abs(self.pos)) # If no `miniters` was specified, adjust automatically # to the max iteration rate seen so far between 2 prints if dynamic_miniters: if maxinterval and delta_t >= maxinterval: # Adjust miniters to time interval by rule of 3 if mininterval: # Set miniters to correspond to mininterval miniters = delta_it * mininterval / delta_t else: # Set miniters to correspond to maxinterval miniters = delta_it * maxinterval / delta_t elif smoothing: miniters = smoothing * delta_it * \ (mininterval / delta_t if mininterval and delta_t else 1) + \ (1 - smoothing) * miniters else: # Maximum nb of iterations between 2 prints miniters = max(miniters, delta_it) # Store old values for next call self.n = self.last_print_n = last_print_n = n self.last_print_t = last_print_t = cur_t self.miniters = miniters tqdm.write(up * 10) # move cursor up if self.total: # move baby yoda offset = " " * int(n / self.total * (self.ncols - 40)) else: offset = "" #frac = n / self.total #percentage = frac * 100 #tqdm.write(offset + ' | ̄ ̄ ̄ ̄ ̄|') #tqdm.write(offset + ' | TRAINING |') #tqdm.write(offset + ' | step |') #tqdm.write(offset + f' | {percentage:>4.0f}% |') #tqdm.write(offset + ' |_____|') tqdm.write(offset + ' ````') tqdm.write(offset + ' ` `') tqdm.write(offset + ' `` ``.````````') tqdm.write(offset + '` `` `') tqdm.write(offset + ' ` `') tqdm.write(offset + ' ````` `````') tqdm.write(offset + ' ﹏ ') tqdm.write(offset + ' \033[92m<´(\033[0m⬬ ⬬\033[92m)`> ') tqdm.write(offset + ' \033[92mʿ\033[0m/ \\\033[92mʾ\033[0m ') # Closing the progress bar. # Update some internal variables for close(). self.last_print_n = last_print_n self.n = n self.miniters = miniters self.close()