Click here to Skip to main content
15,861,172 members
Articles / Programming Languages / Java

Madcap Idea 12: Getting the ‘Create Job’ to Work End–End

Rate me:
Please Sign up or sign in to vote.
5.00/5 (2 votes)
24 Oct 2017CPOL8 min read 7.7K   1
How to get the 'create job' to work end-end

Last Time

Last time we looked at finishing the “View Rating” page, which queried the Kafka Stream stores via a new endpoint in the play backend. This post will see us finish the workings of the “Create Job” page.

PreAmble

Just as a reminder, this is part of my ongoing set of posts which I talk about here, where we will be building up to a point where we have a full app using lots of different stuff, such as these:

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok, so now that we have the introductions out of the way, let's crack on with what we want to cover in this post.

Where is the Code?

As usual, the code is on GitHub here.

What Is This Post All About?

As stated above, this post deals with the “Create Job” functions:

  • Allowing the user to specify their geo location position on the map
  • Create a JSON Job object
  • Send it to a new Play backend endpoint
  • Published the Job out over a Kafka topic (using Akka Streams / Reactive Kafka)
  • Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka)
  • Push the consumed Job out of the forever frame (Comet functionality in Play backend)
  • Have a new RxJs based Observable over the comet based forever frame, and ensure that is working

As you can see, this post actually covers a lot. In fact, this post can be thought of as the lynch pin to this entire project. With the code that is contained in this post, we now have everything we need to deal with the rather more complex visual aspects of the “View Job” page.

Initially, I was thinking I would create separate Kafka streams (topics) for all the different interactions such as:

  • Bid for job
  • Accept job
  • Complete job
  • Cancel job
  • Create new job geo location

But then, I came to my senses and realized this could all be achieved using a single stream, that is a Job stream. The idea being is that the JSON payload of that single stream would just hold slightly different state and different points in time.

To see what I mean by this, this is the actual Scala class that we will be turning to/from JSON that is sent out over the SINGLE Kafka topic:

Java
case class Job
(
  jobUUID: String,
  clientFullName: String,
  clientEmail: String,
  driverFullName: String,
  driverEmail: String,
  vehicleDescription: String,
  vehicleRegistrationNumber: String,
  isAssigned:Boolean,
  isCompleted:Boolean
)

With this simple Scala object, we can do everything we want, for example:

  • To determine if this is a new job that a driver may be assigned to: we just look for Job items that don’t yet have a “driverEmail” - this tells us this job is free and has no driver yet
  • To determine if this job has been accepted by a client: we just filter/examine the “isAssigned” property which may be true/false

Anyway, you get the idea.

So now that we know there is a single stream, let's proceed with the rest of the post, shall we.

What Are We Trying To Do in This Post?

It really is a continuous chain of a single process which follows these sequential steps:

image

Allowing the User to Specify their Geo Location Position on the Map

Before we send the actual job JSON payload, we need to allow the user to specify their position such that the position can be retrieved later (right now, the position is maintained in Local Storage not in the Job payload, I may include client/driver positions in the actual payload, we’ll see how that goes).

Once the client sets their OWN position, they are able to create a job, and push out a new job. If they already have a job in flight, the client is NOT able to create a new job.

Thanks to the React map component that was picked some time ago, the position update really just boils down to this code in the CreateJob.tsx file

Java
_handleMapClick = (event) => {
    const newState = Object.assign({}, this.state, {
        currentPosition: new Position(event.latLng.lat(), event.latLng.lng())
    })
    this.setState(newState)
}

To deal with the users' current position, I also created this simple service class:

JavaScript
export class Position {

    lat: number;
    lng: number;

    constructor(lat: number, lng: number) {
        this.lat = lat;
        this.lng = lng;
    }
}
JavaScript
import { injectable, inject } from "inversify";
import { Position } from "../domain/Position";

@injectable()
export class PositionService {

    constructor() {

    }

    clearUserPosition = (email: string): void => {
        let key = 'currentUserPosition_' + email;
        sessionStorage.removeItem(key);
    }

