Integration testing

Integration Testing with Containers

Preflight provides built-in support for integration testing using TestContainers, allowing you to test against real databases, message brokers, and other infrastructure components.

Overview

Container-based integration testing in Preflight allows you to:

  • Test Taxi schemas against real infrastructure (Kafka, databases, APIs)
  • Verify data flows end-to-end with actual message brokers
  • Ensure connector configurations work with real systems
  • Run isolated, reproducible tests that don’t depend on external services

Getting Started

1. Configure Connector Support

First, declare which connectors your tests need in your build.gradle.kts:

import com.orbitalhq.preflight.gradle.ConnectorSupport
 
plugins {
    id("com.orbitalhq.preflight")
}
 
preflight {
    connectors = listOf(
        ConnectorSupport.Kafka,
        ConnectorSupport.MongoDB
    )
}

This automatically adds the necessary TestContainers dependencies and Orbital connector libraries for each connector type.

2. Define Your Taxi Schema

Create Taxi services that connect to your infrastructure:

// src/stock-quotes.taxi
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

model StockQuote {
    ticker : Ticker inherits String
    price : Price inherits Decimal
}

@KafkaService( connectionName = "quotes-kafka" )
service StockKafka {
    @KafkaOperation( topic = "stockPrices", offset = "earliest" )
    stream quotes : Stream<StockQuote>
}

3. Write Container Tests

Create test specs that use containers:

// test/WithContainersSpec.kt
import app.cash.turbine.test
import com.orbitalhq.expectRawMap
import com.orbitalhq.preflight.dsl.OrbitalSpec
import com.orbitalhq.preflight.dsl.containers.kafka.KafkaContainerSupport
import com.orbitalhq.preflight.dsl.containers.kafka.kafkaContainer
import io.kotest.matchers.shouldBe
 
class WithContainersSpec : OrbitalSpec({
    // Declare containers needed for tests
    withContainers(
        kafkaContainer("quotes-kafka")
    )
    
    describe("integration tests") {
        it("should process messages from Kafka") {
            val kafkaContainer = containerForConnection<KafkaContainerSupport>("quotes-kafka")
            
            queryForStreamOfObjects("""
                stream { StockQuote }
            """.trimIndent()).test {
                // Send test data to Kafka
                kafkaContainer.sendMessage(
                    """{ "ticker": "AAPL", "price": 150.25 }""", 
                    "stockPrices"
                )
                
                // Verify the data flows through
                val result = expectRawMap()
                result.shouldBe(mapOf(
                    "ticker" to "AAPL",
                    "price" to 150.25
                ))
            }
        }
    }
})

Kafka Container Support

Configuration

The Kafka connector provides a pre-configured Kafka container with the necessary setup:

kafkaContainer(
    connectionName = "my-kafka",  // Must match connectionName in Taxi schema
    groupId = "test-consumer"     // Optional, auto-generated if not provided
)

Container Interaction API

Once containers are running, access them through the containerForConnection method:

val kafkaContainer = containerForConnection<KafkaContainerSupport>("quotes-kafka")
 
// Send messages to topics
kafkaContainer.sendMessage(
    message = """{"field": "value"}""",
    topic = "my-topic",
    key = "optional-key",
    headers = listOf() // Optional headers
)
 
// Send raw bytes
kafkaContainer.sendMessage(
    message = byteArrayOf(1, 2, 3),
    topic = "binary-topic"
)

Testing Streaming Queries

Use Turbine’s test function to verify streaming behavior:

queryForStreamOfObjects("stream { MyModel }").test {
    // Send test data
    kafkaContainer.sendMessage("""{"field": "value1"}""", "topic")
    val first = expectRawMap()
    first.shouldBe(mapOf("field" to "value1"))
    
    // Send more data
    kafkaContainer.sendMessage("""{"field": "value2"}""", "topic")
    val second = expectRawMap()
    second.shouldBe(mapOf("field" to "value2"))
    
    // Verify stream completes or continues as expected
}

MongoDB Container Support

Configuration

The MongoDB connector provides a pre-configured MongoDB container with automatic database initialization:

mongoConnector(
    connectionName = "user-db"  // Must match connectionName in Taxi schema
)

Database Schema Setup

Define Taxi services that connect to MongoDB:

// src/user-management.taxi
import com.orbitalhq.mongodb.MongoService
import com.orbitalhq.mongodb.MongoOperation
 
model User {
    id : UserId inherits String
    name : UserName inherits String
    email : EmailAddress inherits String
    age : Age inherits Int
}
 
@MongoService(connectionName = "user-db")
service UserDatabase {
    @MongoOperation(
        database = "user_management",
        collection = "users"
    )
    operation findUser(UserId) : User
 
    @MongoOperation(
        database = "user_management", 
        collection = "users"
    )
    operation findAllUsers() : User[]
}

Container Tests with MongoDB

Create test specs that use MongoDB containers:

// test/MongoIntegrationSpec.kt
import com.orbitalhq.preflight.dsl.OrbitalSpec
import com.orbitalhq.preflight.dsl.containers.mongo.MongoContainerSupport
import com.orbitalhq.preflight.dsl.containers.mongo.mongoConnector
import io.kotest.matchers.shouldBe
import org.bson.Document
 
