package org.dataone.cn.messaging;

import java.util.Date;
import java.util.HashMap;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* loaded from: input_file:org/dataone/cn/messaging/QueueAccessIT.class */
public class QueueAccessIT {
    private static Logger logger = Logger.getLogger(QueueAccessIT.class.getName());
    static String testQueueName = "brokerTestQueue";
    CachingConnectionFactory connFactory = new CachingConnectionFactory("localhost");
    RabbitTemplate rabbitTemplate;

    @BeforeClass
    public static void beforeClassSetup() throws Exception {
        try {
            logger.info("****** declaring a durable queue for use in tests");
            new RabbitAdmin(new CachingConnectionFactory("localhost")).declareQueue(new Queue(testQueueName, true));
        } catch (Throwable th) {
            throw new Exception("Check that broker is started - Failed to (re)declare durable queue during test setup, due to: " + th.getClass().getCanonicalName() + ": " + th.getMessage(), th);
        }
    }

    @Before
    public void setUp() throws Exception {
        Message receive;
        this.connFactory.setUsername("guest");
        this.connFactory.setPassword("guest");
        this.connFactory.setPublisherConfirms(true);
        this.rabbitTemplate = new RabbitTemplate(this.connFactory);
        do {
            receive = this.rabbitTemplate.receive(testQueueName, 10L);
            logger.info("cleaned one item off the queue...");
        } while (receive != null);
        logger.info("******* end of test setup...");
    }

    @Test
    @Ignore("It doesn't look like an exceptionp throwing situation")
    public void testConfirmedPublishToNonExistingQueue_ShouldThrowAMQPException() {
        try {
            this.rabbitTemplate.convertAndSend("aNonExistingQueue", "foo");
            Assert.fail("publishing to a non-existing queue should throw exception");
        } catch (AmqpException e) {
            logger.info(e.getClass().getCanonicalName() + ": " + e.getMessage());
        }
    }

    @Test
    public void testConsumeFromNonExistingQueue_ShouldThrowAMQPException() {
        try {
            this.rabbitTemplate.receiveAndConvert("aNonExistingQueue", 500L);
            Assert.fail("Should not successfully retrieve from non existing queue");
        } catch (AmqpException e) {
            logger.info(e.getClass().getCanonicalName() + ": " + e.getMessage());
        }
    }

    void stopBroker() {
        logger.warn("*#*#*#*#*#*#*#*#*#*    PLEASE STOP BROKER while test sleeps for 15 seconds     *#*#*#*#*#*#*#*#*#*");
        try {
            Thread.sleep(15000L);
        } catch (InterruptedException e) {
            logger.warn("OK, moving on...");
        }
    }

    void startBroker() {
        logger.warn("*#*#*#*#*#*#*#*#*#*    PLEASE RESTART BROKER while test sleeps for 15 seconds    *#*#*#*#*#*#*#*#*#*");
        try {
            Thread.sleep(15000L);
        } catch (InterruptedException e) {
            logger.warn("OK, moving on...");
        }
    }

    @Test
    @Ignore("Can't automate broker stop yet")
    public void testMessageDurability_messageShouldSurviveBrokerRestart() {
        this.rabbitTemplate.convertAndSend(testQueueName, "testBrokerStop1");
        logger.info("   =======> received firstMessage, so broker is up and running: " + ((String) this.rabbitTemplate.receiveAndConvert(testQueueName, 250L)));
        this.rabbitTemplate.convertAndSend(testQueueName, "testBrokerStop2");
        stopBroker();
        try {
            Assert.fail("Should not be able to get a message, so the stop probably didn't happen. " + ((String) this.rabbitTemplate.receiveAndConvert(testQueueName, 250L)));
        } catch (AmqpException e) {
            logger.info("Broker down, as planned... :" + e.getClass().getCanonicalName());
        }
        startBroker();
        Assert.assertEquals("Message should have survived broker restart", "testBrokerStop2", (String) this.rabbitTemplate.receiveAndConvert(testQueueName, 2000L));
    }

    @Test
    @Ignore("Can't automate broker stop yet")
    public void testQueueDurability_queueDeclaredAsDurableShouldSurviveBrokerRestart() throws InterruptedException {
        new RabbitAdmin(this.connFactory).declareQueue(new Queue("aNonDurableQueue", true));
        stopBroker();
        startBroker();
        this.rabbitTemplate.convertAndSend("aNonDurableQueue", "a test message that could have been anything");
    }

