001/*
002 * Copyright 2009-2019 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2009-2019 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.logging.Level;
030
031
032
033/**
034 * Instances of this class are used to ensure that certain actions are performed
035 * at a fixed rate per interval (e.g. 10000 search operations per second).
036 * <p>
037 * Once a class is constructed with the duration of an interval and the target
038 * per interval, the {@link #await} method only releases callers at the
039 * specified number of times per interval.  This class is most useful when
040 * the target number per interval exceeds the limits of other approaches
041 * such as {@code java.util.Timer} or
042 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}.  For instance,
043 * this does a good job of ensuring that something happens about 10000 times
044 * per second, but it's overkill to ensure something happens five times per
045 * hour.  This does come at a cost.  In the worst case, a single thread is
046 * tied up in a loop doing a small amount of computation followed by a
047 * Thread.yield().  Calling Thread.sleep() is not possible because many
048 * platforms sleep for a minimum of 10ms, and all platforms require sleeping
049 * for at least 1ms.
050 * <p>
051 * Testing has shown that this class is accurate for a "no-op"
052 * action up to two million per second, which vastly exceeds its
053 * typical use in tools such as {@code searchrate} and {@code modrate}.  This
054 * class is designed to be called by multiple threads, however, it does not
055 * make any fairness guarantee between threads; a single-thread might be
056 * released from the {@link #await} method many times before another thread
057 * that is blocked in that method.
058 * <p>
059 * This class attempts to smooth out the target per interval throughout each
060 * interval.  At a given ratio, R between 0 and 1, through the interval, the
061 * expected number of actions to have been performed in the interval at that
062 * time is R times the target per interval.  That is, 10% of the way through
063 * the interval, approximately 10% of the actions have been performed, and
064 * 80% of the way through the interval, 80% of the actions have been performed.
065 * <p>
066 * It's possible to wait for multiple "actions" in one call with
067 * {@link #await(int)}. An example use is rate limiting writing bytes out to
068 * a file. You could configure a FixedRateBarrier to only allow 1M bytes to
069 * be written per second, and then call {@link #await(int)} with the size of
070 * the byte buffer to write. The call to {@link #await(int)} would block until
071 * writing out the buffer would not exceed the desired rate.
072 */
073@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
074public final class FixedRateBarrier
075       implements Serializable
076{
077  /**
078   * The minimum number of milliseconds that Thread.sleep() can handle
079   * accurately.  This varies from platform to platform, so we measure it
080   * once in the static initializer below.  When using a low rate (such as
081   * 100 per second), we can often sleep between iterations instead of having
082   * to spin calling Thread.yield().
083   */
084  private static final long minSleepMillis;
085  static
086  {
087    // Calibrate the minimum number of milliseconds that we can reliably
088    // sleep on this system.  We take several measurements and take the median,
089    // which keeps us from choosing an outlier.
090    //
091    // It varies from system to system.  Testing on three systems, yielded
092    // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms),
093    // Windows 7 (1 ms).
094
095    final List<Long> minSleepMillisMeasurements = new ArrayList<>(11);
096
097    for (int i = 0; i < 11; i++)
098    {
099      final long timeBefore = System.currentTimeMillis();
100      try
101      {
102        Thread.sleep(1);
103      }
104      catch (final InterruptedException e)
105      {
106        Debug.debugException(e);
107      }
108      final long sleepMillis = System.currentTimeMillis() - timeBefore;
109      minSleepMillisMeasurements.add(sleepMillis);
110    }
111
112    Collections.sort(minSleepMillisMeasurements);
113    final long medianSleepMillis = minSleepMillisMeasurements.get(
114            minSleepMillisMeasurements.size()/2);
115
116    minSleepMillis = Math.max(medianSleepMillis, 1);
117
118    final String message = "Calibrated FixedRateBarrier to use " +
119          "minSleepMillis=" + minSleepMillis + ".  " +
120          "Minimum sleep measurements = " + minSleepMillisMeasurements;
121    Debug.debug(Level.INFO, DebugType.OTHER, message);
122  }
123
124
125
126  /**
127   * The serial version UID for this serializable class.
128   */
129  private static final long serialVersionUID = -9048370191248737239L;
130
131
132
133  // This tracks when this class is shut down.  Calls to await() after
134  // shutdownRequested() is called, will return immediately with a value of
135  // true.
136  private volatile boolean shutdownRequested = false;
137
138
139  //
140  // The following class variables are guarded by synchronized(this).
141  //
142
143  // The duration of the target interval in nano-seconds.
144  private long intervalDurationNanos;
145
146  // This tracks the number of milliseconds between each iteration if they
147  // were evenly spaced.
148  //
149  // If intervalDurationMs=1000 and perInterval=100, then this is 100.
150  // If intervalDurationMs=1000 and perInterval=10000, then this is .1.
151  private double millisBetweenIterations;
152
153  // The target number of times to release a thread per interval.
154  private int perInterval;
155
156  // A count of the number of times that await has returned within the current
157  // interval.
158  private long countInThisInterval;
159
160  // The start of this interval in terms of System.nanoTime().
161  private long intervalStartNanos;
162
163  // The end of this interval in terms of System.nanoTime().
164  private long intervalEndNanos;
165
166
167
168  /**
169   * Constructs a new FixedRateBarrier, which is active until
170   * {@link #shutdownRequested} is called.
171   *
172   * @param  intervalDurationMs  The duration of the interval in milliseconds.
173   * @param  perInterval  The target number of times that {@link #await} should
174   *                      return per interval.
175   */
176  public FixedRateBarrier(final long intervalDurationMs, final int perInterval)
177  {
178    setRate(intervalDurationMs, perInterval);
179  }
180
181
182
183  /**
184   * Updates the rates associated with this FixedRateBarrier.  The new rate
185   * will be in effect when this method returns.
186   *
187   * @param  intervalDurationMs  The duration of the interval in milliseconds.
188   * @param  perInterval  The target number of times that {@link #await} should
189   *                      return per interval.
190   */
191  public synchronized void setRate(final long intervalDurationMs,
192                                   final int perInterval)
193  {
194    Validator.ensureTrue(intervalDurationMs > 0,
195         "FixedRateBarrier.intervalDurationMs must be at least 1.");
196    Validator.ensureTrue(perInterval > 0,
197         "FixedRateBarrier.perInterval must be at least 1.");
198
199    this.perInterval = perInterval;
200
201    intervalDurationNanos = 1000L * 1000L * intervalDurationMs;
202
203    millisBetweenIterations = (double)intervalDurationMs/(double)perInterval;
204
205    // Reset the intervals and all of the counters.
206    countInThisInterval = 0;
207    intervalStartNanos = 0;
208    intervalEndNanos = 0;
209  }
210
211
212
213  /**
214   * This method waits until it is time for the next 'action' to be performed
215   * based on the specified interval duration and target per interval.  This
216   * method can be called by multiple threads simultaneously.  This method
217   * returns immediately if shutdown has been requested.
218   *
219   * @return  {@code true} if shutdown has been requested and {@code} false
220   *          otherwise.
221   */
222  public synchronized boolean await()
223  {
224    return await(1);
225  }
226
227
228
229  /**
230   * This method waits until it is time for the next {@code count} 'actions'
231   * to be performed based on the specified interval duration and target per
232   * interval.  To achieve the target rate, it's recommended that on average
233   * {@code count} is small relative to {@code perInterval} (and the
234   * {@code count} must not be larger than {@code perInterval}).  A
235   * {@code count} value will not be split across intervals, and due to timing
236   * issues, it's possible that a {@code count} that barely fits in the
237   * current interval will need to wait until the next interval.  If it's not
238   * possible to use smaller 'count' values, then increase {@code perInterval}
239   * and {@code intervalDurationMs} by the same relative amount.  As an
240   * example, if {@code count} is on average 1/10 as big as
241   * {@code perInterval}, then you can expect to attain 90% of the target
242   * rate.  Increasing {@code perInterval} and {@code intervalDurationMs} by
243   * 10x means that 99% of the target rate can be achieved.
244   * <p>
245   * This method can be called by multiple threads simultaneously.  This method
246   * returns immediately if shutdown has been requested.
247   *
248   * @param  count  The number of 'actions' being performed.  It must be less
249   *                than or equal to {@code perInterval}, and is recommended to
250   *                be fairly small relative to {@code perInterval} so that it
251   *                is easier to achieve the desired rate and exhibit smoother
252   *                performance.
253   *
254   * @return  {@code true} if shutdown has been requested and {@code} false
255   *          otherwise.
256   */
257  public synchronized boolean await(final int count)
258  {
259    if (count > perInterval)
260    {
261      Validator.ensureTrue(false,
262           "FixedRateBarrier.await(int) count value " + count +
263                " exceeds perInterval value " + perInterval +
264                ".  The provided count value must be less than or equal to " +
265                "the perInterval value.");
266    }
267    else if (count <= 0)
268    {
269      return shutdownRequested;
270    }
271
272    // Loop forever until we are requested to shutdown or it is time to perform
273    // the next 'action' in which case we break from the loop.
274    while (!shutdownRequested)
275    {
276      final long now = System.nanoTime();
277
278      if ((intervalStartNanos == 0) ||   // Handles the first time we're called.
279          (now < intervalStartNanos))    // Handles a change in the clock.
280      {
281        intervalStartNanos = now;
282        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
283      }
284      else if (now >= intervalEndNanos)  // End of an interval.
285      {
286        countInThisInterval = 0;
287
288        if (now < (intervalEndNanos + intervalDurationNanos))
289        {
290          // If we have already passed the end of the next interval, then we
291          // don't try to catch up.  Instead we just reset the start of the
292          // next interval to now.  This could happen if the system clock
293          // was set to the future, we're running in a debugger, or we have
294          // very short intervals and are unable to keep up.
295          intervalStartNanos = now;
296        }
297        else
298        {
299          // Usually we're some small fraction into the next interval, so
300          // we set the start of the current interval to the end of the
301          // previous one.
302          intervalStartNanos = intervalEndNanos;
303        }
304        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
305      }
306
307      final long intervalRemaining = intervalEndNanos - now;
308      if (intervalRemaining <= 0)
309      {
310        // This shouldn't happen, but we're careful not to divide by 0.
311        continue;
312      }
313
314      final double intervalFractionRemaining =
315           (double) intervalRemaining / intervalDurationNanos;
316
317      final double expectedRemaining = intervalFractionRemaining * perInterval;
318      final long actualRemaining = perInterval - countInThisInterval;
319
320      final long countBehind =
321              (long)Math.ceil(actualRemaining - expectedRemaining);
322
323      if (count <= countBehind)
324      {
325        // We are on schedule or behind schedule so let the 'action(s)'
326        // happen.
327        countInThisInterval += count;
328        break;
329      }
330      else
331      {
332        // If we can sleep until it's time to leave this barrier, then do
333        // so to keep from spinning on a CPU doing Thread.yield().
334
335        final long countNeeded = count - countBehind;
336        final long remainingMillis =
337             (long) Math.floor(millisBetweenIterations * countNeeded);
338
339        if (remainingMillis >= minSleepMillis)
340        {
341          // Cap how long we sleep so that we can respond to a change in the
342          // rate without too much delay.
343          try
344          {
345            // We need to wait here instead of Thread.sleep so that we don't
346            // block setRate.  Also, cap how long we sleep so that we can
347            // respond to a change in the rate without too much delay.
348            final long waitTime = Math.min(remainingMillis, 10);
349            wait(waitTime);
350          }
351          catch (final InterruptedException e)
352          {
353            Debug.debugException(e);
354            Thread.currentThread().interrupt();
355            return shutdownRequested;
356          }
357        }
358        else
359        {
360          // We're ahead of schedule so yield to other threads, and then try
361          // again.  Note: this is the most costly part of the algorithm because
362          // we have to busy wait due to the lack of sleeping for very small
363          // amounts of time.
364          Thread.yield();
365        }
366      }
367    }
368
369    return shutdownRequested;
370  }
371
372
373
374  /**
375   * Retrieves information about the current target rate for this barrier.  The
376   * value returned will include a {@code Long} that specifies the duration of
377   * the current interval in milliseconds and an {@code Integer} that specifies
378   * the number of times that the {@link #await} method should return per
379   * interval.
380   *
381   * @return  Information about hte current target rate for this barrier.
382   */
383  public synchronized ObjectPair<Long,Integer> getTargetRate()
384  {
385    return new ObjectPair<>(
386         (intervalDurationNanos / (1000L * 1000L)),
387         perInterval);
388  }
389
390
391
392  /**
393   * Shuts down this barrier.  Future calls to await() will return immediately.
394   */
395  public void shutdownRequested()
396  {
397    shutdownRequested = true;
398  }
399
400
401
402  /**
403   * Returns {@code true} if shutdown has been requested.
404   *
405   * @return  {@code true} if shutdown has been requested and {@code false}
406   *          otherwise.
407   */
408  public boolean isShutdownRequested()
409  {
410    return shutdownRequested;
411  }
412}