Tech Guru

Trusted Source Technology

How to Set Up Kafka Integration Test – Grape Up

How to Set Up Kafka Integration Test – Grape Up

Do you think about device testing as not ample alternative for holding the application’s dependability and stability? Are you frightened that in some way or somewhere there is a possible bug hiding in the assumption that device tests should really include all situations? And also is mocking Kafka not sufficient for venture specifications? If even a person answer is  ‘yes’, then welcome to a great and straightforward guidebook on how to set up Integration Checks for Kafka utilizing TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-resource Java library specialised in furnishing all required remedies for the integration and testing of exterior resources. It usually means that we are able to mimic an true databases, website server, or even an celebration bus natural environment and address that as a trusted spot to exam application functionality. All these extravagant characteristics are hooked into docker images, described as containers. Do we want to take a look at the database layer with true MongoDB? No anxieties, we have a check container for that. We can not also ignore about UI checks – Selenium Container will do something that we truly need.
In our scenario, we will concentrate on Kafka Testcontainer.

What is Embedded Kafka?

As the name implies, we are going to deal with an in-memory Kafka occasion, completely ready to be applied as a typical broker with full performance. It permits us to function with producers and buyers, as typical, creating our integration checks lightweight. 

Right before we start

The concept for our examination is straightforward – I would like to exam Kafka consumer and producer using two distinctive methods and test how we can utilize them in precise instances. 

Kafka Messages are serialized utilizing Avro schemas.

Embedded Kafka – Producer Exam

The principle is quick – let’s produce a simple task with the controller, which invokes a company system to push a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation('org.springframework.boot:spring-boot-starter-world wide web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-take a look at-assist:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also well worth mentioning superb plugin for Avro. Listed here plugins segment:

id 'org.springframework.boot' variation '2.6.8'
id 'io.spring.dependency-management' edition '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."

Avro Plugin supports schema vehicle-generating. This is a have to-have.

Hyperlink to plugin: https://github.com/davidmc24/gradle-avro-plugin

Now let us determine the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "kind": "document",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "avro.java.string": "String"


Our ProducerService will be focused only on sending messages to Kafka utilizing a template, absolutely nothing exciting about that portion. Primary features can be finished just using this line:

ListenableFuture> long run = this.kafkaTemplate.send out("register-request", kafkaMessage)

We cannot forget about check homes:

    permit-bean-definition-overriding: accurate
      group-id: team_id
      vehicle-offset-reset: earliest
      essential-deserializer: org.apache.kafka.widespread.serialization.StringDeserializer
      price-deserializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroDeserializer
      auto.sign-up.schemas: genuine
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.typical.CustomKafkaAvroSerializer
      particular.avro.reader: legitimate

As we see in the outlined take a look at qualities, we declare a customized deserializer/serializer for KafkaMessages. It is really advised to use Kafka with Avro – don’t allow JSONs keep item framework, let us use civilized mapper and item definition like Avro.


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient customer) 
        tremendous(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        super(new MockSchemaRegistryClient(), props)


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        super(new MockSchemaRegistryClient(), props)

And we have everything to start out producing our take a look at.

@TestInstance(TestInstance.Lifecycle.For each_Class)
@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, subjects = "sign up-ask for")
class ProducerControllerTest {

All we have to have to do is add @EmbeddedKafka annotation with shown subjects and partitions. Software Context will boot Kafka Broker with provided configuration just like that. Keep in thoughts that @TestInstance really should be made use of with exclusive thing to consider. Lifecycle.For every_Class will keep away from creating the exact same objects/context for each and every examination process. Value examining if exams are way too time-consuming.

Customer consumerServiceTest
void Setup() 
DefaultKafkaConsumerFactory client = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = shopper.createConsumer()

Listed here we can declare the examination client, centered on the Avro schema return style. All Kafka qualities are presently delivered in the .yml file. That buyer will be used as a test if the producer essentially pushed a information.

Right here is the precise take a look at process:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()

