Stuck in the loop

Write . Fight resistance . Ship!

Mocking a sqs queue

The latest challenge the team got came with interesting problems to solve and new tech to explore. Integrate data from an event queue populated by an external party was one of them. We had to connect our service with a third party component that was producing a stream of events representing user transactions.

The final goal was to consume each message that got published, do some updates on our service’s internal state and ultimately produce another event into a Kafka topic. Basically, our service can be seen as the door for all user transaction events, in different formats, to enter the company’s global system to be consumed by multiple other services that are part of it.

This third party component publishes these events to an Amazon SQS queue and as we were the first service consuming from this kind of queue we faced some challenges - how to build a test harness that would allow us to verify the correct integration of our service with this queue.

To solve this we could use real AWS instances, however, this was a bit overkill and it actually presented some disadvantages. There are costs associated with running this services, Amazon charges not only for messages sent/received but also for data transfer. Besides, the aws queues we were consuming from were not part of our infrastructure, we had no AWS based services already in place and we had no intention to start using them only because of tests.

After some investigation and a couple quick experiments we decided to go with an ElasticMQ solution and create a small component that would replicate the real SQS server and be deployed on a mock environment alongside our service instances where tests are run. Each instance of the service on this environment would connect to its “companion” SQS server that would be running inside the same VM in order to limit interference between tests.

We wanted a simple base test that could do just this:

  1. produce a new transaction message to an sqs queue
  2. verify if the correct output message has been published to the correct kafka topic

The test should be generic enough so that later we could be able to test multiple scenarios with different input data.

This is what we started with, kind of a blueprint of what we wanted to have in the end, with some commented code related to the kafka consuming and verification steps that we are not going to check on this post.

public class BaseTransactionStreamTest {
    public void publishNewTransactionToTheQueue() {
        // Given
        SQSClient sqsClient = new SQSClient("localhost", 8888);
        // KafkaConsumer kafkaConsumer = new KafkaConsumer("test", "localhost:9092");

        String file = format("mocks/transactions/test_transaction.json");
        String transactionMockData = getJsonFileAsString(file);

        // When
        sqsClient.send("sample-queue", transactionMockData);

        // Then
        // List<Message> publishedOnkafka = kafkaConsumer.consume();
        // assertTrue(publishedOnkafka.size() > 0);
    }
}

To accomplish this we created a project with two separate modules, sqs-mock-client and sqs-mock-standalone.

[...]

<modules>
    <module>sqs-mock-client</module>
    <module>sqs-mock-standalone</module>
</modules>

[...]

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-sqs</artifactId>
            <version>${aws-java-sdk-sqs.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticmq</groupId>
            <artifactId>elasticmq-rest-sqs_2.12</artifactId>
            <version>${elasticmq-rest-sqs_2.12.version}</version>
        </dependency>
    </dependencies>
</dependencyManagement>

[...]

The SQS client is simply a facade that allows creating queues and sending messages to the standalone mock server. This was separated in a module so it could be imported independently by the application test’s module.

public class SQSClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSClient.class);

    private final NodeAddress address;
    private final AmazonSQSClient client;

    public SQSClient(String host, int port) {
        if (host == null || host.isEmpty()) {
            throw new IllegalArgumentException("host must be defined");
        }

        this.address = new NodeAddress("http", host, port, "");
        this.client = new AmazonSQSClient(new BasicAWSCredentials("x", "x"));
        this.client.setEndpoint(address.fullAddress());
    }

    public void send(String queueName, String message) {
        String queueUrl = getQueueUrl(queueName);
        LOGGER.debug("operation=send, queue={}", queueUrl);
        client.sendMessage(queueUrl, message);
    }

    public void createQueue(String queueName) {
        String queueUrl = getQueueUrl(queueName);
        LOGGER.debug("operation=create, queue={}", queueUrl);
        client.createQueue(queueName);
    }

    private String getQueueUrl(String queueName) {
        // Queue URL in ElasticMQ is http://{host}:{port}/queue/{queue_name}
        return address.fullAddress() + "/queue/" + queueName;
    }
}

The other module includes the small standalone deployable component that is responsible for creating the server.

public class Launcher {
    private static final Logger LOG = LoggerFactory.getLogger(Launcher.class);

    public static void launch(Config config) {
        SQSServer sqs = new SQSServer(hostname(), config.port());

        sqs.init();

        SQSClient client = new SQSClient(hostname(), config.port());
        config.queues().forEach(client::createQueue);
    }

    private static String hostname() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
            LOG.warn("Failed to get hostname, using hostname=localhost. {}", e.getMessage());
            return "localhost";
        }
    }
}
public final class SQSServer {
    private final NodeAddress address;
    private SQSRestServer server;

    public SQSServer(String host, int port) {
        this.address = new NodeAddress("http", host, port, "");
    }

    public void init() {
        server = SQSRestServerBuilder
                .withPort(address.port())
                .withInterface("0.0.0.0")
                .withServerAddress(address)
                .start();
    }
}

It ended up being relatively simple, although it took some time to make everything work right on the mock environment. But just with that initial test blueprint and a command line kafka consumer we could manually verify that everything was working as expected.

bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test

Now, the next step was to replace the commented kafka consumer and verification steps with real working code so the tests could be completely automated.