TaxiRideCleansing.java

package com.training.data.artisans.taxi;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;

public class TaxiRideCleansing {

	public DataStream<TaxiRide> execute(DataStream<TaxiRide> rides) throws Exception {
		DataStream<TaxiRide> filteredRidesByNYC = rides
				.filter(new NewYorkTaxiFilter());

		return filteredRidesByNYC;
	}

	public static final class NewYorkTaxiFilter implements FilterFunction<TaxiRide> {

		@Override
		public boolean filter(TaxiRide taxiRide) {
			return GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat) &&
					GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat);
		}
	}
}