Blueprint - Golang SQS Consumer Performance Tunning

Blueprint -  Golang SQS Consumer Performance Tunning

Blueprints são pequenas documentações que eu estruturo em forma de um breve estudo a respeito de algum tema ou ideia específica. Não tem objetivo de ser um artigo estruturado, somente uma breve documentação com exemplos para consultas futuras.

O objetivo deste blueprint é documentar uma evolução incremental de um consumidor SQS em termos de performance.

Cenário

  • Objetivo: Otimizar gradualmente um consumidor SQS em Golang.
  • Documentar o TPS (Transações por segundo) de cada alternativa
  • Documentar os tempos de processamento
  • Restrições: o objetivo dos exemplos é apenas metrificar a dinâmica do SQS, sem processamentos adicionais além de Receber e Deletar da fila.

Produtor de Exemplo

Documentando o exemplo do produtor. Ferramenta usada apenas para gerar massa de testes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"sync/atomic"
	"time"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"github.com/aws/aws-sdk-go/aws"
)

const (
	queueURL = "https://sqs.us-east-1.amazonaws.com/181560427716/nutrition-mock"
	// totalMessages = 100_000
	totalMessages = 10_000
	batchSize     = 10
	workerCount   = 20
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("falha ao carregar AWS config: %v", err)
	}
	client := sqs.NewFromConfig(cfg)

	start := time.Now()

	var sentBatches int64
	var sentMessages int64
	var errBatches int64

	jobs := make(chan []types.SendMessageBatchRequestEntry, workerCount)

	var wg sync.WaitGroup

	for w := 0; w < workerCount; w++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for batch := range jobs {
				input := &sqs.SendMessageBatchInput{
					QueueUrl: aws.String(queueURL),
					Entries:  batch,
				}
				resp, err := client.SendMessageBatch(ctx, input)
				if err != nil || len(resp.Failed) > 0 {
					atomic.AddInt64(&errBatches, 1)
					log.Printf("[worker %d] erro em batch: %v – falhas: %d", id, err, len(resp.Failed))
				} else {
					atomic.AddInt64(&sentBatches, 1)
					atomic.AddInt64(&sentMessages, int64(len(batch)))
					if sentBatches%100 == 0 {
						log.Printf("✔ %d batches enviados (%d mensagens)", sentBatches, sentMessages)
					}
				}
			}
		}(w)
	}

	go func() {
		defer close(jobs)
		var batch []types.SendMessageBatchRequestEntry
		for i := 1; i <= totalMessages; i++ {
			msg := types.SendMessageBatchRequestEntry{
				Id:          aws.String(fmt.Sprintf("msg-%d", i)),
				MessageBody: aws.String(fmt.Sprintf(`{"index":%d}`, i)),
			}
			batch = append(batch, msg)
			if len(batch) == batchSize {
				jobs <- batch
				batch = nil
			}
		}
		if len(batch) > 0 {
			jobs <- batch
		}
	}()

	wg.Wait()

	elapsed := time.Since(start)
	log.Println("================================================")
	log.Printf("Mensagens enviadas: %d", sentMessages)
	log.Printf("Batches enviados:   %d", sentBatches)
	log.Printf("Batches falhados:   %d", errBatches)
	log.Printf("Tempo total:        %s", elapsed)
	log.Println("================================================")

	os.Exit(0)

}

Output

1
2
3
4
5
6
2025/05/16 16:18:39 ================================================
2025/05/16 16:18:39 Mensagens enviadas: 100000
2025/05/16 16:18:39 Batches enviados:   10000
2025/05/16 16:18:39 Batches falhados:   0
2025/05/16 16:18:39 Tempo total:        1m24.708619542s
2025/05/16 16:18:39 ================================================


SQS Consumer Simples

O objetivo é experimentar a performance da implementação de um consumidor SQS padrão, o exemplo inicial da documentação. A dinâmica estruturada apenas em receber e deletar a mensagem de forma unitária. Esse exemplo tem o objetivo apenas de fornecer um comparativo para as implementações subsequentes.

Na versão mais básica do consumidor, optou-se pelo fluxo unitário: em um único loop, a aplicação faz chamadas de long-polling ao SQS solicitando apenas uma mensagem por vez (MaxNumberOfMessages=1). Quando uma mensagem é recebida, ela é imediatamente apagada (DeleteMessage) e um contador incrementa o número de mensagens processadas para controle, sem necessidade de atomicidade para manipular o contador em memória. Esse exemplo serve como baseline para comparação, expondo o custo elevado de múltiplas chamadas HTTP unitárias ao SQS.

SQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
	"context"
	"log"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
)

const (
	queueURL       = "https://sqs.us-east-1.amazonaws.com/181560427716/nutrition-mock"
	totalMessages  = 1_000
	progressPeriod = 100 // log a cada 100 mensagens
)

func main() {
	ctx := context.Background()

	// Carrega credenciais e região
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("falha ao carregar config AWS: %v", err)
	}
	client := sqs.NewFromConfig(cfg)

	var count int
	start := time.Now()

	for count < totalMessages {

		resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
			QueueUrl:            aws.String(queueURL),
			MaxNumberOfMessages: 1,
			WaitTimeSeconds:     10,
		})
		if err != nil {
			log.Printf("erro no ReceiveMessage: %v", err)
			continue
		}
		if len(resp.Messages) == 0 {
			continue
		}

		_, err = client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
			QueueUrl:      aws.String(queueURL),
			ReceiptHandle: resp.Messages[0].ReceiptHandle,
		})
		if err != nil {
			log.Printf("falha ao deletar msg %s: %v", aws.ToString(resp.Messages[0].MessageId), err)
		}

		// Incrementa o contador de controle
		count++

		if count%progressPeriod == 0 {
			log.Printf("progresso: %d/%d msgs", count, totalMessages)
		}
	}

	elapsed := time.Since(start)
	log.Printf("Total de mensagens: %d", count)
	log.Printf("Tempo total: %s", elapsed)
	log.Printf("TPS Médio: %d", int(count)/int(elapsed.Seconds()))
}

Esse exemplo serve como baseline para comparação, expondo o custo elevado de múltiplas chamadas HTTP unitárias ao SQS.

Outputs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
❯ go run consumer.go
2025/05/16 19:28:24 progresso: 100/1000 msgs
2025/05/16 19:28:58 progresso: 200/1000 msgs
2025/05/16 19:29:31 progresso: 300/1000 msgs
2025/05/16 19:30:05 progresso: 400/1000 msgs
2025/05/16 19:30:39 progresso: 500/1000 msgs
2025/05/16 19:31:13 progresso: 600/1000 msgs
2025/05/16 19:31:47 progresso: 700/1000 msgs
2025/05/16 19:32:21 progresso: 800/1000 msgs
2025/05/16 19:32:56 progresso: 900/1000 msgs
2025/05/16 19:33:31 progresso: 1000/1000 msgs

2025/05/16 19:33:31 Total de mensagens: 1000
2025/05/16 19:33:31 Tempo total: 5m40.611940792s
2025/05/16 19:33:31 TPS Médio: 2
  • 1_000 mensagens
  • Tempo total: 340s
  • TPS Médio: 2 rps por répica do consumidor


Batch Consumer

Para reduzir o número de requisições de polling, a segunda versão aumenta MaxNumberOfMessages para 10, fazendo long-polling de até 10 mensagens por chamada. Apesar de reduzirmos em até dez vezes as operações de leitura, o delete ainda ocorre individualmente para cada mensagem dentro do batch. Esse ajuste diminui de forma significativa a latência total de consulta, mas mantém o overhead de confirmações unitárias. Os resultados mostram um ganho consistente de throughput (TPS) em relação ao consumidor simples, comprovando o benefício imediato do batch receive.

SQS Batch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ...

	for count < totalMessages {

		resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
			QueueUrl:            aws.String(queueURL),
			MaxNumberOfMessages: 10, // Batch Size de 10 mensagens
			WaitTimeSeconds:     10,
		})
		if err != nil {
			log.Printf("erro no ReceiveMessage: %v", err)
			continue
		}
		if len(resp.Messages) == 0 {
			continue
		}

		for _, message := range resp.Messages {

			_, err = client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
				QueueUrl:      aws.String(queueURL),
				ReceiptHandle: message.ReceiptHandle,
			})
			if err != nil {
				log.Printf("falha ao deletar msg %s: %v", aws.ToString(message.MessageId), err)
			}

			// Incrementa o contador de controle
			count++

			if count%progressPeriod == 0 {
				log.Printf("progresso: %d/%d msgs", count, totalMessages)
			}

		}

	}

// ...

