001// Licensed under the Apache License, Version 2.0 (the "License"); 002// you may not use this file except in compliance with the License. 003// You may obtain a copy of the License at 004// 005// http://www.apache.org/licenses/LICENSE-2.0 006// 007// Unless required by applicable law or agreed to in writing, software 008// distributed under the License is distributed on an "AS IS" BASIS, 009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 010// See the License for the specific language governing permissions and 011// limitations under the License. 012 013package org.apache.tapestry5.ioc.internal.services.cron; 014 015import org.apache.tapestry5.commons.util.CollectionFactory; 016import org.apache.tapestry5.ioc.Invokable; 017import org.apache.tapestry5.ioc.annotations.PostInjection; 018import org.apache.tapestry5.ioc.services.ParallelExecutor; 019import org.apache.tapestry5.ioc.services.RegistryShutdownHub; 020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor; 021import org.apache.tapestry5.ioc.services.cron.PeriodicJob; 022import org.apache.tapestry5.ioc.services.cron.Schedule; 023import org.slf4j.Logger; 024 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032 033public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable 034{ 035 private final ParallelExecutor parallelExecutor; 036 037 private final Logger logger; 038 039 // Synchronized by jobLock 040 private final List<Job> jobs = CollectionFactory.newList(); 041 042 private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor"); 043 044 private transient boolean shutdown; 045 046 private static final long FIVE_MINUTES = 5 * 60 * 1000; 047 048 private final AtomicInteger jobIdAllocator = new AtomicInteger(); 049 050 private final AtomicBoolean started = new AtomicBoolean(); 051 052 private final Lock jobLock = new ReentrantLock(); 053 054 private class Job implements PeriodicJob, Invokable<Void> 055 { 056 final int jobId = jobIdAllocator.incrementAndGet(); 057 058 private final Schedule schedule; 059 060 private final String name; 061 062 private final Runnable runnableJob; 063 064 private boolean executing, canceled; 065 066 private long nextExecution; 067 068 public Job(Schedule schedule, String name, Runnable runnableJob) 069 { 070 this.schedule = schedule; 071 this.name = name; 072 this.runnableJob = runnableJob; 073 074 nextExecution = schedule.firstExecution(); 075 } 076 077 @Override 078 public String getName() 079 { 080 return name; 081 } 082 083 public long getNextExecution() 084 { 085 try 086 { 087 jobLock.lock(); 088 return nextExecution; 089 } finally 090 { 091 jobLock.unlock(); 092 } 093 } 094 095 096 @Override 097 public boolean isExecuting() 098 { 099 try 100 { 101 jobLock.lock(); 102 return executing; 103 } finally 104 { 105 jobLock.unlock(); 106 } 107 } 108 109 @Override 110 public boolean isCanceled() 111 { 112 try 113 { 114 jobLock.lock(); 115 return canceled; 116 } finally 117 { 118 jobLock.unlock(); 119 } 120 } 121 122 @Override 123 public void cancel() 124 { 125 try 126 { 127 jobLock.lock(); 128 129 canceled = true; 130 131 if (!executing) 132 { 133 removeJob(this); 134 } 135 136 // Otherwise, it will be caught when the job finishes execution. 137 } finally 138 { 139 jobLock.unlock(); 140 } 141 } 142 143 @Override 144 public String toString() 145 { 146 StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId); 147 148 builder.append(", (").append(name).append(')'); 149 150 if (executing) 151 { 152 builder.append(", executing"); 153 } 154 155 if (canceled) 156 { 157 builder.append(", canceled"); 158 } else 159 { 160 builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution)); 161 } 162 163 return builder.append(']').toString(); 164 } 165 166 /** 167 * Starts execution of the job; this sets the executing flag, calculates the next execution time, 168 * and uses the ParallelExecutor to run the job. 169 */ 170 void start() 171 { 172 try 173 { 174 jobLock.lock(); 175 executing = true; 176 177 // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options 178 // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing 179 // overlapping executions of the Job on a more rigid schedule. Use Quartz. 180 181 nextExecution = schedule.nextExecution(nextExecution); 182 183 parallelExecutor.invoke(this); 184 } finally 185 { 186 jobLock.unlock(); 187 } 188 189 if (logger.isTraceEnabled()) 190 { 191 logger.trace(this + " sent for execution"); 192 } 193 } 194 195 void cleanupAfterExecution() 196 { 197 try 198 { 199 if (logger.isTraceEnabled()) 200 { 201 logger.trace(this + " execution complete"); 202 } 203 204 executing = false; 205 206 if (canceled) 207 { 208 removeJob(this); 209 } else 210 { 211 // Again, naive but necessary. 212 thread.interrupt(); 213 } 214 } finally 215 { 216 jobLock.unlock(); 217 } 218 } 219 220 @Override 221 public Void invoke() 222 { 223 logger.debug("Executing job #{} ({})", jobId, name); 224 225 try 226 { 227 runnableJob.run(); 228 } finally 229 { 230 cleanupAfterExecution(); 231 } 232 233 return null; 234 } 235 236 } 237 238 public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger) 239 { 240 this.parallelExecutor = parallelExecutor; 241 this.logger = logger; 242 } 243 244 @PostInjection 245 public void start(RegistryShutdownHub hub) 246 { 247 hub.addRegistryShutdownListener(new Runnable() 248 { 249 @Override 250 public void run() 251 { 252 registryDidShutdown(); 253 } 254 }); 255 256 } 257 258 public void init() 259 { 260 if (!started.get()) 261 { 262 started.set(true); 263 thread.start(); 264 } 265 } 266 267 void removeJob(Job job) 268 { 269 if (logger.isDebugEnabled()) 270 { 271 logger.debug("Removing " + job); 272 } 273 274 try 275 { 276 jobLock.lock(); 277 jobs.remove(job); 278 } finally 279 { 280 jobLock.unlock(); 281 } 282 } 283 284 285 @Override 286 public PeriodicJob addJob(Schedule schedule, String name, Runnable job) 287 { 288 assert schedule != null; 289 assert name != null; 290 assert job != null; 291 292 Job periodicJob = new Job(schedule, name, job); 293 294 try 295 { 296 jobLock.lock(); 297 298 jobs.add(periodicJob); 299 } finally 300 { 301 jobLock.unlock(); 302 } 303 304 if (logger.isDebugEnabled()) 305 { 306 logger.debug("Added " + periodicJob); 307 } 308 309 // Wake the thread so that it can start the job, if necessary. 310 311 // Technically, this is only necessary if the new job is scheduled earlier 312 // than any job currently in the list of jobs, but this naive implementation 313 // is simpler. 314 thread.interrupt(); 315 316 return periodicJob; 317 } 318 319 @Override 320 public void run() 321 { 322 while (!shutdown) 323 { 324 long nextExecution = executeCurrentBatch(); 325 326 try 327 { 328 long delay = nextExecution - System.currentTimeMillis(); 329 330 if (logger.isTraceEnabled()) 331 { 332 logger.trace(String.format("Sleeping for %,d ms", delay)); 333 } 334 335 if (delay > 0) 336 { 337 Thread.sleep(delay); 338 } 339 } catch (InterruptedException 340 ex) 341 { 342 // Ignored; the thread is interrupted() to shut it down, 343 // or to have it execute a new batch. 344 345 logger.trace("Interrupted"); 346 } 347 } 348 } 349 350 private void registryDidShutdown() 351 { 352 shutdown = true; 353 354 thread.interrupt(); 355 } 356 357 /** 358 * Finds jobs and executes jobs that are ready to be executed. 359 * 360 * @return the next execution time (from the non-executing job that is scheduled earliest for execution). 361 */ 362 private long executeCurrentBatch() 363 { 364 long now = System.currentTimeMillis(); 365 long nextExecution = now + FIVE_MINUTES; 366 367 try 368 { 369 jobLock.lock(); 370 // TAP5-2455 371 Set<Job> jobsToCancel = null; 372 373 for (Job job : jobs) 374 { 375 if (job.isExecuting()) 376 { 377 continue; 378 } 379 380 long jobNextExecution = job.getNextExecution(); 381 382 if (jobNextExecution <= 0) 383 { 384 if (jobsToCancel == null) 385 { 386 jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>(); 387 } 388 jobsToCancel.add(job); 389 } else if (jobNextExecution <= now) 390 { 391 job.start(); 392 } else 393 { 394 nextExecution = Math.min(nextExecution, jobNextExecution); 395 } 396 } 397 if (jobsToCancel != null) 398 { 399 for (Job job : jobsToCancel) 400 { 401 job.cancel(); 402 } 403 } 404 } finally 405 { 406 jobLock.unlock(); 407 } 408 409 return nextExecution; 410 } 411 412 413}