Chapter 18Project Generic Priority Queue

Project

Overview

Generic APIs let us describe capabilities at compile time; priority queues are where those capabilities meet the realities of time-sensitive scheduling. In this project, we wrap std.PriorityQueue with rich comparators and context-aware policies that can be tested and tuned without sacrificing zero-cost abstractions. See 17 and priority_queue.zig.

We’ll build three artefacts: a foundational dispatcher that encodes ordering rules in a comparator, a fairness simulator that reuses the same queue while changing policy context, and an analytics wrapper that tracks the top offenders in a stream. Along the way, we revisit allocator choices, weighing strategies for draining, retuning, and introspecting heaps. See 10 and sort.zig.

Learning Goals

  • Translate business rules into compile-time comparator contracts that drive std.PriorityQueue ordering.
  • Model dynamic scheduling heuristics using the queue’s Context parameter while keeping memory churn predictable. 10
  • Derive streaming analytics (top-K, rolling statistics) from the same heap without copy-pasting logic or sacrificing stability. 47

Architecting a reusable queue core

The priority queue API accepts a value type, a user-defined context, and a comparator that returns std.math.Order. That one function decides which element is bubbled to the front, so we’ll treat it as a contract backed by tests.

Comparator design as API surface

Our first example builds a simple build-and-release dispatcher. Urgency is the primary key; submission time breaks ties so that we avoid starving older tasks. The comparator is a pure function, invoked entirely at compile time when the queue type is instantiated, yet it is expressive enough to capture nuanced ordering logic. See math.zig.

Zig
/// Demo: Using std.PriorityQueue to dispatch tasks by priority.
/// Lower urgency values mean higher priority; ties are broken by earlier submission time.
/// This example prints the order in which tasks would be processed.
///
/// Notes:
/// - The comparator returns `.lt` when `a` should be dispatched before `b`.
/// - We also order by `submitted_at_ms` to ensure deterministic order among equal urgencies.
const std = @import("std");
const Order = std.math.Order;

/// A single work item to schedule.
const Task = struct {
    /// Display name for the task.
    name: []const u8,
    /// Priority indicator: lower value = more urgent.
    urgency: u8,
    /// Monotonic timestamp in milliseconds used to break ties (earlier wins).
    submitted_at_ms: u64,
};

/// Comparator for the priority queue:
/// - Primary key: urgency (lower is dispatched first)
/// - Secondary key: submitted_at_ms (earlier is dispatched first)
fn taskOrder(_: void, a: Task, b: Task) Order {
    // Compare by urgency first.
    if (a.urgency < b.urgency) return .lt;
    if (a.urgency > b.urgency) return .gt;

    // Tie-breaker: earlier submission is higher priority.
    return std.math.order(a.submitted_at_ms, b.submitted_at_ms);
}

/// Program entry: builds a priority queue and prints dispatch order.
pub fn main() !void {
    // Use the General Purpose Allocator (GPA) for simplicity in examples.
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    // Instantiate a priority queue of Task:
    // - Context type is `void` (no extra state needed by the comparator)
    // - `taskOrder` defines the ordering.
    var queue = std.PriorityQueue(Task, void, taskOrder).init(allocator, {});
    defer queue.deinit();

    // Enqueue tasks with varying urgency and submission times.
    // Expectation (by our ordering): lower urgency processed first;
    // within same urgency, earlier submitted_at_ms processed first.
    try queue.add(.{ .name = "compile pointer.zig", .urgency = 0, .submitted_at_ms = 1 });
    try queue.add(.{ .name = "run tests", .urgency = 1, .submitted_at_ms = 2 });
    try queue.add(.{ .name = "deploy preview", .urgency = 2, .submitted_at_ms = 3 });
    try queue.add(.{ .name = "prepare changelog", .urgency = 1, .submitted_at_ms = 4 });

    std.debug.print("Dispatch order:\n", .{});

    // Remove tasks in priority order until the queue is empty.
    // removeOrNull() yields the next Task or null when empty.
    while (queue.removeOrNull()) |task| {
        std.debug.print("  - {s} (urgency {d})\n", .{ task.name, task.urgency });
    }
}
Run
Shell
$ zig run task_queue_basics.zig
Output
Shell
Dispatch order:
  - compile pointer.zig (urgency 0)
  - run tests (urgency 1)
  - prepare changelog (urgency 1)
  - deploy preview (urgency 2)