Outputs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
❯ go run consumer_batch.go
2025/05/16 19:55:07 progresso: 100/1000 msgs
2025/05/16 19:55:25 progresso: 200/1000 msgs
2025/05/16 19:55:43 progresso: 300/1000 msgs
2025/05/16 19:56:01 progresso: 400/1000 msgs
2025/05/16 19:56:21 progresso: 500/1000 msgs
2025/05/16 19:56:41 progresso: 600/1000 msgs
2025/05/16 19:57:00 progresso: 700/1000 msgs
2025/05/16 19:57:21 progresso: 800/1000 msgs
2025/05/16 19:57:40 progresso: 900/1000 msgs
2025/05/16 19:57:59 progresso: 1000/1000 msgs
2025/05/16 19:57:59 Total de mensagens: 1000
2025/05/16 19:57:59 Tempo total: 3m10.4659665s
2025/05/16 19:57:59 TPS Médio: 5
  • 1_000 mensagens
  • Tempo total: 180s
  • TPS Médio: 5rps


Worker Pool + Batch Consumer

No terceiro passo, demos um salto de desempenho ao paralelizar todo o processo em múltiplas goroutines. Cada worker roda de forma independente, repetindo o ciclo de ReceiveMessage em batch e deleção unitária até que o contador atômico alcance a meta. Essa abordagem explora a concorrência no cliente Go e distribui as chamadas HTTP simultaneamente, aumentando ainda mais o throughput, mas ainda engargalando na deleção unitária de cada um dos registros processados. Essa estratégia também permite fine tunning, fazendo com que seja possível ajustar o numero de workers concorrentes que cada instância da aplicação irá criar. Foram testados 3 cenários ajustando o critério de workerCount, com 3, 5 e 10 workers, onde conseguimos em cada um deles aumentar ainda mais a vazão de consumo em cada réplica da aplicação, chegando até 47 mensagens por segundo processadas em cada pod.

Worker Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main

import (
	"context"
	"log"
	"sync"
	"sync/atomic"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
)

const (
	queueURL       = "https://sqs.us-east-1.amazonaws.com/181560427716/nutrition-mock"
	totalMessages  = 1_000
	batchSize      = 10
	workerCount    = 10
	progressPeriod = 100 // log a cada 100 mensagens
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("falha ao carregar config AWS: %v", err)
	}
	client := sqs.NewFromConfig(cfg)

	var processed int64
	var wg sync.WaitGroup
	start := time.Now()

	// lança os workers
	for i := 1; i <= workerCount; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for {
				// Checa se o total de mensagens processadas já atingiu o limite - POC
				cur := atomic.LoadInt64(&processed)
				if cur >= totalMessages {
					return
				}

				resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
					QueueUrl:            aws.String(queueURL),
					MaxNumberOfMessages: batchSize,
					WaitTimeSeconds:     1,
					VisibilityTimeout:   30,
				})
				if err != nil {
					log.Printf("[worker %d] ReceiveMessage erro: %v", id, err)
					continue
				}
				if len(resp.Messages) == 0 {
					continue
				}

				for _, msg := range resp.Messages {
					_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
						QueueUrl:      aws.String(queueURL),
						ReceiptHandle: msg.ReceiptHandle,
					})
					if err != nil {
						log.Printf("[worker %d] falha ao deletar msg %s: %v", id, aws.ToString(msg.MessageId), err)
						continue
					}

					// incrementa contador
					newCount := atomic.AddInt64(&processed, 1)
					if newCount%progressPeriod == 0 {
						log.Printf("progresso: %d/%d mensagens", newCount, totalMessages)
					}
					if newCount >= totalMessages {
						return
					}
				}
			}
		}(i)
	}

	// espera os workers terminarem
	wg.Wait()
	elapsed := time.Since(start)
	log.Printf("Total de mensagens: %d", processed)
	log.Printf("Tempo total: %s", elapsed)
	log.Printf("TPS Médio: %d", int(processed)/int(elapsed.Seconds()))
}

Outputs - Worker Pool: 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
❯ go run consumer_worker_pool.go
2025/05/16 20:08:45 progresso: 100/1000 mensagens
2025/05/16 20:08:51 progresso: 200/1000 mensagens
2025/05/16 20:08:57 progresso: 300/1000 mensagens
2025/05/16 20:09:03 progresso: 400/1000 mensagens
2025/05/16 20:09:09 progresso: 500/1000 mensagens
2025/05/16 20:09:15 progresso: 600/1000 mensagens
2025/05/16 20:09:21 progresso: 700/1000 mensagens
2025/05/16 20:09:27 progresso: 800/1000 mensagens
2025/05/16 20:09:33 progresso: 900/1000 mensagens
2025/05/16 20:09:39 progresso: 1000/1000 mensagens
2025/05/16 20:09:40 Total de mensagens: 1000
2025/05/16 20:09:40 Tempo total: 1m2.079631791s
2025/05/16 20:09:40 TPS Médio: 16

