Python Integration Guide โ
Complete Python ecosystem for Vedicquant cosmic trading signals with production-ready implementations, advanced features, and comprehensive examples.
๐ Overview โ
Python provides the most comprehensive Vedicquant integration with battle-tested libraries, robust error handling, and complete trading bot implementations. Our Python ecosystem includes:
- โ Production SSE Client - Robust connection with auto-reconnection
- โ Complete Trading Bots - Multi-exchange integration via CCXT
- โ Telegram Integration - Signal notifications and bot commands
- โ Advanced Analytics - Signal performance tracking and analysis
- โ Risk Management - Position sizing and portfolio protection
- โ Comprehensive Testing - Unit tests and integration examples
๐ฆ Installation โ
Quick Setup โ
# Install core dependencies
pip install requests python-dotenv ccxt pandas
# For Telegram integration
pip install python-telegram-bot
# For advanced analytics
pip install numpy matplotlib seaborn
# Full requirements (recommended)
pip install -r requirements.txt
Requirements File โ
Create requirements.txt
:
# Core SSE and HTTP
requests>=2.31.0
python-dotenv>=1.0.0
# Trading and exchanges
ccxt>=4.1.0
pandas>=2.0.0
numpy>=1.24.0
# Telegram bot (optional)
python-telegram-bot>=20.0
# Analytics (optional)
matplotlib>=3.7.0
seaborn>=0.12.0
plotly>=5.17.0
# Development (optional)
pytest>=7.4.0
black>=23.0.0
flake8>=6.0.0
๐ฏ Quick Start Examples โ
1. Basic Signal Monitoring โ
#!/usr/bin/env python3
from vedicquant_client import VedicquantSSEClient
# Simple signal printing
client = VedicquantSSEClient(api_key='free-week-demo123')
client.start()
2. Custom Signal Handler โ
#!/usr/bin/env python3
from vedicquant_client import VedicquantSSEClient
class MySignalHandler(VedicquantSSEClient):
def default_signal_handler(self, signal_data):
if signal_data.get('event_type') == 'new_signal':
symbol = signal_data['symbol']
signal_type = signal_data['signal_type']
strength = signal_data['signal_strength']
phase = signal_data['phase_name']
print(f"๐ COSMIC ALERT: {symbol} {signal_type}")
print(f" โก Strength: {strength}")
print(f" ๐ Phase: {phase}")
print(f" ๐ฐ Entry: ${signal_data['price_at_signal']:,.2f}")
# Your custom logic here
if strength == 'strong' and phase in ['Amrita', 'Shubha']:
self.execute_high_confidence_trade(signal_data)
handler = MySignalHandler(api_key='free-week-demo123')
handler.start()
3. Signal Filtering โ
#!/usr/bin/env python3
from vedicquant_client import VedicquantSSEClient
class FilteredSignalClient(VedicquantSSEClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Signal filters
self.allowed_symbols = ['BTC', 'ETH', 'SOL', 'ADA']
self.min_strength = 'medium' # weak, medium, strong
self.favorable_phases = ['Amrita', 'Shubha', 'Labha']
def default_signal_handler(self, signal_data):
if signal_data.get('event_type') != 'new_signal':
return
# Apply filters
if not self.passes_filters(signal_data):
return
# Signal passed all filters - take action
print(f"โ
Approved Signal: {signal_data['symbol']} {signal_data['signal_type']}")
self.process_approved_signal(signal_data)
def passes_filters(self, signal):
symbol = signal.get('symbol')
strength = signal.get('signal_strength', 'weak')
phase = signal.get('phase_name', '')
# Check symbol allowlist
if symbol not in self.allowed_symbols:
self.logger.debug(f"โ ๏ธ Symbol {symbol} not in allowed list")
return False
# Check minimum strength
strength_levels = {'weak': 1, 'medium': 2, 'strong': 3}
min_level = strength_levels.get(self.min_strength, 2)
signal_level = strength_levels.get(strength, 1)
if signal_level < min_level:
self.logger.debug(f"โ ๏ธ Signal strength {strength} below minimum")
return False
# Check Vedic phase
phase_favorable = any(fav in phase for fav in self.favorable_phases)
if not phase_favorable:
self.logger.debug(f"โ ๏ธ Unfavorable phase: {phase}")
return False
return True
def process_approved_signal(self, signal):
# Your approved signal logic here
print(f"๐ฅ Processing {signal['symbol']} {signal['signal_type']}")
client = FilteredSignalClient(api_key='free-week-demo123')
client.start()
๐ Complete Implementations โ
1. Basic SSE Client โ
Production-ready SSE client with robust error handling, automatic reconnection, and comprehensive logging.
Key Features:
- โ Exponential backoff reconnection
- โ Signal validation and statistics
- โ Graceful shutdown handling
- โ Configurable logging levels
- โ Environment variable support
from vedicquant_client import VedicquantSSEClient
client = VedicquantSSEClient(
api_key='free-week-demo123',
log_level='INFO'
)
client.start()
2. Trading Bot Integration โ
Complete automated trading implementation with risk management, position tracking, and multi-exchange support.
Key Features:
- โ CCXT multi-exchange support
- โ Risk-based position sizing
- โ Portfolio management
- โ Real-time P&L tracking
- โ Vedic phase filtering
from trading_bot import VedicquantTradingBot
bot = VedicquantTradingBot(
exchanges=['binance', 'bybit'],
risk_per_trade=0.02,
max_positions=5
)
bot.start()
3. Telegram Bot โ
Advanced Telegram integration with signal notifications, portfolio tracking, and interactive commands.
Key Features:
- โ Real-time signal alerts
- โ Portfolio status commands
- โ Signal history and analytics
- โ User subscription management
- โ Custom notification settings
from telegram_bot import VedicquantTelegramBot
bot = VedicquantTelegramBot(
telegram_token='your_bot_token',
vedicquant_api_key='free-week-demo123'
)
bot.start()
๐ฏ Implementation Patterns โ
Environment Configuration โ
Create .env
file for secure configuration:
# Vedicquant Configuration
VEDICQUANT_API_KEY=free-week-demo123
SSE_URL=https://sse.vedicquant.com
# Trading Configuration (optional)
EXCHANGE_API_KEY=your_exchange_api_key
EXCHANGE_SECRET=your_exchange_secret
RISK_PER_TRADE=0.02
MAX_POSITIONS=5
# Telegram Configuration (optional)
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
TELEGRAM_CHAT_ID=your_chat_id
# Logging
LOG_LEVEL=INFO
LOG_FILE=vedicquant.log
Error Handling Pattern โ
import logging
from datetime import datetime
from typing import Dict, Optional
class RobustSignalHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_count = 0
self.last_error = None
def handle_signal(self, signal_data: Dict):
try:
# Validate signal data
if not self.validate_signal(signal_data):
return
# Process signal
self.process_signal(signal_data)
# Reset error counter on success
self.error_count = 0
except Exception as e:
self.error_count += 1
self.last_error = datetime.now()
self.logger.error(f"โ Signal processing error: {e}")
# Implement circuit breaker pattern
if self.error_count > 5:
self.logger.critical("๐จ Too many errors - pausing processing")
time.sleep(60) # Cool down period
def validate_signal(self, signal: Dict) -> bool:
required_fields = ['symbol', 'signal_type', 'price_at_signal']
for field in required_fields:
if field not in signal:
self.logger.warning(f"โ ๏ธ Missing required field: {field}")
return False
return True
def process_signal(self, signal: Dict):
# Your signal processing logic
pass
Async Implementation Pattern โ
import asyncio
import aiohttp
import json
from typing import AsyncGenerator
class AsyncVedicquantClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = None
async def connect(self) -> AsyncGenerator[Dict, None]:
"""Async generator for SSE signals"""
headers = {
'Accept': 'text/event-stream',
'X-API-Key': self.api_key,
'Cache-Control': 'no-cache'
}
async with aiohttp.ClientSession() as session:
self.session = session
async with session.get(
'https://sse.vedicquant.com/api/signals/stream',
headers=headers
) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
try:
data = json.loads(line[6:])
yield data
except json.JSONDecodeError:
continue
# Usage
async def process_signals():
client = AsyncVedicquantClient('free-week-demo123')
async for signal in client.connect():
if signal.get('event_type') == 'new_signal':
print(f"๐ Async Signal: {signal['symbol']} {signal['signal_type']}")
# Process signal asynchronously
await handle_signal_async(signal)
async def handle_signal_async(signal: Dict):
# Your async signal processing
await asyncio.sleep(0.1) # Simulate async work
print(f"โ
Processed {signal['symbol']}")
# Run
asyncio.run(process_signals())
๐งช Testing and Development โ
Unit Testing โ
# test_vedicquant.py
import unittest
from unittest.mock import Mock, patch
from vedicquant_client import VedicquantSSEClient
class TestVedicquantClient(unittest.TestCase):
def setUp(self):
self.client = VedicquantSSEClient(api_key='test-key')
def test_signal_validation(self):
# Test valid signal
valid_signal = {
'symbol': 'BTC',
'signal_type': 'BUY',
'price_at_signal': 95000
}
self.assertTrue(self.client.validate_signal(valid_signal))
# Test invalid signal
invalid_signal = {'symbol': 'BTC'}
self.assertFalse(self.client.validate_signal(invalid_signal))
@patch('requests.get')
def test_connection(self, mock_get):
# Mock SSE response
mock_response = Mock()
mock_response.status_code = 200
mock_response.iter_lines.return_value = [
b'data: {"event_type": "heartbeat"}',
b'data: {"event_type": "new_signal", "symbol": "BTC", "signal_type": "BUY"}'
]
mock_get.return_value = mock_response
# Test connection
self.client.connect_sse_stream()
# Verify API call
mock_get.assert_called_once()
if __name__ == '__main__':
unittest.main()
Integration Testing โ
# test_integration.py
import pytest
from vedicquant_client import VedicquantSSEClient
import time
@pytest.fixture
def client():
return VedicquantSSEClient(api_key='free-week-demo123')
def test_real_connection(client):
"""Test actual connection to Vedicquant SSE"""
signals_received = []
def test_handler(signal_data):
signals_received.append(signal_data)
if len(signals_received) >= 3: # Stop after 3 signals
client.stop()
client.on_signal = test_handler
# Run for maximum 30 seconds
start_time = time.time()
client.start(max_retries=1)
# Verify we received signals
assert len(signals_received) > 0
assert any(s.get('event_type') in ['heartbeat', 'new_signal'] for s in signals_received)
def test_signal_format(client):
"""Test signal data format validation"""
# This would connect and validate signal structure
pass
๐ Performance Monitoring โ
Signal Analytics โ
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
class SignalAnalytics:
def __init__(self):
self.signals = []
self.performance_data = defaultdict(list)
def track_signal(self, signal: Dict):
"""Track received signal for analysis"""
signal['received_at'] = datetime.now()
self.signals.append(signal)
# Update performance metrics
self.update_metrics(signal)
def update_metrics(self, signal: Dict):
"""Update real-time performance metrics"""
symbol = signal.get('symbol')
strength = signal.get('signal_strength')
phase = signal.get('phase_name')
self.performance_data['signals_per_hour'].append({
'timestamp': datetime.now(),
'symbol': symbol
})
self.performance_data['strength_distribution'][strength] += 1
self.performance_data['phase_distribution'][phase] += 1
def get_hourly_stats(self) -> Dict:
"""Get signals per hour statistics"""
now = datetime.now()
hour_ago = now - timedelta(hours=1)
recent_signals = [
s for s in self.signals
if s.get('received_at', now) > hour_ago
]
return {
'signals_last_hour': len(recent_signals),
'symbols_active': len(set(s['symbol'] for s in recent_signals)),
'avg_signals_per_hour': len(self.signals) / max(1, len(self.signals) / 60)
}
def generate_report(self) -> str:
"""Generate comprehensive performance report"""
stats = self.get_hourly_stats()
report = f"""
๐ Vedicquant Signal Analytics Report
=====================================
โฐ Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
๐ก Signal Statistics:
โข Total Signals: {len(self.signals)}
โข Signals Last Hour: {stats['signals_last_hour']}
โข Active Symbols: {stats['symbols_active']}
๐ Phase Distribution:
{self._format_distribution(self.performance_data['phase_distribution'])}
โก Strength Distribution:
{self._format_distribution(self.performance_data['strength_distribution'])}
"""
return report
def _format_distribution(self, dist_dict: Dict) -> str:
"""Format distribution data for reporting"""
lines = []
for key, count in sorted(dist_dict.items()):
percentage = (count / len(self.signals)) * 100 if self.signals else 0
lines.append(f"โข {key}: {count} ({percentage:.1f}%)")
return '\n'.join(lines)
# Usage in signal handler
analytics = SignalAnalytics()
class AnalyticsClient(VedicquantSSEClient):
def default_signal_handler(self, signal_data):
# Track signal for analytics
analytics.track_signal(signal_data)
# Generate report every 100 signals
if len(analytics.signals) % 100 == 0:
print(analytics.generate_report())
# Continue with normal processing
super().default_signal_handler(signal_data)
๐ฏ Next Steps โ
1. Start with Basic Client โ
Begin with the Basic SSE Client to understand signal structure and connection handling.
2. Build Trading Logic โ
Implement your trading strategy using the Trading Bot as a foundation.
3. Add Notifications โ
Set up Telegram alerts to stay informed of important signals.
4. Implement Analytics โ
Track performance and optimize your strategy with signal analytics.
5. Scale Your System โ
Deploy to production with proper monitoring, logging, and error handling.
๐ก๏ธ Best Practices โ
Security โ
- โ Use environment variables for API keys
- โ Implement proper error handling
- โ Never log sensitive credentials
- โ Use HTTPS for all connections
Reliability โ
- โ Implement exponential backoff for reconnections
- โ Add circuit breaker patterns for error handling
- โ Monitor connection health with heartbeats
- โ Log all significant events
Performance โ
- โ Use connection pooling for HTTP requests
- โ Implement async processing for high throughput
- โ Cache signal data appropriately
- โ Monitor memory usage in long-running processes
Testing โ
- โ Write unit tests for signal validation
- โ Test reconnection scenarios
- โ Validate signal processing logic
- โ Use testnet for trading bot development
Ready to harness cosmic trading intelligence with Python! ๐โจ