    storeUserPosition = (currentUser: any, position: Position): void => {

        if (currentUser == null || currentUser == undefined)
            return;

        if (position == null || position == undefined)
            return;

        let currentUsersPosition = {
            currentUser: currentUser,
            position: position
        }
        let key = 'currentUserPosition_' + currentUser.email;
        sessionStorage.setItem(key, JSON.stringify(currentUsersPosition));
    }

    currentPosition = (email: string): Position => {
        let key = 'currentUserPosition_' + email;
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
        return currentUsersPosition.position;
    }

    hasPosition = (email: string): boolean => {
        let key = 'currentUserPosition_' + email;
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
        return currentUsersPosition != null && currentUsersPosition != undefined;
    }
}

Create a JSON Job Object

The next step is to create a Job object that may be posted for the new Job to the Play backend. This is done via a standard JQuery POST, as follows:

JavaScript
_handleCreateJobClick = () => {


    var self = this;
    var currentUser = this._authService.user();

    var newJob = {

        clientFullName: currentUser.fullName,
        clientEmail: currentUser.email,
        driverFullName: '',
        driverEmail: '',
        vehicleDescription: '',
        vehicleRegistrationNumber: '',
        isAssigned: false,
        isCompleted: false
    }

    $.ajax({
        type: 'POST',
        url: 'job/submit',
        data: JSON.stringify(newJob),
        contentType: "application/json; charset=utf-8",
        dataType: 'json'
    })
    .done(function (jdata, textStatus, jqXHR) {

        self._jobService.storeUserIssuedJob(newJob);
        const newState = Object.assign({}, self.state, {
            hasIssuedJob: self._jobService.hasIssuedJob()
        });
        //self.setState(newState)
        self._positionService.storeUserPosition(currentUser, self.state.currentPosition);
        hashHistory.push('/viewjob');
    })
    .fail(function (jqXHR, textStatus, errorThrown) {
        const newState = Object.assign({}, self.state, {
            okDialogHeaderText: 'Error',
            okDialogBodyText: jqXHR.responseText,
            okDialogOpen: true,
            okDialogKey: Math.random()
        })
        self.setState(newState)
    });
}

This will also store the users' current position using the PositionService we just saw, and redirect the user (this page is only available to clients as it's all about creating new jobs, which drivers can't do). We also redirect to the “ViewJob” page on successfully sending a new job.

Send It to a New Play Backend Endpoint

There is a new route to support the Job creation, so obviously we need a new route entry:

JavaScript
POST  /job/submit                              controllers.JobController.submitJob()

Published the Job Out Over a Kafka Topic (using Akka Streams / Reactive Kafka)

Ok, so we now know that we have a new endpoint that can accept a “job” JSON object. What does it do with this Job JSON object. Well, quite simply, it does this:

  • Converts the JSON into a Scala object
  • Sends it out over Kafka using Reactive Kafka publisher

You may be asking yourself why we want to burden ourselves with Kafka here at all if all we are going to do is get a Job JSON payload in them send it out via Kafka only to have it come back in via Kafka. This seems weird, why bother. The reason we want to involve Kafka here is for the audit and commit log facility that it provided. We want a record of the events, that’s what kafka has given us, a nice append only log.

Anyway, what does the new endpoint code look like that accepts the job. Here it is:

Java
package controllers

import javax.inject.Inject

