programing

M 요구에 대한 N초 단위의 슬롯링 방식 호출

goodsources 2022. 7. 11. 21:31
반응형

M 요구에 대한 N초 단위의 슬롯링 방식 호출

일부 메서드의 실행을 N초 이내에 최대 M콜로 억제하는 컴포넌트/클래스가 필요합니다(또는 ms 또는 nanos는 상관없습니다).

즉, N초의 슬라이딩 윈도우에서 M회 이상 수행되지 않도록 해야 합니다.

기존 클래스를 모르는 경우 솔루션/아이디어를 자유롭게 게시해 주십시오.

타임스탬프의 링 버퍼를 M의 고정 사이즈로 사용합니다.메서드가 호출될 때마다 가장 오래된 엔트리를 체크하고 과거에 N초 미만이면 실행 및 다른 엔트리를 추가합니다.그렇지 않으면 시간차를 위해 sleep 상태가 됩니다.

바로 사용할 수 있었던 것은 Google Guava Rate Limiter였습니다.

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}

구체적으로는 를 사용하여 이 기능을 구현할 수 있어야 합니다.큐를 초기화하려면M Delayed처음에 지연이 0으로 설정된 인스턴스가 있습니다.메서드에 대한 요구가 들어오면 토큰이 수신되어 슬롯링 요건이 충족될 때까지 메서드가 차단됩니다.토큰이 취득되면 큐에 대한 새로운 토큰의 지연은N.

토큰 버킷알고리즘을 읽어주세요기본적으로 토큰이 들어 있는 버킷이 있습니다.메서드를 실행할 때마다 토큰을 가져옵니다.토큰이 더 이상 없으면 토큰을 얻을 때까지 차단합니다.한편, 일정한 간격으로 토큰을 보충하는 외부 액터가 있습니다.

이 작업을 수행할 수 있는 라이브러리가 없습니다(또는 이와 유사한 기능).이 논리를 코드에 쓰거나 AspectJ를 사용하여 동작을 추가할 수 있습니다.

분산 시스템 전체에서 동작하는 Java 기반의 슬라이딩 윈도우 환율리미터가 필요한 경우 https://github.com/mokies/ratelimitj 프로젝트를 참조해 주십시오.

IP별 요구를 분당 50으로 제한하는 Redis 백업 구성은 다음과 같습니다.

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

Redis 설정의 상세한 것에 대하여는, https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis 를 참조해 주세요.

이것은 어플리케이션에 따라 다릅니다.

여러 스레드에서 토큰이 버스트가 허용되지 않는 글로벌 환율 제한 액션을 수행하도록 하는 경우를 상상해 보십시오(즉, 10초당 10개의 액션을 제한하지만 처음 1초 동안 10개의 액션이 발생한 후 9초 정지된 상태로 유지되지 않도록 해야 합니다).

DelayedQueue에는 단점이 있습니다.즉, 스레드가 토큰을 요구하는 순서는 요구를 실행하는 순서가 아닐 수 있습니다.토큰을 기다리는 동안 여러 스레드가 차단되면 다음 사용 가능한 토큰을 사용할 스레드가 명확하지 않습니다.내 생각에 당신은 영원히 기다릴 수도 있어요.

한 가지 해결책은 연속된 두 작업 사이에 최소 간격을 두고 요청된 순서대로 작업을 수행하는 것입니다.

실장은 다음과 같습니다.

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}

요청하신 내용은 아니지만 N초 내에 M개의 요청이 아닌 M개의 동시 요청을 제한하도록 설계된 것도 유용할 수 있습니다.

간단한 슬롯링 알고리즘을 실장했습니다.이 링크(http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html)를 사용해 보십시오.

알고리즘에 대한 개요

이 알고리즘은 Java Delayed Queue 기능을 사용합니다.예상되는 지연을 가진 지연 개체를 만듭니다(밀리초의 TimeUnit의 경우 여기서 1000/M).지연된 큐에 같은 오브젝트를 넣어주세요.인턴이 우리에게 이동 창을 제공할 것입니다.다음으로 각 메서드콜이 큐에서 오브젝트를 가져오기 전에 take는 지정된 지연 후에만 반환되는 블로킹콜입니다.메서드콜 후에 오브젝트를 갱신된 시간(여기서는 현재 밀리초)으로 큐에 넣는 것을 잊지 마세요.

여기에서는 지연이 다른 여러 개의 지연 개체를 사용할 수도 있습니다.이 접근방식은 높은 throughput도 제공합니다.

다음 구현에서는 임의의 요청 시간 정밀도를 처리할 수 있으며, 각 요청에 대해 O(1) 시간 복잡도를 가지고 있으며, 추가 버퍼(예: O(1) 공간 복잡도)가 필요하지 않습니다.또한 토큰을 해방하기 위해 백그라운드 스레드가 필요하지 않으며, 마지막 요청 이후 경과된 시간에 따라 토큰이 해방됩니다.

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        if (available<1)
            return false;
        else {
            available--;
            lastTimeStamp = now;
            return true;
        }
    }
}

다음과 같은 간단한 방법을 사용해 보십시오.

public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}

Apache Camel은 다음과 같은 슬롯러 메커니즘을 지원합니다.

from("seda:a").throttle(100).asyncDelayed().to("seda:b");

이것은 위의 LeakyBucket 코드에 대한 업데이트입니다.이는 초당 1000개 이상의 요구로 동작합니다.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

그리고 위의 경우 가장 짧습니다.

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}

여기 간단한 요금 제한기의 약간의 고급 버전이 있습니다.

/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

유닛 테스트

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}

솔루션:단순한 util 메서드로 수정하여 래퍼 클래스를 만들 수 있습니다.

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

JAVA 스레드 데바운스스로틀에서 가져오기

언급URL : https://stackoverflow.com/questions/1407113/throttling-method-calls-to-m-requests-in-n-seconds

반응형