from __future__ import annotations
import os
import queue
import sys
import threading
import time
import uuid
import json
import msgpack
import requests
import socket
import webbrowser
import pkgutil
from http.server import HTTPServer, BaseHTTPRequestHandler
from IPython import get_ipython
from IPython.display import IFrame, display
from flask import Flask, request, render_template, send_from_directory, Response, jsonify
from flask_socketio import SocketIO, join_room
from lightningchart.utils.utils import NumpyEncoder, msgpack_default
LOCALHOST = 'localhost'
host_name = '127.0.0.1'
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
base_dir = os.path.join(sys._MEIPASS)
[docs]
def get_free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((LOCALHOST, 0))
port = sock.getsockname()[1]
sock.close()
return port
[docs]
def display_html(html_content, notebook=False, width: int | str = '100%', height: int | str = 600):
html_bytes = html_content.encode('utf-8')
class Server(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_bytes)
def log_message(self, format, *args):
pass
server_address = (LOCALHOST, 0)
server = HTTPServer(server_address, Server)
server_thread = threading.Thread(target=server.handle_request)
server_thread.daemon = False
server_thread.start()
if notebook:
return display(
IFrame(
src=f'http://{LOCALHOST}:{server.server_port}',
width=width,
height=height,
)
)
else:
webbrowser.open(f'http://{LOCALHOST}:{server.server_port}')
server_thread.join()
[docs]
def js_functions():
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
base_dir = os.path.join(sys._MEIPASS)
js_code = pkgutil.get_data(__name__, os.path.join(base_dir, 'static/lcpy.js')).decode()
return js_code
[docs]
def create_html(items):
serialized_items = []
for i in items:
serialized_items.append(json.dumps(i, cls=NumpyEncoder))
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="shortcut icon" href="#">
<title>LightningChart Python</title>
<script src="https://cdn.jsdelivr.net/npm/@lightningchart/[email protected]/dist/lcjs.iife.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@lightningchart/[email protected]/dist/iife/lcjs-themes.iife.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/msgpack.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/client-dist/socket.io.min.js"></script>
<style>
body {{
height: 100%;
margin: 0;
}}
</style>
</head>
<body>
<script>
{js_functions()}
</script>
<script>
lcpy.initStatic({serialized_items});
</script>
</body>
</html>
"""
return html
[docs]
class Instance:
def __init__(self):
self.id = str(uuid.uuid4()).split('-')[0]
self.session = requests.Session()
retry_adapter = requests.adapters.HTTPAdapter(max_retries=5)
self.session.mount('http://', retry_adapter)
self.items = list()
self.pending_get_results = dict()
self.connected_clients = dict()
self.preserve_data = True
self.server_is_open = False
self.server_port = None
self.seq_num = 0
self.event_handlers = {}
self.send_method = 'http'
self._cb_ctx = threading.local()
self._cb_ctx.in_event = False
# Initialize Flask and SocketIO
self.app = Flask(
__name__,
static_folder=os.path.join(base_dir, 'static'),
template_folder=os.path.join(base_dir, 'static'),
)
self.app.config['SECRET_KEY'] = os.urandom(24).hex()
self.socketio = SocketIO(self.app, async_mode='gevent', ping_timeout=60)
# HTTP routes
self.app.route('/', methods=['GET'])(self._http_index)
self.app.route('/send', methods=['POST'])(self._http_send)
self.app.route('/get', methods=['POST'])(self._http_get)
self.app.route('/storage', methods=['GET'])(self._http_storage)
self.app.route('/resources/<path:path>', methods=['GET'])(self._http_resources)
self.app.route('/static/<path:path>', methods=['GET'])(self._http_static)
# SocketIO events
self.socketio.on_event('connect', self._sio_connect)
self.socketio.on_event('disconnect', self._sio_disconnect)
self.socketio.on_event('join', self._sio_join)
self.socketio.on_event('get_result', self._sio_get_result)
self.app.route('/event_callback', methods=['POST'])(self._http_event_callback)
# ----- Public methods -----
[docs]
def send(self, id: str, command: str, arguments: dict = None):
data = {
'seq': self.seq_num,
'id': id,
'command': command,
'args': arguments,
}
self.seq_num += 1
if not self.server_is_open:
self.items.append(data)
return
if getattr(self._cb_ctx, 'in_event', False):
return self._send_direct(data)
if self.send_method == 'http':
return self._send_http(data)
else:
return self._send_direct(data)
[docs]
def get(self, id: str, command: str = None, arguments: dict = None):
get_id = str(uuid.uuid4()).split('-')[0]
data = {
'get_id': get_id,
'id': str(id),
'command': command,
'args': arguments or {},
}
if getattr(self._cb_ctx, 'in_event', False):
return self._get_direct(data)
if not self.server_is_open:
self._start_server()
try:
ipy = get_ipython()
if ipy is not None and ipy.__class__.__name__ == 'ZMQInteractiveShell':
self._open_in_notebook()
for _ in range(20):
if self.id in self.connected_clients.values():
break
time.sleep(0.5)
else:
self._open_in_browser()
except Exception as e:
raise Exception(f'Chart was not opened, and it failed to open automatically. Please open it manually.Error: {e}')
binary_data = msgpack.packb(data, default=msgpack_default)
try:
response = self.session.post(
f'http://{LOCALHOST}:{self.server_port}/get?room={self.id}&get_id={get_id}',
data=binary_data,
headers={'Content-Type': 'application/msgpack'},
timeout=30,
)
if response.ok:
data = msgpack.unpackb(response.content, raw=False)
return data
elif response.status_code == 400:
raise Exception('Chart is not open, cannot execute command. Call open() method first.')
elif response.status_code == 500:
raise Exception('Unexpected error occurred, cannot execute command.')
except requests.RequestException as e:
print(e)
return None
[docs]
def open(
self,
method: str = None,
live: bool = False,
width: int | str = '100%',
height: int | str = 600,
):
if method not in ('browser', 'notebook', 'link'):
ipy = get_ipython()
method = 'notebook' if ipy is not None and ipy.__class__.__name__ == 'ZMQInteractiveShell' else 'browser'
if (live or method == 'link') and not self.server_is_open:
self._start_server()
if self.id in self.connected_clients.values():
if method == 'link':
return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}'
return None
if method == 'notebook':
return self._open_in_notebook(width=width, height=height)
elif method == 'link':
return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}'
else:
self._open_in_browser()
return None
[docs]
def close(self):
if self.server_is_open:
for client in list(self.connected_clients.keys()):
self.socketio.emit('shutdown', to=client)
self.socketio.stop()
self.server_is_open = False
self.session.close()
[docs]
def set_data_preservation(self, enabled: bool):
self.preserve_data = enabled
return self
# ----- Private methods -----
def _send_http(self, data: dict):
binary_data = msgpack.packb(data, default=msgpack_default)
try:
response = self.session.post(
f'http://{LOCALHOST}:{self.server_port}/send?id={self.id}',
data=binary_data,
headers={'Content-Type': 'application/msgpack'},
timeout=30,
)
if response.ok:
return True
except requests.RequestException as e:
raise Exception(f'Error sending data: {e}')
def _send_direct(self, data: dict):
binary_data = msgpack.packb(data, default=msgpack_default)
try:
save = False
if self.id in self.connected_clients.values():
self.socketio.emit('item', binary_data, to=self.id)
else:
save = True
if self.preserve_data or save:
self.items.append(data)
return True
except Exception as e:
raise Exception(f'Error sending data: {e}')
def _get_direct(self, data: dict, timeout: float = 5.0):
if self.id not in self.connected_clients.values():
raise Exception('Chart is not connected; cannot perform get.')
get_id = data['get_id']
q = queue.Queue()
self.pending_get_results[get_id] = q
try:
binary_data = msgpack.packb(data, default=msgpack_default)
self.socketio.emit('get_request', binary_data, to=self.id)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
return q.get_nowait()
except queue.Empty:
pass
self.socketio.sleep(0.05)
return None
finally:
self.pending_get_results.pop(get_id, None)
def _start_server(self):
try:
self.server_port = get_free_port()
server_thread = threading.Thread(
target=lambda: self.socketio.run(
self.app,
host=host_name,
port=self.server_port,
debug=False,
log_output=False,
use_reloader=False,
)
)
server_thread.start()
self.server_is_open = True
except Exception as e:
raise Exception(f'The server could not be started: {e}')
def _wait_for_get_result(self, get_id, q=None, timeout=5, poll_interval=0.5, max_polls=10):
if q is None:
q = queue.Queue()
self.pending_get_results[get_id] = q
for _ in range(max_polls):
if not q.empty():
break
self.socketio.sleep(poll_interval)
try:
result = q.get(timeout=timeout)
except queue.Empty:
result = None
finally:
self.pending_get_results.pop(get_id, None)
return result
def _open_static(self):
html = create_html(self.items)
display_html(html)
def _open_in_browser(self):
if self.server_is_open:
webbrowser.open(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}')
try:
timeout = 10
interval = 0.1
waited = 0
while waited < timeout:
if self.id in self.connected_clients.values():
break
time.sleep(interval)
waited += interval
except Exception as e:
print(e)
else:
self._open_static()
def _open_in_notebook(self, width: int | str = '100%', height: int | str = 600):
if self.server_is_open:
return display(
IFrame(
src=f'http://{LOCALHOST}:{self.server_port}/?id={self.id}',
width=width,
height=height,
)
)
else:
html = create_html(self.items)
return display_html(html, notebook=True, width=width, height=height)
# ----- HTTP Routes -----
def _http_send(self):
room = request.args.get('id')
binary_data = request.data
save = False
if room in self.connected_clients.values():
self.socketio.emit('item', binary_data, to=room)
else:
save = True
if self.preserve_data or save:
data = msgpack.unpackb(binary_data)
self.items.append(data)
return '', 200
def _http_get(self):
# print('inside _http_get')
room = request.args.get('room')
get_id = request.args.get('get_id')
binary_data = request.data
if room not in self.connected_clients.values():
return Response('', status=400)
q = queue.Queue()
self.pending_get_results[get_id] = q
self.socketio.emit('get_request', binary_data, to=room)
result = self._wait_for_get_result(get_id, q)
if result is None:
return Response('', status=500)
return Response(msgpack.packb(result), mimetype='application/msgpack')
def _http_storage(self):
room = request.args.get('id')
if room not in self.connected_clients.values():
return Response('Room not found', status=404)
data = msgpack.packb(self.items)
if not self.preserve_data:
del self.items[:]
return Response(data, mimetype='application/msgpack')
def _http_resources(self, path):
static_dir = os.path.join(os.path.dirname(__file__), 'static')
return send_from_directory(os.path.join(static_dir, 'resources'), path)
def _http_static(self, path):
static_dir = os.path.join(os.path.dirname(__file__), 'static')
return send_from_directory(static_dir, path)
def _http_index(self):
room = request.args.get('id')
return render_template('index.html', room=room)
# ----- SocketIO Events -----
def _sio_connect(self):
self.connected_clients[request.sid] = 'default'
def _sio_disconnect(self):
self.connected_clients.pop(request.sid, None)
def _sio_join(self, room):
join_room(room)
self.connected_clients[request.sid] = room
self.socketio.emit('storage', to=room)
def _sio_get_result(self, binary_data):
data = msgpack.unpackb(binary_data, raw=False)
get_id = data['get_id']
result = data['result']
if get_id in self.pending_get_results:
self.pending_get_results[get_id].put(result)
def _http_event_callback(self):
try:
ctype = (request.content_type or '').lower()
if request.is_json or 'application/json' in ctype:
data = request.get_json(force=True, silent=True) or {}
else:
binary_data = request.data
data = msgpack.unpackb(binary_data, raw=False)
callback_id = data.get('callbackId')
event_data = data.get('eventData')
handler = self.event_handlers.get(callback_id)
if handler:
prev = getattr(self._cb_ctx, 'in_event', False)
self._cb_ctx.in_event = True
try:
result = handler(event_data)
if result is not None:
return jsonify(result), 200
finally:
self._cb_ctx.in_event = prev
else:
print(f'[event_callback] Unknown callbackId: {callback_id}')
return '', 200
except Exception as e:
print(f'[event_callback] Error: {e}')
return '', 500