PoupularPlacesMain.java

package com.training.data.artisans.taxi;


import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.GeoUtils;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema;
import com.google.common.collect.Iterables;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class PoupularPlacesMain {

	// events are out of order by max 60 seconds
	private static final int MAX_EVENT_DELAY_DEFAULT = 60;

	// events of 10 minutes are served in 1 second (10 * 60) = 600s
	private static final int SERVING_SPEED_FACTOR_DEFAULT = 600;

	private static final int POPULAR_PLACES_COUNTER_THRESHOLD = 20;

	public static void main(String[] args) throws Exception {

		// get an ExecutionEnvironment
		StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();
		// configure event-time processing
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		// generate a Watermark every second
		env.getConfig().setAutoWatermarkInterval(1000);

		// configure Kafka consumer
		Properties props = new Properties();
		props.setProperty("zookeeper.connect", "localhost:2181"); // Zookeeper default host:port
		props.setProperty("bootstrap.servers", "localhost:9092"); // Broker default host:port
		props.setProperty("group.id", "myGroup");                 // Consumer group ID
		props.setProperty("auto.offset.reset", "earliest");       // Always read topic from start

		// create a Kafka consumer
		FlinkKafkaConsumer010<TaxiRide> consumer =
				new FlinkKafkaConsumer010<>(
						"cleansedRides",
						new TaxiRideSchema(),
						props);

		// assign a timestamp extractor to the consumer
		consumer.assignTimestampsAndWatermarks(new PopulatPlacesWatermarkOutOfOrdeness(MAX_EVENT_DELAY_DEFAULT));

		DataStream<TaxiRide> rides = env.addSource(consumer);

//		DataStream<TaxiRide> rides = env.addSource(
//				new TaxiRideSource("/Users/dineshat/solo/flink-java-project/nycTaxiRides.gz", MAX_EVENT_DELAY_DEFAULT, SERVING_SPEED_FACTOR_DEFAULT));

		DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popoularPlaces = rides
				.filter(new TaxiRideCleansing.NewYorkTaxiFilter())
				.map(new MapToGridCell())
				.<KeyedStream<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>>keyBy(0, 1)
				.timeWindow(Time.minutes(15), Time.minutes(5))
				.apply(new RideCounterWindowFunction())
				.filter(new PopularPlaceThresholdFilter(POPULAR_PLACES_COUNTER_THRESHOLD))
				.map(new MapFromGridCellToLatLon());

		Map<String, String> config = new HashMap<>();
		config.put("bulk.flush.max.actions", "10");   // flush inserts after every event
		config.put("cluster.name", "elasticsearch"); // default cluster name

		List<InetSocketAddress> transports = new ArrayList<>();
// set default connection details
		transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

		popoularPlaces.addSink(
				new ElasticsearchSink<>(config, transports, new PopularPlaceInserter()))
//				.setParallelism(1)
				.name("ES_Sink");

//		popoularPlaces.print();
		env.execute("Popular place task");
	}

	public static class MapToGridCell implements MapFunction<TaxiRide, Tuple2<Integer, Boolean>> {

		@Override
		public Tuple2<Integer, Boolean> map(TaxiRide taxiRide) throws Exception {
			float lon;
			float lat;
			final boolean isStart = taxiRide.isStart;
			if(isStart) {
				lon = taxiRide.startLon;
				lat = taxiRide.startLat;
			}
			else {
				lon = taxiRide.endLon;
				lat = taxiRide.endLat;
			}

			int gridId = GeoUtils.mapToGridCell(lon, lat);
			return Tuple2.of(gridId, isStart);
		}
	}

	public static class RideCounterWindowFunction implements WindowFunction<
			// input type
			Tuple2<Integer, Boolean>,
			// output type
			Tuple4<Integer, Long, Boolean, Integer>,
			// key type
			Tuple,
			// window type
			TimeWindow>
	{

		@Override
		public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<Integer, Boolean>> events,
				Collector<Tuple4<Integer, Long, Boolean, Integer>> collector) throws Exception {

			Tuple2<Integer, Boolean> castedKey = (Tuple2<Integer, Boolean>)key;
			int gridId = castedKey.f0;
			boolean isStart = castedKey.f1;
			long windowTime = timeWindow.getEnd();
			int rideCounter = Iterables.size(events);

			collector.collect(Tuple4.of(gridId, windowTime, isStart, rideCounter));
		}
	}

	public static final class PopularPlaceThresholdFilter implements FilterFunction<Tuple4<Integer, Long, Boolean, Integer>> {

		private final int popularPlacesCounterThreshold;

		public PopularPlaceThresholdFilter(int popularPlacesCounterThreshold) {
			this.popularPlacesCounterThreshold = popularPlacesCounterThreshold;
		}

		@Override
		public boolean filter(Tuple4<Integer, Long, Boolean, Integer> place) {
			return place.f3 >= popularPlacesCounterThreshold;
		}
	}

	public static class MapFromGridCellToLatLon implements MapFunction<Tuple4<Integer, Long, Boolean, Integer>,
			Tuple5<Float, Float, Long, Boolean, Integer>> {


		@Override
		public Tuple5<Float, Float, Long, Boolean, Integer> map(Tuple4<Integer, Long, Boolean, Integer> cell) throws Exception {
			float lon = GeoUtils.getGridCellCenterLon(cell.f0);
			float lat = GeoUtils.getGridCellCenterLat(cell.f0);
			long timeWindow = cell.f1;
			boolean isStart = cell.f2;
			int count = cell.f3;

			return Tuple5.of(lon, lat, timeWindow, isStart, count);
		}
	}

	public static class PopulatPlacesWatermarkOutOfOrdeness extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {
		public PopulatPlacesWatermarkOutOfOrdeness(int maxOutOfOrderness) {
			super(Time.seconds(maxOutOfOrderness));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}

	public static class PopularPlaceInserter implements ElasticsearchSinkFunction<Tuple5<Float, Float, Long, Boolean, Integer>> {

		@Override
		public void process(
				Tuple5<Float, Float, Long, Boolean, Integer> record,
				RuntimeContext ctx,
				RequestIndexer indexer) {

			// construct JSON document to index
			Map<String, String> json = new HashMap<>();
			json.put("time", record.f2.toString());         // timestamp
			json.put("location", record.f1+","+record.f0);  // lat,lon pair
			json.put("isStart", record.f3.toString());      // isStart
			json.put("cnt", record.f4.toString());          // count

			IndexRequest rqst = Requests.indexRequest()
					.index("nyc-places")           // index name
					.type("popular-locations")     // mapping name
					.source(json);

			indexer.add(rqst);
		}
	}
}