Skip to content

candle_manager

candle_manager

Classes:

Name Description
CandleManager

CandleManager

CandleManager(
    candles: Optional[List[Candle]] = None,
    candle_life: Optional[timedelta] = None,
    timeframe: Optional[timedelta] = None,
    timeframe_fill: bool = False,
    candlestick: Optional[CandlestickType] = None,
)

Methods:

Name Description
collapse_candles

Collapses the given list of candles into specific timeframe candles.

purge

Remove this indicator value from all Candles

sort_candles

Sorts Candles in order of timestamp, accounts for collapsing

Source code in hexital/core/candle_manager.py
def __init__(
    self,
    candles: Optional[List[Candle]] = None,
    candle_life: Optional[timedelta] = None,
    timeframe: Optional[timedelta] = None,
    timeframe_fill: bool = False,
    candlestick: Optional[CandlestickType] = None,
):
    if candles:
        self.candles = candles
    else:
        self.candles = []

    self.candle_life = candle_life
    self.timeframe = timeframe
    self.timeframe_fill = timeframe_fill
    self.candlestick = candlestick

    self._tasks(True)

collapse_candles

collapse_candles()

Collapses the given list of candles into specific timeframe candles. This can re-ran with same list to collapse latest candles. This method is destructive, returning a new list for the collapsed candles

Source code in hexital/core/candle_manager.py
def collapse_candles(self):
    """Collapses the given list of candles into specific timeframe candles.
    This can re-ran with same list to collapse latest candles.
    This method is destructive, returning a new list for the collapsed candles"""
    if not self.candles or not self.timeframe:
        return

    candles_ = [self.candles.pop(0)]
    init_candle = candles_[0]
    init_candle.timeframe = self.timeframe

    start_time = round_down_timestamp(init_candle.timestamp, self.timeframe)
    end_time = start_time + self.timeframe

    if not on_timeframe(init_candle.timestamp, self.timeframe):
        init_candle.set_collapsed_timestamp(end_time)

    while self.candles:
        candle = self.candles.pop(0)
        prev_candle = candles_[-1]

        next_end_time = end_time + self.timeframe
        candle.timestamp = clean_timestamp(candle.timestamp)
        candle.timeframe = self.timeframe

        if start_time < candle.timestamp <= end_time and prev_candle.timestamp == end_time:
            prev_candle.merge(candle)
        elif (
            start_time - self.timeframe < candle.timestamp <= start_time
            and prev_candle.timestamp == start_time
        ):
            prev_candle.merge(candle)
        elif start_time < candle.timestamp <= end_time:
            candle.set_collapsed_timestamp(end_time)
            candles_.append(candle)
        elif end_time < candle.timestamp <= next_end_time:
            candle.set_collapsed_timestamp(next_end_time)
            candles_.append(candle)
            start_time = end_time
            end_time = next_end_time
        elif start_time < candle.timestamp and on_timeframe(candle.timestamp, self.timeframe):
            start_time = round_down_timestamp(candle.timestamp, self.timeframe)
            end_time = start_time + self.timeframe
            candle.set_collapsed_timestamp(start_time)
            candles_.append(candle)
        elif next_end_time < candle.timestamp:
            start_time = round_down_timestamp(candle.timestamp, self.timeframe)
            end_time = start_time + self.timeframe
            candle.set_collapsed_timestamp(end_time)
            candles_.append(candle)
        else:
            # Shit's fucked yo
            raise InvalidCandleOrder(
                f"Failed to collapse_candles due to invalid candle order prev: [{prev_candle}] - current: [{candle}]",
            )

    if self.timeframe_fill:
        candles_ = self._fill_timeframe_candles(candles_, self.timeframe)

    self.candles.extend(candles_)

purge

purge(indicator: str | Set[str])

Remove this indicator value from all Candles

Source code in hexital/core/candle_manager.py
def purge(self, indicator: str | Set[str]):
    """Remove this indicator value from all Candles"""
    if isinstance(indicator, str):
        indicator = indicator

    for candle in self.candles:
        for name in indicator:
            candle.indicators.pop(name, None)
            candle.sub_indicators.pop(name, None)

sort_candles

sort_candles(candles: Optional[List[Candle]] = None)

Sorts Candles in order of timestamp, accounts for collapsing

Source code in hexital/core/candle_manager.py
def sort_candles(self, candles: Optional[List[Candle]] = None):
    """Sorts Candles in order of timestamp, accounts for collapsing"""
    if candles:
        candles.sort(key=cmp_to_key(self._sort_comparison))
    else:
        self.candles.sort(key=cmp_to_key(self._sort_comparison))