                write-up("/sign-up-ask for")
                      .articles(objectMapper.writeValueAsBytes(ask for)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Topic_Identify)

        RegisterRequest valueReceived = consumedRegisterRequest.benefit()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Very first of all, we use MockMvc to execute an action on our endpoint. That endpoint takes advantage of ProducerService to press messages to Kafka. KafkaConsumer is employed to verify if the producer labored as anticipated. And that’s it – we have a thoroughly functioning test with embedded Kafka.

Take a look at Containers – Purchaser Check

TestContainers are nothing else like impartial docker illustrations or photos prepared for remaining dockerized. The following check state of affairs will be improved by a MongoDB image. Why not maintain our info in the database ideal just after anything at all happened in Kafka movement?

Dependencies are not considerably diverse than in the prior illustration. The next actions are essential for examination containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

set('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s concentration now on the Client element. The exam scenario will be easy – one purchaser assistance will be dependable for acquiring the Kafka concept and storing the parsed payload in the MongoDB assortment. All that we have to have to know about KafkaListeners, for now, is that annotation:

@KafkaListener(subject areas = "register-ask for")

By the performance of the annotation processor, KafkaListenerContainerFactory will be accountable to create a listener on our approach. From this second our strategy will respond to any forthcoming Kafka information with the described matter.

Avro serializer and deserializer configs are the exact as in the preceding exam.

Concerning TestContainer, we really should get started with the pursuing annotations:

community class AbstractIntegrationTest {

For the duration of startup, all configured TestContainers modules will be activated. It signifies that we will get obtain to the total working ecosystem of the chosen resource. As illustration:

personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a consequence of booting the take a look at, we can be expecting two docker containers to commence with the offered configuration.

How to Set Up Kafka Integration Test – Grape Up

What is truly important for the mongo container – it gives us total access to the databases employing just a simple connection uri. With these a feature, we are able to acquire a seem what is the current state in our collections, even through debug method and organized breakpoints.
Just take a glimpse also at the Ryuk container – it is effective like overwatch and checks if our containers have commenced accurately.

And here is the last aspect of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.kafka.customer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.start out()

   mongoDBContainer.waitingFor(Hold out.forListeningPort()

general public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource provides us the option to established all needed surroundings variables in the course of the test lifecycle. Strongly desired for any config reasons for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get expected partitions through container startup.

And the last section of the Kafka check containers journey – the most important system of the examination:

public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("sign-up-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())

   //Wait for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().dimensions())

personal KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

non-public void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   attempt (KafkaProducer producer = createProducer()) 
               .forEach(registerRequest -> 
                           ProducerRecord record = new ProducerRecord<>(topicName, registerRequest)

The personalized producer is responsible for composing our information to KafkaBroker. Also, it is proposed to give some time for shoppers to take care of messages thoroughly. As we see, the concept was not just consumed by the listener, but also saved in the MongoDB assortment.


As we can see, recent answers for integration exams are fairly quick to implement and sustain in assignments. There is no position in trying to keep just unit checks and counting on all strains lined as a indication of code/logic excellent. Now the problem is, ought to we use an Embedded remedy or TestContainers? I advise initial of all concentrating on the word “Embedded”. As a fantastic integration test, we want to get an practically ideal copy of the generation environment with all houses/characteristics involved. In-memory alternatives are superior, but generally, not adequate for large small business initiatives. Undoubtedly, the edge of Embedded solutions is the effortless way to implement these kinds of exams and sustain configuration, just when anything at all transpires in memory.
TestContainers at the 1st sight might glance like overkill, but they give us the most important aspect, which is a independent environment. We do not have to even depend on current docker illustrations or photos – if we want we can use customized ones. This is a enormous enhancement for probable exam scenarios.
What about Jenkins? There is no explanation to be frightened also to use TestContainers in Jenkins. I firmly endorse examining TestContainers documentation on how conveniently we can established up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwanted situation for working with TestContainers, then don’t be reluctant. It is normally superior to hold all providers managed and secured with integration exam contracts.