Source code for lightningchart_trader.instance

from __future__ import annotations
import os
import sys
import time
import threading
import socket
import queue
import uuid
import msgpack
import requests
import webbrowser
from IPython import get_ipython
from IPython.display import IFrame, display
from flask import Flask, request, render_template, send_from_directory, Response
from flask_socketio import SocketIO, join_room

LOCALHOST = 'localhost'
host_name = '0.0.0.0'
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
    base_dir = os.path.join(sys._MEIPASS)


[docs] def get_free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((LOCALHOST, 0)) port = sock.getsockname()[1] sock.close() return port
[docs] class Instance: def __init__(self, license_key: str): self.id = str(uuid.uuid4()).split('-')[0] self.license_key = license_key self.items = list() self.storage = dict() self.pending_get_results = dict() self.connected_clients = dict() self.preserve_data = True self.server_is_open = False self.server_port = None self.event_handlers = {} self.seq_num = 0 # Initialize Flask and SocketIO self.session = requests.Session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) self.session.mount('http://', retry_adapter) self.app = Flask( __name__, static_folder=os.path.join(base_dir, 'static'), template_folder=os.path.join(base_dir, 'static'), ) self.app.config['SECRET_KEY'] = 'secret!' self.socketio = SocketIO(self.app, async_mode='gevent', ping_timeout=60) # HTTP routes self.app.route('/', methods=['GET'])(self._http_index) self.app.route('/send', methods=['POST'])(self._http_send) self.app.route('/get', methods=['POST'])(self._http_get) self.app.route('/storage', methods=['GET'])(self._http_storage) self.app.route('/static/<path:path>', methods=['GET'])(self._http_static) self.app.route('/event_callback', methods=['POST'])(self._http_event_callback) # SocketIO events self.socketio.on_event('connect', self._sio_connect) self.socketio.on_event('disconnect', self._sio_disconnect) self.socketio.on_event('join', self._sio_join) self.socketio.on_event('get_result', self._sio_get_result) # ----- Public methods -----
[docs] def send(self, id: str, command: str, arguments: dict = None): data = {'seq': self.seq_num,'id': str(id), 'command': command, 'args': arguments or {}} self.seq_num += 1 if not self.server_is_open: self.items.append(data) else: binary_data = msgpack.packb(data) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/send?room={self.id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, ) if response.ok: return True except requests.RequestException as e: print(e)
[docs] def get(self, id: str, command: str = None, arguments: dict = None): get_id = str(uuid.uuid4()).split('-')[0] data = { 'get_id': get_id, 'id': str(id), 'command': command, 'args': arguments or {}, } if not self.server_is_open: try: if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': self.open_in_notebook() for _ in range(20): if self._room_response(): break time.sleep(0.5) else: self.open_in_browser() except Exception as e: raise Exception(f'Chart was not opened, and it failed to open automatically. Please open it manually.Error: {e}') elif not self._room_response(): for _ in range(20): if self._room_response(): break time.sleep(0.5) binary_data = msgpack.packb(data) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/get?room={self.id}&get_id={get_id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, ) if response.ok: data = msgpack.unpackb(response.content, raw=False) return data elif response.status_code == 400: raise Exception('Chart is not open, cannot execute command. Call open() method first.') elif response.status_code == 500: raise Exception('Unexpected error occurred, cannot execute command.') except requests.RequestException as e: print(e)
[docs] def open( self, method: str = None, live: bool = False, width: int | str = '100%', height: int | str = 600, ): if method not in ('browser', 'notebook', 'link'): try: method = 'notebook' if get_ipython().__class__.__name__ == 'ZMQInteractiveShell' else 'browser' except NameError: method = 'browser' if (live or method == 'link') and not self.server_is_open: self._start_server() if self.id not in self.storage: self.storage[self.id] = [] self.storage[self.id].extend(self.items) if method == 'notebook': return self.open_in_notebook(width=width, height=height) elif method == 'link': return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}' else: return self.open_in_browser()
[docs] def open_in_browser(self): if not self.server_is_open: self._start_server() if self.id not in self.storage: self.storage[self.id] = [] self.storage[self.id].extend(self.items) try: webbrowser.open(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}') for _ in range(20): if self._room_response(): return True time.sleep(0.5) except requests.exceptions.ConnectionError as e: print(e) return False
[docs] def open_in_notebook(self, width: int | str = '100%', height: int | str = 600): if not self.server_is_open: self._start_server() if self.id not in self.storage: self.storage[self.id] = [] self.storage[self.id].extend(self.items) try: return display( IFrame( src=f'http://{LOCALHOST}:{self.server_port}/?id={self.id}', width=width, height=height, ) ) except requests.exceptions.ConnectionError as e: print(e)
[docs] def close(self): if self.server_is_open: for client in self.connected_clients.keys(): self.socketio.emit('shutdown', to=client) self.socketio.stop() self.server_is_open = False
def _start_server(self): try: self.server_port = get_free_port() server_thread = threading.Thread( target=lambda: self.socketio.run( self.app, host=host_name, port=self.server_port, debug=True, log_output=False, use_reloader=False, ) ) server_thread.start() self.server_is_open = True for _ in range(20): try: response = self.session.get(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}') if response.ok: break except requests.RequestException: pass time.sleep(0.1) except Exception as e: raise Exception(f'Failed to start server on port {self.server_port}. {e}') def _room_response(self): return self.id in self.connected_clients.values() def _wait_for_get_result(self, get_id, timeout=5, poll_interval=0.5, max_polls=10): q = queue.Queue() self.pending_get_results[get_id] = q for _ in range(max_polls): if not q.empty(): break self.socketio.sleep(poll_interval) try: result = q.get(timeout=timeout) except queue.Empty: result = None finally: del self.pending_get_results[get_id] return result # ----- HTTP Routes ----- def _http_index(self): room = request.args.get('id') return render_template('index.html', room=room, license_key=self.license_key) def _http_static(self, path): return send_from_directory('./static', path) def _http_send(self): room = request.args.get('room') binary_data = request.data if room not in self.storage: self.storage[room] = [] save = False if room in self.connected_clients.values(): self.socketio.emit('item', binary_data, to=room) else: save = True if self.preserve_data or save: data = msgpack.unpackb(binary_data) self.storage[room].append(data) return '', 200 def _http_get(self): room = request.args.get('room') get_id = request.args.get('get_id') binary_data = request.data if room not in self.connected_clients.values(): return Response('', status=400) self.socketio.emit('get_request', binary_data, to=room) result = self._wait_for_get_result(get_id) if result is None: return Response('', status=500) return Response(msgpack.packb(result), mimetype='application/msgpack') def _http_storage(self): room = request.args.get('room') try: data = msgpack.packb(self.storage[room]) if not self.preserve_data: del self.storage[room] return Response(data, mimetype='application/msgpack') except KeyError: return Response('Room not found', status=404) # ----- SocketIO Events ----- def _sio_connect(self): self.connected_clients[request.sid] = 'default' def _sio_disconnect(self): del self.connected_clients[request.sid] def _sio_join(self, room): join_room(room) self.connected_clients[request.sid] = room if room in self.storage: self.socketio.emit('exec', to=room) def _sio_get_result(self, binary_data): data = msgpack.unpackb(binary_data) get_id = data['get_id'] result = data['result'] if get_id in self.pending_get_results: self.pending_get_results[get_id].put(result) def _http_event_callback(self): try: binary_data = request.data data = msgpack.unpackb(binary_data, raw=False) callback_id = data.get('callbackId') event_data = data.get('eventData') handler = self.event_handlers.get(callback_id) if handler: threading.Thread( target=handler, args=(event_data,), daemon=True, ).start() return '', 200 except Exception as e: print(f"Event callback error: {e}") return '', 500