class MongoIntegrationSpec : OrbitalSpec({
    // Declare MongoDB container needed for tests
    withContainers(
        mongoConnector("user-db")
    )
    
    describe("MongoDB integration tests") {
        it("should find users from MongoDB") {
            val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
            
            // Insert test data directly using MongoDB client
            val client = mongoContainer.mongoClient()
            val database = client.getDatabase("user_management")
            val collection = database.getCollection("users")
            
            val testUser = Document("id", "user123")
                .append("name", "John Doe")
                .append("email", "john@example.com")
                .append("age", 30)
            
            collection.insertOne(testUser).toFuture().join()
            
            // Now test your Taxi query
            val result = """
                find { User(UserId == "user123") }
            """.queryForObject()
            
            result.shouldBe(mapOf(
                "id" to "user123",
                "name" to "John Doe",
                "email" to "john@example.com",
                "age" to 30
            ))
        }
        
        it("should find all users from collection") {
            val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
            val client = mongoContainer.mongoClient()
            val collection = client.getDatabase("user_management")
                .getCollection("users")
            
            // Insert multiple test users
            val users = listOf(
                Document("id", "user1").append("name", "Alice").append("age", 25),
                Document("id", "user2").append("name", "Bob").append("age", 35)
            )
            collection.insertMany(users).toFuture().join()
            
            val results = """
                find { User }
            """.queryForCollection()
            
            results.size shouldBe 2
            results[0]["name"] shouldBe "Alice"
            results[1]["name"] shouldBe "Bob"
        }
    }
})

Container Interaction API

Once MongoDB containers are running, access them through the containerForConnection method:

val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
 
// Get direct access to MongoDB client for test data setup
val client = mongoContainer.mongoClient()
val database = client.getDatabase("your_database")
val collection = database.getCollection("your_collection")
 
// Insert test documents
val document = Document("field", "value")
collection.insertOne(document).toFuture().join()
 
// Query documents for verification
val results = collection.find().toFuture().join()

Database Initialization

MongoDB containers are automatically initialized with:

  • Default database: user_management
  • Test credentials: test_container:test_container
  • Custom initialization script support via init-schema.js

The container uses the connection string format:

mongodb://test_container:test_container@host:port/user_management

Container Lifecycle

Containers are managed automatically by Preflight:

  1. Startup: Containers start before your tests run
  2. Connection: Orbital connectors are configured with container connection details
  3. Execution: Your tests interact with real running infrastructure
  4. Cleanup: Containers are automatically stopped after tests complete

Lazy Initialization

Container connections are created lazily when you first execute a query. This ensures:

  • Containers are fully started and ready
  • Connection details (ports, URLs) are available
  • Tests don’t fail due to timing issues

Best Practices

Connection Naming

Use descriptive connection names that match your schema:

// In build.gradle.kts
kafkaContainer("user-events-kafka")
 
// In schema
@KafkaService( connectionName = "user-events-kafka" )
service UserEventStream { ... }

Test Data Management

Send test data programmatically for reliable tests:

describe("user event processing") {
    it("should handle user registration") {
        val kafka = containerForConnection<KafkaContainerSupport>("user-events-kafka")
        
        kafka.sendMessage("""
            {
                "eventType": "USER_REGISTERED", 
                "userId": "123",
                "timestamp": "2024-01-15T10:30:00Z"
            }
        """.trimIndent(), "user-events")
        
        // Test your query logic...
    }
}

Resource Isolation

Each test spec gets fresh containers, ensuring isolation:

class UserEventsSpec : OrbitalSpec({
    withContainers(kafkaContainer("events"))
    // Tests in this spec share the same container instance
})
 
class OrderEventsSpec : OrbitalSpec({
    withContainers(kafkaContainer("events"))
    // This gets a separate container instance
})

Supported Connectors

Currently supported container types:

Kafka

  • Usage: ConnectorSupport.Kafka
  • Container: Confluent Platform Kafka
  • Features: Topic creation, message production, consumer groups
  • API: KafkaContainerSupport with sendMessage() methods

MongoDB

  • Usage: ConnectorSupport.MongoDB
  • Container: MongoDB 6.0.7
  • Features: Document operations, collections, database initialization
  • API: MongoContainerSupport with direct MongoDB client access

Future Connectors

Additional connectors are planned:

  • PostgreSQL databases
  • REST APIs with WireMock
  • Message queues (RabbitMQ, ActiveMQ)
  • Redis caches

Troubleshooting

Container Startup Issues

If containers fail to start:

// Increase startup timeout
kafkaContainer("my-kafka").apply {
    withStartupTimeout(Duration.ofMinutes(5))
}

Connection Timing

If you get “container not found” errors, ensure:

  • Connection names match between kafkaContainer() and @KafkaService(connectionName=...)
  • You call containerForConnection() after the query starts (inside test methods)

Port Conflicts

TestContainers automatically assigns available ports. If you need specific ports:

// Let TestContainers assign ports (recommended)
val kafka = containerForConnection<KafkaContainerSupport>("my-kafka")
// Use kafka.kafkaContainer.bootstrapServers for the actual address

Performance Tips

Container Reuse

Containers are reused within a spec but not between specs. For faster tests:

// Group related tests in the same spec
class KafkaIntegrationSpec : OrbitalSpec({
    withContainers(kafkaContainer("kafka"))
    
    describe("user events") {
        it("handles registration") { /* ... */ }
        it("handles updates") { /* ... */ }
        it("handles deletion") { /* ... */ }
    }
})

Parallel Execution

Different specs run in parallel with isolated containers:

// These can run simultaneously
class UserServiceSpec : OrbitalSpec({ /* ... */ })
class OrderServiceSpec : OrbitalSpec({ /* ... */ })
class PaymentServiceSpec : OrbitalSpec({ /* ... */ })