So I am going to tell you my environment and what I want:
- A process like linux cron services for running commands following pre-determined schedule.
- Objects that should be able to process configuration files that contain lists of command lines and when its should be invoked.
- Objects that should be able to process config files. This config files contain lists of command and when that can be invoked(in time).
- Some of the aforementioned configurations files must be modified dynamically. That means without stop our application.
- For its configuration.
- For the scalability that we need.
- For the shell scripting complexity that we ought need (in my real life I used to need data base connections, parse thousands urls, parallel processing on net, etc).
- It is our OS configuration task.
All of this in an environment where we need an intensive use of concurrency. So I have to deal with this scenario but akka libraries do not have solution for it. So the are several options on the market, some of them:
So in my case I going to use the last one, Akka-Quartz Scheduler, there are several reason, Apache Camel in my opinion is too complicated for just for use his Timer and related to Quartz have the same problem and too oriented to java community, so every kind of listener has to be implemented and we need to work in an akka environment.
I will not tell you here what is better o worst option for my “akka cron environment”. Scheduler and Timers are a very complicated aspects in programming so if you want a right opinion you will need a deep benchmarking between the three of them. It is not the intention of this document.
What are we going to do in our example:
- We need to execute several task following specific schedule (day/time) indefinitely until something goes wrong.
- We should change the aforementioned schedule, if we need, for any other. All without stop any of our actors that are busy executing our task.
It is up to you don't change the scheduler with PAST dates.I do not make any validation about the date that you configure in your schedule.
As Akka-Quartz Scheduler explain, its goal is to provide Akka with a scheduling system that is closer to what one would expect for Cron type jobs.
This is the example that I bring to readers today:
This is the example that I bring to readers today:
We have a platform whose target is to collect information from a web page that publishes everything related to movies. At the same time I know that every month is published the information about when(in the time) will be added new pages with the info related to new movies. All our code is based on AkkaScheduler module on my github, it is a multi-module project so you can follow the instruction indicated in the repository if you want to deploy it.
Our project has a very important aspect:
Our project has a very important aspect:
Configuration files: I have 2 configuration files.
One of them will indicate me that every month I have to do something, this configuration is internal and will be in an internal file. In our case will be in the akka configuration file(application.conf) on my github :
One of them will indicate me that every month I have to do something, this configuration is internal and will be in an internal file. In our case will be in the akka configuration file(application.conf) on my github :
1 2 3 4 5 6 7 8 9 | quartz { defaultTimezone = "Europe/London" schedules { moviepages { description = "A cron job that fires off every month" expression = "0 30 2 2 * ? *" } } } |
See this reference to the CronExpression. Because the info is published every first day in the month I will check at the beginning of the second day every month. So this is an internal configuration ref: code 1.0 because it never change(I have to review every month for a schedule about the pages related to new films that will be added or updated)
The Second of them will indicate me when will be updated each page that has the information, during the whole month. This will be an external configuration file that indicate when will be updated or added any new page throughout the month. So that day that the page be updated or added our actor will collect information about it. Every month this external file has to be changed with another new schedule.
An example of my external file (cronmoviepages.conf) that you can find on my github is below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | ## https://github.com/lightbend/config/blob/master/HOCON.md ## cronexpresssion = "Seconds Minutes Hours Day-of-month Month Day-of-Week Year(Optional)" schedule { defaultTimezone = "Europe/London" moviepagemejorenvo = [ { cronexpresssion = "0 4 2 20 9 ? *" moviepage = "40" } { cronexpresssion = "0 4 2 20 9 ? *" moviepage = "41" } ] } |
In code 1.1 cronexpression means when the actor has to update the associate page from the movie website. Internally the software will group all moviepage with the same cronexpression and will fire the jobs with each common group.
We have two main cores:
- For initialize the process BootNotScheduled . In this case we know what pages we want to update so we do not use the schedule.
- For start the schedule process BootScheduled . This is main program that fire the scheduler.
1 2 3 4 5 6 7 8 9 10 11 | object BootScheduled extends App { //https://github.com/lambdista/config val system = ActorSystem(ConstantsObject.ScheduledIoTDaemon) // This actor will control the whole System ("IoTDaemon_Scheduled") val daemonScheduled = system.actorOf(IoTAdmin.props, ConstantsObject.ScheduledIoTDaemonProcessing) // This actor will control the reScheduling about time table for update new pages with films in original version val reeschedule = system.actorOf(ReSchedulingJob.props, ConstantsObject.DaemonReShedulingVoMoviePages) //Use system's dispatcher as ExecutionContext //QuartzSchedulerExtension is scoped to that ActorSystem and there will only ever be one instance of it per ActorSystem QuartzSchedulerExtension(system).schedule("moviepages", reeschedule, FireSchedule(daemonScheduled)) } |
In the previous code 1.3 on line 10 It will use the internal configuration from our application.conf ref. code 1.0. At the same time this line is responsible of every second day of the month at 2.30 am fire a job that will read the schedule a new scheduler ref. code 1.1(cronmoviepages.conf), the external file that can be read periodically and it indicates what jobs be re-scheduled every time with the new schedule.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | ..................... cronExpreesionMatches.foreach( cronExpreesionMatch => { val (cronExpreesion, listOfMoviePages) = cronExpreesionMatch log.info( "Programming Schedule for cronexpression: {} including matches: {}", cronExpreesion,listOfMoviePages.mkString("-") ) val headList = listOfMoviePages.head // TODO it is important to kill all Scheduled jobs that has been created ONCE time that the work has done /** This code generate a warning because the reschedule job never exist * any more because headList used for named it never is the same*/ QuartzSchedulerExtension(system).rescheduleJob( s"Movie-Page-Scheduler$headList", scheduledDaemon, ParseUrlFilms(listOfMoviePages), Option("Scheduling "), cronExpreesion, None, defaultTimezone) } ) log.info("schedule get new MoviePages") } ..................... |
In the previous code 1.4 on line 16 we re-schedule an actor (IoTAdmin) to be fire following the external configuration and then it will launch concurrently so many actors(ProcessMoviePage) as many pages need to be updated. You can appreciate that in the code below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ........ class IoTAdmin extends Actor with ActorLogging with MessageToTargetActors{ override def preStart(): Unit = log.info("Start process in Akka Scheduler Example") override def postStop(): Unit = log.info("Stop process in Akka Schedule Example") // TODO checking perfromance: http://docs.scala-lang.org/overviews/collections/performance-characteristics.html var watched = Map.empty[ActorRef, Int] override def receive = { case ParseUrlFilms(urlPageNunmList, optActorRef) => urlPageNunmList.foreach( urlpagenumber => { val currentActorRef: ActorRef = context.actorOf(ProcessMoviePage.props) watched += currentActorRef -> urlpagenumber // watcher for every actor that is created 'cause the actor need to know when the process have finished context.watch(currentActorRef) // TODO urlspatterns<=varconf val processUrlFilmMessage = ProccessPage( s"http://mejorenvo.com/p${urlpagenumber}.html", optActorRef.fold(self)(ref=>ref) ) sendParserUrlMessage(currentActorRef, processUrlFilmMessage) }) ............. |
Every specific day that was configured in cronmoviepages.conf file the IoTAdmin actor will receive a ParseUrlFilms message to process the new page that should be updated.
This post and the project can be a base or skeleton for you if you want to create a process with the following specifications:
- If you need to execute scheduled task and these tasks occur at a specific moment in time and not need to be executed periodically.
- If you need to execute several task following a customizable schedule (day/time), indefinitely until something goes wrong.
This is the idea of our cron but more in deep because some time we need to do more complex thing, external to our OS.
I have used Akka-Quartz Scheduler with docker container and it works pretty well. I have NOT tested the use of this libraries in an Akka Clustering Environment with the complexity that these kind of implementations entail. You can get access to Akka Documentation about to try of configure seed nodes on any PaaS and run it. You can do it manually but when you are working on a PaaS you need to do it automatically. The explanation in Akka Library documentation and specifically the reference to Cluster Bootstrap was not working at the time of writing this article. I will try to explain and implement it in next post.