Source code for lightningchart.instance

from __future__ import annotations
import os
import queue
import sys
import threading
import time
import uuid
import json
import msgpack
import requests
import socket
import webbrowser
import pkgutil
from http.server import HTTPServer, BaseHTTPRequestHandler
from IPython import get_ipython
from IPython.display import IFrame, display
from flask import Flask, request, render_template, send_from_directory, Response, jsonify
from flask_socketio import SocketIO, join_room

from lightningchart.utils.utils import NumpyEncoder, msgpack_default

LOCALHOST = 'localhost'
host_name = '127.0.0.1'
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
    base_dir = os.path.join(sys._MEIPASS)


[docs] def get_free_port(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((LOCALHOST, 0)) port = sock.getsockname()[1] sock.close() return port
[docs] def display_html(html_content, notebook=False, width: int | str = '100%', height: int | str = 600): html_bytes = html_content.encode('utf-8') class Server(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html_bytes) def log_message(self, format, *args): pass server_address = (LOCALHOST, 0) server = HTTPServer(server_address, Server) server_thread = threading.Thread(target=server.handle_request) server_thread.daemon = False server_thread.start() if notebook: return display( IFrame( src=f'http://{LOCALHOST}:{server.server_port}', width=width, height=height, ) ) else: webbrowser.open(f'http://{LOCALHOST}:{server.server_port}') server_thread.join()
[docs] def js_functions(): base_dir = '.' if hasattr(sys, '_MEIPASS'): base_dir = os.path.join(sys._MEIPASS) js_code = pkgutil.get_data(__name__, os.path.join(base_dir, 'static/lcpy.js')).decode() return js_code
[docs] def create_html(items): serialized_items = [] for i in items: serialized_items.append(json.dumps(i, cls=NumpyEncoder)) html = f"""<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="shortcut icon" href="#"> <title>LightningChart Python</title> <script src="https://cdn.jsdelivr.net/npm/@lightningchart/[email protected]/dist/lcjs.iife.js"></script> <script src="https://cdn.jsdelivr.net/npm/@lightningchart/[email protected]/dist/iife/lcjs-themes.iife.js"></script> <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/msgpack.min.js"></script> <script src="https://cdn.jsdelivr.net/npm/[email protected]/client-dist/socket.io.min.js"></script> <style> body {{ height: 100%; margin: 0; }} </style> </head> <body> <script> {js_functions()} </script> <script> lcpy.initStatic({serialized_items}); </script> </body> </html> """ return html
[docs] class Instance: def __init__(self): self.id = str(uuid.uuid4()).split('-')[0] self.session = requests.Session() retry_adapter = requests.adapters.HTTPAdapter(max_retries=5) self.session.mount('http://', retry_adapter) self.items = list() self.pending_get_results = dict() self.connected_clients = dict() self.preserve_data = True self.server_is_open = False self.server_port = None self.seq_num = 0 self.event_handlers = {} self.send_method = 'http' self._cb_ctx = threading.local() self._cb_ctx.in_event = False # Initialize Flask and SocketIO self.app = Flask( __name__, static_folder=os.path.join(base_dir, 'static'), template_folder=os.path.join(base_dir, 'static'), ) self.app.config['SECRET_KEY'] = os.urandom(24).hex() self.socketio = SocketIO(self.app, async_mode='gevent', ping_timeout=60) # HTTP routes self.app.route('/', methods=['GET'])(self._http_index) self.app.route('/send', methods=['POST'])(self._http_send) self.app.route('/get', methods=['POST'])(self._http_get) self.app.route('/storage', methods=['GET'])(self._http_storage) self.app.route('/resources/<path:path>', methods=['GET'])(self._http_resources) self.app.route('/static/<path:path>', methods=['GET'])(self._http_static) # SocketIO events self.socketio.on_event('connect', self._sio_connect) self.socketio.on_event('disconnect', self._sio_disconnect) self.socketio.on_event('join', self._sio_join) self.socketio.on_event('get_result', self._sio_get_result) self.app.route('/event_callback', methods=['POST'])(self._http_event_callback) # ----- Public methods -----
[docs] def send(self, id: str, command: str, arguments: dict = None): data = { 'seq': self.seq_num, 'id': id, 'command': command, 'args': arguments, } self.seq_num += 1 if not self.server_is_open: self.items.append(data) return if getattr(self._cb_ctx, 'in_event', False): return self._send_direct(data) if self.send_method == 'http': return self._send_http(data) else: return self._send_direct(data)
[docs] def get(self, id: str, command: str = None, arguments: dict = None): get_id = str(uuid.uuid4()).split('-')[0] data = { 'get_id': get_id, 'id': str(id), 'command': command, 'args': arguments or {}, } if getattr(self._cb_ctx, 'in_event', False): return self._get_direct(data) if not self.server_is_open: self._start_server() try: ipy = get_ipython() if ipy is not None and ipy.__class__.__name__ == 'ZMQInteractiveShell': self._open_in_notebook() for _ in range(20): if self.id in self.connected_clients.values(): break time.sleep(0.5) else: self._open_in_browser() except Exception as e: raise Exception(f'Chart was not opened, and it failed to open automatically. Please open it manually.Error: {e}') binary_data = msgpack.packb(data, default=msgpack_default) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/get?room={self.id}&get_id={get_id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, timeout=30, ) if response.ok: data = msgpack.unpackb(response.content, raw=False) return data elif response.status_code == 400: raise Exception('Chart is not open, cannot execute command. Call open() method first.') elif response.status_code == 500: raise Exception('Unexpected error occurred, cannot execute command.') except requests.RequestException as e: print(e) return None
[docs] def open( self, method: str = None, live: bool = False, width: int | str = '100%', height: int | str = 600, ): if method not in ('browser', 'notebook', 'link'): ipy = get_ipython() method = 'notebook' if ipy is not None and ipy.__class__.__name__ == 'ZMQInteractiveShell' else 'browser' if (live or method == 'link') and not self.server_is_open: self._start_server() if self.id in self.connected_clients.values(): if method == 'link': return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}' return None if method == 'notebook': return self._open_in_notebook(width=width, height=height) elif method == 'link': return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}' else: self._open_in_browser() return None
[docs] def close(self): if self.server_is_open: for client in list(self.connected_clients.keys()): self.socketio.emit('shutdown', to=client) self.socketio.stop() self.server_is_open = False self.session.close()
[docs] def set_data_preservation(self, enabled: bool): self.preserve_data = enabled return self
# ----- Private methods ----- def _send_http(self, data: dict): binary_data = msgpack.packb(data, default=msgpack_default) try: response = self.session.post( f'http://{LOCALHOST}:{self.server_port}/send?id={self.id}', data=binary_data, headers={'Content-Type': 'application/msgpack'}, timeout=30, ) if response.ok: return True except requests.RequestException as e: raise Exception(f'Error sending data: {e}') def _send_direct(self, data: dict): binary_data = msgpack.packb(data, default=msgpack_default) try: save = False if self.id in self.connected_clients.values(): self.socketio.emit('item', binary_data, to=self.id) else: save = True if self.preserve_data or save: self.items.append(data) return True except Exception as e: raise Exception(f'Error sending data: {e}') def _get_direct(self, data: dict, timeout: float = 5.0): if self.id not in self.connected_clients.values(): raise Exception('Chart is not connected; cannot perform get.') get_id = data['get_id'] q = queue.Queue() self.pending_get_results[get_id] = q try: binary_data = msgpack.packb(data, default=msgpack_default) self.socketio.emit('get_request', binary_data, to=self.id) deadline = time.monotonic() + timeout while time.monotonic() < deadline: try: return q.get_nowait() except queue.Empty: pass self.socketio.sleep(0.05) return None finally: self.pending_get_results.pop(get_id, None) def _start_server(self): try: self.server_port = get_free_port() server_thread = threading.Thread( target=lambda: self.socketio.run( self.app, host=host_name, port=self.server_port, debug=False, log_output=False, use_reloader=False, ) ) server_thread.start() self.server_is_open = True except Exception as e: raise Exception(f'The server could not be started: {e}') def _wait_for_get_result(self, get_id, q=None, timeout=5, poll_interval=0.5, max_polls=10): if q is None: q = queue.Queue() self.pending_get_results[get_id] = q for _ in range(max_polls): if not q.empty(): break self.socketio.sleep(poll_interval) try: result = q.get(timeout=timeout) except queue.Empty: result = None finally: self.pending_get_results.pop(get_id, None) return result def _open_static(self): html = create_html(self.items) display_html(html) def _open_in_browser(self): if self.server_is_open: webbrowser.open(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}') try: timeout = 10 interval = 0.1 waited = 0 while waited < timeout: if self.id in self.connected_clients.values(): break time.sleep(interval) waited += interval except Exception as e: print(e) else: self._open_static() def _open_in_notebook(self, width: int | str = '100%', height: int | str = 600): if self.server_is_open: return display( IFrame( src=f'http://{LOCALHOST}:{self.server_port}/?id={self.id}', width=width, height=height, ) ) else: html = create_html(self.items) return display_html(html, notebook=True, width=width, height=height) # ----- HTTP Routes ----- def _http_send(self): room = request.args.get('id') binary_data = request.data save = False if room in self.connected_clients.values(): self.socketio.emit('item', binary_data, to=room) else: save = True if self.preserve_data or save: data = msgpack.unpackb(binary_data) self.items.append(data) return '', 200 def _http_get(self): # print('inside _http_get') room = request.args.get('room') get_id = request.args.get('get_id') binary_data = request.data if room not in self.connected_clients.values(): return Response('', status=400) q = queue.Queue() self.pending_get_results[get_id] = q self.socketio.emit('get_request', binary_data, to=room) result = self._wait_for_get_result(get_id, q) if result is None: return Response('', status=500) return Response(msgpack.packb(result), mimetype='application/msgpack') def _http_storage(self): room = request.args.get('id') if room not in self.connected_clients.values(): return Response('Room not found', status=404) data = msgpack.packb(self.items) if not self.preserve_data: del self.items[:] return Response(data, mimetype='application/msgpack') def _http_resources(self, path): static_dir = os.path.join(os.path.dirname(__file__), 'static') return send_from_directory(os.path.join(static_dir, 'resources'), path) def _http_static(self, path): static_dir = os.path.join(os.path.dirname(__file__), 'static') return send_from_directory(static_dir, path) def _http_index(self): room = request.args.get('id') return render_template('index.html', room=room) # ----- SocketIO Events ----- def _sio_connect(self): self.connected_clients[request.sid] = 'default' def _sio_disconnect(self): self.connected_clients.pop(request.sid, None) def _sio_join(self, room): join_room(room) self.connected_clients[request.sid] = room self.socketio.emit('storage', to=room) def _sio_get_result(self, binary_data): data = msgpack.unpackb(binary_data, raw=False) get_id = data['get_id'] result = data['result'] if get_id in self.pending_get_results: self.pending_get_results[get_id].put(result) def _http_event_callback(self): try: ctype = (request.content_type or '').lower() if request.is_json or 'application/json' in ctype: data = request.get_json(force=True, silent=True) or {} else: binary_data = request.data data = msgpack.unpackb(binary_data, raw=False) callback_id = data.get('callbackId') event_data = data.get('eventData') handler = self.event_handlers.get(callback_id) if handler: prev = getattr(self._cb_ctx, 'in_event', False) self._cb_ctx.in_event = True try: result = handler(event_data) if result is not None: return jsonify(result), 200 finally: self._cb_ctx.in_event = prev else: print(f'[event_callback] Unknown callbackId: {callback_id}') return '', 200 except Exception as e: print(f'[event_callback] Error: {e}') return '', 500