Skip to content

Concurrent Queue and Semaphore Pattern

advanced18 min read

The Image Upload That Crashed Chrome

A developer built a photo gallery upload feature. Users can drop 200 photos at once:

async function uploadAll(files) {
  await Promise.all(files.map(file => uploadFile(file)));
}

The browser opens 200 HTTP connections simultaneously. Memory spikes to 2GB. Chrome runs out of memory and crashes the tab. Even if it survived, the server would reject most connections — APIs typically allow 6-10 concurrent requests per client.

The fix: limit concurrency. Upload 5 files at a time, not 200. This is the fundamental problem concurrent queues and semaphores solve.

Why You Need Concurrency Control

Mental Model

Think of concurrency control like a theme park ride. The ride fits 5 people. A hundred people are in line. You don't let all 100 rush the ride at once — you let 5 in, wait for them to finish, let the next 5 in. A semaphore is the ride operator counting people in and out.

You need concurrency limits when:

  • Rate limits: APIs cap requests per second (100/sec is common)
  • Memory: Each in-flight request holds data in memory (image uploads, large payloads)
  • Browser limits: Browsers cap concurrent HTTP connections per domain (~6 for HTTP/1.1)
  • Server health: Too many concurrent requests from one client can degrade service for everyone
  • User experience: Processing too many things at once starves the main thread

The Semaphore Pattern

A semaphore is a counter that controls access to a shared resource. It has two operations: acquire (decrement the counter, wait if zero) and release (increment the counter, wake a waiter).

class Semaphore {
  #permits;
  #queue = [];

  constructor(permits) {
    this.#permits = permits;
  }

  acquire() {
    if (this.#permits > 0) {
      this.#permits--;
      return Promise.resolve();
    }

    const { promise, resolve } = Promise.withResolvers();
    this.#queue.push(resolve);
    return promise;
  }

  release() {
    const next = this.#queue.shift();
    if (next) {
      next();
    } else {
      this.#permits++;
    }
  }
}

Usage:

const semaphore = new Semaphore(3);

async function limitedFetch(url) {
  await semaphore.acquire();
  try {
    return await fetch(url);
  } finally {
    semaphore.release();
  }
}

await Promise.all(urls.map(url => limitedFetch(url)));

With 100 URLs and a semaphore of 3, only 3 fetches are active at any time. When one finishes, the next URL starts. Clean, simple, correct.

Quiz
What happens if you forget to call release() after acquiring a semaphore?

Concurrent Queue With Max Parallelism

The semaphore pattern works, but a purpose-built concurrent queue is often more ergonomic:

class ConcurrentQueue {
  #concurrency;
  #running = 0;
  #queue = [];

  constructor(concurrency) {
    this.#concurrency = concurrency;
  }

  add(task) {
    return new Promise((resolve, reject) => {
      this.#queue.push({ task, resolve, reject });
      this.#run();
    });
  }

  #run() {
    while (this.#running < this.#concurrency && this.#queue.length > 0) {
      const { task, resolve, reject } = this.#queue.shift();
      this.#running++;

      task()
        .then(resolve)
        .catch(reject)
        .finally(() => {
          this.#running--;
          this.#run();
        });
    }
  }

  get pending() {
    return this.#queue.length;
  }

  get active() {
    return this.#running;
  }
}

Usage:

const queue = new ConcurrentQueue(5);

async function uploadPhotos(files) {
  const results = await Promise.all(
    files.map(file =>
      queue.add(() => uploadFile(file))
    )
  );
  return results;
}

The queue ensures at most 5 uploads run simultaneously. Each caller gets back a promise that resolves when their specific task completes. The queue handles scheduling internally.

Quiz
In the ConcurrentQueue implementation, why does the #run() method use a while loop instead of just starting one task?

Adding Progress Tracking

For file uploads and batch operations, users need progress feedback:

class TrackedQueue extends ConcurrentQueue {
  #total = 0;
  #completed = 0;
  #failed = 0;
  #onProgress;

  constructor(concurrency, onProgress) {
    super(concurrency);
    this.#onProgress = onProgress;
  }

  add(task) {
    this.#total++;
    this.#notifyProgress();

    return super.add(task)
      .then(result => {
        this.#completed++;
        this.#notifyProgress();
        return result;
      })
      .catch(err => {
        this.#failed++;
        this.#notifyProgress();
        throw err;
      });
  }

  #notifyProgress() {
    this.#onProgress?.({
      total: this.#total,
      completed: this.#completed,
      failed: this.#failed,
      active: this.active,
      pending: this.pending,
    });
  }
}

const queue = new TrackedQueue(3, (progress) => {
  progressBar.value = (progress.completed + progress.failed) / progress.total;
  statusText.textContent =
    `${progress.completed}/${progress.total} complete, ${progress.failed} failed`;
});

Priority Queues for Async Tasks

Not all tasks are equally urgent. A priority queue processes high-priority items first:

class PriorityQueue {
  #concurrency;
  #running = 0;
  #queues = { high: [], normal: [], low: [] };

  constructor(concurrency) {
    this.#concurrency = concurrency;
  }

  add(task, priority = 'normal') {
    return new Promise((resolve, reject) => {
      this.#queues[priority].push({ task, resolve, reject });
      this.#run();
    });
  }

  #nextTask() {
    for (const priority of ['high', 'normal', 'low']) {
      if (this.#queues[priority].length > 0) {
        return this.#queues[priority].shift();
      }
    }
    return null;
  }

  #run() {
    while (this.#running < this.#concurrency) {
      const item = this.#nextTask();
      if (!item) break;

      const { task, resolve, reject } = item;
      this.#running++;

      task()
        .then(resolve)
        .catch(reject)
        .finally(() => {
          this.#running--;
          this.#run();
        });
    }
  }
}

