TaxiRideCleansingRunner.java
package com.training.data.artisans.taxi;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
public class TaxiRideCleansingRunner {
// events are out of order by max 60 seconds
private static final int MAX_EVENT_DELAY_DEFAULT = 60;
// events of 10 minutes are served in 1 second (10 * 60) = 600s
private static final int SERVING_SPEED_FACTOR_DEFAULT = 600;
public static void main(String[] args) throws Exception {
TaxiRideCleansingParameterParser params = new TaxiRideCleansingParameterParser();
// TODO: refactor this method
if(!params.parseParams(args)){
final String dataFilePath = params.getDataFilePath();
// get an ExecutionEnvironment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource(dataFilePath, MAX_EVENT_DELAY_DEFAULT, SERVING_SPEED_FACTOR_DEFAULT));
TaxiRideCleansing taxiRideCleansing = new TaxiRideCleansing();
DataStream<TaxiRide> filteredRides = taxiRideCleansing.execute(rides);
filteredRides.addSink(new FlinkKafkaProducer010<>(
"localhost:9092", // Kafka broker host:port
"cleansedRides", // Topic to write to
new TaxiRideSchema()) // Serializer (provided as util)
);
// filteredRides.print();
env.execute("Running Taxi Ride Cleansing");
}
}
}