    @Test
    public void testDelayedConsumeAcknowledgement() throws InterruptedException {
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage1");
        logger.warn("*#*#*#*#*#*#*#*#*#*   Please check broker for message count. should be 1");
        Thread.sleep(4000L);
        QueueAccess queueAccess = new QueueAccess(this.connFactory, testQueueName);
        queueAccess.registerAsynchronousMessageListener(1, new MessageListener() { // from class: org.dataone.cn.messaging.QueueAccessIT.1
            public void onMessage(Message message) {
                System.out.println(String.format("Thread: %s  -  date:  %s  :::::: %s", Thread.currentThread().getName(), new Date(), new String(message.getBody())));
                try {
                    System.out.println(String.format("Thread: %s  -  date:  %s  :::::: %s", Thread.currentThread().getName(), new Date(), "CHECK BROKER FOR MESSAGE COUNT, SHOULD STILL BE 1"));
                    Thread.sleep(4000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException("a runtime execption");
                }
            }
        });
        logger.info("slept 8sec,...Going to sleep another 10 to match the onMessage execution pause...");
        Thread.sleep(10500L);
        logger.warn("*#*#*#*#*#*#*#*#*#*   Please check broker for message count. should now be 0");
        Thread.sleep(3000L);
        queueAccess.clearAsynchronousMessageListeners();
    }

    @Test
    public void testParallelConsumption_TimingShouldBeLikeOneMessage() throws InterruptedException {
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage1");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage2");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage3");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage4");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage5");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage6");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage7");
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage8");
        logger.warn("*#*#*#*#*#*#*#*#*#*   Please check broker for message count. should be 8");
        Thread.sleep(10000L);
        long currentTimeMillis = System.currentTimeMillis();
        final HashMap hashMap = new HashMap();
        QueueAccess queueAccess = new QueueAccess(this.connFactory, testQueueName);
        queueAccess.registerAsynchronousMessageListener(8, new MessageListener() { // from class: org.dataone.cn.messaging.QueueAccessIT.2
            public void onMessage(Message message) {
                System.out.println(String.format("Thread: %s  -  date:  %s  :::::: %s", Thread.currentThread().getName(), new Date(), new String(message.getBody())));
                try {
                    System.out.println(String.format("Thread: %s  -  date:  %s  :::::: %s", Thread.currentThread().getName(), new Date(), "CHECK BROKER FOR MESSAGE COUNT, SHOULD STILL BE 8"));
                    Thread.sleep(4000L);
                    hashMap.put(Thread.currentThread().getName(), Long.valueOf(System.currentTimeMillis()));
                } catch (InterruptedException e) {
                    throw new RuntimeException("a runtime execption");
                }
            }
        });
        logger.info("...Going to sleep while messages are consumed...");
        Thread.sleep(4500L);
        Assert.assertEquals("all threads should have reported back", 8L, hashMap.size());
        Long l = 0L;
        for (String str : hashMap.keySet()) {
            if (((Long) hashMap.get(str)).longValue() > l.longValue()) {
                l = (Long) hashMap.get(str);
            }
        }
        Assert.assertTrue("latest should be greater than start", l.longValue() > currentTimeMillis);
        logger.info("Total time:  " + String.valueOf(l.longValue() - currentTimeMillis));
        Assert.assertTrue("Total time to process should be less than or equal to size of one task sleep (plus small amount)", l.longValue() - currentTimeMillis < 4500);
        queueAccess.clearAsynchronousMessageListeners();
    }

    @Test
    public void exampleSynchronousConsumer_Pipeline() throws InterruptedException {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(this.connFactory);
        Queue queue = new Queue("pipe1", false);
        rabbitAdmin.declareQueue(queue);
        Queue queue2 = new Queue("pipe2", false);
        rabbitAdmin.declareQueue(queue2);
        Queue queue3 = new Queue("pipe3", false);
        rabbitAdmin.declareQueue(queue3);
        Queue queue4 = new Queue("pipe4", false);
        rabbitAdmin.declareQueue(queue4);
        this.rabbitTemplate.convertAndSend(testQueueName, "demoMessage");
        processStep(testQueueName, queue.getName());
        processStep(queue.getName(), queue2.getName());
        processStep(queue2.getName(), queue3.getName());
        processStep(queue3.getName(), queue4.getName());
    }

    private void processStep(String str, String str2) throws InterruptedException {
        logger.warn("*#*#*#*#*#*#*#*#*#*   PAUSING:  please list_queues");
        Thread.sleep(8000L);
        Message nextDelivery = new QueueAccess(this.connFactory, str).getNextDelivery(2000L);
        String str3 = "null";
        if (nextDelivery != null && nextDelivery.getBody() != null) {
            str3 = new String(nextDelivery.getBody());
        }
        logger.info(String.format("Got message from queue %s: %s", str, str3));
        new QueueAccess(this.connFactory, str2).publish(nextDelivery);
    }

    @Test
    @Ignore
    public void simpleroundTripTest() throws Exception {
        logger.info("********** running the test");
        Thread.sleep(2000L);
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPublisherConfirms(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.convertAndSend(testQueueName, "hello world message!");
        Thread.sleep(6000L);
        logger.info("************ message sent / trying to retrieve...");
        String str = (String) rabbitTemplate.receiveAndConvert(testQueueName, 10000L);
        logger.info("************* message received");
        Assert.assertEquals("Message in should match message out.", "hello world message!", str);
        logger.info("************* done.");
    }

    @Test
    public void testPlaceHolderUntilCanGetMockBroker() {
    }
}