Usage:

const queue = new PriorityQueue(3);

queue.add(() => fetch('/api/analytics'), 'low');
queue.add(() => fetch('/api/user-data'), 'high');
queue.add(() => fetch('/api/recommendations'), 'normal');
queue.add(() => fetch('/api/notifications'), 'high');

User data and notifications jump the line ahead of analytics and recommendations. The user sees critical data first.

Quiz
A priority queue has concurrency 2 with these tasks queued: [low: A], [high: B], [normal: C], [high: D]. Both concurrency slots are open. Which tasks start first?

Real-World: Image Upload Queue

Here's a complete upload queue with retry, progress, and cancellation:

function createUploadQueue(options = {}) {
  const {
    concurrency = 3,
    maxRetries = 2,
    onProgress,
    signal,
  } = options;

  const queue = new ConcurrentQueue(concurrency);
  let completed = 0;
  let failed = 0;
  let total = 0;

  function uploadFile(file) {
    total++;

    return queue.add(async () => {
      signal?.throwIfAborted();

      let lastError;
      for (let attempt = 0; attempt <= maxRetries; attempt++) {
        try {
          const formData = new FormData();
          formData.append('file', file);

          const response = await fetch('/api/upload', {
            method: 'POST',
            body: formData,
            signal,
          });

          if (!response.ok) throw new Error(`Upload failed: ${response.status}`);

          const result = await response.json();
          completed++;
          onProgress?.({ completed, failed, total, file: file.name });
          return result;
        } catch (err) {
          lastError = err;
          if (err.name === 'AbortError') throw err;
        }
      }

      failed++;
      onProgress?.({ completed, failed, total, file: file.name });
      throw lastError;
    });
  }

  return { uploadFile };
}

const controller = new AbortController();

const uploader = createUploadQueue({
  concurrency: 5,
  onProgress: ({ completed, total }) => {
    console.log(`${completed}/${total} uploaded`);
  },
  signal: controller.signal,
});

const results = await Promise.allSettled(
  files.map(file => uploader.uploadFile(file))
);

Real-World: API Batch Processing

When you need to process thousands of items through an API with rate limits:

async function batchProcess(items, processFn, options = {}) {
  const {
    concurrency = 10,
    batchSize = 50,
    delayBetweenBatches = 1000,
  } = options;

  const queue = new ConcurrentQueue(concurrency);
  const results = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);

    const batchResults = await Promise.allSettled(
      batch.map(item => queue.add(() => processFn(item)))
    );

    results.push(...batchResults);

    const hasMore = i + batchSize < items.length;
    if (hasMore && delayBetweenBatches > 0) {
      await new Promise(r => setTimeout(r, delayBetweenBatches));
    }
  }

  return results;
}

const results = await batchProcess(
  userIds,
  async (userId) => {
    const response = await fetch(`/api/users/${userId}/sync`, { method: 'POST' });
    return response.json();
  },
  { concurrency: 5, batchSize: 100, delayBetweenBatches: 2000 }
);

The batchSize and delayBetweenBatches parameters serve different purposes than concurrency. Concurrency limits how many requests are in-flight at once (protecting browser and server resources). Batch size groups items for progress reporting and checkpointing. Delay between batches adds a global rate limit to stay under API quotas. A well-tuned system uses all three: concurrency of 10 for throughput, batches of 100 for progress tracking, and 2-second delays between batches to stay under 3000 requests/minute API limits.

The map-with-concurrency Utility

For simple cases where you just need Promise.all with a concurrency limit, here's the minimal utility:

async function mapConcurrent(items, fn, concurrency) {
  const results = new Array(items.length);
  let index = 0;

  async function worker() {
    while (index < items.length) {
      const i = index++;
      results[i] = await fn(items[i], i);
    }
  }

  const workers = Array.from({ length: concurrency }, () => worker());
  await Promise.all(workers);
  return results;
}

const thumbnails = await mapConcurrent(
  images,
  (image) => generateThumbnail(image),
  4
);

This spawns N workers that each pull from a shared index. Results are placed at the correct indices, so the output order matches the input order — just like Promise.all.

Common Trap

There's a subtle error handling gap here: if one worker throws, Promise.all rejects with that error, but the other workers keep running and pulling new items from the shared index. They'll continue processing (and potentially failing) even after the caller has moved on. For production use, add an aborted flag that workers check before picking up the next item, so a single failure stops the whole batch cleanly.

What developers doWhat they should do
Promise.all(items.map(fn)) with unbounded concurrency
Unbounded concurrency exhausts memory (each in-flight request holds data), hits browser connection limits, and can overwhelm servers or trigger rate limiting
Use a concurrent queue or semaphore to limit in-flight operations
Forgetting to release semaphore permits on error
If an error skips the release call, that permit is permanently consumed. After enough errors, the semaphore deadlocks — no new tasks can ever start
Always release in a finally block
Fixed concurrency for all environments
Mobile devices have less memory and slower connections. A concurrency of 10 that works on desktop may crash mobile browsers. Read API rate limit headers and adjust dynamically
Adapt concurrency based on context (mobile: lower, desktop: higher, API rate limit: match it)
Using priority queues without starvation prevention
If high-priority tasks keep arriving, low-priority tasks wait forever. Add a maximum wait time after which tasks get promoted
Add aging or ensure low-priority tasks eventually run
Interview Question

Design a task scheduler for a web-based IDE that runs ESLint, TypeScript checking, and Prettier simultaneously across multiple files. The constraints: no more than 4 concurrent operations (to avoid pegging the CPU), TypeScript checking should have higher priority than linting, and the user should see real-time progress. Describe the data structures and scheduling algorithm you'd use.