package org.springframework.integration.endpoint;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationPattern;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

/* loaded from: input_file:WEB-INF/lib/spring-integration-core-5.5.20.jar:org/springframework/integration/endpoint/MessageProducerSupport.class */
public abstract class MessageProducerSupport extends AbstractEndpoint implements MessageProducer, TrackableComponent, SmartInitializingSingleton, IntegrationPattern {
    private MessageChannel outputChannel;
    private String outputChannelName;
    private MessageChannel errorChannel;
    private String errorChannelName;
    private volatile Subscription subscription;
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private boolean shouldTrack = false;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducerSupport() {
        setPhase(1073741823);
    }

    @Override // org.springframework.integration.core.MessageProducer
    public void setOutputChannel(MessageChannel messageChannel) {
        this.outputChannel = messageChannel;
    }

    @Override // org.springframework.integration.core.MessageProducer
    public void setOutputChannelName(String str) {
        Assert.hasText(str, "'outputChannelName' must not be null or empty");
        this.outputChannelName = str;
    }

    @Override // org.springframework.integration.core.MessageProducer
    public MessageChannel getOutputChannel() {
        String str = this.outputChannelName;
        if (str != null) {
            this.outputChannel = getChannelResolver().resolveDestination(str);
            this.outputChannelName = null;
        }
        return this.outputChannel;
    }

    public void setErrorChannel(MessageChannel messageChannel) {
        this.errorChannel = messageChannel;
    }

    public void setErrorChannelName(String str) {
        Assert.hasText(str, "'errorChannelName' must not be empty");
        this.errorChannelName = str;
    }

    @Nullable
    public MessageChannel getErrorChannel() {
        String str = this.errorChannelName;
        if (str != null) {
            this.errorChannel = getChannelResolver().resolveDestination(str);
            this.errorChannelName = null;
        }
        return this.errorChannel;
    }

    public void setSendTimeout(long j) {
        this.messagingTemplate.setSendTimeout(j);
    }

    @Override // org.springframework.integration.support.management.TrackableComponent
    public void setShouldTrack(boolean z) {
        this.shouldTrack = z;
    }

    public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    protected MessagingTemplate getMessagingTemplate() {
        return this.messagingTemplate;
    }

    @Override // org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return IntegrationPatternType.inbound_channel_adapter;
    }

    @Override // org.springframework.beans.factory.SmartInitializingSingleton
    public void afterSingletonsInstantiated() {
        Assert.state(this.outputChannel != null || StringUtils.hasText(this.outputChannelName), "'outputChannel' or 'outputChannelName' is required");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        BeanFactory beanFactory = getBeanFactory();
        if (beanFactory != null) {
            this.messagingTemplate.setBeanFactory(beanFactory);
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        Subscription subscription = this.subscription;
        if (subscription != null) {
            this.subscription = null;
            subscription.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendMessage(Message<?> message) {
        if (message == null) {
            throw new MessagingException("cannot send a null message");
        }
        Message<?> trackMessageIfAny = trackMessageIfAny(message);
        try {
            this.messagingTemplate.send((MessagingTemplate) getRequiredOutputChannel(), trackMessageIfAny);
        } catch (RuntimeException e) {
            if (!sendErrorMessageIfNecessary(trackMessageIfAny, e)) {
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
        MessageChannel requiredOutputChannel = getRequiredOutputChannel();
        Flux doOnSubscribe = Flux.from(publisher).map(this::trackMessageIfAny).doOnComplete(this::stop).doOnCancel(this::stop).doOnSubscribe(subscription -> {
            this.subscription = subscription;
        });
        if (requiredOutputChannel instanceof ReactiveStreamsSubscribableChannel) {
            ((ReactiveStreamsSubscribableChannel) requiredOutputChannel).subscribeTo(doOnSubscribe);
        } else {
            doOnSubscribe.doOnNext(message -> {
                try {
                    sendMessage(message);
                } catch (Exception e) {
                    this.logger.error(e, () -> {
                        return "Error sending a message: " + message;
                    });
                }
            }).subscribe();
        }
    }

    protected final boolean sendErrorMessageIfNecessary(@Nullable Message<?> message, Exception exc) {
        MessageChannel errorChannel = getErrorChannel();
        if (errorChannel == null) {
            return false;
        }
        this.messagingTemplate.send((MessagingTemplate) errorChannel, (Message<?>) buildErrorMessage(message, exc));
        return true;
    }

    protected final ErrorMessage buildErrorMessage(@Nullable Message<?> message, Exception exc) {
        return this.errorMessageStrategy.buildErrorMessage(exc, getErrorMessageAttributes(message));
    }

    protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
        return ErrorMessageUtils.getAttributeAccessor(message, null);
    }

    private MessageChannel getRequiredOutputChannel() {
        MessageChannel outputChannel = getOutputChannel();
        Assert.state(outputChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
        return outputChannel;
    }

    private Message<?> trackMessageIfAny(Message<?> message) {
        return this.shouldTrack ? MessageHistory.write(message, this, getMessageBuilderFactory()) : message;
    }
}
