Reactive Extensions for Go with full Reactive Streams 1.0.4 compliance.
- Type-safe generics - Full Go generics support
- Reactive Streams 1.0.4 - Complete specification compliance
- Backpressure strategies - Buffer, Drop, Latest, Error
- Push & Pull Models - Observable API (push) and Reactive Streams (pull with backpressure)
- Retry and backoff - Fixed, Linear, Exponential backoff with configurable retry limits
- Context cancellation - Graceful shutdown
- Thread-safe - All APIs are safe for concurrent access with proper synchronization
Install the library:
go get github.com/droxer/RxGo@latestSimple and intuitive API for basic reactive programming:
package main import ( "context""fmt""github.com/droxer/RxGo/pkg/observable" ) funcmain(){// Basic usageobs:=observable.Just(1, 2, 3, 4, 5) obs.Subscribe(context.Background(), observable.NewSubscriber( func(vint){fmt.Printf("Got %d\n", v) }, func(){fmt.Println("Done") }, func(errerror){fmt.Printf("Error: %v\n", err) }, )) // Using new operatorsnumbers:=observable.Range(1, 10) firstFive:=observable.Take(numbers, 5) fmt.Println("\nFirst five numbers:") firstFive.Subscribe(context.Background(), observable.NewSubscriber( func(vint){fmt.Printf("%d ", v) }, func(){fmt.Println("\nCompleted") }, func(errerror){fmt.Printf("Error: %v\n", err) }, )) }Output:
Got 1 Got 2 Got 3 Got 4 Got 5 Done First five numbers: 1 2 3 4 5 Completed Full Reactive Streams 1.0.4 compliance with backpressure support:
package main import ( "context""fmt""github.com/droxer/RxGo/pkg/streams" ) funcmain(){publisher:=streams.NewCompliantRangePublisher(1, 5) publisher.Subscribe(context.Background(), streams.NewSubscriber( func(vint){fmt.Printf("Received: %d\n", v) }, func(errerror){fmt.Printf("Error: %v\n", err) }, func(){fmt.Println("Completed") }, )) // Using new processorsfmt.Println("\nUsing TakeProcessor:") numbers:=streams.NewCompliantRangePublisher(1, 10) takeProcessor:= streams.NewTakeProcessor[int](5) numbers.Subscribe(context.Background(), takeProcessor) takeProcessor.Subscribe(context.Background(), streams.NewSubscriber( func(vint){fmt.Printf("%d ", v) }, func(errerror){fmt.Printf("Error: %v\n", err) }, func(){fmt.Println("\nTake completed") }, )) }Output:
Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Completed Using TakeProcessor: 1 2 3 4 5 Take completed Handle producer/consumer speed mismatches with four strategies:
import"github.com/droxer/RxGo/pkg/streams"// Buffer - keep all items in bounded bufferpublisher:=streams.NewBufferedPublisher[int]( streams.WithBufferStrategy(streams.Buffer), streams.WithBufferSize(100), ) // Drop - discard new items when fullpublisher:=streams.NewBufferedPublisher[int]( streams.WithBufferStrategy(streams.Drop), streams.WithBufferSize(50), ) // Latest - keep only latest itempublisher:=streams.NewBufferedPublisher[int]( streams.WithBufferStrategy(streams.Latest), streams.WithBufferSize(1), ) // Error - signal error on overflowpublisher:=streams.NewBufferedPublisher[int]( streams.WithBufferStrategy(streams.Error), streams.WithBufferSize(10), )You can easily convert between the Observable and Publisher APIs using adapters. This is useful when you need to combine the simplicity of the observable package with the backpressure support of the streams package.
import ( "github.com/droxer/RxGo/pkg/adapters""github.com/droxer/RxGo/pkg/observable""github.com/droxer/RxGo/pkg/streams" ) // Convert an Observable to a Publisherobs:=observable.Just(1, 2, 3) publisher:=adapters.ObservablePublisherAdapter(obs) // Convert a Publisher to an Observablepub:=streams.NewCompliantRangePublisher(1, 5) observable:=adapters.PublisherToObservableAdapter(pub)- Architecture - Package structure and design decisions
- Observable API - Simple Observable API examples
- Reactive Streams - Full Reactive Streams 1.0.4 compliance
- Backpressure - Handle producer/consumer speed mismatches
- Push vs Pull Models - Understanding push and pull models with backpressure
- Retry and Backoff - Configurable retry with backoff strategies
- Transformations - Transform and process data streams with both Reactive Streams and Observable API
- Context Cancellation - Graceful cancellation using Go context
- Schedulers - Execution context control
We welcome contributions! Please see CONTRIBUTING.md for guidelines.