Outputs - Worker Pool: 5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
❯ go run consumer_worker_pool.go
2025/05/16 20:10:15 progresso: 100/1000 mensagens
2025/05/16 20:10:18 progresso: 200/1000 mensagens
2025/05/16 20:10:21 progresso: 300/1000 mensagens
2025/05/16 20:10:25 progresso: 400/1000 mensagens
2025/05/16 20:10:29 progresso: 500/1000 mensagens
2025/05/16 20:10:32 progresso: 600/1000 mensagens
2025/05/16 20:10:36 progresso: 700/1000 mensagens
2025/05/16 20:10:40 progresso: 800/1000 mensagens
2025/05/16 20:10:43 progresso: 900/1000 mensagens
2025/05/16 20:10:47 progresso: 1000/1000 mensagens
2025/05/16 20:10:48 Total de mensagens: 1000
2025/05/16 20:10:48 Tempo total: 38.013904167s
2025/05/16 20:10:48 TPS Médio: 26

Outputs - Worker Pool: 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
❯ go run consumer_worker_pool.go
2025/05/16 20:11:14 progresso: 100/1000 mensagens
2025/05/16 20:11:16 progresso: 200/1000 mensagens
2025/05/16 20:11:18 progresso: 300/1000 mensagens
2025/05/16 20:11:19 progresso: 400/1000 mensagens
2025/05/16 20:11:22 progresso: 500/1000 mensagens
2025/05/16 20:11:23 progresso: 600/1000 mensagens
2025/05/16 20:11:25 progresso: 700/1000 mensagens
2025/05/16 20:11:28 progresso: 800/1000 mensagens
2025/05/16 20:11:30 progresso: 900/1000 mensagens
2025/05/16 20:11:32 progresso: 1000/1000 mensagens
2025/05/16 20:11:34 Total de mensagens: 1000
2025/05/16 20:11:34 Tempo total: 21.779013417s
2025/05/16 20:11:34 TPS Médio: 47


Worker Pool + Batch Consumer + Batch Delete

A otimização seguinte combina paralelismo e deleção em lote. Cada worker continua fazendo ReceiveMessage em batches de até 10 mensagens, mas passa a apagar todo o lote de uma só vez com DeleteMessageBatch. Dessa forma, reduzimos drasticamente tanto o número de chamadas de leitura quanto o de confirmações de processamento. Essa versão alcançou o maior throughput comparado a paralelização apenas do consumo, comprovando que o padrão “batch receive + batch delete” é o mais eficiente para cenários de alto volume no SQS.

Batch Delete

Outputs - Worker Pool: 5

1
2
3
4
5
6
7
8
9
10
11
12
13
❯ go run consumer_batch_delete.go
2025/05/16 20:13:25 progresso: 100/1000 msgs
2025/05/16 20:13:25 progresso: 200/1000 msgs
2025/05/16 20:13:26 progresso: 300/1000 msgs
2025/05/16 20:13:27 progresso: 400/1000 msgs
2025/05/16 20:13:27 progresso: 500/1000 msgs
2025/05/16 20:13:28 progresso: 600/1000 msgs
2025/05/16 20:13:29 progresso: 700/1000 msgs
2025/05/16 20:13:29 progresso: 800/1000 msgs
2025/05/16 20:13:31 progresso: 1000/1000 msgs
2025/05/16 20:13:32 Total de mensagens: 1000
2025/05/16 20:13:32 Tempo total: 8.328690625s
2025/05/16 20:13:32 TPS Médio: 125


Channels e GoRoutines + Batch Consumer + Batch Delete

A estratégia final para essa PoC se baseia em implementar um channel que receba as mensagens em batch e as processem em paralelo ao consumo de mensagens. Dessa forma conseguimos aumentar o throughput de interações de solicitação de mensagens para o SQS e realizar a tratativa dos batchs com paralelismo e concorrência. Ainda aplicamos o split de workpools para otimizar ainda mais o uso de recursos para dar vazão as mensagens na Queue, e utilizando o delete em batch para diminuir as solicitações de confirmação de processamento também.

