package org.springframework.integration.endpoint;

import java.time.Duration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.4.jar:org/springframework/integration/endpoint/ReactiveMessageSourceProducer.class */
public class ReactiveMessageSourceProducer extends MessageProducerSupport {
    private final Flux<? extends Message<?>> messageFlux;
    private Duration delayWhenEmpty = IntegrationReactiveUtils.DEFAULT_DELAY_WHEN_EMPTY;

    public ReactiveMessageSourceProducer(MessageSource<?> messageSource) {
        Assert.notNull(messageSource, "'messageSource' must not be null");
        this.messageFlux = IntegrationReactiveUtils.messageSourceToFlux(messageSource).contextWrite(context -> {
            return context.put(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, this.delayWhenEmpty);
        });
    }

    public void setDelayWhenEmpty(Duration duration) {
        Assert.notNull(duration, "'delayWhenEmpty' must not be null");
        this.delayWhenEmpty = duration;
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        subscribeToPublisher(this.messageFlux);
    }
}
