/*
 * Decompiled with CFR 0.152.
 */
package discord4j.rest.request;

import discord4j.common.LogUtil;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscardedRequestException;
import discord4j.rest.request.DiscordWebRequest;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RateLimitRetryOperator;
import discord4j.rest.request.RateLimitStrategy;
import discord4j.rest.request.RequestCorrelation;
import discord4j.rest.request.RequestQueue;
import discord4j.rest.request.RouterOptions;
import discord4j.rest.response.ResponseFunction;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;

class RequestStream {
    private static final Logger log = Loggers.getLogger(RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RequestSubscriber requestSubscriber;
    private final RateLimitRetryOperator rateLimitRetryOperator;
    private final AtomicLong requestsInFlight = new AtomicLong(0L);
    private final Sinks.Empty<?> stopCallback = Sinks.empty();

    RequestStream(BucketKey id, RouterOptions routerOptions, DiscordWebClient httpClient, RateLimitStrategy rateLimitStrategy) {
        this.id = id;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = httpClient;
        this.requestSubscriber = new RequestSubscriber(rateLimitStrategy, this.requestsInFlight::decrementAndGet);
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    private Retry serverErrorRetryFactory() {
        return Retry.withThrowable((Function)reactor.retry.Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504, 520)).withBackoffScheduler(this.timedTaskScheduler).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(ctx -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {} for {}", new Object[]{ctx.iteration(), this.id.toString(), ctx.exception().toString(), ctx.backoff()});
            }
        }));
    }

    boolean push(RequestCorrelation<ClientResponse> request) {
        this.requestsInFlight.incrementAndGet();
        boolean accepted = this.requestQueue.push(request);
        if (!accepted) {
            this.requestsInFlight.decrementAndGet();
        }
        return accepted;
    }

    void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).takeUntilOther((Publisher)this.stopCallback.asMono()).subscribe((CoreSubscriber)this.requestSubscriber);
    }

    void stop() {
        this.stopCallback.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
    }

    Instant getResetAt() {
        return this.requestSubscriber.getResetAt();
    }

    long countRequestsInFlight() {
        return this.requestsInFlight.get();
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        this.requestsInFlight.decrementAndGet();
        requestCorrelation.getResponse().emitError((Throwable)new DiscardedRequestException(requestCorrelation.getRequest()), Sinks.EmitFailureHandler.FAIL_FAST);
    }

    private class RequestSubscriber
    extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Instant resetAt = Instant.EPOCH;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction;
        private final Runnable processedCallback;

        public Instant getResetAt() {
            return this.resetAt;
        }

        public RequestSubscriber(RateLimitStrategy strategy, Runnable processedCallback) {
            this.processedCallback = processedCallback;
            this.responseFunction = response -> {
                Duration resetAfter;
                HttpClientResponse httpResponse = response.getHttpResponse();
                if (log.isDebugEnabled()) {
                    Instant requestTimestamp = Instant.ofEpochMilli((Long)httpResponse.currentContextView().get((Object)"discord4j.request.timestamp"));
                    Duration responseTime = Duration.between(requestTimestamp, Instant.now());
                    LogUtil.traceDebug((Logger)log, trace -> LogUtil.format((ContextView)httpResponse.currentContextView(), (String)("Read " + httpResponse.status() + " in " + responseTime + (trace == false ? "" : " with headers: " + httpResponse.responseHeaders()))));
                }
                if (!(resetAfter = strategy.apply(httpResponse)).isZero()) {
                    if (log.isDebugEnabled()) {
                        log.debug(LogUtil.format((ContextView)httpResponse.currentContextView(), (String)"Delaying next request by {}"), new Object[]{resetAfter});
                    }
                    this.resetAt = Instant.now().plus(resetAfter);
                }
                boolean global = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
                Mono action = Mono.empty();
                if (global) {
                    long retryAfter = Long.parseLong(httpResponse.responseHeaders().get("Retry-After"));
                    Duration fixedBackoff = Duration.ofSeconds(retryAfter);
                    action = RequestStream.this.globalRateLimiter.rateLimitFor(fixedBackoff).doOnTerminate(() -> log.debug(LogUtil.format((ContextView)httpResponse.currentContextView(), (String)"Globally rate limited for {}"), new Object[]{fixedBackoff}));
                }
                if (httpResponse.status().code() >= 400) {
                    return action.then(response.createException().flatMap(Mono::error));
                }
                return action.thenReturn(response);
            };
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.request(1L);
        }

        protected void hookOnNext(RequestCorrelation<ClientResponse> correlation) {
            DiscordWebRequest request = correlation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            Sinks.One<ClientResponse> callback = correlation.getResponse();
            Mono.just((Object)clientRequest).flatMap(req -> Mono.deferContextual(ctx -> {
                LogUtil.traceDebug((Logger)log, trace -> LogUtil.format((ContextView)ctx, (String)(trace != false ? req.toString() : req.getDescription())));
                return RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange((ClientRequest)req).flatMap(this.responseFunction)).next();
            })).contextWrite(ctx -> ctx.putAll(correlation.getContext()).put((Object)"discord4j.request", (Object)clientRequest.getId()).put((Object)"discord4j.bucket", (Object)RequestStream.this.id.toString())).retryWhen(Retry.withThrowable(RequestStream.this.rateLimitRetryOperator::apply)).transform(this.getResponseTransformers(request)).retryWhen(RequestStream.this.serverErrorRetryFactory()).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribe(response -> callback.emitValue(response, Sinks.EmitFailureHandler.FAIL_FAST), t -> {
                log.trace("Error while processing {}: {}", new Object[]{request, t});
                callback.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
            }, () -> callback.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST));
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordRequest) {
            return RequestStream.this.responseFunctions.stream().map(rt -> rt.transform(discordRequest).andThen(mono -> mono.checkpoint("Apply " + rt + " to " + discordRequest.getDescription() + " [RequestStream]"))).reduce(Function::andThen).orElse(mono -> mono);
        }

        private void next(SignalType signal) {
            Duration wait = Duration.between(Instant.now(), this.resetAt);
            Mono timer = wait.isNegative() || wait.isZero() ? Mono.just((Object)0L) : Mono.delay((Duration)wait, (Scheduler)RequestStream.this.timedTaskScheduler);
            timer.doFinally(__ -> this.processedCallback.run()).subscribe(l -> {
                if (log.isDebugEnabled()) {
                    log.debug("[B:{}] Ready to consume next request after {}", new Object[]{RequestStream.this.id.toString(), signal});
                }
                this.request(1L);
            }, t -> log.error("[B:{}] Error while scheduling next request", new Object[]{RequestStream.this.id.toString(), t}));
        }

        protected void hookOnComplete() {
            log.debug("[B:{}] RequestStream completed", new Object[]{RequestStream.this.id.toString()});
        }
    }
}