Because the comparator returns std.math.Order, we can layer in secondary keys without changing the queue type; the heap simply obeys the contract you encode.

Growth and allocation strategy

Every call to add may reallocate if the underlying slice needs more capacity. For hot paths, reserve with ensureUnusedCapacity or initialize from a pre-sized slice, then drain to amortize allocations. The queue’s deinit is cheap so long as you make allocator lifetimes explicit, mirroring the memory hygiene practices from our allocator deep dive. 10

Policy-driven reprioritization

Next, we feed richer data into the same queue: service requests with SLAs, time-of-day context, and VIP hints. The queue itself is agnostic; all nuance lives in the policy structure and comparator. This design keeps the heap reusable even as we layer on fairness rules. 17

Aging and VIP weighting

The comparator computes a scalar “score” by measuring slack (time remaining until deadline), multiplying overdue requests to escalate them, and subtracting a VIP bonus. Because Context is just a struct, the policy is compiled into the queue and can be swapped by constructing a new instance with different weights. We forward-declare helper functions to keep the comparator readable and testable.

Simulating operating modes

We run two scenarios: mid-shift triage and late escalation. The only difference is the policy struct we pass to init; everything else (tasks, queue type) stays the same. The printed order shows how overdue multiplication and VIP boosts change the pop sequence.

Zig
const std = @import("std");
const Order = std.math.Order;

/// Represents an incoming support request with SLA constraints.
const Request = struct {
    ticket: []const u8,
    submitted_at_ms: u64,
    sla_ms: u32,
    work_estimate_ms: u32,
    vip: bool,
};

/// Scheduling policy parameters that influence prioritization.
const Policy = struct {
    now_ms: u64,             // Current time reference for slack calculation
    vip_boost: i64,          // Score reduction (boost) for VIP requests
    overdue_multiplier: i64, // Penalty multiplier for overdue requests
};

/// Computes the time slack for a request: positive means time remaining, negative means overdue.
/// Overdue requests are amplified by the policy's overdue_multiplier to increase urgency.
fn slack(policy: Policy, request: Request) i64 {
    // Calculate absolute deadline from submission time + SLA window
    const deadline = request.submitted_at_ms + request.sla_ms;
    
    // Compute slack as deadline - now; use i128 to prevent overflow on subtraction
    const slack_signed = @as(i64, @intCast(@as(i128, deadline) - @as(i128, policy.now_ms)));
    
    if (slack_signed >= 0) {
        // Positive slack: request is still within SLA
        return slack_signed;
    }
    
    // Negative slack: request is overdue; amplify urgency by multiplying
    return slack_signed * policy.overdue_multiplier;
}

/// Computes a weighted score for prioritization.
/// Lower scores = higher priority (processed first by min-heap).
fn weightedScore(policy: Policy, request: Request) i64 {
    // Start with slack: negative (overdue) or positive (time remaining)
    var score = slack(policy, request);
    
    // Add work estimate: longer tasks get slightly lower priority (higher score)
    score += @as(i64, @intCast(request.work_estimate_ms));
    
    // VIP boost: reduce score to increase priority
    if (request.vip) score -= policy.vip_boost;
    
    return score;
}

/// Comparison function for the priority queue.
/// Returns Order.lt if 'a' should be processed before 'b' (lower score = higher priority).
fn requestOrder(policy: Policy, a: Request, b: Request) Order {
    const score_a = weightedScore(policy, a);
    const score_b = weightedScore(policy, b);
    return std.math.order(score_a, score_b);
}