import entities.Job
import entities.JobJsonFormatters._
import entities._
import actors.job.{JobConsumerActor, JobProducerActor}
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.scaladsl.{BroadcastHub, Keep, MergeHub}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.http.ContentTypes
import play.api.libs.Comet
import play.api.libs.json._
import play.api.libs.json.Json
import play.api.libs.json.Format
import play.api.libs.json.JsSuccess
import play.api.libs.json.Writes
import play.api.mvc.{Action, Controller}
import utils.Errors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class JobController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
{
  val rand = new Random()

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _ => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val (sink, source) =
    MergeHub.source[JsValue](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
      .run()

  //job producer
  val childJobProducerActorProps = Props(classOf[JobProducerActor],mat,ec)
  val jobProducerSupervisorProps = createBackoffSupervisor(childJobProducerActorProps,
    s"JobProducerActor_${rand.nextInt()}")
  val jobProducerSupervisorActorRef = actorSystem.actorOf(jobProducerSupervisorProps, 
    name = "jobProducerSupervisor")

  //job consumer
  val childJobConsumerActorProps = Props(new JobConsumerActor(sink)(mat,ec))
  val jobConsumerSupervisorProps = createBackoffSupervisor(childJobConsumerActorProps,
    s"JobConsumerActor_${rand.nextInt()}")
  val jobConsumerSupervisorActorRef = actorSystem.actorOf(jobConsumerSupervisorProps, 
    name = "jobConsumerSupervisor")
  jobConsumerSupervisorActorRef ! Init


  def streamedJob() = Action {
    Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)
  }

  def submitJob = Action.async(parse.json) { request =>
    Json.fromJson[Job](request.body) match {
      case JsSuccess(job, _) => {
        jobProducerSupervisorActorRef ! job
        Future.successful(Ok(Json.toJson(job.copy(clientEmail = job.clientEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a Job from the json provided. " +
          Errors.show(errors)))
    }
  }

  private def createBackoffSupervisor(childProps:Props, actorChildName: String) : Props = {
    BackoffSupervisor.props(
      Backoff.onStop(
        childProps,
        childName = actorChildName,
        minBackoff = 3.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
      ).withSupervisorStrategy(
        OneForOneStrategy() {
          case _ => SupervisorStrategy.Restart
        })
    )
  }
}

There is a fair bit going on in that code. Let's dissect it a bit.

  • We create a backoff supervisor for both the Kafka producer/consumer actors
  • We create a stream that is capable of writing to the Comet frame socket
  • We provide the sink side (MergeHub) of the stream to the consumer actor, such that when it reads a value from Kafka, it will be pumped into the sink which will then travel through the Akka stream back to the web page via the BroadcastHub and Comet forever frame back to the HTML (and ultimately RxJs Subject)

Push the Consumed Job Out of the Forever Frame (Comet Functionality in Play Backend)

Ok, so we just saw how the 2 actors are created under back off supervisors, and how the consumer (the one that reads from Kafka) gets the ability to essentially write back to the forever frame in the HTML.

So how does the job go out into Kafka land?

That part is quite simple, here it is:

Java
package actors.job

import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import entities.Job
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class JobProducerActor(
  implicit materializer: ActorMaterializer,
  ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobProducerSettings = ProducerSettings(
    context.system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers(s"${Settings.bootStrapServers}")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Job](perProducerBufferSize = 16)
      .map(job => {
        val jobBytes = jSONSerde.serializer().serialize("", job)
        (job, jobBytes)
      })
      .map { jobWithBytes =>
        val (job, jobBytes) = jobWithBytes
        new ProducerRecord[String, Array[Byte]](
          JobTopics.JOB_SUBMIT_TOPIC, job.clientEmail, jobBytes)
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Producer.plainSink(jobProducerSettings))(Keep.both)
      .run()

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill
    }
  }

  override def postStop(): Unit = {
    super.postStop()
    println(s"JobProducerActor seen 'Done'")
    killswitch.shutdown()
  }

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobProducerActor seen ${job}")
      Source.single(job).runWith(mergeHubSink)
    }
    case Done => {
      println(s"JobProducerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
  }
}

We covered a lot of how this worked in the last post, when we talked about how to create a new Rating. The mechanism is essentially the same but this time for Job JSON data.

Consume the Job Over a Kafka Topic (using Akka Streams / Reactive Kafka)

Let's see the JobConsumerActor which takes this Sink (MergeHub from JobController) and pushes the value out to it, when it sees a new value from Kafka on the job topic “job-submit-topic”. This then travels through the Akka stream where it goes via the BroadcastHub out to the forever from in the HTML.

Here is the code, it may look scary, but really it's just reading a value of the Kafka topic and pushing it out via the Sink (MergeHub):

Java
package actors.job

import entities.{Job, Init}
import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.{Done, NotUsed}
import akka.actor.{Actor, ActorSystem, PoisonPill}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Keep, MergeHub, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
                  ByteArraySerializer, StringDeserializer, StringSerializer}
import play.api.libs.json.{JsValue, Json}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