Async

Por fim, a estratégia mais sofisticada desacopla totalmente a etapa de fetching da de processamento, usando dois pipelines de goroutines interligados por um canal de batches. No primeiro estágio, um conjunto de fetchers realiza ReceiveMessage em batch* de 10 mensagens e empurra cada slice para o canal para ser consumido de forma assincrona, aumentando ainda mais a vazão do consumo. No segundo estágio, outro conjunto de deleters consome esses batches, apaga em lote e incrementa de forma atômica um contador compartilhado. Um canal de sinalização fecha o pipeline assim que a meta de mensagens é atingida, garantindo parada imediata sem perda de dados, processando 200 TPS por replica, consumindo 1000 mensagens em 5s Esse modelo apresentou o maior TPS nos testes de longa duração, onde foram inseridas 100.000 mensagens que foram processadas em 2m57s, atingindo 564 tps de processamento por unidade computacional, validando que a combinação de paralelismo, batching e pipelines desacoplados é a abordagem ideal para tuning de consumidores SQS em Go.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main

import (
	"context"
	"log"
	"sync"
	"sync/atomic"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

const (
	queueURL       = "https://sqs.us-east-1.amazonaws.com/181560427716/nutrition-mock"
	totalMessages  = 100_000 // meta de consumo
	batchSize      = 10      // até 10 mensagens por ReceiveMessage
	workerCount    = 10      // paralelismo
	progressPeriod = 1000    // log a cada 100 mensagens
)

func main() {
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("falha ao carregar AWS config: %v", err)
	}
	client := sqs.NewFromConfig(cfg)

	// canal de batches e sinal de término
	batchCh := make(chan []types.Message)
	doneCh := make(chan struct{})
	var once sync.Once
	signalDone := func() { once.Do(func() { close(doneCh) }) }

	var processed int64
	start := time.Now()

	var fetchWg, deleteWg sync.WaitGroup

	for i := 1; i <= workerCount; i++ {
		fetchWg.Add(1)
		go func(id int) {
			defer fetchWg.Done()
			for {
				// Trava do teste
				if atomic.LoadInt64(&processed) >= totalMessages {
					return
				}
				resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
					QueueUrl:            aws.String(queueURL),
					MaxNumberOfMessages: batchSize,
					WaitTimeSeconds:     1,
					VisibilityTimeout:   60,
				})
				if err != nil {
					log.Printf("[%d] Receive erro: %v", id, err)
					time.Sleep(time.Second)
					continue
				}
				if len(resp.Messages) == 0 {
					continue
				}
				// envia batch para o channel de processamento
				select {
				case batchCh <- resp.Messages:
				case <-doneCh:
					return
				}
			}
		}(i)
	}

	// quando todos os fetchers terminarem, fechamos batchCh
	go func() {
		fetchWg.Wait()
		close(batchCh)
	}()

	// Inicia os Workers de Processamento
	for i := 1; i <= workerCount; i++ {
		deleteWg.Add(1)
		go func(id int) {
			defer deleteWg.Done()
            // Consome as mensagens do SQS através do Channel de forma assincrona
			for batch := range batchCh {

				entries := make([]types.DeleteMessageBatchRequestEntry, 0, len(batch))
				for _, msg := range batch {
					entries = append(entries, types.DeleteMessageBatchRequestEntry{
						Id:            msg.MessageId,
						ReceiptHandle: msg.ReceiptHandle,
					})
				}

				if _, err := client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
					QueueUrl: aws.String(queueURL),
					Entries:  entries,
				}); err != nil {
					log.Printf("[%d] DeleteBatch erro: %v", id, err)
				}

				newCount := atomic.AddInt64(&processed, int64(len(entries)))
				if newCount >= totalMessages {
					signalDone()
				}
				if newCount%progressPeriod == 0 || newCount >= totalMessages {
					log.Printf("progresso: %d/%d msgs", newCount, totalMessages)
				}
			}
		}(i)
	}

	deleteWg.Wait()

	elapsed := time.Since(start)
	final := atomic.LoadInt64(&processed)
	log.Printf("Total de mensagens: %d", final)
	log.Printf("Tempo total: %s", elapsed)
	log.Printf("TPS Médio: %d", int(final)/int(elapsed.Seconds()))

}


Outputs - Worker Pool: 5

