Provn
    How it worksBrowse jobsFor companiesBlogLog in

    © 2026 Provn Inc. All rights reserved.

    About•Blog•Terms of Service•Privacy Policy

    Made with love in Seattle

    Challenges/Collabera/Software Engineer/Senior Python Developer Skills Challenge – Trade Surveillance

    Senior Python Developer Skills Challenge – Trade Surveillance

    Python, Trade Surveillance
    OOP
    Anomaly Detection
    Performance
    Financial Services
    Compliance
    Concurrency
    Estimated Time:
    45 minutes
    Status:Not started

    What You'll Be Doing

    The Scenario

    You are a senior Python developer joining the Global Supervisory and Surveillance technology team at a major investment bank. The team owns a Python-based alert pipeline that ingests trade data from multiple trading desks (equities, fixed income, derivatives), detects anomalous trading patterns, and generates compliance alerts for the surveillance team.

    The system processes trade records during market hours across all asset classes. It runs on a monthly release cycle and must meet strict audit and compliance requirements.

    The Problem

    An internal audit has flagged three issues with the current pipeline:

    • Performance: The pipeline cannot keep up with trade volume during peak hours. Processing trades from six desks concurrently takes 3–4x longer than expected. The compliance team is receiving alerts 45–60 minutes after the trades occurred, which is outside the bank's SLA.
    • Memory: The system loads all trade data into memory at once. During high-volume periods, the process exceeds its memory allocation and crashes, causing missed alerts — an audit finding.
    • Alert quality: The anomaly detection logic produces a high false-positive rate. The compliance team reports that fewer than 15% of generated alerts require action. The current detection approach is computationally expensive and does not scale.

    You have been given the current pipeline code (trade_surveillance_pipeline.py). Read it carefully — it may contain issues beyond the three primary audit findings.

    Your task is to refactor this code to address the audit findings and improve the overall quality of the pipeline. You do not need to fix everything — focus on the issues that matter most and explain your reasoning for what you prioritized.


    Starter Code: trade_surveillance_pipeline.py

    Copy this file into your working environment. This is the code you are refactoring.

    """
    trade_surveillance_pipeline.py
    Global Supervisory and Surveillance — Alert Pipeline
    
    This module processes trade data from multiple trading desks,
    detects anomalous trading patterns, and generates compliance alerts.
    """
    
    import copy
    import time
    import json
    from threading import Thread
    
    
    class TradeProcessor:
        """Base processor for trade data."""
    
        def process(self, trades):
            raise NotImplementedError
    
        def validate(self, trade):
            raise NotImplementedError
    
    
    class EquityProcessor(TradeProcessor):
        """Processes equity trades."""
    
        def process(self, trades):
            results = []
            for trade in trades:
                if self.validate(trade):
                    results.append(self._analyze(trade))
            return results
    
        def validate(self, trade):
            return trade.get("asset_class") == "equity"
    
        def _analyze(self, trade):
            return {"trade_id": trade["trade_id"], "status": "reviewed", "flags": []}
    
    
    class SurveillanceProcessor(EquityProcessor):
        """Extends EquityProcessor for surveillance-specific logic.
        Handles validation for trades requiring surveillance review."""
    
        def validate(self, trade):
            return trade.get("surveillance_required", False)
    
    
    # Alert thresholds by asset class
    ALERT_THRESHOLDS = [
        ("equity", 1000000),
        ("fixed_income", 5000000),
        ("derivative", 2000000),
    ]
    
    
    def get_threshold(asset_class):
        """Returns the alert threshold for a given asset class."""
        for ac, threshold in ALERT_THRESHOLDS:
            if ac == asset_class:
                return threshold
        return None
    
    
    def create_alert(trade, flags, metadata={}):
        """Creates an alert record from a trade and its flags."""
        alert = copy.copy(trade)
        alert["flags"] = flags
        alert["metadata"] = metadata
        metadata["processed_at"] = time.time()
        return alert
    
    
    def load_trades(desk_feeds):
        """Loads trade data from all desk feeds."""
        all_trades = []
        for feed in desk_feeds:
            trades = json.loads(feed)
            all_trades.extend(trades)
        return all_trades
    
    
    def fetch_trade_feed(desk_id, max_retries=3):
        """Fetches trade data from a desk feed with retry logic."""
        retries = 0
        while retries < max_retries:
            try:
                start = time.time()
                result = _api_call(desk_id)
                elapsed = time.time() - start
                print(f"[LOG] fetch_trade_feed({desk_id}) took {elapsed:.2f}s")
                return result
            except Exception as e:
                retries += 1
                print(f"[LOG] fetch_trade_feed({desk_id}) retry {retries}: {e}")
                time.sleep(1)
        print(f"[LOG] fetch_trade_feed({desk_id}) FAILED after {max_retries} retries")
        return None
    
    
    def transform_trade(trade, normalize_notional, convert_currency, add_timestamps):
        """Applies transformations to a trade record."""
        if normalize_notional:
            trade["notional"] = float(trade.get("notional", 0))
        if convert_currency:
            trade["notional_usd"] = trade["notional"] * get_fx_rate(
                trade.get("currency", "USD"))
        if add_timestamps:
            trade["processed_at"] = time.time()
        return trade
    
    
    def process_all_desks(desk_ids):
        """Processes trades from multiple desks concurrently."""
        results = []
        threads = []
    
        for desk_id in desk_ids:
            t = Thread(
                target=lambda d: results.append(analyze_desk(d)),
                args=(desk_id,)
            )
            threads.append(t)
            t.start()
    
        for t in threads:
            t.join()
    
        return results
    
    
    def detect_anomalies(trades, window_minutes=30, threshold_multiplier=3.0):
        """Detects anomalous trading patterns within a time window.
        Flags trader activity where combined notional exceeds threshold."""
        anomalies = []
        for i in range(len(trades)):
            for j in range(len(trades)):
                if i != j:
                    time_diff = abs(
                        trades[i]["timestamp"] - trades[j]["timestamp"]
                    )
                    if time_diff <= window_minutes * 60:
                        if trades[i]["trader_id"] == trades[j]["trader_id"]:
                            combined = (
                                trades[i]["notional"] + trades[j]["notional"]
                            )
                            avg = get_threshold(trades[i]["asset_class"])
                            if avg and combined > avg * threshold_multiplier:
                                anomalies.append({
                                    "trader_id": trades[i]["trader_id"],
                                    "trades": [
                                        trades[i]["trade_id"],
                                        trades[j]["trade_id"],
                                    ],
                                    "combined_notional": combined,
                                    "window_minutes": window_minutes,
                                })
        return anomalies
    
    
    def _api_call(desk_id):
        """Stub — simulates API response from a desk feed."""
        return json.dumps([{
            "trade_id": f"T{desk_id}001",
            "asset_class": "equity",
            "notional": 1500000,
            "currency": "USD",
            "trader_id": "TR001",
            "timestamp": time.time(),
            "surveillance_required": True,
        }])
    
    
    def get_fx_rate(currency):
        """Returns exchange rate to USD."""
        rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "JPY": 0.0067}
        return rates.get(currency, 1.0)
    
    
    def analyze_desk(desk_id):
        """Analyzes trades for a single desk."""
        feed = fetch_trade_feed(desk_id)
        if feed:
            trades = load_trades([feed])
            return detect_anomalies(trades)
        return []
    

    Constraints

    These constraints reflect the real operating environment. Honor them in your solution.

    • Python standard library + common packages only. The production environment supports Python 3.10+, standard library modules, and common packages (e.g., multiprocessing, concurrent.futures, collections, typing, abc). You cannot introduce new infrastructure (no Kafka, no Redis, no Celery, no Spark). Your solution must work within the existing Python runtime.
    • Do not rewrite from scratch. This is an existing codebase on a monthly release cycle. You are joining a team, not replacing one. Refactor the existing code — preserve the module's API surface (function signatures that other services call) while improving internals. Breaking API contracts would require a cross-team migration that is out of scope for a single release.
    • Upstream data feeds are a black box. You cannot modify the format, frequency, or schema of trade data feeds from trading desks. The _api_call stub represents an external service you do not control. Your solution must work with the data as it arrives.
    • Production ownership. Whatever you propose, your team owns in production. The on-call engineer will be paged when it fails at 2 AM. Design accordingly — your solution should be debuggable, observable, and recoverable.
    
    ---
    
    

    What You'll Accomplish

    Identify and prioritize high-impact performance, memory, and code quality issues in a production Python pipeline

    Refactor concurrent desk processing for improved throughput without introducing new infrastructure

    Redesign anomaly detection logic to reduce false positives and improve computational efficiency

    Write a production incident runbook for a latency-related alert pipeline failure

    Demonstrate critical evaluation and iterative use of AI coding tools in a compliance context

    How Your Work Will Be Scored

    Resume & Background — 50% Python Engineering & Code Quality — 20%AI Fluency — 15%Quantitative Analysis & Domain Application — 10% Communication & Technical Reasoning — 5%

    What to Submit

    No submission guidelines provided.

    On this page

    Top of Page
    What You'll Be Doing
    How It's Scored
    What to Submit