//TODO : This actor should take in a way of pushing back to Websocket
class JobConsumerActor
  (val sink:Sink[JsValue, NotUsed])
  (implicit materializer: ActorMaterializer, ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobConsumerSettings = ConsumerSettings(
    context.system,new StringDeserializer(),new ByteArrayDeserializer())
    .withBootstrapServers(s"${Settings.bootStrapServers}")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val ((_, killswitch), kafkaConsumerFuture) =
    Consumer.committableSource(jobConsumerSettings, Subscriptions.topics(JobTopics.JOB_SUBMIT_TOPIC))
      .mapAsync(1) { msg => {
        val jobBytes = msg.record.value
        val job = jSONSerde.deserializer().deserialize(JobTopics.JOB_SUBMIT_TOPIC,jobBytes)
        self ! job
        msg.committableOffset.commitScaladsl()
      }
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Sink.last)(Keep.both)
      .run()

  kafkaConsumerFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill
    }
  }

  override def postStop(): Unit = {
    super.postStop()
    println(s"JobConsumerActor seen 'Done'")
    killswitch.shutdown()
  }

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobConsumerActor seen ${job}")
      val finalJsonValue = Json.toJson(job)
      Source.single(finalJsonValue).runWith(sink)
    }
    case Done => {
      println(s"JobConsumerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
    case Init => {
      println("JobConsumerActor saw init")
    }
  }
}

Have a New RxJs Based Observable Over the Comet Based forever Frame, and Ensure that is Working

So at the end of the pipeline, we have a forever frame in the browser (always available) that we wish to get events from. Ideally, we want to turn this rather bland event into a better RxJs Observable. So how do we do that. It's quite simple - we use this little service that is able to create a new Observable from the incoming event for us:

JavaScript
import { injectable, inject } from "inversify";
import { JobEventArgs } from "../domain/JobEventArgs";
import Rx from 'rx';

@injectable()
export class JobStreamService {

    private _jobSourceObservable: Rx.Observable<any>;

    constructor() {

    }

    init = (): void => {

        window['jobChanged'] = function (incomingJsonPayload: any) {
            let evt = new CustomEvent('onJobChanged', new JobEventArgs(incomingJsonPayload));
            window.dispatchEvent(evt);
        }

        this._jobSourceObservable = Rx.Observable.fromEvent(window, 'onJobChanged');
    }

    getJobStream = (): Rx.Observable<any> => {
        return this._jobSourceObservable;
    }
}

Where the JobEventArgs looks like this:

JavaScript
<pre class="brush: jscript; title: ; notranslate">
export class JobEventArgs {

    detail: any;

    constructor(detail: any) {
        this.detail = detail;
    }
}

We can use this service in other code and subscribe to this RxJs Observable that the above service exposes. Here is an example of subscribing to it. We will talk much more about this in the next post.

JavaScript
componentWillMount() {
    this._subscription =
        this._jobStreamService.getJobStream()
        .subscribe(
        jobArgs => {

                //TODO : 1. This should not be hard coded
                //TODO : 2. We should push out current job when we FIRST LOAD this page
                //          if we are a client, and we should enrich it if we are a driver
                //       3. The list of markers should be worked out again every time based
                //          on RX stream messages
                console.log('RX saw onJobChanged');
                console.log('RX x = ', jobArgs.detail);
            },
            error => {
                console.log('RX saw ERROR');
                console.log('RX error = ', error);
            },
            () => {
                console.log('RX saw COMPLETE');
            }
        );
}

Conclusion

I am aware that this post has taken a while to get out there. I had an issue in the middle of this one where I broke something and I had to unwind a whole bunch of commits and bring them back in one by one to see when it broke. This caused a bit of friction. The other reason this post took so long is that life just gets in the way sometimes. Stupid life, huh?

Next Time

Next time, we will focus our attention on the “View Job” page which is probably the most complex visual aspect of this project, but we now have all the plumbing to support it, so it's just a matter of getting it done. After that page is done, this project is pretty much there. Yay!

Image 2 Image 3

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
QuestionTags Question Pin
Rick York25-Oct-17 7:58
mveRick York25-Oct-17 7:58 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.