A Dart implementation of p-limit for controlling the concurrency of async operations.
ไธญๆๆๆกฃ | English
- ๐ข Concurrency Control - Limit the number of concurrent async operations
- ๐๏ธ Dynamic Adjustment - Change concurrency limits on the fly
- ๐ Queue Management - Track active and pending operations
- ๐ Multiple Queue Strategies - FIFO, LIFO, and Priority-based execution
- โก High Performance - Efficient queue implementations
- ๐ก๏ธ Error Handling - Proper error propagation and handling
- ๐ฆ Easy to Use - Simple and intuitive API
- ๐ฏ Type Safe - Full Dart type safety support
Add this to your pubspec.yaml:
dependencies: f_limit: ^1.0.0Then run:
dart pub getimport'package:f_limit/f_limit.dart'; voidmain() async{// ๐ง Create a limiter that allows only 2 concurrent operationsfinal limit =fLimit(2); // ๐ Create some async tasksfinal tasks =List.generate(5, (i) => () async{print('๐ Task $i started'); awaitFuture.delayed(Duration(seconds:1)); print('โ
Task $i completed'); return'Result $i'}); // โก Execute all tasks with concurrency limitfinal futures = tasks.map((task) =>limit(task)); final results =awaitFuture.wait(futures); print('๐ All tasks completed: $results')}import'package:f_limit/f_limit.dart'; Future<String> fetchData(String url) async{// Simulate API callawaitFuture.delayed(Duration(milliseconds:200)); return'Data from $url'} voidmain() async{// ๐ก๏ธ Limit API calls to 3 concurrent requestsfinal limit =fLimit(3); final urls = [ 'https://api.example.com/users', 'https://api.example.com/posts', 'https://api.example.com/comments', 'https://api.example.com/albums', 'https://api.example.com/photos', ]; // ๐ Execute API calls with rate limitingfinal futures = urls.map((url) =>limit(() =>fetchData(url))); final results =awaitFuture.wait(futures); print('๐ API Results: $results')}voidmain() async{final limit =fLimit(1); // ๐ Start with limited concurrencyfinal futures =<Future<String>>[]; for (int i =0; i <10; i++){futures.add(limit(() async{print('๐ Task $i (concurrency: ${limit.concurrency})'); awaitFuture.delayed(Duration(milliseconds:100)); return'Task $i done'}))} // ๐ Increase concurrency after some timeFuture.delayed(Duration(milliseconds:300), (){print('โฌ๏ธ Increasing concurrency to 5'); limit.concurrency =5}); awaitFuture.wait(futures); print('๐ All tasks completed')}final limit =fLimit(2, queueStrategy:QueueStrategy.fifo); // Tasks execute in the order they were addedfinal limit =fLimit(2, queueStrategy:QueueStrategy.lifo); // Tasks execute in reverse order (stack-like behavior)final limit =fLimit(2, queueStrategy:QueueStrategy.priority); // ๐ฏ Add tasks with different prioritieslimit(() async{print('๐ต Background task')}, priority:1); limit(() async{print('๐ด Critical task')}, priority:10); limit(() async{print('๐ก Important task')}, priority:5); // โก Execution order: Critical (10), Important (5), Background (1)voidmain() async{final limit =fLimit(1, queueStrategy:QueueStrategy.priority); final futures =<Future<void>>[]; // ๐ข Low priority futures.add(limit(() async{print('๐ข Background maintenance')}, priority:1)); // ๐ก Medium priority futures.add(limit(() async{print('๐ก User notification')}, priority:5)); // ๐ด High priority futures.add(limit(() async{print('๐ด Critical security update')}, priority:10)); awaitFuture.wait(futures); // Output: ๐ด ๐ก ๐ข }Creates a concurrency limiter.
Parameters:
concurrency- Maximum number of concurrent operations (โฅ 1)queueStrategy- Queue execution strategy (optional, defaults to FIFO)
Returns:FLimit instance
Queue execution strategies:
| Strategy | Description | Use Case |
|---|---|---|
fifo | First In, First Out | ๐ Fair task execution |
lifo | Last In, First Out | ๐ Stack-like processing |
priority | Priority-based | โญ Important tasks first |
activeCount- ๐ Number of currently executing operationspendingCount- โณ Number of queued operationsconcurrency- ๐๏ธ Current concurrency limit (get/set)queueStrategy- ๐ Current queue strategy
call(function,{priority})- ๐ Execute function with concurrency limitisolate(computation,{priority})- ๐งต Execute computation in a separate isolatemap(items, mapper)- ๐บ๏ธ Map items concurrentlyonIdle- ๐ค Wait for idle stateclearQueue()- ๐๏ธ Clear all pending operations
Creates a limited version of a function.
Parameters:
function- The function to limitoptions-LimitOptionswith concurrency and queue strategy
Returns: Limited function wrapper
The limiter properly handles errors in async operations:
final limit =fLimit(2); final future1 =limit(() async{throwException('๐ฅ Something went wrong')}); final future2 =limit(() async{return'โ
Success'}); try{await future1; // This will throw } catch (e){print('โ Caught error: $e')} final result =await future2; // This will succeedprint('โ
Result: $result');You can run computationally heavy tasks in a separate isolate while respecting the concurrency limit using isolate:
final limit =fLimit(2); // โก This will run in a separate isolate!final result =await limit.isolate((){// ๐จ Heavy computation hereint sum =0; for (int i =0; i <1000000; i++){sum += i} return sum}); print('Result: $result');Note: The function passed to isolate must be a static function, a top-level function, or a closure that is sendable (i.e., it doesn't capture any non-sendable objects).
Process items in an iterable concurrently:
final limit =fLimit(2); final items = [1, 2, 3, 4, 5]; // Map items to results with concurrency limitfinal results =await limit.map(items, (item) async{awaitFuture.delayed(Duration(seconds:1)); return item *2});Wait for all tasks to complete:
await limit.onIdle; print('All tasks finished and queue is empty');final limit =fLimit(3); // ๐ Monitor queue statusprint('Active: ${limit.activeCount}'); print('Pending: ${limit.pendingCount}'); print('Strategy: ${limit.queueStrategy}'); // ๐ง Add tasks and monitorfor (int i =0; i <10; i++){limit(() async{print('๐ Active: ${limit.activeCount}, Pending: ${limit.pendingCount}'); awaitFuture.delayed(Duration(milliseconds:100))})}| JavaScript | Dart | Description |
|---|---|---|
const limit = pLimit(2) | final limit = fLimit(2) | ๐ง Create limiter |
limit(() => asyncTask()) | limit(() => asyncTask()) | ๐ Execute task |
limit.activeCount | limit.activeCount | ๐ Active count |
limit.pendingCount | limit.pendingCount | โณ Pending count |
limit.clearQueue() | limit.clearQueue() | ๐๏ธ Clear queue |
enumTaskPriority{low(1), medium(5), high(10); constTaskPriority(this.value); finalint value} Future<void> processFiles() async{final limit =fLimit(3, queueStrategy:QueueStrategy.priority); // ๐ด Critical system fileslimit(() =>processFile('system.log'), priority:TaskPriority.high.value); // ๐ก User documents limit(() =>processFile('document.pdf'), priority:TaskPriority.medium.value); // ๐ข Cache fileslimit(() =>processFile('cache.tmp'), priority:TaskPriority.low.value)}Future<void> batchProcess(List<String> items) async{final limit =fLimit(5); awaitFuture.wait( items.map((item) =>limit(() =>processItem(item))) ); print('๐ Batch processing completed!')}We welcome contributions!
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by p-limit by Sindre Sorhus
- Part of the FlutterCandies organization
๐ FlutterCandies | ๐ฆ pub.dev | ๐ Issues
Made with โค๏ธ by the FlutterCandies team