// @flow import { ReplaySubject, Observable } from "rxjs"; import { buildPlaceRecord } from "../records/PlaceRecord"; import { Map } from "immutable"; import { findNearbyPlaces, getPlaceDetails } from "../apis/GooglePlacesApi"; import { path } from "ramda"; import { type GooglePlaceObj } from "../records/PlaceRecord"; import { setById } from "../helpers/ImmutableHelpers"; import location$ from "./LocationStream"; import filter$ from "./FilterStream"; import FilterRecord from "../records/FilterRecord"; import foodItems$ from "./FoodItemsStream"; import PlaceRecord from "../records/PlaceRecord"; import geodist from "geodist"; const getGeoDist = begin => end => geodist(begin, end, { exact: true }); const placesSubject = new ReplaySubject(); export function emitter(val?: ?PlaceRecord) { placesSubject.next(val); } filter$.subscribe(() => emitter(null)); foodItems$ .mergeMap((foodItems = Map()) => Observable.from(foodItems.toArray())) .mergeMap(([foodItemId, foodItem]) => getPlaceDetails(foodItem)) .map(buildPlaceRecord) .subscribe(emitter); location$ .combineLatest(filter$) .debounceTime(200) .mergeMap(([location, filter]: [?Position, FilterRecord]) => { return findNearbyPlaces({ location, radius: filter.radius, search: filter.search }); }) .map((val: { location: ?Position, places: ?Array }) => { if (!val) { return; } const { places } = val; const coords = path(["location", "coords"], val) || {}; const getDist = getGeoDist({ lat: coords.latitude, lon: coords.longitude }); return (places || []).map((place: GooglePlaceObj) => buildPlaceRecord({ ...place, distance: getDist({ lat: place.geometry.location.lat, lon: place.geometry.location.lng }) }) ); }) .subscribe(places => places && places.map(emitter)); export default placesSubject.scan((places, place) => { if (!place) { return null; } return setById(places || new Map(), place); });