Skip to content

Apache Pulsar

Testcontainers can be used to automatically create Apache Pulsar containers without external services.

It's based on the official Apache Pulsar docker image, so it is recommended to read the official guide.

Adding this module to your project dependencies

Please run the following command to add the Apache Pulsar module to your Go dependencies:

go get github.com/testcontainers/testcontainers-go/modules/pulsar

Usage example

Create a Pulsar container to use it in your tests:

c, err := testcontainerspulsar.StartContainer(
    ctx,
    tt.opts...,
)

where the tt.opts are the options to configure the container. See the Container Options section for more details.

Then you can retrieve the broker and the admin url:

brokerURL, err := c.BrokerURL(ctx)
require.Nil(t, err)

serviceURL, err := c.HTTPServiceURL(ctx)
require.Nil(t, err)

Container Options

When starting the Pulsar container, you can pass options in a variadic way to configure it.

Pulsar Image

If you need to set a different Pulsar image you can use the WithPulsarImage.

testcontainerspulsar.WithPulsarImage("docker.io/apachepulsar/pulsar:2.10.2"),

Pulsar Configuration

If you need to set Pulsar configuration variables you can use the WithPulsarEnv to set Pulsar environment variables: the PULSAR_PREFIX_ prefix will be automatically added for you.

For example, if you want to enable brokerDeduplicationEnabled:

testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"),

It will result in the PULSAR_PREFIX_brokerDeduplicationEnabled=true environment variable being set in the container request.

Pulsar IO

If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker with the WithFunctionsWorker option:

testcontainerspulsar.WithFunctionsWorker(),

Pulsar Transactions

If you need to test Pulsar Transactions you can enable the transactions feature:

testcontainerspulsar.WithTransactions(),

Log consumers

If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the WithLogConsumers function, which accepts a variadic argument of LogConsumers.

testcontainerspulsar.WithLogConsumers(&testLogConsumer{}),

An example of a LogConsumer could be the following:

// logConsumer is a testcontainers.LogConsumer that prints the log to stdout
type testLogConsumer struct{}

// Accept prints the log to stdout
func (lc *testLogConsumer) Accept(l testcontainers.Log) {
    fmt.Print(string(l.Content))
}

Warning

You will need to explicitly stop the producer in your tests.

If you want to know more about LogConsumers, please check the Following Container Logs documentation.

Advanced configuration

In the case you need a more advanced configuration regarding the config, host config and endpoint settings Docker types, you can leverage the modifier functions that are available in the ContainerRequest. The Pulsar container exposes a way to interact with those modifiers in a simple manner, using the aforementioned options in the StartContainer function:

testcontainerspulsar.WithConfigModifier(func(config *container.Config) {
    config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m")
}),
testcontainerspulsar.WithHostConfigModifier(func(hostConfig *container.HostConfig) {
    hostConfig.Resources = container.Resources{
        Memory: 1024 * 1024 * 1024,
    }
}),
testcontainerspulsar.WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) {
    settings[nwName] = &network.EndpointSettings{
        Aliases: []string{"pulsar"},
    }
}),

Please check out the Advanced Settings for creating containers documentation.