from __future__ import annotations
import os
import secrets
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import socket
import queue
import uuid
import msgpack
import requests
import webbrowser
from IPython import get_ipython
from IPython.display import IFrame, display
from flask import Flask, request, render_template, send_from_directory, Response
from flask_socketio import SocketIO, join_room
LOCALHOST = 'localhost'
base_dir = '.'
if hasattr(sys, '_MEIPASS'):
base_dir = os.path.join(sys._MEIPASS)
[docs]
def get_free_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind((LOCALHOST, 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = sock.getsockname()[1]
finally:
sock.close()
return port
[docs]
class Instance:
def __init__(self, license_key: str):
self.id = str(uuid.uuid4()).split('-')[0]
self.license_key = license_key
self.items = list()
self.storage = dict()
self.pending_get_results = dict()
self.connected_clients = dict()
self.preserve_data = True
self.server_is_open = False
self.server_port = None
self.event_handlers = {}
self.seq_num = 0
self._server_thread = None
self._callback_executor = None
# Initialize Flask and SocketIO
self.session = None
self._ensure_runtime_resources()
self.app = Flask(
__name__,
static_folder=os.path.join(base_dir, 'static'),
template_folder=os.path.join(base_dir, 'static'),
)
self.app.config['SECRET_KEY'] = secrets.token_hex(32)
self.socketio = SocketIO(self.app, async_mode='gevent', ping_timeout=60)
# HTTP routes
self.app.route('/', methods=['GET'])(self._http_index)
self.app.route('/send', methods=['POST'])(self._http_send)
self.app.route('/get', methods=['POST'])(self._http_get)
self.app.route('/storage', methods=['GET'])(self._http_storage)
self.app.route('/static/<path:path>', methods=['GET'])(self._http_static)
self.app.route('/event_callback', methods=['POST'])(self._http_event_callback)
# SocketIO events
self.socketio.on_event('connect', self._sio_connect)
self.socketio.on_event('disconnect', self._sio_disconnect)
self.socketio.on_event('join', self._sio_join)
self.socketio.on_event('get_result', self._sio_get_result)
# ----- Public methods -----
[docs]
def send(self, id: str, command: str, arguments: dict = None):
data = {'seq': self.seq_num, 'id': str(id), 'command': command, 'args': arguments or {}}
self.seq_num += 1
if not self.server_is_open:
self.items.append(data)
return False
if self._post_command(data):
return True
self.items.append(data)
return False
def _post_command(self, data):
self._ensure_runtime_resources()
binary_data = msgpack.packb(data, use_bin_type=True)
try:
response = self.session.post(
f'http://{LOCALHOST}:{self.server_port}/send?room={self.id}',
data=binary_data,
headers={'Content-Type': 'application/msgpack'},
timeout=10,
)
return response.ok
except requests.RequestException as e:
print(e)
return False
def _ensure_runtime_resources(self):
if self.session is None:
self.session = requests.Session()
retry_adapter = requests.adapters.HTTPAdapter(max_retries=5)
self.session.mount('http://', retry_adapter)
if self._callback_executor is None:
self._callback_executor = ThreadPoolExecutor(max_workers=4)
def _queue_initial_items(self):
if not self.items:
return
if self.id not in self.storage:
self.storage[self.id] = []
self.storage[self.id].extend(self.items)
self.items.clear()
def _wait_until_server_ready(self, attempts=20, delay=0.1):
self._ensure_runtime_resources()
for _ in range(attempts):
try:
response = self.session.get(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}', timeout=2)
if response.ok:
return True
except requests.RequestException:
pass
time.sleep(delay)
return False
def _wait_until_room_joined(self, attempts=20, delay=0.5):
for _ in range(attempts):
if self._room_response():
return True
time.sleep(delay)
return False
def _request_get_result(self, data, get_id):
self._ensure_runtime_resources()
binary_data = msgpack.packb(data, use_bin_type=True)
try:
response = self.session.post(
f'http://{LOCALHOST}:{self.server_port}/get?room={self.id}&get_id={get_id}',
data=binary_data,
headers={'Content-Type': 'application/msgpack'},
timeout=70,
)
except requests.RequestException as e:
raise RuntimeError(f'Failed to transfer get request to chart server: {e}') from e
if response.ok:
result = msgpack.unpackb(response.content, raw=False)
if isinstance(result, dict) and '__chart_get_error__' in result:
raise RuntimeError(f'Chart command failed: {result["__chart_get_error__"]}')
return result
if response.status_code == 400:
raise RuntimeError('Chart is not open, cannot execute command. Call open() method first.')
if response.status_code == 500:
raise RuntimeError('Timed out while waiting for chart response.')
raise RuntimeError(f'Unexpected chart server response: HTTP {response.status_code}')
[docs]
def get(self, id: str, command: str = None, arguments: dict = None):
get_id = str(uuid.uuid4()).split('-')[0]
data = {
'get_id': get_id,
'seq': self.seq_num,
'id': str(id),
'command': command,
'args': arguments or {},
}
self.seq_num += 1
if not self.server_is_open:
try:
if get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
self.open_in_notebook()
else:
self.open_in_browser()
except Exception as e:
raise RuntimeError(f'Chart was not opened, and it failed to open automatically. Please open it manually. Error: {e}') from e
if not self._wait_until_room_joined():
raise RuntimeError('Chart is not open, cannot execute command. Call open() method first.')
return self._request_get_result(data, get_id)
[docs]
def open(
self,
method: str = None,
live: bool = False,
width: int | str = '100%',
height: int | str = 600,
):
if method not in ('browser', 'notebook', 'link'):
try:
method = 'notebook' if get_ipython().__class__.__name__ == 'ZMQInteractiveShell' else 'browser'
except NameError:
method = 'browser'
if (live or method == 'link') and not self.server_is_open:
self._start_server()
self._queue_initial_items()
if method == 'notebook':
return self.open_in_notebook(width=width, height=height)
elif method == 'link':
return f'http://{LOCALHOST}:{self.server_port}/?id={self.id}'
else:
return self.open_in_browser()
[docs]
def open_in_browser(self):
if not self.server_is_open:
self._start_server()
self._queue_initial_items()
try:
webbrowser.open(f'http://{LOCALHOST}:{self.server_port}/?id={self.id}')
return self._wait_until_room_joined()
except requests.exceptions.ConnectionError as e:
print(e)
return False
[docs]
def open_in_notebook(self, width: int | str = '100%', height: int | str = 600):
if not self.server_is_open:
self._start_server()
self._queue_initial_items()
try:
return display(
IFrame(
src=f'http://{LOCALHOST}:{self.server_port}/?id={self.id}',
width=width,
height=height,
)
)
except requests.exceptions.ConnectionError as e:
print(e)
[docs]
def close(self):
if self.server_is_open:
for client in list(self.connected_clients.keys()):
self.socketio.emit('shutdown', to=client)
try:
self.socketio.stop()
except (AttributeError, RuntimeError) as e:
print(f'Chart server shutdown warning: {e}')
self.server_is_open = False
self.connected_clients.clear()
self.pending_get_results.clear()
if self._callback_executor is not None:
self._callback_executor.shutdown(wait=False)
self._callback_executor = None
if self.session is not None:
self.session.close()
self.session = None
def _start_server(self):
try:
self._ensure_runtime_resources()
self.server_port = get_free_port()
self._server_thread = threading.Thread(
target=lambda: self.socketio.run(
self.app,
host=LOCALHOST,
port=self.server_port,
debug=False,
log_output=False,
use_reloader=False,
),
daemon=True,
)
self._server_thread.start()
if not self._wait_until_server_ready():
raise RuntimeError('Server did not respond before startup timeout.')
self.server_is_open = True
except Exception as e:
self.server_is_open = False
raise RuntimeError(f'Failed to start server on port {self.server_port}. {e}') from e
def _room_response(self):
return self.id in self.connected_clients.values()
def _wait_for_get_result(self, get_id, timeout=5):
q = queue.Queue()
self.pending_get_results[get_id] = q
try:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
return q.get_nowait()
except queue.Empty:
self.socketio.sleep(0.05)
return None
finally:
self.pending_get_results.pop(get_id, None)
# ----- HTTP Routes -----
def _http_index(self):
room = request.args.get('id')
index_js_path = os.path.join(base_dir, 'static', 'index.js')
static_version = int(os.path.getmtime(index_js_path)) if os.path.exists(index_js_path) else int(time.time())
return render_template('index.html', room=room, license_key=self.license_key, static_version=static_version)
def _http_static(self, path):
return send_from_directory('./static', path)
def _http_send(self):
room = request.args.get('room')
binary_data = request.data
if not room:
return Response('Missing room', status=400)
try:
data = msgpack.unpackb(binary_data, raw=False)
except (msgpack.ExtraData, msgpack.FormatError, msgpack.StackError, ValueError) as e:
return Response(f'Invalid msgpack payload: {e}', status=400)
if room in self.connected_clients.values():
self.socketio.emit('item', binary_data, to=room)
else:
self.storage.setdefault(room, [])
if self.preserve_data or room not in self.connected_clients.values():
self.storage.setdefault(room, []).append(data)
return '', 200
def _http_get(self):
room = request.args.get('room')
get_id = request.args.get('get_id')
binary_data = request.data
if room not in self.connected_clients.values():
return Response('', status=400)
self.socketio.emit('get_request', binary_data, to=room)
result = self._wait_for_get_result(get_id, timeout=60)
if result is None:
return Response('', status=500)
return Response(msgpack.packb(result, use_bin_type=True), mimetype='application/msgpack')
def _http_storage(self):
room = request.args.get('room')
try:
data = msgpack.packb(self.storage[room], use_bin_type=True)
if not self.preserve_data:
del self.storage[room]
return Response(data, mimetype='application/msgpack')
except KeyError:
return Response('Room not found', status=404)
# ----- SocketIO Events -----
def _sio_connect(self):
self.connected_clients[request.sid] = 'default'
def _sio_disconnect(self):
self.connected_clients.pop(request.sid, None)
def _sio_join(self, room):
join_room(room)
self.connected_clients[request.sid] = room
if room in self.storage:
self.socketio.emit('exec', to=room)
def _sio_get_result(self, binary_data):
data = msgpack.unpackb(binary_data, raw=False)
get_id = data['get_id']
if 'error' in data:
result = {'__chart_get_error__': data['error']}
else:
result = data.get('result')
if get_id in self.pending_get_results:
self.pending_get_results[get_id].put(result)
def _http_event_callback(self):
try:
binary_data = request.data
data = msgpack.unpackb(binary_data, raw=False)
callback_id = data.get('callbackId')
event_data = data.get('eventData')
handler = self.event_handlers.get(callback_id)
if handler:
self._ensure_runtime_resources()
self._callback_executor.submit(handler, event_data)
return '', 200
except Exception as e:
print(f'Event callback error: {e}')
return '', 500