- Notifications
You must be signed in to change notification settings - Fork 0
Description
Although I don't see any use in my work for such multi-observables currently, the idea about having higher arity dataflows made me interested (scientifically) in this library. Here, I'd like to write some points I see about such library.
Approach
First, Instead of creating Bi- and Tri- Observables by hand, you can explore the possibility of code templating and static code generation, similar how many of the primitive-collections libraries (such as Trove or fastutil) do it.
Second, I suggest adopting the reactive-streams idioms instead of the RxJava 1.x idions. I don't see too much direct code reuse and since you'll need wrappers and converters anyway, you have the opportunity to avoid a few less-than-optimal decisions the RxJava API has made.
Primitives
I'd start a new type hierarchy based on the reactive-streams idioms:
interface{Arity}Publisher<Ti, ...>{voidsubscribe({Arity}Subscriber<Ti, ...> s)} interfaceSubscriberBase{voidonStart(Subscriptions); voidonError(Throwablee); voidonCompleted()} interface{Arity}Subscriber<Ti, ...> extendsSubscriberBase{voidonNext(T1t1, ...)} interfaceSubscription{// or import reactive-streams directlyvoidrequest(longn); voidcancel()} interface{Arity}Processor<Ti, ..., Ri, ...> extends{Arity}Subscriber<Ti,...>,{Arity}Publisher<Ri, ...>{} Queues
Since most operators require some Spsc queue to operate and they are designed to transmit one element at a time, you'll need to extend the logic and API of the classical Queues.
interface{Arity}Queue{booleanoffer(Ti, ...); booleanpoll(Action{k}<Ti, ...> out); // true if element was availablebooleanpeek(Action{k}<Ti, ...> out); booleanisEmpty(); intsize()}Note that indeed, using callbacks poses some overhead, but I think it is less than allocating some tuples for the classical queues.
In addition, the ring-buffer based Spsc queue implementation needs some slight changes regarding the element store: you can use the current indexing logic, but now that translates to k-times indexing into the underlying array (the power-of-2 remains), roughly:
Object[] array = newObject[powerOf2Capacity * 3]; voidoffer(T1t1, T2t2, T3t3){intoffset = ((int)(producerIndex) & mask) * 3; array[offset] = t1; array[offset + 1] = t2; array[offset + 2].lazySet(t3); // memory order: releaseproducerIndex++} booleanpoll(Action3<T1, T2, T3> out){intoffset = ((int)(consumerIndex) & mask) * 3; Objecto3 = array[offset + 2]; // memory order: acquireif (o1 == null{returnfalse} Objecto2 = array[offset + 1]; // read in opposite orderObjecto1 = array[offset]; array[offset] = null; array[offset + 1] = null; array[offset + 2].lazySet(null); consumerIndex++; out.call((T1)o1, (T2)o2, (T3)o3); returntrue}Callbacks on the stream
Since Java functions can't return more than one result, one can return a tuple for each 'column' and split the operators into many, or use the same trick as with queues and have an output callback:
publicfinalBiObservable<R1, R2> map(Action3<? superT1, ? superT2, Action2<R1, R2>> mapper){returnlift(s ->{returnnewAbstractBiSubscriber<T1, T2>(s){@OverridepublicvoidonNext(T1t1, T2t2){mapper.call(t1, t2, s::onNext)} // ... })})}