/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.amqp.connection.impl;

import io.netty.handler.ssl.OpenSsl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.ConnectTimeoutException;
import org.eclipse.hono.client.amqp.connection.ConnectionFactory;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConnectionFactoryImpl
implements ConnectionFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactoryImpl.class);
    private static final String PROTOCOL_AMQP = "amqp";
    private static final String PROTOCOL_AMQPS = "amqps";
    private final Vertx vertx;
    private final ClientConfigProperties config;
    private ProtonClient protonClient;

    public ConnectionFactoryImpl(Vertx vertx, ClientConfigProperties config) {
        this.vertx = Objects.requireNonNull(vertx);
        this.config = Objects.requireNonNull(config);
    }

    public void setProtonClient(ProtonClient protonClient) {
        this.protonClient = Objects.requireNonNull(protonClient);
    }

    @Override
    public String getHost() {
        return this.config.getHost();
    }

    @Override
    public int getPort() {
        return this.config.getPort();
    }

    @Override
    public String getPathSeparator() {
        return this.config.getPathSeparator();
    }

    @Override
    public String getServerRole() {
        return this.config.getServerRole();
    }

    @Override
    public Future<ProtonConnection> connect(ProtonClientOptions options, Handler<AsyncResult<ProtonConnection>> closeHandler, Handler<ProtonConnection> disconnectHandler) {
        return this.connect(options, null, null, closeHandler, disconnectHandler);
    }

    @Override
    public Future<ProtonConnection> connect(ProtonClientOptions options, String username, String password, Handler<AsyncResult<ProtonConnection>> closeHandler, Handler<ProtonConnection> disconnectHandler) {
        return this.connect(options, username, password, null, closeHandler, disconnectHandler);
    }

    @Override
    public Future<ProtonConnection> connect(ProtonClientOptions options, String username, String password, String containerId, Handler<AsyncResult<ProtonConnection>> closeHandler, Handler<ProtonConnection> disconnectHandler) {
        ProtonClientOptions clientOptions = Optional.ofNullable(options).orElseGet(this::createClientOptions);
        String effectiveUsername = Optional.ofNullable(username).orElseGet(this.config::getUsername);
        String effectivePassword = Optional.ofNullable(password).orElseGet(this.config::getPassword);
        this.addOptions(clientOptions, effectiveUsername, effectivePassword);
        String effectiveContainerId = Optional.ofNullable(containerId).orElseGet(this::getContainerIdDefault);
        ProtonClient client = Optional.ofNullable(this.protonClient).orElseGet(() -> ProtonClient.create(this.vertx));
        Promise connectionAttempt = Promise.promise();
        LOGGER.debug("connecting to AMQP 1.0 container [{}://{}:{}, role: {}]", clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole());
        AtomicBoolean connectionTimeoutReached = new AtomicBoolean(false);
        Long connectionTimeoutTimerId = this.config.getConnectTimeout() > 0 ? Long.valueOf(this.vertx.setTimer(this.config.getConnectTimeout(), id -> {
            if (connectionTimeoutReached.compareAndSet(false, true)) {
                this.failConnectionAttempt(clientOptions, connectionAttempt, new ConnectTimeoutException("connection attempt timed out after " + this.config.getConnectTimeout() + "ms"));
            }
        })) : null;
        client.connect(clientOptions, this.config.getHost(), this.config.getPort(), effectiveUsername, effectivePassword, (AsyncResult<ProtonConnection> conAttempt) -> this.handleConnectionAttemptResult((AsyncResult<ProtonConnection>)conAttempt, effectiveContainerId, connectionTimeoutTimerId, connectionTimeoutReached, clientOptions, closeHandler, disconnectHandler, connectionAttempt));
        return connectionAttempt.future();
    }

    private String getContainerIdDefault() {
        return ConnectionFactory.createContainerId(this.config.getName(), this.config.getServerRole(), UUID.randomUUID());
    }

    private void failConnectionAttempt(ProtonClientOptions clientOptions, Handler<AsyncResult<ProtonConnection>> connectionResultHandler, Throwable cause) {
        LOGGER.debug("can't connect to AMQP 1.0 container [{}://{}:{}, role: {}]: {}", clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), cause.getMessage());
        connectionResultHandler.handle(Future.failedFuture(cause));
    }

    private void handleConnectionAttemptResult(AsyncResult<ProtonConnection> conAttempt, String containerId, Long connectionTimeoutTimerId, AtomicBoolean connectionTimeoutReached, ProtonClientOptions clientOptions, Handler<AsyncResult<ProtonConnection>> closeHandler, Handler<ProtonConnection> disconnectHandler, Handler<AsyncResult<ProtonConnection>> connectionResultHandler) {
        if (connectionTimeoutReached.get()) {
            this.handleTimedOutConnectionAttemptResult(conAttempt, clientOptions);
            return;
        }
        if (conAttempt.failed()) {
            Optional.ofNullable(connectionTimeoutTimerId).ifPresent(this.vertx::cancelTimer);
            this.failConnectionAttempt(clientOptions, connectionResultHandler, conAttempt.cause());
        } else {
            LOGGER.debug("connected to AMQP 1.0 container [{}://{}:{}, role: {}], opening connection ...", clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole());
            ProtonConnection downstreamConnection = conAttempt.result();
            downstreamConnection.setContainer(containerId).setHostname(this.config.getAmqpHostname()).openHandler(openCon -> {
                Optional.ofNullable(connectionTimeoutTimerId).ifPresent(this.vertx::cancelTimer);
                downstreamConnection.disconnectHandler(null);
                if (connectionTimeoutReached.get()) {
                    this.logTimedOutOpenHandlerResult((AsyncResult<ProtonConnection>)openCon, downstreamConnection, clientOptions);
                    this.closeAndDisconnect(downstreamConnection);
                } else if (openCon.succeeded()) {
                    LOGGER.debug("connection to container [{}] at [{}://{}:{}, role: {}] open", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole());
                    downstreamConnection.disconnectHandler(disconnectHandler);
                    downstreamConnection.closeHandler(closeHandler);
                    connectionResultHandler.handle(Future.succeededFuture(downstreamConnection));
                } else {
                    this.logFailedOpenHandlerResult((AsyncResult<ProtonConnection>)openCon, downstreamConnection, clientOptions);
                    this.closeAndDisconnect(downstreamConnection);
                    connectionResultHandler.handle(Future.failedFuture(openCon.cause()));
                }
            }).disconnectHandler(disconnectedCon -> {
                Optional.ofNullable(connectionTimeoutTimerId).ifPresent(this.vertx::cancelTimer);
                if (connectionTimeoutReached.get()) {
                    LOGGER.warn("ignoring error - connection attempt already timed out: can't open connection to container [{}] at [{}://{}:{}, role: {}]: {}", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), "underlying connection was disconnected while opening AMQP connection");
                } else {
                    LOGGER.warn("can't open connection to container [{}] at [{}://{}:{}, role: {}]: {}", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), "underlying connection was disconnected while opening AMQP connection");
                    connectionResultHandler.handle(Future.failedFuture("underlying connection was disconnected while opening AMQP connection"));
                }
            }).open();
        }
    }

    private Future<Void> closeAndDisconnect(ProtonConnection downstreamConnection) {
        return HonoProtonHelper.closeConnection(downstreamConnection, this.config.getCloseConnectionTimeout(), this.vertx.getOrCreateContext());
    }

    private void handleTimedOutConnectionAttemptResult(AsyncResult<ProtonConnection> conAttempt, ProtonClientOptions clientOptions) {
        if (conAttempt.succeeded()) {
            LOGGER.debug("ignoring successful connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out", clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole());
            this.closeAndDisconnect(conAttempt.result());
        } else {
            LOGGER.debug("ignoring failed connection attempt to AMQP 1.0 container [{}://{}:{}, role: {}]: attempt already timed out", clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), conAttempt.cause());
        }
    }

    private void logTimedOutOpenHandlerResult(AsyncResult<ProtonConnection> openConnectionResult, ProtonConnection downstreamConnection, ProtonClientOptions clientOptions) {
        if (openConnectionResult.succeeded()) {
            LOGGER.debug("ignoring received open frame from container [{}] at [{}://{}:{}, role: {}]: connection attempt already timed out", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole());
        } else {
            ErrorCondition error = downstreamConnection.getRemoteCondition();
            if (error == null) {
                LOGGER.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), openConnectionResult.cause());
            } else {
                LOGGER.warn("ignoring failure to open connection to container [{}] at [{}://{}:{}, role: {}]: attempt already timed out; error: {} -{}", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), error.getCondition(), error.getDescription());
            }
        }
    }

    private void logFailedOpenHandlerResult(AsyncResult<ProtonConnection> openCon, ProtonConnection downstreamConnection, ProtonClientOptions clientOptions) {
        ErrorCondition error = downstreamConnection.getRemoteCondition();
        if (error == null) {
            LOGGER.warn("can't open connection to container [{}] at [{}://{}:{}, role: {}]", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), openCon.cause());
        } else {
            LOGGER.warn("can't open connection to container [{}] at [{}://{}:{}, role: {}]: {} -{}", downstreamConnection.getRemoteContainer(), clientOptions.isSsl() ? PROTOCOL_AMQPS : PROTOCOL_AMQP, this.config.getHost(), this.config.getPort(), this.config.getServerRole(), error.getCondition(), error.getDescription());
        }
    }

    private void addOptions(ProtonClientOptions clientOptions, String username, String password) {
        this.addTlsTrustOptions(clientOptions);
        if (!Strings.isNullOrEmpty(username) && !Strings.isNullOrEmpty(password)) {
            clientOptions.addEnabledSaslMechanism("PLAIN");
        } else {
            this.addTlsKeyCertOptions(clientOptions);
        }
    }

    private void addTlsTrustOptions(ProtonClientOptions clientOptions) {
        TrustOptions trustOptions;
        if (this.config.isTlsEnabled()) {
            clientOptions.setSsl(true);
        }
        if (clientOptions.getTrustOptions() == null && (trustOptions = this.config.getTrustOptions()) != null) {
            clientOptions.setSsl(true).setTrustOptions(trustOptions);
        }
        if (clientOptions.isSsl()) {
            boolean isOpenSslAvailable = OpenSsl.isAvailable();
            boolean supportsKeyManagerFactory = OpenSsl.supportsKeyManagerFactory();
            boolean useOpenSsl = isOpenSslAvailable && supportsKeyManagerFactory;
            LOGGER.debug("OpenSSL [available: {}, supports KeyManagerFactory: {}]", (Object)isOpenSslAvailable, (Object)supportsKeyManagerFactory);
            if (useOpenSsl) {
                LOGGER.debug("using OpenSSL [version: {}] instead of JVM's default SSL engine", (Object)OpenSsl.versionString());
                clientOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions());
            } else {
                LOGGER.debug("using JVM's default SSL engine");
                clientOptions.setJdkSslEngineOptions(new JdkSSLEngineOptions());
            }
            if (this.config.isHostnameVerificationRequired()) {
                clientOptions.setHostnameVerificationAlgorithm("HTTPS");
            } else {
                clientOptions.setHostnameVerificationAlgorithm("");
            }
            LinkedHashSet protocols = new LinkedHashSet(this.config.getSecureProtocols().size());
            this.config.getSecureProtocols().forEach(p -> {
                LOGGER.debug("enabling secure protocol [{}]", p);
                protocols.add(p);
            });
            clientOptions.setEnabledSecureTransportProtocols((Set)protocols);
            this.config.getSupportedCipherSuites().forEach(suiteName -> {
                LOGGER.debug("adding supported cipher suite [{}]", suiteName);
                clientOptions.addEnabledCipherSuite((String)suiteName);
            });
        }
    }

    private void addTlsKeyCertOptions(ProtonClientOptions clientOptions) {
        KeyCertOptions keyCertOptions;
        if (clientOptions.getKeyCertOptions() == null && (keyCertOptions = this.config.getKeyCertOptions()) != null) {
            clientOptions.setSsl(true).setKeyCertOptions(keyCertOptions);
            clientOptions.addEnabledSaslMechanism("EXTERNAL");
        }
    }

    private ProtonClientOptions createClientOptions() {
        ProtonClientOptions options = new ProtonClientOptions();
        options.setConnectTimeout(this.config.getConnectTimeout());
        options.setHeartbeat(this.config.getHeartbeatInterval());
        options.setMaxFrameSize(this.config.getMaxFrameSize());
        options.setReconnectAttempts(0);
        return options;
    }
}

