A Cryptocurrency and Stock Market Analysis with LightningChart Python
Tutorial
Assisted by AI
Learn how to conduct a cryptocurrency and stock market analysis using LightningChart Python data visualization library.
Introduction
This project presents a focused analysis comparing cryptocurrency and stock market performance using structured financial datasets. The primary objective is to visualize and interpret differences in price behavior, trading volume, and overall market composition through high-performance visualizations developed with LightningChart Python.
Data from both markets were standardized to ensure consistency, focusing on key indicators such as Open, Close, Volume, and Market Capitalization. Additional derived metrics, including hourly price changes, log-transformed volumes, and asset class summaries, were created to enhance the analytical depth. The project ultimately aims to reveal how cryptocurrencies, which trade continuously, differ from traditional stock markets that operate within fixed trading sessions.
Project Overview
Develop a focused portfolio of LightningChart Python visualizations to explore how key financial indicators such as price, volume, and volatility, differ between cryptocurrencies and stock market assets. The objective is to uncover behavioural contrasts, correlation structures, and temporal patterns that influence market dynamics and investor strategies.
Objectives
- Profile hourly price changes and volume fluctuations across crypto and stock assets.
- Compare log-scaled trading volumes using jittered strip and box plots for distribution insight.
- Analyze open-close price relationships through scatter plots and median references.
- Examine hourly volatility trends via spider charts representing normalized return variability.
- Summarize market structure using treemaps and stacked bars to show class proportions and top-performing assets.
- Investigate inter-variable relationships through side-by-side correlation heatmaps for crypto and stock markets.
Deliverables
- A comprehensive report with per-chart documentation (parameters, rationale, insights, and short analysis).
- Executable Jupyter Notebook cells for each chart with transparent preprocessing and axis/legend configuration.
- Final conclusions highlighting observed market behaviours and visualisation-driven insights.
Tools Used
Python 3.13.5, LightningChart Python, Jupyter Notebook, AI Assistance
About the Dataset
The files used werethe Cryptocurrency dataset and the Stock market dataset available on Kaggle.
LightningChart Python
LightningChart Python is a fast, interactive charting library optimized for large and heterogeneous datasets. In this project, it powers all visuals, polar rose, histograms, strip (jittered) plots, box plots, scatter, spider (radar), treemap, stacked bars, correlation heatmaps, and a multi-panel dashboard, delivering smooth zoom/pan interactions and presentation-ready styling.
Setting Up Python Environment
Before running the project, install Python and the other required libraries using:
%pip install numpy pandas lightningchart
Setting Up Your Development Environment:
- Set up a virtual environment:
- Use Visual Studio Code (VSCode) for a streamlined development experience.
Loading and Preprocessing Data
Fetch and preprocess the data using the following function:
# Import necessary libraries (load pandas library to preprocess dataset)
import pandas as pd
Visualizing Data with LightningChart Python
A polar/rose plot is a radial histogram that visualizes dispersion, tail weight, and skew as lobes, allowing rapid shape comparison across assets while neutralizing volume effects through normalization. The rose plots reveal that some crypto tickers exhibit fatter-tailed hourly return distributions than most stocks, indicating higher intra-hour risk.
# Chart 1A - Polar-style Rose per Asset (separate windows)
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import re
import numpy as np
import pandas as pd
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
try:
ccd_clean
except NameError:
ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try:
sd
except NameError:
sd = pd.read_csv("stocks.csv", encoding="latin1")
def returns_from_frame(df, asset_col=None, time_col=None, price_col=None, open_col=None, close_col=None):
d = df.copy()
def norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
colmap = {norm(c): c for c in d.columns}
cols_norm = set(colmap.keys())
def pick_exact(cands):
return next((colmap[c] for c in cands if c in cols_norm), None)
def pick_contains(subs):
for cn in cols_norm:
if any(sub in cn for sub in subs):
return colmap[cn]
return None
asset_col = asset_col or pick_exact(['symbol','ticker']) or pick_contains(['symbol','ticker','asset','name'])
time_col = time_col or pick_exact(['timestamp','datetime','date','time']) or pick_contains(['timestamp','datetime','date','time'])
price_col = price_col or pick_exact(['priceusd','price','close','adjclose']) or pick_contains(['priceusd','closeprice','lastprice','last','close','price'])
open_col = open_col or pick_exact(['open']) or pick_contains(['open'])
close_col = close_col or pick_exact(['close','adjclose']) or pick_contains(['close','closing'])
if asset_col is None or time_col is None:
raise ValueError(f"Missing key columns. Found asset={asset_col}, time={time_col}")
d[time_col] = pd.to_datetime(d[time_col], errors='coerce')
d = d.dropna(subset=[time_col]).sort_values([asset_col, time_col])
def to_num(x):
return (x.astype(str).str.replace(r'[^0-9.\-]', '', regex=True).replace({'': np.nan}).astype(float))
if price_col is not None:
d[price_col] = to_num(d[price_col])
d = d.dropna(subset=[price_col])
d['ret_1h'] = d.groupby(asset_col, group_keys=False)[price_col].apply(lambda s: s.pct_change()*100.0)
elif open_col is not None and close_col is not None:
d[open_col], d[close_col] = to_num(d[open_col]), to_num(d[close_col])
d = d.dropna(subset=[open_col, close_col])
d['ret_1h'] = (d[close_col] - d[open_col]) / d[open_col] * 100.0
else:
raise ValueError("Provide a price column or both open & close columns.")
d = d.dropna(subset=['ret_1h'])
return d.rename(columns={asset_col:'symbol', time_col:'timestamp'})[['symbol','timestamp','ret_1h']]
crypto_ret = returns_from_frame(ccd_clean)
stock_ret = returns_from_frame(sd)
crypto_ret['asset_class'] = 'Crypto'
stock_ret['asset_class'] = 'Stock'
rets = pd.concat([crypto_ret, stock_ret], ignore_index=True)
N = 4
top_assets = (rets.groupby('symbol').size().sort_values(ascending=False).head(N).index.tolist())
plot_df = rets[rets['symbol'].isin(top_assets)].copy()
BIN_COUNT = 72
ret_min = plot_df['ret_1h'].quantile(0.01)
ret_max = plot_df['ret_1h'].quantile(0.99)
if not np.isfinite(ret_min) or not np.isfinite(ret_max) or ret_min == ret_max:
ret_min, ret_max = plot_df['ret_1h'].min(), plot_df['ret_1h'].max()
if ret_min == ret_max:
ret_min, ret_max = -0.5, 0.5
bins = np.linspace(ret_min, ret_max, BIN_COUNT + 1)
def rose_polar_data(counts):
counts = counts.astype(float)
if counts.max() > 0:
counts = counts / counts.max()
angles = (np.arange(BIN_COUNT) + 0.5) * (360.0 / BIN_COUNT)
angles = np.append(angles, angles[0])
amplitudes = np.append(counts, counts[0])
polar_data = [{'angle': float(angle), 'amplitude': float(amp)}
for angle, amp in zip(angles, amplitudes)]
return polar_data
charts = []
for sym in top_assets:
vals = plot_df.loc[plot_df['symbol'] == sym, 'ret_1h'].to_numpy()
if vals.size == 0:
continue
counts, _ = np.histogram(vals, bins=bins)
polar_data = rose_polar_data(counts)
chart = lc.PolarChart(
theme=lc.Themes.Light,
html_text_rendering=True,
title=f'Polar Rose - Hourly Return Distribution - {sym}\n(radius = relative frequency)'
)
for r in [1.0, 0.5]:
angles_ref = np.linspace(0, 360, 360, endpoint=True)
ref_data = [{'angle': float(angle), 'amplitude': float(r)} for angle in angles_ref]
ref_series = chart.add_area_series()
ref_series.set_name(f'Reference (r={r})')
ref_series.set_data(ref_data)
ref_series.set_stroke(thickness=1, color='#cccccc')
series = chart.add_area_series()
series.set_name(sym)
series.set_data(polar_data)
series.set_stroke(thickness=2)
charts.append(chart)
for c in charts:
c.open()
Histogram of Hourly Price Changes by Asset
A histogram is the most direct way to view the distribution of hourly returns. It exposes centre, spread, skew, and tail frequency in absolute terms, enabling apples-to-apples frequency comparisons across assets when using common bin edges. Histograms show crypto assets typically have broader and heavier-tailed hourly return distributions than most stocks, implying greater intrahour risk and larger potential drawdowns.
# Chart 1B - Histogram of Hourly Price Changes by Asset
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import re
import numpy as np
import pandas as pd
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
# Expect existing dataframes: ccd_clean (crypto) and sd (stocks)
try:
ccd_clean
except NameError:
ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try:
sd
except NameError:
sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helper: robust hourly returns
def returns_from_frame(df,
asset_col=None, time_col=None,
price_col=None, open_col=None, close_col=None
):
"""Return ['symbol','timestamp','ret_1h'] from a generic OHLC/Close frame."""
d = df.copy()
def norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
colmap = {norm(c): c for c in d.columns}
cols_norm = set(colmap.keys())
def pick_exact(cands):
for c in cands:
if c in cols_norm:
return colmap[c]
return None
def pick_contains(subs):
for cn in cols_norm:
if any(sub in cn for sub in subs):
return colmap[cn]
return None
if asset_col is None:
asset_col = pick_exact(['symbol','ticker']) or pick_contains(['symbol','ticker','asset','name'])
if time_col is None:
time_col = pick_exact(['timestamp','datetime','date','time']) or pick_contains(['timestamp','datetime','date','time'])
if price_col is None:
price_col = (pick_exact(['priceusd','price','close','adjclose'])
or pick_contains(['priceusd','closeprice','lastprice','last','close','price']))
if open_col is None:
open_col = pick_exact(['open']) or pick_contains(['open'])
if close_col is None:
close_col = pick_exact(['close','adjclose']) or pick_contains(['close','closing'])
if asset_col is None or time_col is None:
raise ValueError(f"Missing key columns. Found asset={asset_col}, time={time_col}")
d[time_col] = pd.to_datetime(d[time_col], errors='coerce')
d = d.dropna(subset=[time_col]).sort_values([asset_col, time_col])
def to_num(x):
return (x.astype(str)
.str.replace(r'[^0-9.\-]', '', regex=True)
.replace({'': np.nan})
.astype(float))
if price_col is not None:
d[price_col] = to_num(d[price_col])
d = d.dropna(subset=[price_col])
d['ret_1h'] = d.groupby(asset_col, group_keys=False)[price_col].apply(lambda s: s.pct_change()*100.0)
elif open_col is not None and close_col is not None:
d[open_col], d[close_col] = to_num(d[open_col]), to_num(d[close_col])
d = d.dropna(subset=[open_col, close_col])
d['ret_1h'] = (d[close_col] - d[open_col]) / d[open_col] * 100.0
else:
raise ValueError("Provide a price column or both open & close columns.")
d = d.dropna(subset=['ret_1h'])
return d.rename(columns={asset_col:'symbol', time_col:'timestamp'})[['symbol','timestamp','ret_1h']]
# Build returns
crypto_ret = returns_from_frame(ccd_clean)
stock_ret = returns_from_frame(sd) # if this fails, override with your exact column names
# Examples if needed:
# stock_ret = returns_from_frame(sd, asset_col='Ticker', time_col='Date', price_col='Close')
# stock_ret = returns_from_frame(sd, asset_col='Ticker', time_col='Date', open_col='Open', close_col='Close')
crypto_ret['asset_class'] = 'Crypto'
stock_ret['asset_class'] = 'Stock'
rets = pd.concat([crypto_ret, stock_ret], ignore_index=True)
# Pick which assets to show
N = 4 # change as you like
top_assets = (rets.groupby('symbol').size()
.sort_values(ascending=False)
.head(N).index.tolist())
plot_df = rets[rets['symbol'].isin(top_assets)].copy()
# Build common bins
# Wider bins if stock returns are near 0; increase BIN_COUNT for more detail
BIN_COUNT = 80
ret_min = plot_df['ret_1h'].quantile(0.01)
ret_max = plot_df['ret_1h'].quantile(0.99)
if not np.isfinite(ret_min) or not np.isfinite(ret_max) or ret_min == ret_max:
ret_min, ret_max = plot_df['ret_1h'].min(), plot_df['ret_1h'].max()
if ret_min == ret_max: # all zeros
ret_min, ret_max = -0.5, 0.5
bins = np.linspace(ret_min, ret_max, BIN_COUNT + 1)
# Render a BarChart per asset
charts = []
for sym in top_assets:
vals = plot_df.loc[plot_df['symbol'] == sym, 'ret_1h'].to_numpy()
if vals.size == 0:
continue
counts, bin_edges = np.histogram(vals, bins=bins)
# Nicer category labels: show every ~5th bin to avoid clutter
bar_data = []
for i, count in enumerate(counts):
if BIN_COUNT > 60 and i % 5 != 0:
label = "" # sparse labels for readability
else:
label = f"{bin_edges[i]:.2f}–{bin_edges[i+1]:.2f}"
bar_data.append({"category": label, "value": int(count)})
chart = lc.BarChart(
vertical=True,
theme=lc.Themes.Light,
title=f'Hourly Price Change (%) - {sym}\n'
f'X: Hourly return (%) bins | Y: Frequency',
legend={'visible': False},
html_text_rendering=True
)
chart.set_data(bar_data)
chart.set_sorting('disabled') # keep bin order
chart.set_bars_color('cyan')
charts.append(chart)
for c in charts:
c.open()
Strip/Jitter Plot of Trading Volume by Asset
A strip plot (jittered scatter) on a log scale compresses large magnitude differences and shows the full distribution of volumes per asset-spread, clusters, and outliers, without hiding detail inside bins (unlike a bar chart or histogram). Median lines provide a quick central tendency reference.
The jittered log-volume view reveals clear liquidity tiers across the top assets. Assets with higher medians and narrow dispersion are more liquid and predictable intraday, while those with lower medians and fat upper tails experience episodic bursts, relevant for slippage and order sizing.
# Chart 2A - Strip (Jittered Scatter) of Trading Volume by Asset (log10)
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import re
import numpy as np
import pandas as pd
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
try:
ccd_clean
except NameError:
ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try:
sd
except NameError:
sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helpers
def _norm(s):
return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _detect_col(df, exact=None, contains=None):
exact = exact or []
contains = contains or []
cmap = {_norm(c): c for c in df.columns}
keys = set(cmap.keys())
for key in exact:
if key in keys:
return cmap[key]
for k in keys:
if any(sub in k for sub in contains):
return cmap[k]
return None
def build_volume_tidy(df, asset_label):
"""Return columns: symbol, volume (float), asset_class."""
d = df.copy()
sym_col = _detect_col(d, exact=['symbol','ticker'], contains=['symbol','ticker','name'])
vol_col = (_detect_col(d, exact=['vol24h','totalvol','volume'],
contains=['vol24h','totalvol','volume'])
or _detect_col(d, contains=['vol'])) # last resort
if sym_col is None or vol_col is None:
raise ValueError(f"Could not find symbol/volume in columns: {list(d.columns)}")
# Coerce volume to numeric (strip commas, units)
d[vol_col] = (d[vol_col].astype(str)
.str.replace(r'[^0-9.\-eE]', '', regex=True)
.replace({'': np.nan})
.astype(float))
d = d.dropna(subset=[sym_col, vol_col])
out = d[[sym_col, vol_col]].rename(columns={sym_col:'symbol', vol_col:'volume'})
out['asset_class'] = asset_label
return out
# Build tidy volumes
vol_crypto = build_volume_tidy(ccd_clean, 'Crypto')
vol_stocks = build_volume_tidy(sd, 'Stock')
vol = pd.concat([vol_crypto, vol_stocks], ignore_index=True)
# Focus on top-N assets by count for clarity
N = 8
top_assets = (vol.groupby('symbol').size()
.sort_values(ascending=False).head(N).index.tolist())
plot_df = vol[vol['symbol'].isin(top_assets)].copy()
# Log-transform volume to compress scale differences
plot_df = plot_df[plot_df['volume'] > 0].copy()
plot_df['log10_volume'] = np.log10(plot_df['volume'])
# Map each symbol to an x-category with jitter
symbols = sorted(plot_df['symbol'].unique().tolist())
x_index = {s:i for i,s in enumerate(symbols)}
rng = np.random.default_rng(42)
plot_df['x'] = plot_df['symbol'].map(x_index).astype(float) + rng.uniform(-0.3, 0.3, size=len(plot_df))
plot_df['y'] = plot_df['log10_volume']
# Compute per-asset medians for reference lines
medians = plot_df.groupby('symbol')['y'].median()
# LightningChart rendering
chart = lc.ChartXY(theme=lc.Themes.Light,html_text_rendering=True,
title='Trading Volume by Asset - Strip (Jittered) Plot - log10 scale')
# Helper to add series (handles API name variations)
def add_points(ch):
for fn in ('addPointSeries','add_point_series'):
if hasattr(ch, fn): return getattr(ch, fn)()
raise AttributeError("Point series adder not found on ChartXY.")
def add_line(ch):
for fn in ('addLineSeries','add_line_series'):
if hasattr(ch, fn): return getattr(ch, fn)()
raise AttributeError("Line series adder not found on ChartXY.")
def add_data(series, arr2d):
# try ndarray; fall back to list[dict] or list[tuple]
try:
series.add(arr2d); return
except Exception:
pass
try:
series.add([{'x': float(x), 'y': float(y)} for x,y in arr2d]); return
except Exception:
pass
series.add([(float(x), float(y)) for x,y in arr2d])
def set_point_size(series, size=3.0):
for fn in ('setPointSize','set_point_size'):
if hasattr(series, fn):
try: getattr(series, fn)(size)
except Exception: pass
def set_stroke(series, thickness=2.0):
for fn in ('setStrokeThickness','set_stroke_thickness'):
if hasattr(series, fn):
try: getattr(series, fn)(thickness)
except Exception: pass
def set_name(series, name):
for fn in ('setName','set_name','setLabel','set_label'):
if hasattr(series, fn):
try: getattr(series, fn)(name)
except Exception: pass
# One point series per asset
for sym in symbols:
s = add_points(chart)
set_name(s, sym)
set_point_size(s, 3.0)
data = plot_df.loc[plot_df['symbol']==sym, ['x','y']].to_numpy()
add_data(s, data)
# Median reference line per asset (short horizontal segment at x=category ±0.35)
for i, sym in enumerate(symbols):
y = medians.loc[sym]
x0, x1 = i-0.35, i+0.35
seg = np.array([[x0, y],[x1, y]], dtype=float)
l = add_line(chart)
set_name(l, f'{sym} median')
set_stroke(l, 2.5)
add_data(l, seg)
# Nice viewing window
xmin, xmax = -0.8, len(symbols)-0.2
ymin, ymax = plot_df['y'].quantile(0.02), plot_df['y'].quantile(0.98)
if not np.isfinite(ymin) or not np.isfinite(ymax) or ymin==ymax:
ymin, ymax = plot_df['y'].min(), plot_df['y'].max()
# Try to set intervals if your build exposes them (safe to ignore if missing)
for axis_fn, lo, hi in (('getDefaultAxisX', xmin, xmax), ('getDefaultAxisY', ymin, ymax)):
if hasattr(chart, axis_fn):
try:
axis = getattr(chart, axis_fn)()
if hasattr(axis, 'setInterval'):
axis.setInterval(lo, hi)
except Exception:
pass
chart.open()
Box Plot of Trading Volume by Asset
A box plot summarizes each asset’s volume distribution with five-number stats (median, quartiles, whiskers) and flags outliers. Using log10 compresses scale differences so cross-asset liquidity levels and variability are directly comparable.
On a log scale, high-liquidity assets exhibit higher medians and often tighter IQRs, implying consistent participation. Lower-liquidity assets show lower medians with wider IQRs and frequent high-side outliers, reflecting sporadic interest and potential execution risk (slippage).
# Chart 2B - Box Plot of Trading Volume by Asset (log10)
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import re
import numpy as np
import pandas as pd
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
# Ensure dataframes exist
try: ccd_clean
except NameError: ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try: sd
except NameError: sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helpers
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _detect_col(df, exact=None, contains=None):
exact, contains = exact or [], contains or []
cmap = {_norm(c): c for c in df.columns}; keys = set(cmap.keys())
for k in exact:
if k in keys: return cmap[k]
for kn in keys:
if any(sub in kn for sub in contains): return cmap[kn]
return None
def build_volume_tidy(df, asset_label):
d = df.copy()
sym_col = _detect_col(d, exact=['symbol','ticker'], contains=['symbol','ticker','name'])
vol_col = (_detect_col(d, exact=['vol24h','totalvol','volume'],
contains=['vol24h','totalvol','volume'])
or _detect_col(d, contains=['vol']))
if sym_col is None or vol_col is None:
raise ValueError(f"Could not find symbol/volume in: {list(d.columns)}")
d[vol_col] = (d[vol_col].astype(str)
.str.replace(r'[^0-9.\-eE]', '', regex=True)
.replace({'': np.nan}).astype(float))
d = d.dropna(subset=[sym_col, vol_col])
out = d[[sym_col, vol_col]].rename(columns={sym_col:'symbol', vol_col:'volume'})
out['asset_class'] = asset_label
return out
# Assemble tidy volume data
vol = pd.concat([
build_volume_tidy(ccd_clean, 'Crypto'),
build_volume_tidy(sd, 'Stock')
], ignore_index=True)
# Keep only positive volumes and compute log10
vol = vol[vol['volume'] > 0].copy()
vol['log10_volume'] = np.log10(vol['volume'])
# Choose top-N assets by row count (keeps charts readable)
N = 10
symbols = (vol.groupby('symbol').size()
.sort_values(ascending=False).head(N).index.tolist())
# Build category->data mapping (log10 volumes)
category_data = {sym: vol.loc[vol['symbol']==sym, 'log10_volume'].to_numpy()
for sym in symbols}
# Build Box Plot
chart = lc.ChartXY(theme=lc.Themes.Light,
title='Box Plot - Trading Volume by Asset (Y = log10(volume))',
html_text_rendering=True)
# Add box series (handle method naming differences)
box_series = chart.add_box_series() if hasattr(chart, 'add_box_series') else chart.addBoxSeries()
dataset = []
x_out, y_out = [], []
for i, sym in enumerate(symbols):
data = category_data[sym]
if data.size == 0:
continue
start = (i * 2) + 1
end = start + 1
q1 = float(np.percentile(data, 25))
q3 = float(np.percentile(data, 75))
med = float(np.median(data))
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
non_out = data[(data >= lower_bound) & (data <= upper_bound)]
if non_out.size == 0:
# fallback if everything looks like an outlier
non_out = data
lower_ext = float(np.min(non_out))
upper_ext = float(np.max(non_out))
dataset.append({
'start': start,
'end': end,
'lowerQuartile': q1,
'upperQuartile': q3,
'median': med,
'lowerExtreme': lower_ext,
'upperExtreme': upper_ext,
})
# collect outliers
outliers = data[(data < lower_bound) | (data > upper_bound)]
if outliers.size:
x_out.extend([start + 0.5] * len(outliers))
y_out.extend(outliers.tolist())
# Push all boxes at once
box_series.add_multiple(dataset)
# Add outliers (if any)
if len(y_out):
pt_series = (chart.add_point_series(sizes=True, rotations=True, lookup_values=True)
if hasattr(chart, 'add_point_series') else chart.addPointSeries(sizes=True, rotations=True, lookup_values=True))
pt_series.set_point_color('red')
pt_series.append_samples(x_values=x_out, y_values=y_out, sizes=[9]*len(y_out))
# Optional: show asset names on X axis using categories (if supported)
try:
axX = chart.getDefaultAxisX()
if hasattr(lc, 'AxisTickStrategies'):
axX.setTickStrategy(lc.AxisTickStrategies.Category)
centers = [(i*2)+1.5 for i in range(len(symbols))]
axX.setCategories([lc.Category(label=sym, value=centers[i]) for i, sym in enumerate(symbols)])
except Exception:
pass
chart.open()
Scatter Plot of Open vs Close Prices (coloured by asset)
A scatter of Open vs. Close directly shows intrahour drift and volatility. Distance from the y = x line quantifies how much the hour moved; the cloud’s tightness vs. spread compares stability across assets. Top assets show a stock cluster hugging the diagonal (smaller candles), while several crypto tickers scatter wider-consistent with larger intrahour swings. Occasional distant points flag shock hours.
# Chart 3A - Scatter Plot of Open vs Close Prices (colored by asset)
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import re
import numpy as np
import pandas as pd
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
# Ensure dataframes exist
try:
ccd_clean
except NameError:
ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try:
sd
except NameError:
sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helpers
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _detect(df, exact=None, contains=None):
exact, contains = exact or [], contains or []
cmap = {_norm(c): c for c in df.columns}; keys = set(cmap.keys())
for k in exact:
if k in keys: return next(c for c in df.columns if _norm(c)==k)
for col in df.columns:
if any(sub in _norm(col) for sub in contains or []):
return col
return None
def _to_num(s):
return (s.astype(str)
.str.replace(r'[^0-9.\-eE]', '', regex=True)
.replace({'': np.nan})
.astype(float))
def open_close_from_df(df, asset_label=None,
asset_col=None, time_col=None,
open_col=None, close_col=None, price_col=None):
"""Return tidy [symbol, timestamp, open, close, asset_class].
If only a single price column exists, uses prev price as 'open' (close-to-close)."""
d = df.copy()
# Detect columns
asset_col = asset_col or _detect(d, exact=['symbol','ticker'], contains=['symbol','ticker','name'])
time_col = time_col or _detect(d, exact=['timestamp','datetime','date','time'],
contains=['timestamp','datetime','date','time'])
if open_col is None:
open_col = _detect(d, exact=['open'])
if close_col is None:
close_col = _detect(d, exact=['close','adjclose'], contains=['close'])
if price_col is None:
price_col = _detect(d, exact=['priceusd','price'], contains=['priceusd','last','price'])
if asset_col is None or time_col is None:
raise ValueError(f"Missing key columns. Found asset={asset_col}, time={time_col}")
# Parse + sort
d[time_col] = pd.to_datetime(d[time_col], errors='coerce')
d = d.dropna(subset=[time_col]).sort_values([asset_col, time_col])
if close_col is not None and open_col is not None:
d[open_col] = _to_num(d[open_col])
d[close_col] = _to_num(d[close_col])
out = d.dropna(subset=[open_col, close_col]).copy()
out = out.rename(columns={asset_col:'symbol', time_col:'timestamp',
open_col:'open', close_col:'close'})
else:
# Fallback: single price -> open = previous close within each asset
pcol = close_col or price_col
if pcol is None:
raise ValueError("Could not find Open/Close/Price columns.")
d[pcol] = _to_num(d[pcol])
d = d.dropna(subset=[pcol]).copy()
d['open'] = d.groupby(asset_col, group_keys=False)[pcol].shift(1)
d['close'] = d[pcol]
out = d.dropna(subset=['open','close']).rename(
columns={asset_col:'symbol', time_col:'timestamp'}
)[['symbol','timestamp','open','close']]
if asset_label is not None:
out['asset_class'] = asset_label
return out
# Build tidy open/close pairs
oc_crypto = open_close_from_df(ccd_clean, asset_label='Crypto')
oc_stocks = open_close_from_df(sd, asset_label='Stock')
oc = pd.concat([oc_crypto, oc_stocks], ignore_index=True)
# Pick top-N assets by sample count for legible plot
N = 10
top_assets = (oc.groupby('symbol').size()
.sort_values(ascending=False).head(N).index.tolist())
plot_df = oc[oc['symbol'].isin(top_assets)].copy()
# LightningChart scatter
chart = lc.ChartXY(theme=lc.Themes.White,
title='Open vs Close Prices - Colored by Asset',
html_text_rendering=True)
# Helper functions (handle API naming differences)
def add_points(ch):
for fn in ('addPointSeries','add_point_series'):
if hasattr(ch, fn): return getattr(ch, fn)()
raise AttributeError("Point series adder not found.")
def add_line(ch):
for fn in ('addLineSeries','add_line_series'):
if hasattr(ch, fn): return getattr(ch, fn)()
raise AttributeError("Line series adder not found.")
def add_xy(series, arr):
for payload in (arr,
[{'x': float(x), 'y': float(y)} for x,y in arr],
[(float(x), float(y)) for x,y in arr]):
try: series.add(payload); return
except Exception: pass
def set_name(s, name):
for fn in ('setName','set_name','setLabel','set_label'):
if hasattr(s, fn):
try: getattr(s, fn)(name)
except Exception: pass
def set_point_size(s, size):
for fn in ('setPointSize','set_point_size'):
if hasattr(s, fn):
try: getattr(s, fn)(size)
except Exception: pass
# Add a y=x reference line (sessions finishing flat)
xy_min = float(min(plot_df[['open','close']].min()))
xy_max = float(max(plot_df[['open','close']].max()))
ref = add_line(chart)
add_xy(ref, np.array([[xy_min, xy_min], [xy_max, xy_max]], dtype=float))
try: ref.setStrokeThickness(1.5)
except Exception: pass
set_name(ref, 'y = x')
# Add a point series per asset (colored by asset automatically by LC theme)
for sym in top_assets:
s = add_points(chart)
set_name(s, sym)
set_point_size(s, 2.2)
pts = plot_df.loc[plot_df['symbol']==sym, ['open','close']].to_numpy(dtype=float)
add_xy(s, pts)
# Axes titles (safe if supported)
try: chart.getDefaultAxisX().setTitle('Open Price')
except Exception: pass
try: chart.getDefaultAxisY().setTitle('Close Price')
except Exception: pass
# Make the view square-ish to read the y=x line fairly
try:
# Pad a bit around range
pad = 0.03 * (xy_max - xy_min if np.isfinite(xy_max-xy_min) else 1.0)
x0, x1 = xy_min - pad, xy_max + pad
y0, y1 = x0, x1
axX, axY = chart.getDefaultAxisX(), chart.getDefaultAxisY()
if hasattr(axX,'setInterval'): axX.setInterval(x0, x1)
if hasattr(axY,'setInterval'): axY.setInterval(y0, y1)
except Exception:
pass
chart.open()
Spider Chart of Hour-of-Day Volatility for 4 assets
A spider/radar chart compactly maps the intraday volatility profile over the 24-hour cycle, exposing time-of-day peaks and troughs at a glance. The radar view reveals distinct intraday rhythms: stock symbols typically spike around opening/closing hours, while crypto shows broader or shifted peaks, reflecting 24/7 trading and global participation. These patterns can inform execution timing and risk windows for each asset.
# Chart 3B - Spider Chart of Hour-of-Day Volatility for 4 assets
# Metric: mean absolute return (%) per hour-of-day
# Axes: every 3 hours (0, 3, …, 21). Values normalized to 0–100 across all assets/hours.
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import numpy as np
import pandas as pd
import re
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
try:
ccd_clean
except NameError:
ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try:
sd
except NameError:
sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helpers
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _detect(df, exact=(), contains=()):
if df is None or df.empty: return None
cmap = {_norm(c): c for c in df.columns}
keys = set(cmap.keys())
for k in exact or ():
if k in keys: return cmap[k]
for c in df.columns:
if any(sub in _norm(c) for sub in (contains or ())): return c
return None
def _to_num(s):
return pd.to_numeric(
s.astype(str).str.replace(r'[^0-9.\-eE]', '', regex=True).replace({'': np.nan}),
errors='coerce'
)
def hourly_abs_returns(df, asset_label=None,
asset_col=None, time_col=None,
price_col=None, open_col=None, close_col=None):
"""Tidy DF: symbol, timestamp, hour, abs_ret_pct (+ asset_class). Returns empty DF if not derivable."""
if df is None or df.empty:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
d = df.copy()
asset_col = asset_col or _detect(d, exact=('symbol','ticker'), contains=('symbol','ticker','name','asset'))
time_col = time_col or _detect(d, exact=('timestamp','datetime','date','time'),
contains=('timestamp','datetime','date','time'))
if price_col is None:
price_col = _detect(d, exact=('priceusd','price','close','adjclose'),
contains=('priceusd','price','last','close'))
if open_col is None:
open_col = _detect(d, exact=('open',), contains=('open',))
if close_col is None:
close_col = _detect(d, exact=('close','adjclose'), contains=('close',))
if asset_col is None or time_col is None:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
d[time_col] = pd.to_datetime(d[time_col], errors='coerce')
d = d.dropna(subset=[time_col]).sort_values([asset_col, time_col])
if open_col is not None and close_col is not None:
d[open_col] = _to_num(d[open_col])
d[close_col] = _to_num(d[close_col])
d = d.dropna(subset=[open_col, close_col])
if d.empty:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
ret_pct = (d[close_col] - d[open_col]) / d[open_col] * 100.0
elif price_col is not None:
d[price_col] = _to_num(d[price_col])
d = d.dropna(subset=[price_col])
if d.empty:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
ret_pct = d.groupby(asset_col, group_keys=False)[price_col].pct_change() * 100.0
else:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
out = pd.DataFrame({
'symbol': d[asset_col].values,
'timestamp': d[time_col].values,
'abs_ret_pct': np.abs(ret_pct.values)
}).dropna(subset=['abs_ret_pct'])
if out.empty:
return pd.DataFrame(columns=['symbol','timestamp','hour','abs_ret_pct','asset_class'])
out['hour'] = pd.to_datetime(out['timestamp']).dt.hour
if asset_label is not None:
out['asset_class'] = asset_label
return out[['symbol','timestamp','hour','abs_ret_pct','asset_class']]
# Build data
ar_crypto = hourly_abs_returns(ccd_clean, 'Crypto')
ar_stocks = hourly_abs_returns(sd, 'Stock')
ar = pd.concat([ar_crypto, ar_stocks], ignore_index=True)
if ar.empty:
raise RuntimeError("Could not compute hourly returns from the provided datasets.")
# Pick 4 assets by sample count
N = 4
top_assets = (ar.groupby('symbol').size().sort_values(ascending=False).head(N).index.tolist())
plot = ar[ar['symbol'].isin(top_assets)].copy()
# Mean |return| % per hour-of-day per asset
hourly = (plot.groupby(['symbol','hour'])['abs_ret_pct']
.mean()
.reset_index())
# Spider axes: every 3 hours to keep it readable (8 axes)
hours = list(range(0, 24, 3)) # 0,3,6,9,12,15,18,21
categories = [f'{h:02d}h' for h in hours]
# Global normalization for comparability → 0..100
r_max = hourly.loc[hourly['hour'].isin(hours), 'abs_ret_pct'].max()
if not np.isfinite(r_max) or r_max <= 0:
r_max = 1.0
def points_for_symbol(sym):
vals = []
for h in hours:
v = hourly.loc[(hourly['symbol']==sym) & (hourly['hour']==h), 'abs_ret_pct'].mean()
if not np.isfinite(v): v = 0.0
vals.append(float(v) / r_max * 100.0) # scale to 0..100
return [{'axis': cat, 'value': val} for cat, val in zip(categories, vals)]
# Spider Chart
chart = lc.SpiderChart(
theme=lc.Themes.Dark,
title='Hour-of-Day Volatility - Spider (radius = relative mean |return|, 0..100)',
html_text_rendering=True
)
# Set the axes (categories) explicitly if the API supports it
if hasattr(chart, 'set_categories'):
chart.set_categories(categories)
# Add 4 series (one per asset)
for sym in top_assets:
series = chart.add_series()
series.set_name(sym) if hasattr(series,'set_name') else None
series.add_points(points_for_symbol(sym))
chart.open()
Treemap of Market Capitalization (or Proxy) by Class –> Asset
A treemap efficiently conveys hierarchical composition-here, asset class -> symbol, while encoding both relative size (area) and magnitude (color). The treemap highlights strong size inequality among assets, only a few command substantial value, while most remain minor.
# Chart 4A - Treemap of Market Capitalization (or Proxy) by Class --> Asset
# Radius = mean absolute return (%) per hour-of-day, normalized (0..1)
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import numpy as np
import pandas as pd
import re
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
try: ccd_clean
except NameError: ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try: sd
except NameError: sd = pd.read_csv("stocks.csv", encoding="latin1")
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _pick_col(df, *, exact=(), contains=(), exclude_contains=()):
"""Pick first column by exact name or substring; avoid excluded substrings."""
normed = {_norm(c): c for c in df.columns}
keys = set(normed.keys())
def ok(name):
n = _norm(name)
return not any(ex in n for ex in exclude_contains)
for k in exact:
if k in keys and ok(normed[k]):
return normed[k]
for c in df.columns:
cn = _norm(c)
if any(sub in cn for sub in contains) and ok(c):
return c
return None
def _num(s):
# robust numeric coercion
x = (s.astype(str)
.str.replace(r'[^0-9.\-eE]', '', regex=True)
.replace({'': np.nan}))
return pd.to_numeric(x, errors='coerce')
def _first(*vals):
for v in vals:
if v is not None: return v
return None
def summarize_assets(df: pd.DataFrame, asset_label: str):
d = df.copy()
asset_col = _first(
_pick_col(d, exact=('symbol','ticker','name','asset'),
contains=('symbol','ticker','name','asset','security','company'))
)
mcap_col = _first(
_pick_col(d, exact=('marketcap','market_cap','marketcapitalization','marketcapitalisation'),
contains=('marketcap','marketcapital','mktcap','mktcapital'),
exclude_contains=('time','date','stamp'))
)
close_col = _first(
_pick_col(d, exact=('close','adjclose','closeprice','price','last','closeusd','closeus'),
contains=('close','adjclose','last','price'),
exclude_contains=('time','date','stamp'))
)
open_col = _first(
_pick_col(d, exact=('open','openprice'),
contains=('open',),
exclude_contains=('time','date','stamp'))
)
vol_col = _first(
_pick_col(d, exact=('volume','vol','volumeusd','totalvolume','volumeto','sharestraded','turnover'),
contains=('volume','vol','turnover','shares'),
exclude_contains=('time','date','stamp'))
)
if asset_col is None:
raise ValueError(f"[{asset_label}] No symbol/ticker column found in: {list(d.columns)}")
for c in (mcap_col, close_col, open_col, vol_col):
if c is not None:
d[c] = _num(d[c])
candidates = []
if mcap_col is not None:
mc = d[[asset_col, mcap_col]].dropna()
if not mc.empty and mc[mcap_col].gt(0).any():
g = (mc.groupby(asset_col, as_index=False)[mcap_col].median()
.rename(columns={mcap_col: 'value'}))
g['source'] = 'market_cap (median)'
candidates.append(g)
if close_col is not None and vol_col is not None:
d['__liq__'] = d[close_col] * d[vol_col]
liq = d[[asset_col, '__liq__']].replace([np.inf,-np.inf], np.nan).dropna()
if not liq.empty and liq['__liq__'].gt(0).any():
g = (liq.groupby(asset_col, as_index=False)['__liq__'].median()
.rename(columns={'__liq__': 'value'}))
g['source'] = 'median(close×volume)'
candidates.append(g)
price_like = close_col if close_col is not None else open_col
if price_like is not None:
p = d[[asset_col, price_like]].dropna()
if not p.empty and p[price_like].gt(0).any():
g = (p.groupby(asset_col, as_index=False)[price_like].median()
.rename(columns={price_like: 'value'}))
g['source'] = f'median({price_like}) price proxy'
candidates.append(g)
if not candidates:
g = (d.groupby(asset_col, as_index=False)[asset_col].size()
.rename(columns={'size': 'value'}))
g['source'] = 'row_count proxy'
candidates.append(g)
out = candidates[0].copy()
out['asset_class'] = asset_label
out.rename(columns={asset_col: 'symbol'}, inplace=True)
out = out[out['value'].replace([np.inf,-np.inf], np.nan).fillna(0) > 0]
return out.reset_index(drop=True)
# build summaries
crypto = summarize_assets(ccd_clean, 'Crypto')
stocks = summarize_assets(sd, 'Stock')
summary = pd.concat([crypto, stocks], ignore_index=True)
TOP_N = 30
summary = (summary.sort_values('value', ascending=False)
.groupby('asset_class', group_keys=False)
.head(TOP_N)
.reset_index(drop=True))
nodes = []
for grp, g in summary.groupby('asset_class'):
children = [{'name': sym, 'value': float(val)} for sym, val in zip(g['symbol'], g['value'])]
src = g['source'].mode().iat[0] if 'source' in g and not g['source'].empty else ''
nodes.append({'name': f"{grp} [{src}]", 'children': children})
# Treemap (with class-name fallback)
treemap = None
for cls in ('TreeMapChart','TreemapChart','TreeMap'):
if hasattr(lc, cls):
treemap = getattr(lc, cls)(theme=lc.Themes.Light,
title=('Market Capitalization / Proxy - Crypto vs Stocks\n'
'(priority: mcap --> median(close×volume) --> median(price) --> row_count)'),
html_text_rendering=True)
break
if treemap is None:
raise RuntimeError("Treemap chart class not found in lightningchart.")
# Coloring (safe)
vmin, vmax = float(summary['value'].min()), float(summary['value'].max())
try:
if not np.isfinite(vmin) or not np.isfinite(vmax) or vmin == vmax:
treemap.set_node_coloring(steps=[{'value': 0, 'color': '#8ecae6'},
{'value': 1, 'color': '#ffb703'}])
else:
treemap.set_node_coloring(steps=[
{'value': vmin, 'color': '#8ecae6', 'label':'Min'},
{'value': (vmin+vmax)/2, 'color': '#ffb703', 'label':' '},
{'value': vmax, 'color': '#fb8500', 'label':'Max'},
])
except Exception:
pass
for fn in ('set_data','setData'):
if hasattr(treemap, fn):
getattr(treemap, fn)(nodes); break
treemap.open()
Bar Chart of Market Capitalization Distribution by Crypto vs Stocks
A stacked bar cleanly compares total class size (Crypto vs Stock) while showing the composition of that total: the Top-K contributors vs the long tail (“Others”). It’s ideal for assessing concentration and share of top names side-by-side. This view highlights where market weight sits: whether each class is top-heavy (few names drive most value) or distributed (substantial “Others”).
# Chart 4B - Stacked Bar of Class totals split by Top-K assets (+ Others)
# Auto unit scaling (K/M/B/T) for very large values
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import numpy as np
import pandas as pd
import re
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
# Ensure dataframes exist
try: ccd_clean
except NameError: ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try: sd
except NameError: sd = pd.read_csv("stocks.csv", encoding="latin1")
# Helpers
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _pick_col(df, *, exact=(), contains=(), exclude_contains=()):
normed = {_norm(c): c for c in df.columns}; keys = set(normed.keys())
def ok(name): n = _norm(name); return not any(ex in n for ex in exclude_contains)
for k in exact:
if k in keys and ok(normed[k]): return normed[k]
for c in df.columns:
cn = _norm(c)
if any(sub in cn for sub in contains) and ok(c): return c
return None
def _num(s):
x = (s.astype(str).str.replace(r'[^0-9.\-eE]', '', regex=True).replace({'': np.nan}))
return pd.to_numeric(x, errors='coerce')
def _first(*vals):
for v in vals:
if v is not None: return v
return None
def summarize_assets(df: pd.DataFrame, asset_label: str):
d = df.copy()
asset_col = _first(_pick_col(d, exact=('symbol','ticker','name','asset'),
contains=('symbol','ticker','name','asset','security','company')))
mcap_col = _first(_pick_col(d, exact=('marketcap','market_cap','marketcapitalization','marketcapitalisation'),
contains=('marketcap','marketcapital','mktcap','mktcapital'),
exclude_contains=('time','date','stamp')))
close_col = _first(_pick_col(d, exact=('close','adjclose','closeprice','price','last','closeusd','closeus'),
contains=('close','adjclose','last','price'),
exclude_contains=('time','date','stamp')))
open_col = _first(_pick_col(d, exact=('open','openprice'), contains=('open',),
exclude_contains=('time','date','stamp')))
vol_col = _first(_pick_col(d, exact=('volume','vol','volumeusd','totalvolume','volumeto','sharestraded','turnover'),
contains=('volume','vol','turnover','shares'),
exclude_contains=('time','date','stamp')))
if asset_col is None:
raise ValueError(f"[{asset_label}] No symbol/ticker column found.")
for c in (mcap_col, close_col, open_col, vol_col):
if c is not None: d[c] = _num(d[c])
candidates = []
if mcap_col is not None:
mc = d[[asset_col, mcap_col]].dropna()
if not mc.empty and mc[mcap_col].gt(0).any():
g = (mc.groupby(asset_col, as_index=False)[mcap_col].median()
.rename(columns={mcap_col:'value'}))
g['source'] = 'market_cap (median)'; candidates.append(g)
if close_col is not None and vol_col is not None:
d['__liq__'] = d[close_col] * d[vol_col]
liq = d[[asset_col,'__liq__']].replace([np.inf,-np.inf], np.nan).dropna()
if not liq.empty and liq['__liq__'].gt(0).any():
g = (liq.groupby(asset_col, as_index=False)['__liq__'].median()
.rename(columns={'__liq__':'value'}))
g['source'] = 'median(close×volume)'; candidates.append(g)
price_like = close_col if close_col is not None else open_col
if price_like is not None:
p = d[[asset_col, price_like]].dropna()
if not p.empty and p[price_like].gt(0).any():
g = (p.groupby(asset_col, as_index=False)[price_like].median()
.rename(columns={price_like:'value'}))
g['source'] = f'median({price_like}) price proxy'; candidates.append(g)
if not candidates:
g = (d.groupby(asset_col, as_index=False)[asset_col].size()
.rename(columns={'size':'value'}))
g['source'] = 'row_count proxy'; candidates.append(g)
out = candidates[0].copy()
out['asset_class'] = asset_label
out.rename(columns={asset_col:'symbol'}, inplace=True)
out = out[out['value'].replace([np.inf,-np.inf], np.nan).fillna(0) > 0]
return out.reset_index(drop=True)
# Build summary
crypto = summarize_assets(ccd_clean, 'Crypto')
stocks = summarize_assets(sd, 'Stock')
summary = pd.concat([crypto, stocks], ignore_index=True)
TOP_K = 5
categories = ['Crypto', 'Stock']
def topk_plus_others(df_class, label_others):
dfc = df_class.sort_values('value', ascending=False)
top = dfc.head(TOP_K)[['symbol','value']]
total = dfc['value'].sum()
others_val = float(total - top['value'].sum())
out = list(zip(top['symbol'].tolist(), top['value'].astype(float).tolist()))
if others_val > 0:
out.append((label_others, others_val))
return out, total
crypto_list, crypto_total = topk_plus_others(summary[summary['asset_class']=='Crypto'], 'Others (Crypto)')
stock_list, stock_total = topk_plus_others(summary[summary['asset_class']=='Stock'], 'Others (Stock)')
# Union of subcategories (so stacks align)
subcats = [name for name, _ in crypto_list] + [name for name, _ in stock_list]
subcats = list(dict.fromkeys(subcats))
val_map_crypto = dict(crypto_list)
val_map_stock = dict(stock_list)
# Auto unit scaling
max_val = max(sum(v for _, v in crypto_list), sum(v for _, v in stock_list))
scales = [(1e12, 'Trillions'), (1e9, 'Billions'), (1e6, 'Millions'), (1e3, 'Thousands'), (1.0, '')]
scale, unit = next(((s, u) for s, u in scales if max_val / s >= 1.0), (1.0, ''))
scaled_stack = []
for name in subcats:
scaled_stack.append({
'subCategory': name,
'values': [
float(val_map_crypto.get(name, 0.0)) / scale,
float(val_map_stock.get(name, 0.0)) / scale,
]
})
# Render
title = f"Stacked Bar - Class Totals split by Top {TOP_K} Assets (+ Others)"
if unit:
title += f" [Values in {unit}]"
chart = lc.BarChart(
vertical=True,
theme=lc.Themes.Light,
title=title,
html_text_rendering=True,
legend={'visible': True}
)
# Keep category order
for fn in ('set_sorting','setSorting'):
if hasattr(chart, fn):
getattr(chart, fn)('disabled'); break
# Push data (scaled)
for fn in ('set_data_stacked','setDataStacked'):
if hasattr(chart, fn):
getattr(chart, fn)(categories, scaled_stack); break
# Cosmetics
try: chart.set_bars_padding(0.15)
except Exception: pass
chart.open()
Correlation Heatmap of All Numerical Parameters (Pearson)
Correlation heatmaps compactly summarize co-movement structure within each market (Crypto vs Stocks). Placing them side-by-side enables immediate comparison of price–volume dynamics and OHLC relationships across the two universes.
Stocks typically show tighter OHLC correlation blocks (more regular session behavior), while crypto can display looser or regime-dependent price-volume links. Where price-volume correlation is stronger, flows tend to move with price trends; weaker or mixed signs suggest liquidity responding to volatility rather than direction.
# Chart 5 - Side-by-Side Correlation Heatmaps of Crypto vs Stocks
# Developed with AI assistance to demonstrate LightningChart Python
import lightningchart as lc
import numpy as np
import pandas as pd
import re
# License
with open("D:/HAMK/Internship/MyProjects/lc_license.txt", "r") as f:
lc.set_license(f.read().strip())
try: ccd_clean
except NameError: ccd_clean = pd.read_csv("cryptocurrency.csv", encoding="latin1")
try: sd
except NameError: sd = pd.read_csv("stocks.csv", encoding="latin1")
# helpers
def _norm(s): return re.sub(r'[^a-z0-9]', '', str(s).lower())
def _pick(df, *, exact=(), contains=()):
m = {_norm(c): c for c in df.columns}
for k in exact:
if k in m: # exact normalized key found
# map back to original
for c in df.columns:
if _norm(c) == k: return c
for c in df.columns:
n = _norm(c)
if any(sub in n for sub in contains): return c
return None
def _to_num(s):
x = (s.astype(str).str.replace(r'[^0-9.\-eE]', '', regex=True)
.replace({'': np.nan}))
return pd.to_numeric(x, errors='coerce')
def _crypto_vars(df):
"""Return DataFrame with columns price, volume, high, low for crypto (with proxies if needed)."""
d = df.copy()
price_col = (_pick(d, exact=('priceusd','price'), contains=('priceusd','price','last')) or
_pick(d, exact=('close','adjclose'), contains=('close',)))
volume_col = (_pick(d, exact=('vol24h','totalvol','volume'),
contains=('vol24h','totalvol','volume','volumeto','turnover')))
high_col = _pick(d, exact=('high',), contains=('high',))
low_col = _pick(d, exact=('low',), contains=('low',))
chg_col = _pick(d, exact=('chg24h','chg_24h','change24h'),
contains=('chg24','24h','pct','percent','change'))
out = pd.DataFrame()
if price_col is not None:
price = _to_num(d[price_col])
out['price'] = price
# If no high/low, synthesize symmetric proxies from price & 24h change (bounded)
if high_col is None or low_col is None:
if chg_col is not None:
dpct = _to_num(d[chg_col]).clip(-50, 50).abs().fillna(0.0) / 100.0
# Make the synthetic band modest to avoid degenerate correlations
band = (0.5 * dpct).fillna(0.0)
out['high'] = price * (1.0 + band)
out['low'] = price * (1.0 - band)
if high_col is not None:
out['high'] = _to_num(d[high_col])
if low_col is not None:
out['low'] = _to_num(d[low_col])
if volume_col is not None:
out['volume'] = _to_num(d[volume_col])
return out.dropna()
def _stock_vars(df):
"""Return DataFrame with columns price, volume, high, low for stocks."""
d = df.copy()
open_col = _pick(d, exact=('open','openprice'), contains=('open',))
close_col = (_pick(d, exact=('close','adjclose'), contains=('close',)) or
_pick(d, exact=('price',), contains=('price',)))
high_col = _pick(d, exact=('high','highprice'), contains=('high',))
low_col = _pick(d, exact=('low','lowprice'), contains=('low',))
volume_col = _pick(d, exact=('volume',), contains=('volume','turnover','sharestraded'))
out = pd.DataFrame()
if close_col is not None: out['price'] = _to_num(d[close_col])
elif open_col is not None: out['price'] = _to_num(d[open_col])
if volume_col is not None: out['volume'] = _to_num(d[volume_col])
if high_col is not None: out['high'] = _to_num(d[high_col])
if low_col is not None: out['low'] = _to_num(d[low_col])
return out.dropna()
def _corr4(df):
"""Build 4×4 corr matrix in order [price, volume, high, low]. Missing entries filled sensibly."""
labels = ['price','volume','high','low']
if df.empty:
M = np.zeros((4,4), dtype=float)
np.fill_diagonal(M, 1.0)
return M, labels
# retain available columns in correct order
cols = [c for c in labels if c in df.columns]
sub = df[cols].dropna()
if len(sub) < 2:
M = np.zeros((4,4), dtype=float)
np.fill_diagonal(M, 1.0)
return M, labels
c = sub.corr().to_dict()
M = np.zeros((4,4), dtype=float)
for i, a in enumerate(labels):
for j, b in enumerate(labels):
if a in c and b in c[a] and pd.notna(c[a][b]):
M[i,j] = c[a][b]
elif a == b:
M[i,j] = 1.0
else:
M[i,j] = 0.0
return M, labels
# Build matrices
crypto_df = _crypto_vars(ccd_clean)
stock_df = _stock_vars(sd)
M_crypto, labels = _corr4(crypto_df)
M_stock, _ = _corr4(stock_df)
# render two heatmaps
chart = lc.ChartXY(
theme=lc.Themes.Light,
title='Correlation Heatmaps - Crypto vs Stocks (Price, Volume, High, Low)',
html_text_rendering=True
)
def add_heatmap(ch, cols, rows):
for fn in ('add_heatmap_grid_series','addHeatmapGridSeries'):
if hasattr(ch, fn):
return getattr(ch, fn)(columns=cols, rows=rows)
raise AttributeError("No heatmap grid series method found.")
def apply_palette(series):
palette = [
{'value': -1.0, 'color': '#313695'},
{'value': -0.5, 'color': '#74add1'},
{'value': 0.0, 'color': '#ffffbf'},
{'value': 0.5, 'color': '#f46d43'},
{'value': 1.0, 'color': '#a50026'},
]
series.set_palette_coloring(steps=palette, look_up_property='value', interpolate=True)
# Crypto (left)
h1 = add_heatmap(chart, 4, 4)
h1.set_start(x=0.0, y=0.0)
h1.set_end( x=4.2, y=4.0)
h1.set_step(x=1, y=1)
h1.set_intensity_interpolation(True)
h1.invalidate_intensity_values(M_crypto.tolist())
h1.hide_wireframe()
apply_palette(h1)
# Stocks (right)
h2 = add_heatmap(chart, 4, 4)
h2.set_start(x=4.8, y=0.0)
h2.set_end( x=9.0, y=4.0)
h2.set_step(x=1, y=1)
h2.set_intensity_interpolation(True)
h2.invalidate_intensity_values(M_stock.tolist())
h2.hide_wireframe()
apply_palette(h2)
# Name heatmaps, kill auto-legend, and add just two items
def _set_name(series, name):
for fn in ('setName', 'set_name', 'setLabel', 'set_label'):
if hasattr(series, fn):
try: getattr(series, fn)(name); return
except Exception: pass
_set_name(h1, 'Crypto')
_set_name(h2, 'Stock')
# Create a legend box
# lg = None
# for fn in ('addLegendBox', 'addLegend', 'add_legend_box', 'add_legend'):
# if hasattr(chart, fn):
# lg = getattr(chart, fn)()
# break
# if lg is not None:
# # Turn OFF automatic entries to avoid duplicates
# for fn in ('setAutoEntries', 'set_auto_entries', 'setAutomaticEntries', 'set_automatic_entries'):
# if hasattr(lg, fn):
# try: getattr(lg, fn)(False)
# except Exception: pass
# # If there’s a clear() API, use it
# for fn in ('clear', 'clearItems', 'clear_items'):
# if hasattr(lg, fn):
# try: getattr(lg, fn)()
# except Exception: pass
# # Ensure the series themselves won't auto-register elsewhere
# for s in (h1, h2):
# for fn in ('setShowInLegend', 'set_show_in_legend', 'setVisibleInLegend'):
# if hasattr(s, fn):
# try: getattr(s, fn)(False)
# except Exception: pass
# Now add exactly two entries
# try:
# lg.add(h1)
# lg.add(h2)
# except Exception:
# pass
chart.open()
Conclusion
- Crypto markets exhibit higher volatility and irregular trading patterns compared to stocks, driven by 24/7 global activity.
- Stocks show more stable, session-based behavior with clear volume peaks during market open and close hours.
- Volume and price correlations across both asset classes highlight that trading activity remains a strong indicator of market momentum.
Continue learning with LightningChart
Alternative to SciChart 2026: Why Performance Leaders Choose the Industry Standard
The data visualization market in 2026 is highly fragmented, yet in mission-critical sectors, one name consistently emerges when performance limits are pushed to the edge. While SciChart remains a known player, technical facts and market history favor LightningChart as...
Debunking SciChart’s Performance
Learn about SciChart’s misleading benchmark performance metrics that distort how a real high-end chart library performs.
Swing index indicator: formula and implementation with LC JS Trader
Learn the Swing Index indicator formula and implementation with LightningChart JS Trader to detect trend direction and refine trading signals.
