Market Data Pipeline
The Market Data Pipeline serves as the sensory layer of Signal.Engine. It is responsible for multi-market ticker discovery, historical data ingestion, and the transformation of raw OHLCV (Open, High, High, Low, Close, Volume) data into model-ready features.
The pipeline is optimized for the Nifty 500 (NSE) and S&P 500 (US) universes, utilizing a hybrid approach of web scraping, local registry management, and real-time API streaming.
📋 Ticker Discovery
The system uses src/ticker_utils.py to define the trading universe. These utilities handle the logic for identifying liquid assets across different exchanges.
| Function | Market | Source | Description |
| :--- | :--- | :--- | :--- |
| get_sp500_tickers() | US | Wikipedia (Live) | Scrapes the current S&P 500 list with automated Yahoo Finance symbol mapping (e.g., BF.B → BF-B). |
| get_nifty500_tickers() | India | Local CSV | Loads the broad Nifty 500 index from src/nifty500.csv. |
| get_nifty_total_market()| India | Hardcoded | Returns a curated list of the most liquid Nifty 100 stocks (Nifty 50 + Next 50). |
Usage Example:
from src.ticker_utils import get_nifty_total_market
# Fetch the active liquid universe for the brain to process
tickers = get_nifty_total_market()
print(f"Scanning {len(tickers)} symbols...")
🧠 Data Loading & Feature Engineering
The MVPDataLoader (found in src/data_loader.py) is the primary interface for preparing data for both the LSTM (Sequence Modeling) and the PPO (Reinforcement Learning) agents.
Key Methods
1. fetch_data(ticker, period, interval)
Retrieves historical price data. In the current v3.0 architecture, this typically fetches daily or hourly intervals for sequence processing.
2. feature_engineering(df)
Transforms raw data into a stationary feature set. The pipeline currently generates:
- Log Returns: For price stationarity.
- RSI (Relative Strength Index): To measure momentum and overbought/oversold conditions.
- MACD (Moving Average Convergence Divergence): To capture trend shifts.
- Normalized Volume: To assess liquidity-backed moves.
3. create_sequences(window_size=50)
Converts the tabular data into a 3D tensor shape (samples, window_size, features). The Brain requires the last 50 candles to understand the market context.
Implementation Example:
from src.data_loader import MVPDataLoader
# Initialize loader for a specific asset
loader = MVPDataLoader(ticker="RELIANCE.NS")
# 1. Fetch and process features
df = loader.fetch_data()
df_processed = loader.feature_engineering(df)
# 2. Generate training/inference splits
# This automatically fits scalers to prevent data leakage
train_data, val_data = loader.get_data_splits()
⚡ Live Execution Pipeline
During live trading or paper trading via src.trader_alpaca, the pipeline shifts from batch processing to real-time polling.
- Polling: The agent fetches the latest OHLCV bar from the Alpaca Market Data API.
- Context Assembly: The system appends the new bar to the existing
window_size(50) in memory. - Scaling: The new sequence is normalized using the
StandardScalerobjects saved during the training phase (Supervised Fine-Tuning). - Inference: The processed tensor is fed into the
LSTMPredictorto generate a signal (Buy/Sell/Hold).
⚙️ Configuration
Data behavior is controlled via environment variables and class parameters:
window_size: Default50. Adjusting this requires retraining the Brain.APCA_API_BASE_URL: Determines if data is pulled from Alpaca's live or paper endpoints.- Scaling Persistence: Scalers are typically pickled during training to ensure that live inference features are normalized identically to the training "Golden Dataset."
Note: The Market Data Pipeline includes internal rate-limiting and anti-bot headers (User-Agent spoofing) to ensure uninterrupted access to public data sources like Wikipedia and Yahoo Finance.