Spring AMQP AcknowledgeMode.AUTO works slow

java spring amqp spring-amqp

521 观看

1回复

304 作者的声誉

I have a producer which sends to RabbitMQ 20 messages per second and also I have a consumer, which should receive message with the same speed as they are produced.

There are some conditions I have to implement:

  1. Produce and consume 20 messages per second.
  2. Save the producing order.
  3. The messages shouldn't be lost (that's why I use AcknowledgeMode.AUTO).

When I use Spring AMQP realization (org.springframework.amqp.rabbit), my consumer processes maximum 6 messages per second. But if I use the native AMQP library (com.rabbitmq.client) it does all 20 messages per second with both ack - auto and manual.

The question is:

Why Spring implementation in Consumer case works so slow and how can I fix this?

If I set prefetchCount(20) it works as needed, but I can't use prefetch because it can break the order in a reject case.

Spring amqp:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqServer);
    connectionFactory.setUsername(rabbitMqUsername);
    connectionFactory.setPassword(rabbitMqPassword);
    return connectionFactory;
}

...

private SimpleMessageListenerContainer createContainer(Queue queue, Receiver receiver, AcknowledgeMode acknowledgeMode) {
    SimpleMessageListenerContainer persistentListenerContainer = new SimpleMessageListenerContainer();
    persistentListenerContainer.setConnectionFactory(connectionFactory());
    persistentListenerContainer.setQueues(queue);
    persistentListenerContainer.setMessageListener(receiver);
    persistentListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return persistentListenerContainer;
}

...

@Override
public void onMessage(Message message) {saveToDb}
作者: AlexSmith 的来源 发布者: 2017 年 12 月 27 日

回应 1


1

95410 作者的声誉

决定

Spring AMQP (before 2.0) defaults prefetch to 1 which, as you say, guarantees order even after a rejection.

The native client applies no basicQos() by default which effectively means it has infinite prefetch.

So you are not comparing apples and apples.

Try channel.basicQos(1) with the native client and you should see similar results to the default spring amqp setup.

EDIT

When comparing apples with apples, I get similar results with/without the framework...

@SpringBootApplication
public class So47995535Application {

    public static void main(String[] args) {
        SpringApplication.run(So47995535Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(100);

    private int nativeCount;

    private int rlCount;

    @Bean
    public ApplicationRunner runner(ConnectionFactory factory, RabbitTemplate template,
            SimpleMessageListenerContainer container) {
        return args -> {
            for (int i = 0; i < 100; i++) {
                template.convertAndSend("foo", "foo" + i);
            }
            container.start();
            Connection conn = factory.createConnection();
            Channel channel = conn.createChannel(false);
            channel.basicQos(1);
            channel.basicConsume("foo", new DefaultConsumer(channel) {

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    System.out.println("native " + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    nativeCount++;
                    latch.countDown();
                }

            });
            latch.await(60, TimeUnit.SECONDS);
            System.out.println("Native: " + this.nativeCount + " LC: " + this.rlCount);
            channel.close();
            conn.close();
            container.stop();
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("foo");
        container.setPrefetchCount(1);
        container.setAutoStartup(false);
        container.setMessageListener((MessageListener) m -> {
            System.out.println("LC " + new String(m.getBody()));
            this.rlCount++;
            this.latch.countDown();
        });
        return container;
    }

}

and

Native: 50 LC: 50
作者: Gary Russell 发布者: 2017 年 12 月 27 日
32x32