openaire-library/utils/AdvancedAsyncSubject.ts

51 lines
1.1 KiB
TypeScript
Raw Normal View History

import {Subject, Subscriber, Subscription} from "rxjs";
export class AdvancedAsyncSubject<T> extends Subject<T>{
private _value: T = null;
private hasNext: boolean = false;
private hasFinished: boolean = false;
/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<any>): Subscription {
if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.hasFinished && this.hasNext) {
subscriber.next(this._value);
}
return super._subscribe(subscriber);
}
next(value: T): void {
if (!this.hasFinished) {
this._value = value;
this.hasNext = true;
this.finish();
} else {
this._value = value;
super.next(value);
}
}
get value() {
return this.getValue();
}
getValue() {
return this._value;
}
error(error: any): void {
if (!this.hasFinished) {
super.error(error);
}
}
finish(): void {
this.hasFinished = true;
if (this.hasNext) {
super.next(this._value);
}
}
}