/// Simulates a scheduling scenario by inserting all tasks into a priority queue,
/// then dequeuing and printing them in priority order.
fn simulateScenario(allocator: std.mem.Allocator, policy: Policy, label: []const u8) !void {
    // Define a set of incoming requests with varying SLA constraints and characteristics
    const tasks = [_]Request{
        .{ .ticket = "INC-482", .submitted_at_ms = 0, .sla_ms = 500, .work_estimate_ms = 120, .vip = false },
        .{ .ticket = "INC-993", .submitted_at_ms = 120, .sla_ms = 400, .work_estimate_ms = 60, .vip = true },
        .{ .ticket = "INC-511", .submitted_at_ms = 200, .sla_ms = 200, .work_estimate_ms = 45, .vip = false },
        .{ .ticket = "INC-742", .submitted_at_ms = 340, .sla_ms = 120, .work_estimate_ms = 30, .vip = false },
    };

    // Initialize priority queue with the given policy as context for comparison
    var queue = std.PriorityQueue(Request, Policy, requestOrder).init(allocator, policy);
    defer queue.deinit();

    // Add all tasks to the queue; they will be heap-ordered automatically
    try queue.addSlice(&tasks);

    // Print scenario header
    std.debug.print("{s} (now={d}ms)\n", .{ label, policy.now_ms });
    
    // Dequeue and print requests in priority order (lowest score first)
    while (queue.removeOrNull()) |request| {
        // Recalculate score and deadline for display
        const score = weightedScore(policy, request);
        const deadline = request.submitted_at_ms + request.sla_ms;
        
        std.debug.print(
            "  -> {s} score={d} deadline={d} vip={}\n",
            .{ request.ticket, score, deadline, request.vip },
        );
    }
    std.debug.print("\n", .{});
}

pub fn main() !void {
    // Set up general-purpose allocator with leak detection
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    // Scenario 1: Mid-shift with moderate VIP boost and overdue penalty
    try simulateScenario(
        allocator,
        .{ .now_ms = 350, .vip_boost = 250, .overdue_multiplier = 2 },
        "Mid-shift triage"
    );
    
    // Scenario 2: Escalation window with reduced VIP boost but higher overdue penalty
    try simulateScenario(
        allocator,
        .{ .now_ms = 520, .vip_boost = 100, .overdue_multiplier = 4 },
        "Escalation window"
    );
}
Run
Shell
$ zig run sla_fairness.zig
Output
Shell
Mid-shift triage (now=350ms)
  -> INC-993 score=-20 deadline=520 vip=true
  -> INC-511 score=95 deadline=400 vip=false
  -> INC-742 score=140 deadline=460 vip=false
  -> INC-482 score=270 deadline=500 vip=false

Escalation window (now=520ms)
  -> INC-511 score=-435 deadline=400 vip=false
  -> INC-742 score=-210 deadline=460 vip=false
  -> INC-993 score=-40 deadline=520 vip=true
  -> INC-482 score=40 deadline=500 vip=false

Changing policy after enqueuing existing items requires rebuilding the heap—drain into a slice, mutate the policy, then reinsert or call fromOwnedSlice to re-heapify under the new comparator. 10

Analytics and top-K reporting

Priority queues are also excellent rolling aggregates. By keeping the “worst” elements in the heap and trimming aggressively, we can maintain a top-K view of latency spikes with minimal overhead. Sorting the current heap snapshot lets us render results directly for dashboards or logs. 47

A composable wrapper

TopK wraps std.PriorityQueue and uses the comparator to form a min-heap of scores. Every insert calls remove when the heap exceeds the limit, ensuring we keep only the highest scorers. The snapshotDescending helper copies the heap into a scratch buffer and sorts it with std.sort.heap, leaving the queue ready for further inserts. 17

Zig
// Import the Zig standard library for allocator, sorting, debugging, etc.
const std = @import("std");

const Order = std.math.Order;

// A single latency measurement for an endpoint.
// Fields:
//  - endpoint: UTF-8 byte slice identifying the endpoint.
//  - duration_ms: observed latency in milliseconds.
//  - payload_bytes: size of the request/response payload in bytes.
const LatencySample = struct {
    endpoint: []const u8,
    duration_ms: u32,
    payload_bytes: u32,
};