1
2
3
4
5
6
7
8
9
10
11
12
❯ go run consumer_channels.go
2025/05/16 20:15:08 progresso: 100/1000 msgs
2025/05/16 20:15:08 progresso: 200/1000 msgs
2025/05/16 20:15:08 progresso: 300/1000 msgs
2025/05/16 20:15:09 progresso: 400/1000 msgs
2025/05/16 20:15:09 progresso: 500/1000 msgs
2025/05/16 20:15:09 progresso: 600/1000 msgs
2025/05/16 20:15:10 progresso: 900/1000 msgs
2025/05/16 20:15:11 progresso: 1000/1000 msgs
2025/05/16 20:15:12 Total de mensagens: 1000
2025/05/16 20:15:12 Tempo total: 5.310673458s
2025/05/16 20:15:12 TPS Médio: 200

Outputs - Worker Pool: 10

1
2
3
4
5
6
7
8
9
10
11
12
13
❯ go run consumer_channels.go
2025/05/16 20:15:47 progresso: 100/1000 msgs
2025/05/16 20:15:47 progresso: 200/1000 msgs
2025/05/16 20:15:47 progresso: 300/1000 msgs
2025/05/16 20:15:47 progresso: 400/1000 msgs
2025/05/16 20:15:48 progresso: 500/1000 msgs
2025/05/16 20:15:48 progresso: 600/1000 msgs
2025/05/16 20:15:48 progresso: 700/1000 msgs
2025/05/16 20:15:49 progresso: 800/1000 msgs
2025/05/16 20:15:49 progresso: 1000/1000 msgs
2025/05/16 20:15:50 Total de mensagens: 1000
2025/05/16 20:15:50 Tempo total: 4.201653667s
2025/05/16 20:15:50 TPS Médio: 250

Para fazer um teste mais longo, realizei um teste com 100.000 mensagens na fila para validar uma execução mais prolongada.

Outputs - Worker Pool: 10 e 100.000 mensagens

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
❯ go run consumer_channels.go
2025/05/16 20:19:42 progresso: 1000/100000 msgs
2025/05/16 20:19:43 progresso: 2000/100000 msgs
2025/05/16 20:19:45 progresso: 3000/100000 msgs
2025/05/16 20:19:47 progresso: 4000/100000 msgs
2025/05/16 20:19:49 progresso: 5000/100000 msgs
2025/05/16 20:19:51 progresso: 6000/100000 msgs
2025/05/16 20:19:52 progresso: 7000/100000 msgs
2025/05/16 20:19:54 progresso: 8000/100000 msgs
// ...
2025/05/16 20:22:24 progresso: 94000/100000 msgs
2025/05/16 20:22:26 progresso: 95000/100000 msgs
2025/05/16 20:22:28 progresso: 96000/100000 msgs
2025/05/16 20:22:30 progresso: 97000/100000 msgs
2025/05/16 20:22:31 progresso: 98000/100000 msgs
2025/05/16 20:22:35 progresso: 100000/100000 msgs
2025/05/16 20:22:36 Total de mensagens: 100000
2025/05/16 20:22:36 Tempo total: 2m57.05202975s
2025/05/16 20:22:36 TPS Médio: 564


Resultados e Comparações

Aqui está a tabela resumindo os resultados de todas as variações testadas:

Implementação Workers Mensagens Tempo Total TPS Médio
Consumidor Simples 1 1 000 5m40.612s 2
Batch Consumer 1 1 000 3m10.466s 5
Worker Pool + Batch Consumer 3 1 000 1m2.080s 16
Worker Pool + Batch Consumer 5 1 000 0m38.014s 26
Worker Pool + Batch Consumer 10 1 000 0m21.779s 47
Worker Pool + Batch Consumer + Batch Delete 5 1 000 0m8.329s 125
Canais & Goroutines + Batch Consumer + Batch Delete 5 1 000 0m5.311s 200
Canais & Goroutines + Batch Consumer + Batch Delete 10 1 000 0m4.202s 250
Canais & Goroutines + Batch Consumer + Batch Delete (long run) 10 100 000 2m57.052s 564



Referências

Amazon SQS message quotas

AWS SQS Package

Simplifying Message Queueing with Golang and Amazon SQS

Boas Práticas de Uso de Channels e Go Routines

DeleteMessageBatch

HTSQS

SQS Consumer Design: Achieving High Scalability while managing concurrency in Go

comments powered by Disqus