java - RxJava + JavaFX Property -
i have method called rxtoproperty() turns observable javafx property.
public static <t> readonlyobjectproperty<t> rxtoproperty(observable<t> obs) { readonlyobjectwrapper<t> property = new readonlyobjectwrapper<>(); obs.onbackpressurelatest().serialize().subscribe(v -> { synchronized(property) { property.set(v); } }); return property.getreadonlyproperty(); } how ensure returned property up-to-date when bound controls in javafx? have noticed bizarre, random behaviors seem indicate thread safety has been compromised schedulers , javafx.
like example, try use method show 1 source observable scheduled in several different ways, , results random , haphazard when used in tableview, cells alternating between having values , not having values, thread activity never end.
whenever try schedule on fx thread using platform.invokelater() makes behavior more crazy. wrong rxtoproperty() method?
public class reactivetableviewtest extends application { @override public void start(stage stage) throws exception { group root = new group(); scene scene = new scene(root); root.getchildren().add(new reactivetable()); stage.setscene(scene); stage.sizetoscene(); stage.show(); } private static final class reactiverecord { private final observable<number> obs = observable.just(10,20,30,40,50,60).cast(number.class); public observable<number> subscribeimmediate() { return obs; } public observable<number> subscribecomputation() { return obs.subscribeon(schedulers.computation()); } public observable<number> subscribenewthread() { return obs.subscribeon(schedulers.newthread()); } public observable<number> subscribeio() { return obs.subscribeon(schedulers.io()); } public observable<number> subscribeparallel() { return obs.flatmap(i -> observable.just(i).subscribeon(schedulers.computation())); } public observable<number> subscribetrampoline() { return obs.subscribeon(schedulers.trampoline()); } public immutablelist<observable<number>> getall() { return immutablelist.of(subscribeimmediate(),subscribecomputation(),subscribenewthread(),subscribeio(),subscribeparallel(),subscribetrampoline()); } public immutablelist<string> getheaders() { return immutablelist.of("immediate","computation","new","io","parallel","trampoline"); } } private static final class reactivetable extends tableview<reactiverecord> { private reactivetable() { reactiverecord record = new reactiverecord(); this.getitems().add(record); immutablelist<observable<number>> observables = record.getall(); immutablelist<string> headers = record.getheaders(); (int = 0; < observables.size(); i++) { tablecolumn<reactiverecord,number> col = new tablecolumn<>(headers.get(i)); final int index = i; col.setcellvaluefactory(cb -> rxtoproperty(observables.get(index))); this.getcolumns().add(col); } } } public static <t> readonlyobjectproperty<t> rxtoproperty(observable<t> obs) { readonlyobjectwrapper<t> property = new readonlyobjectwrapper<>(); obs.onbackpressurelatest().serialize().subscribe(v -> { synchronized(property) { system.out.println("emitting val " + v + " on " + thread.currentthread().getname()); property.set(v); } }); return property.getreadonlyproperty(); } public static void main(string[] args) { launch(args); } }
tomas mikula gave helpful insight behavior, solution, on reactfx github project.
Comments
Post a Comment