// Compute a score for a latency sample.
// Higher scores represent more severe (worse) samples. The formula favors
// larger durations and applies a small penalty for larger payloads to reduce
// noisy high-latency large-payload samples.
//
// Returns an f64 so scores can be compared with fractional penalties.
fn score(sample: LatencySample) f64 {
    // Convert integers to floating point explicitly to avoid implicit casts.
    // The penalty factor 0.005 was chosen empirically to be small.
    return @as(f64, @floatFromInt(sample.duration_ms)) - (@as(f64, @floatFromInt(sample.payload_bytes)) * 0.005);
}

// TopK is a compile-time generic producer that returns a fixed-capacity,
// score-driven top-K tracker for items of type T.
//
// Parameters:
//  - T: the element type stored in the tracker.
//  - scoreFn: a compile-time function that maps T -> f64 used to rank elements.
fn TopK(comptime T: type, comptime scoreFn: fn (T) f64) type {
    const Error = error{InvalidLimit};

    // Comparator helpers used by the PriorityQueue and for sorting snapshots.
    const Comparators = struct {
        // Comparator used by the PriorityQueue. The first parameter is the
        // user-provided context (unused here), hence the underscore name.
        // Returns an Order (Less/Equal/Greater) based on the score function.
        fn heap(_: void, a: T, b: T) Order {
            return std.math.order(scoreFn(a), scoreFn(b));
        }

        // Boolean comparator used by the heap sort to produce descending order.
        // Returns true when `a` should come before `b` (i.e., a has higher score).
        fn desc(_: void, a: T, b: T) bool {
            return scoreFn(a) > scoreFn(b);
        }
    };

    return struct {
        // A priority queue specialized for T using our heap comparator.
        const Heap = std.PriorityQueue(T, void, Comparators.heap);
        const Self = @This();

        heap: Heap,
        limit: usize,

        // Initialize a TopK tracker with the provided allocator and positive limit.
        // Returns Error.InvalidLimit when limit == 0.
        pub fn init(allocator: std.mem.Allocator, limit: usize) Error!Self {
            if (limit == 0) return Error.InvalidLimit;
            return .{ .heap = Heap.init(allocator, {}), .limit = limit };
        }

        // Deinitialize the underlying heap and free its resources.
        pub fn deinit(self: *Self) void {
            self.heap.deinit();
        }

        // Add a single value into the tracker. If adding causes the internal
        // count to exceed `limit`, the priority queue will evict the item it
        // considers lowest priority according to our comparator, keeping the
        // top-K scored items.
        pub fn add(self: *Self, value: T) !void {
            try self.heap.add(value);
            if (self.heap.count() > self.limit) {
                // Evict the lowest-priority element (as defined by Comparators.heap).
                _ = self.heap.remove();
            }
        }

        // Add multiple values from a slice into the tracker.
        // This simply forwards each element to `add`.
        pub fn addSlice(self: *Self, values: []const T) !void {
            for (values) |value| try self.add(value);
        }

        // Produce a snapshot of the current tracked items in descending score order.
        //
        // The snapshot allocates a new array via `allocator` and copies the
        // internal heap's item storage into it. The result is then sorted
        // descending (highest score first) using Comparators.desc.
        //
        // Caller is responsible for freeing the returned slice.
        pub fn snapshotDescending(self: *Self, allocator: std.mem.Allocator) ![]T {
            const count = self.heap.count();
            const out = try allocator.alloc(T, count);
            // Copy the underlying items buffer into the newly allocated array.
            // This creates an independent snapshot so we can sort without mutating the heap.
            @memcpy(out, self.heap.items[0..count]);
            // Sort in-place so the highest-scored items appear first.
            std.sort.heap(T, out, @as(void, {}), Comparators.desc);
            return out;
        }
    };
}

