Source code for lightningchart_trader.instance

from __future__ import annotations
import os
import secrets
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import socket
import queue
import uuid
import msgpack
import requests
import webbrowser
from IPython import get_ipython
from IPython.display import IFrame, display
from flask import Flask, request, render_template, send_from_directory, Response
from flask_socketio import SocketIO, join_room

LOCALHOST = 'localhost'
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
    base_dir = os.path.join(sys._MEIPASS)


[docs] def get_free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.bind((LOCALHOST, 0)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port = sock.getsockname()[1] finally: sock.close() return port
[docs] class Instance: def __init__(self, license_key: str): self.id = str(uuid.uuid4()).split('-')[0] self.license_key = license_key self.items = list() self.storage = dict() self.pending_get_results = dict() self.connected_clients = dict() self.preserve_data = True self.server_is_open = False self.server_port = None self.event_handlers = {} self.seq_num = 0 self._server_thread = None self._callback_executor = None # Initialize Flask and SocketIO self.session = None self._ensure_runtime_resources() self.app = Flask( __name__, static_folder=os.path.join(base_dir, 'static'), template_folder=os.path.join(base_dir, 'static'), ) self.app.config['SECRET_KEY'] = secrets.token_hex(32) self.socketio = SocketIO(self.app, async_mode='gevent', ping_timeout=60) # HTTP routes self.app.route('/', methods=['GET'])(self._http_index) self.app.route('/send', methods=['POST'])(self._http_send) self.app.route('/get', methods=['POST'])(self._http_get) self.app.route('/storage', methods=['GET'])(self._http_storage) self.app.route('/static/<path:path>', methods=['GET'])(self._http_static) self.app.route('/event_callback', methods=['POST'])(self._http_event_callback) # SocketIO events self.socketio.on_event('connect', self._sio_connect) self.socketio.on_event('disconnect', self._sio_disconnect) self.socketio.on_event('join', self._sio_join) self.socketio.on_event('get_result', self._sio_get_result) # ----- Public methods -----
[docs] def send(self, id: str, command: str, arguments: dict = None): data = {'seq': self.seq_num, 'id': str(id), 'command': command, 'args': arguments or {}} self.seq_num += 1 if not self.server_is_open: self.items.append(data) return False if self._post_command(data): return True self.items.append(data) return False
def _post_command(self, data): self._ensure_runtime_resources() binary_data = msgpack.packb(data, use_bin_type=True) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/send?room={self.id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, timeout=10, ) return response.ok except requests.RequestException as e: print(e) return False def _ensure_runtime_resources(self): if self.session is None: self.session = requests.Session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) self.session.mount('http://', retry_adapter) if self._callback_executor is None: self._callback_executor = ThreadPoolExecutor(max_workers=4) def _queue_initial_items(self): if not self.items: return if self.id not in self.storage: self.storage[self.id] = [] self.storage[self.id].extend(self.items) self.items.clear() def _wait_until_server_ready(self, attempts=20, delay=0.1): self._ensure_runtime_resources() for _ in range(attempts): try: response = self.session.get(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}', timeout=2) if response.ok: return True except requests.RequestException: pass time.sleep(delay) return False def _wait_until_room_joined(self, attempts=20, delay=0.5): for _ in range(attempts): if self._room_response(): return True time.sleep(delay) return False def _request_get_result(self, data, get_id): self._ensure_runtime_resources() binary_data = msgpack.packb(data, use_bin_type=True) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/get?room={self.id}&get_id={get_id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, timeout=70, ) except requests.RequestException as e: raise RuntimeError(f'Failed to transfer get request to chart server: {e}') from e if response.ok: result = msgpack.unpackb(response.content, raw=False) if isinstance(result, dict) and '__chart_get_error__' in result: raise RuntimeError(f'Chart command failed: {result["__chart_get_error__"]}') return result if response.status_code == 400: raise RuntimeError('Chart is not open, cannot execute command. Call open() method first.') if response.status_code == 500: raise RuntimeError('Timed out while waiting for chart response.') raise RuntimeError(f'Unexpected chart server response: HTTP {response.status_code}')
[docs] def get(self, id: str, command: str = None, arguments: dict = None): get_id = str(uuid.uuid4()).split('-')[0] data = { 'get_id': get_id, 'seq': self.seq_num, 'id': str(id), 'command': command, 'args': arguments or {}, } self.seq_num += 1 if not self.server_is_open: try: if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': self.open_in_notebook() else: self.open_in_browser() except Exception as e: raise RuntimeError(f'Chart was not opened, and it failed to open automatically. Please open it manually. Error: {e}') from e if not self._wait_until_room_joined(): raise RuntimeError('Chart is not open, cannot execute command. Call open() method first.') return self._request_get_result(data, get_id)
[docs] def open( self, method: str = None, live: bool = False, width: int | str = '100%', height: int | str = 600, ): if method not in ('browser', 'notebook', 'link'): try: method = 'notebook' if get_ipython().__class__.__name__ == 'ZMQInteractiveShell' else 'browser' except NameError: method = 'browser' if (live or method == 'link') and not self.server_is_open: self._start_server() self._queue_initial_items() if method == 'notebook': return self.open_in_notebook(width=width, height=height) elif method == 'link': return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}' else: return self.open_in_browser()
[docs] def open_in_browser(self): if not self.server_is_open: self._start_server() self._queue_initial_items() try: webbrowser.open(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}') return self._wait_until_room_joined() except requests.exceptions.ConnectionError as e: print(e) return False
[docs] def open_in_notebook(self, width: int | str = '100%', height: int | str = 600): if not self.server_is_open: self._start_server() self._queue_initial_items() try: return display( IFrame( src=f'http://{LOCALHOST}:{self.server_port}/?id={self.id}', width=width, height=height, ) ) except requests.exceptions.ConnectionError as e: print(e)
[docs] def close(self): if self.server_is_open: for client in list(self.connected_clients.keys()): self.socketio.emit('shutdown', to=client) try: self.socketio.stop() except (AttributeError, RuntimeError) as e: print(f'Chart server shutdown warning: {e}') self.server_is_open = False self.connected_clients.clear() self.pending_get_results.clear() if self._callback_executor is not None: self._callback_executor.shutdown(wait=False) self._callback_executor = None if self.session is not None: self.session.close() self.session = None
def _start_server(self): try: self._ensure_runtime_resources() self.server_port = get_free_port() self._server_thread = threading.Thread( target=lambda: self.socketio.run( self.app, host=LOCALHOST, port=self.server_port, debug=False, log_output=False, use_reloader=False, ), daemon=True, ) self._server_thread.start() if not self._wait_until_server_ready(): raise RuntimeError('Server did not respond before startup timeout.') self.server_is_open = True except Exception as e: self.server_is_open = False raise RuntimeError(f'Failed to start server on port {self.server_port}. {e}') from e def _room_response(self): return self.id in self.connected_clients.values() def _wait_for_get_result(self, get_id, timeout=5): q = queue.Queue() self.pending_get_results[get_id] = q try: deadline = time.monotonic() + timeout while time.monotonic() < deadline: try: return q.get_nowait() except queue.Empty: self.socketio.sleep(0.05) return None finally: self.pending_get_results.pop(get_id, None) # ----- HTTP Routes ----- def _http_index(self): room = request.args.get('id') index_js_path = os.path.join(base_dir, 'static', 'index.js') static_version = int(os.path.getmtime(index_js_path)) if os.path.exists(index_js_path) else int(time.time()) return render_template('index.html', room=room, license_key=self.license_key, static_version=static_version) def _http_static(self, path): return send_from_directory('./static', path) def _http_send(self): room = request.args.get('room') binary_data = request.data if not room: return Response('Missing room', status=400) try: data = msgpack.unpackb(binary_data, raw=False) except (msgpack.ExtraData, msgpack.FormatError, msgpack.StackError, ValueError) as e: return Response(f'Invalid msgpack payload: {e}', status=400) if room in self.connected_clients.values(): self.socketio.emit('item', binary_data, to=room) else: self.storage.setdefault(room, []) if self.preserve_data or room not in self.connected_clients.values(): self.storage.setdefault(room, []).append(data) return '', 200 def _http_get(self): room = request.args.get('room') get_id = request.args.get('get_id') binary_data = request.data if room not in self.connected_clients.values(): return Response('', status=400) self.socketio.emit('get_request', binary_data, to=room) result = self._wait_for_get_result(get_id, timeout=60) if result is None: return Response('', status=500) return Response(msgpack.packb(result, use_bin_type=True), mimetype='application/msgpack') def _http_storage(self): room = request.args.get('room') try: data = msgpack.packb(self.storage[room], use_bin_type=True) if not self.preserve_data: del self.storage[room] return Response(data, mimetype='application/msgpack') except KeyError: return Response('Room not found', status=404) # ----- SocketIO Events ----- def _sio_connect(self): self.connected_clients[request.sid] = 'default' def _sio_disconnect(self): self.connected_clients.pop(request.sid, None) def _sio_join(self, room): join_room(room) self.connected_clients[request.sid] = room if room in self.storage: self.socketio.emit('exec', to=room) def _sio_get_result(self, binary_data): data = msgpack.unpackb(binary_data, raw=False) get_id = data['get_id'] if 'error' in data: result = {'__chart_get_error__': data['error']} else: result = data.get('result') if get_id in self.pending_get_results: self.pending_get_results[get_id].put(result) def _http_event_callback(self): try: binary_data = request.data data = msgpack.unpackb(binary_data, raw=False) callback_id = data.get('callbackId') event_data = data.get('eventData') handler = self.event_handlers.get(callback_id) if handler: self._ensure_runtime_resources() self._callback_executor.submit(handler, event_data) return '', 200 except Exception as e: print(f'Event callback error: {e}') return '', 500