package com.rabbitmq.examples.perf;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/* loaded from: classes2.dex */
public class Consumer extends ProducerConsumerBase implements Runnable {
    private final boolean autoAck;
    private final Channel channel;
    private final String id;
    private final CountDownLatch latch = new CountDownLatch(1);
    private final int msgLimit;
    private final int multiAckEvery;
    private ConsumerImpl q;
    private final String queueName;
    private final Stats stats;
    private final long timeLimit;
    private final int txSize;

    /* loaded from: classes2.dex */
    private class ConsumerImpl extends DefaultConsumer {
        long now;
        int totalMsgCount;

        private ConsumerImpl(Channel channel) {
            super(channel);
            this.totalMsgCount = 0;
            long currentTimeMillis = System.currentTimeMillis();
            this.now = currentTimeMillis;
            Consumer.this.lastStatsTime = currentTimeMillis;
            Consumer.this.msgCount = 0;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleCancel(String str) throws IOException {
            System.out.println("Consumer cancelled by broker. Re-consuming.");
            Consumer.this.channel.basicConsume(Consumer.this.queueName, Consumer.this.autoAck, Consumer.this.q);
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            this.totalMsgCount++;
            Consumer.this.msgCount++;
            if (Consumer.this.msgLimit == 0 || Consumer.this.msgCount <= Consumer.this.msgLimit) {
                DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr));
                dataInputStream.readInt();
                long readLong = dataInputStream.readLong();
                long nanoTime = System.nanoTime();
                if (!Consumer.this.autoAck) {
                    if (Consumer.this.multiAckEvery == 0) {
                        Consumer.this.channel.basicAck(envelope.getDeliveryTag(), false);
                    } else if (this.totalMsgCount % Consumer.this.multiAckEvery == 0) {
                        Consumer.this.channel.basicAck(envelope.getDeliveryTag(), true);
                    }
                }
                if (Consumer.this.txSize != 0 && this.totalMsgCount % Consumer.this.txSize == 0) {
                    Consumer.this.channel.txCommit();
                }
                this.now = System.currentTimeMillis();
                Consumer.this.stats.handleRecv(Consumer.this.id.equals(envelope.getRoutingKey()) ? nanoTime - readLong : 0L);
                Consumer.this.delay(this.now);
            }
            if (Consumer.this.msgLimit == 0 || Consumer.this.msgCount < Consumer.this.msgLimit) {
                return;
            }
            Consumer.this.latch.countDown();
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            Consumer.this.latch.countDown();
        }
    }

    public Consumer(Channel channel, String str, String str2, int i, boolean z, int i2, Stats stats, float f, int i3, int i4) {
        this.channel = channel;
        this.id = str;
        this.queueName = str2;
        this.rateLimit = f;
        this.txSize = i;
        this.autoAck = z;
        this.multiAckEvery = i2;
        this.stats = stats;
        this.msgLimit = i3;
        this.timeLimit = i4 * 1000;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            ConsumerImpl consumerImpl = new ConsumerImpl(this.channel);
            this.q = consumerImpl;
            this.channel.basicConsume(this.queueName, this.autoAck, consumerImpl);
            if (this.timeLimit == 0) {
                this.latch.await();
            } else {
                this.latch.await(this.timeLimit, TimeUnit.MILLISECONDS);
            }
        } catch (ShutdownSignalException e) {
            throw new RuntimeException(e);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        } catch (InterruptedException e3) {
            throw new RuntimeException(e3);
        }
    }
}