// Example program demonstrating TopK usage with LatencySample.
pub fn main() !void {
    // Create a general-purpose allocator for example allocations.
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    // Track the top 5 latency samples by computed score.
    var tracker = try TopK(LatencySample, score).init(allocator, 5);
    defer tracker.deinit();

    // Example samples. These are small, stack-allocated literal records.
    const samples = [_]LatencySample{
        .{ .endpoint = "/v1/users", .duration_ms = 122, .payload_bytes = 850 },
        .{ .endpoint = "/v1/orders", .duration_ms = 210, .payload_bytes = 1200 },
        .{ .endpoint = "/v1/users", .duration_ms = 188, .payload_bytes = 640 },
        .{ .endpoint = "/v1/payments", .duration_ms = 305, .payload_bytes = 1500 },
        .{ .endpoint = "/v1/orders", .duration_ms = 154, .payload_bytes = 700 },
        .{ .endpoint = "/v1/ledger", .duration_ms = 420, .payload_bytes = 540 },
        .{ .endpoint = "/v1/users", .duration_ms = 275, .payload_bytes = 980 },
        .{ .endpoint = "/v1/health", .duration_ms = 34, .payload_bytes = 64 },
        .{ .endpoint = "/v1/ledger", .duration_ms = 362, .payload_bytes = 480 },
    };

    // Bulk-add the sample slice into the tracker.
    try tracker.addSlice(&samples);

    // Capture the current top-K samples in descending order and print them.
    const worst = try tracker.snapshotDescending(allocator);
    defer allocator.free(worst);

    std.debug.print("Top latency offenders (descending by score):\n", .{});
    for (worst, 0..) |sample, idx| {
        // Compute the score again for display purposes (identical to the ordering key).
        const computed_score = score(sample);
        std.debug.print(
            "  {d:>2}. {s: <12} latency={d}ms payload={d}B score={d:.2}\n",
            .{ idx + 1, sample.endpoint, sample.duration_ms, sample.payload_bytes, computed_score },
        );
    }
}
Run
Shell
$ zig run topk_latency.zig
Output
Shell
Top latency offenders (descending by score):
   1. /v1/ledger   latency=420ms payload=540B score=417.30
   2. /v1/ledger   latency=362ms payload=480B score=359.60
   3. /v1/payments latency=305ms payload=1500B score=297.50
   4. /v1/users    latency=275ms payload=980B score=270.10
   5. /v1/orders   latency=210ms payload=1200B score=204.00

Snapshotting copies the heap so that future inserts remain cheap; reuse a scratch allocator or arena for high-volume telemetry jobs to avoid fragmenting long-lived heaps. 10

From queues to module boundaries

We now have reusable queue wrappers that can live in their own module. The next chapter formalizes that step, showing how to surface the queue as a package-level module and expose policies through @import boundaries. 19

Notes & Caveats

  • Define comparators in a dedicated helper so they can be unit-tested independently and reused across queue instances. 13
  • Policy structs are value types—change detection means rebuilding the heap or creating a new queue; otherwise, your ordering no longer matches the comparator’s assumptions.
  • Copying heap contents for reporting allocates memory; recycle buffers or use arenas when integrating with telemetry services to keep GC-less Zig code predictable. 10

Exercises

  • Extend the dispatcher to respect “batch size” hints by tallying cumulative runtime in the comparator; add a test that asserts fairness across mixed priorities. 13
  • Modify the SLA simulator to write audit entries using std.log and compare the output against expectations under multiple policies. log.zig
  • Teach the TopK wrapper to return both the snapshot and the aggregate average; consider how you would expose that through an asynchronous metrics hook. 47

Alternatives & Edge Cases

  • If you need stable ordering for items with identical scores, wrap the payload in a struct that stores a monotonically increasing sequence number and include it in the comparator.
  • For extremely large queues, consider chunking into buckets or using a pairing heap—std.PriorityQueue is binary and may incur cache misses for million-item heaps.
  • When exposing queue factories across module boundaries, document allocator ownership and provide explicit destroy helpers to prevent leaks when callers change policies at runtime. 19

Help make this chapter better.

Found a typo, rough edge, or missing explanation? Open an issue or propose a small improvement on GitHub.