You are a senior Python developer joining the Global Supervisory and Surveillance technology team at a major investment bank. The team owns a Python-based alert pipeline that ingests trade data from multiple trading desks (equities, fixed income, derivatives), detects anomalous trading patterns, and generates compliance alerts for the surveillance team.
The system processes trade records during market hours across all asset classes. It runs on a monthly release cycle and must meet strict audit and compliance requirements.
An internal audit has flagged three issues with the current pipeline:
You have been given the current pipeline code (trade_surveillance_pipeline.py). Read it carefully — it may contain issues beyond the three primary audit findings.
Your task is to refactor this code to address the audit findings and improve the overall quality of the pipeline. You do not need to fix everything — focus on the issues that matter most and explain your reasoning for what you prioritized.
trade_surveillance_pipeline.pyCopy this file into your working environment. This is the code you are refactoring.
"""
trade_surveillance_pipeline.py
Global Supervisory and Surveillance — Alert Pipeline
This module processes trade data from multiple trading desks,
detects anomalous trading patterns, and generates compliance alerts.
"""
import copy
import time
import json
from threading import Thread
class TradeProcessor:
"""Base processor for trade data."""
def process(self, trades):
raise NotImplementedError
def validate(self, trade):
raise NotImplementedError
class EquityProcessor(TradeProcessor):
"""Processes equity trades."""
def process(self, trades):
results = []
for trade in trades:
if self.validate(trade):
results.append(self._analyze(trade))
return results
def validate(self, trade):
return trade.get("asset_class") == "equity"
def _analyze(self, trade):
return {"trade_id": trade["trade_id"], "status": "reviewed", "flags": []}
class SurveillanceProcessor(EquityProcessor):
"""Extends EquityProcessor for surveillance-specific logic.
Handles validation for trades requiring surveillance review."""
def validate(self, trade):
return trade.get("surveillance_required", False)
# Alert thresholds by asset class
ALERT_THRESHOLDS = [
("equity", 1000000),
("fixed_income", 5000000),
("derivative", 2000000),
]
def get_threshold(asset_class):
"""Returns the alert threshold for a given asset class."""
for ac, threshold in ALERT_THRESHOLDS:
if ac == asset_class:
return threshold
return None
def create_alert(trade, flags, metadata={}):
"""Creates an alert record from a trade and its flags."""
alert = copy.copy(trade)
alert["flags"] = flags
alert["metadata"] = metadata
metadata["processed_at"] = time.time()
return alert
def load_trades(desk_feeds):
"""Loads trade data from all desk feeds."""
all_trades = []
for feed in desk_feeds:
trades = json.loads(feed)
all_trades.extend(trades)
return all_trades
def fetch_trade_feed(desk_id, max_retries=3):
"""Fetches trade data from a desk feed with retry logic."""
retries = 0
while retries < max_retries:
try:
start = time.time()
result = _api_call(desk_id)
elapsed = time.time() - start
print(f"[LOG] fetch_trade_feed({desk_id}) took {elapsed:.2f}s")
return result
except Exception as e:
retries += 1
print(f"[LOG] fetch_trade_feed({desk_id}) retry {retries}: {e}")
time.sleep(1)
print(f"[LOG] fetch_trade_feed({desk_id}) FAILED after {max_retries} retries")
return None
def transform_trade(trade, normalize_notional, convert_currency, add_timestamps):
"""Applies transformations to a trade record."""
if normalize_notional:
trade["notional"] = float(trade.get("notional", 0))
if convert_currency:
trade["notional_usd"] = trade["notional"] * get_fx_rate(
trade.get("currency", "USD"))
if add_timestamps:
trade["processed_at"] = time.time()
return trade
def process_all_desks(desk_ids):
"""Processes trades from multiple desks concurrently."""
results = []
threads = []
for desk_id in desk_ids:
t = Thread(
target=lambda d: results.append(analyze_desk(d)),
args=(desk_id,)
)
threads.append(t)
t.start()
for t in threads:
t.join()
return results
def detect_anomalies(trades, window_minutes=30, threshold_multiplier=3.0):
"""Detects anomalous trading patterns within a time window.
Flags trader activity where combined notional exceeds threshold."""
anomalies = []
for i in range(len(trades)):
for j in range(len(trades)):
if i != j:
time_diff = abs(
trades[i]["timestamp"] - trades[j]["timestamp"]
)
if time_diff <= window_minutes * 60:
if trades[i]["trader_id"] == trades[j]["trader_id"]:
combined = (
trades[i]["notional"] + trades[j]["notional"]
)
avg = get_threshold(trades[i]["asset_class"])
if avg and combined > avg * threshold_multiplier:
anomalies.append({
"trader_id": trades[i]["trader_id"],
"trades": [
trades[i]["trade_id"],
trades[j]["trade_id"],
],
"combined_notional": combined,
"window_minutes": window_minutes,
})
return anomalies
def _api_call(desk_id):
"""Stub — simulates API response from a desk feed."""
return json.dumps([{
"trade_id": f"T{desk_id}001",
"asset_class": "equity",
"notional": 1500000,
"currency": "USD",
"trader_id": "TR001",
"timestamp": time.time(),
"surveillance_required": True,
}])
def get_fx_rate(currency):
"""Returns exchange rate to USD."""
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "JPY": 0.0067}
return rates.get(currency, 1.0)
def analyze_desk(desk_id):
"""Analyzes trades for a single desk."""
feed = fetch_trade_feed(desk_id)
if feed:
trades = load_trades([feed])
return detect_anomalies(trades)
return []
These constraints reflect the real operating environment. Honor them in your solution.
multiprocessing, concurrent.futures, collections, typing, abc). You cannot introduce new infrastructure (no Kafka, no Redis, no Celery, no Spark). Your solution must work within the existing Python runtime._api_call stub represents an external service you do not control. Your solution must work with the data as it arrives.
---
Identify and prioritize high-impact performance, memory, and code quality issues in a production Python pipeline
Refactor concurrent desk processing for improved throughput without introducing new infrastructure
Redesign anomaly detection logic to reduce false positives and improve computational efficiency
Write a production incident runbook for a latency-related alert pipeline failure
Demonstrate critical evaluation and iterative use of AI coding tools in a compliance context
On this page