/*
 * Decompiled with CFR 0.152.
 */
package org.eclnt.ccee.mq.implactivemq;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.util.Hashtable;
import java.util.Map;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclnt.ccee.ICCEEConstants;
import org.eclnt.ccee.config.Config;
import org.eclnt.ccee.log.AppLog;
import org.eclnt.ccee.mq.IMessageQueue;
import org.eclnt.ccee.mq.IMessageQueueListener;
import org.eclnt.util.valuemgmt.ValueManager;

public class AMQMessageQueue
implements IMessageQueue {
    String m_amqBrokerURL;
    String m_amqUserName;
    String m_amqPassword;
    HealCheckerThread m_healthCheckerThread = new HealCheckerThread();
    Map<IMessageQueueListener, AMQMessageListener> m_messageListeners = new Hashtable<IMessageQueueListener, AMQMessageListener>();

    public AMQMessageQueue() {
        this.initializeFromConfig();
    }

    @Override
    public void publishMessage(String topic, String message) {
        if (ValueManager.isEmpty((String)topic)) {
            throw new Error("Topic not set.");
        }
        if (ValueManager.isEmpty((String)message)) {
            throw new Error("Message not set.");
        }
        AppLog.L.log(ICCEEConstants.LL_INF, "MQ: publishing message: " + topic + ", " + message.length());
        Connection connection = null;
        Session session = null;
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.m_amqBrokerURL);
            connection = connectionFactory.createConnection(this.m_amqUserName, this.m_amqPassword);
            connection.start();
            session = connection.createSession(false, 1);
            Topic destination = session.createTopic(topic);
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setDeliveryMode(1);
            TextMessage amqMessage = session.createTextMessage(message);
            producer.send((Message)amqMessage);
            session.close();
            connection.close();
            AppLog.L.log(ICCEEConstants.LL_INF, "MQ: publishing message: finished");
        }
        catch (Throwable t) {
            AppLog.L.log(ICCEEConstants.LL_INF, "MQ: problem occurred when publishing message: " + topic, t);
            throw new Error("Problem sending message: " + topic, t);
        }
        finally {
            try {
                session.close();
            }
            catch (Throwable connectionFactory) {}
            try {
                connection.close();
            }
            catch (Throwable connectionFactory) {}
        }
    }

    @Override
    public void addMessageListener(String topic, IMessageQueueListener listener) {
        if (ValueManager.isEmpty((String)topic)) {
            throw new Error("Topic not set.");
        }
        if (listener == null) {
            throw new Error("Listener not set.");
        }
        AppLog.L.log(ICCEEConstants.LL_INF, "MQ: adding listener: " + topic + ", " + listener.getClass().getName());
        Connection connection = null;
        Session session = null;
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.m_amqBrokerURL);
            connection = connectionFactory.createConnection(this.m_amqUserName, this.m_amqPassword);
            connection.start();
            session = connection.createSession(false, 1);
            Topic destination = session.createTopic(topic);
            MessageConsumer consumer = session.createConsumer((Destination)destination);
            AMQMessageListener aml = new AMQMessageListener(topic, listener, connection, session);
            consumer.setMessageListener((MessageListener)aml);
            this.m_messageListeners.put(listener, aml);
        }
        catch (Throwable t) {
            AppLog.L.log(ICCEEConstants.LL_INF, "MQ: problem occurred when publishing message: " + topic, t);
            try {
                session.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                connection.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw new Error("Problem adding message listener: " + topic, t);
        }
    }

    @Override
    public void removeMessageListener(String topic, IMessageQueueListener listener) {
        AMQMessageListener aml = this.m_messageListeners.get(listener);
        if (aml != null) {
            AppLog.L.log(ICCEEConstants.LL_INF, "MQ: removing listener: " + topic + ", " + listener.getClass().getName());
            aml.close();
        }
    }

    protected void initializeFromConfig() {
        if (Config.getConfigValue("activemq_brokerurl") != null) {
            this.m_amqBrokerURL = Config.getConfigValue("activemq_brokerurl");
        }
        if (Config.getConfigValue("activemq_username") != null) {
            this.m_amqUserName = Config.getConfigValue("activemq_username");
        }
        if (Config.getConfigValue("activemq_password") != null) {
            this.m_amqPassword = Config.getConfigValue("activemq_password");
        }
    }

    public class HealCheckerThread
    extends Thread {
    }

    public static class AMQMessageListener
    implements MessageListener {
        String i_topic;
        IMessageQueueListener i_ccListener;
        Connection i_connection;
        Session i_session;

        public AMQMessageListener(String topic, IMessageQueueListener ccListener, Connection connection, Session session) {
            this.i_topic = topic;
            this.i_ccListener = ccListener;
            this.i_connection = connection;
            this.i_session = session;
        }

        public void onMessage(Message message) {
            try {
                if (!(message instanceof TextMessage)) {
                    throw new Error("Not supported MessageType: " + message.getClass().getName());
                }
                TextMessage tm = (TextMessage)message;
                AppLog.L.log(AppLog.LL_INF, "Processing message: " + this.i_topic + ", " + tm.getText().length());
                this.i_ccListener.processMessage(this.i_topic, tm.getText());
                AppLog.L.log(AppLog.LL_INF, "Processing message: finished");
            }
            catch (Throwable t) {
                AppLog.L.log(AppLog.LL_INF, "Problem when receiving message: " + message, t);
            }
        }

        public void close() {
            try {
                this.i_session.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                this.i_connection.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.i_ccListener.reactOnClosed();
        }
    }
}

