Integration Testing with Containers
Preflight provides built-in support for integration testing using TestContainers, allowing you to test against real databases, message brokers, and other infrastructure components.
Overview
Container-based integration testing in Preflight allows you to:
- Test Taxi schemas against real infrastructure (Kafka, databases, APIs)
- Verify data flows end-to-end with actual message brokers
- Ensure connector configurations work with real systems
- Run isolated, reproducible tests that don’t depend on external services
Getting Started
1. Configure Connector Support
First, declare which connectors your tests need in your build.gradle.kts
:
import com.orbitalhq.preflight.gradle.ConnectorSupport
plugins {
id("com.orbitalhq.preflight")
}
preflight {
connectors = listOf(
ConnectorSupport.Kafka,
ConnectorSupport.MongoDB
)
}
This automatically adds the necessary TestContainers dependencies and Orbital connector libraries for each connector type.
2. Define Your Taxi Schema
Create Taxi services that connect to your infrastructure:
// src/stock-quotes.taxi
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation
model StockQuote {
ticker : Ticker inherits String
price : Price inherits Decimal
}
@KafkaService( connectionName = "quotes-kafka" )
service StockKafka {
@KafkaOperation( topic = "stockPrices", offset = "earliest" )
stream quotes : Stream<StockQuote>
}
3. Write Container Tests
Create test specs that use containers:
// test/WithContainersSpec.kt
import app.cash.turbine.test
import com.orbitalhq.expectRawMap
import com.orbitalhq.preflight.dsl.OrbitalSpec
import com.orbitalhq.preflight.dsl.containers.kafka.KafkaContainerSupport
import com.orbitalhq.preflight.dsl.containers.kafka.kafkaContainer
import io.kotest.matchers.shouldBe
class WithContainersSpec : OrbitalSpec({
// Declare containers needed for tests
withContainers(
kafkaContainer("quotes-kafka")
)
describe("integration tests") {
it("should process messages from Kafka") {
val kafkaContainer = containerForConnection<KafkaContainerSupport>("quotes-kafka")
queryForStreamOfObjects("""
stream { StockQuote }
""".trimIndent()).test {
// Send test data to Kafka
kafkaContainer.sendMessage(
"""{ "ticker": "AAPL", "price": 150.25 }""",
"stockPrices"
)
// Verify the data flows through
val result = expectRawMap()
result.shouldBe(mapOf(
"ticker" to "AAPL",
"price" to 150.25
))
}
}
}
})
Kafka Container Support
Configuration
The Kafka connector provides a pre-configured Kafka container with the necessary setup:
kafkaContainer(
connectionName = "my-kafka", // Must match connectionName in Taxi schema
groupId = "test-consumer" // Optional, auto-generated if not provided
)
Container Interaction API
Once containers are running, access them through the containerForConnection
method:
val kafkaContainer = containerForConnection<KafkaContainerSupport>("quotes-kafka")
// Send messages to topics
kafkaContainer.sendMessage(
message = """{"field": "value"}""",
topic = "my-topic",
key = "optional-key",
headers = listOf() // Optional headers
)
// Send raw bytes
kafkaContainer.sendMessage(
message = byteArrayOf(1, 2, 3),
topic = "binary-topic"
)
Testing Streaming Queries
Use Turbine’s test
function to verify streaming behavior:
queryForStreamOfObjects("stream { MyModel }").test {
// Send test data
kafkaContainer.sendMessage("""{"field": "value1"}""", "topic")
val first = expectRawMap()
first.shouldBe(mapOf("field" to "value1"))
// Send more data
kafkaContainer.sendMessage("""{"field": "value2"}""", "topic")
val second = expectRawMap()
second.shouldBe(mapOf("field" to "value2"))
// Verify stream completes or continues as expected
}
MongoDB Container Support
Configuration
The MongoDB connector provides a pre-configured MongoDB container with automatic database initialization:
mongoConnector(
connectionName = "user-db" // Must match connectionName in Taxi schema
)
Database Schema Setup
Define Taxi services that connect to MongoDB:
// src/user-management.taxi
import com.orbitalhq.mongodb.MongoService
import com.orbitalhq.mongodb.MongoOperation
model User {
id : UserId inherits String
name : UserName inherits String
email : EmailAddress inherits String
age : Age inherits Int
}
@MongoService(connectionName = "user-db")
service UserDatabase {
@MongoOperation(
database = "user_management",
collection = "users"
)
operation findUser(UserId) : User
@MongoOperation(
database = "user_management",
collection = "users"
)
operation findAllUsers() : User[]
}
Container Tests with MongoDB
Create test specs that use MongoDB containers:
// test/MongoIntegrationSpec.kt
import com.orbitalhq.preflight.dsl.OrbitalSpec
import com.orbitalhq.preflight.dsl.containers.mongo.MongoContainerSupport
import com.orbitalhq.preflight.dsl.containers.mongo.mongoConnector
import io.kotest.matchers.shouldBe
import org.bson.Document
class MongoIntegrationSpec : OrbitalSpec({
// Declare MongoDB container needed for tests
withContainers(
mongoConnector("user-db")
)
describe("MongoDB integration tests") {
it("should find users from MongoDB") {
val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
// Insert test data directly using MongoDB client
val client = mongoContainer.mongoClient()
val database = client.getDatabase("user_management")
val collection = database.getCollection("users")
val testUser = Document("id", "user123")
.append("name", "John Doe")
.append("email", "john@example.com")
.append("age", 30)
collection.insertOne(testUser).toFuture().join()
// Now test your Taxi query
val result = """
find { User(UserId == "user123") }
""".queryForObject()
result.shouldBe(mapOf(
"id" to "user123",
"name" to "John Doe",
"email" to "john@example.com",
"age" to 30
))
}
it("should find all users from collection") {
val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
val client = mongoContainer.mongoClient()
val collection = client.getDatabase("user_management")
.getCollection("users")
// Insert multiple test users
val users = listOf(
Document("id", "user1").append("name", "Alice").append("age", 25),
Document("id", "user2").append("name", "Bob").append("age", 35)
)
collection.insertMany(users).toFuture().join()
val results = """
find { User }
""".queryForCollection()
results.size shouldBe 2
results[0]["name"] shouldBe "Alice"
results[1]["name"] shouldBe "Bob"
}
}
})
Container Interaction API
Once MongoDB containers are running, access them through the containerForConnection
method:
val mongoContainer = containerForConnection<MongoContainerSupport>("user-db")
// Get direct access to MongoDB client for test data setup
val client = mongoContainer.mongoClient()
val database = client.getDatabase("your_database")
val collection = database.getCollection("your_collection")
// Insert test documents
val document = Document("field", "value")
collection.insertOne(document).toFuture().join()
// Query documents for verification
val results = collection.find().toFuture().join()
Database Initialization
MongoDB containers are automatically initialized with:
- Default database:
user_management
- Test credentials:
test_container:test_container
- Custom initialization script support via
init-schema.js
The container uses the connection string format:
mongodb://test_container:test_container@host:port/user_management
Container Lifecycle
Containers are managed automatically by Preflight:
- Startup: Containers start before your tests run
- Connection: Orbital connectors are configured with container connection details
- Execution: Your tests interact with real running infrastructure
- Cleanup: Containers are automatically stopped after tests complete
Lazy Initialization
Container connections are created lazily when you first execute a query. This ensures:
- Containers are fully started and ready
- Connection details (ports, URLs) are available
- Tests don’t fail due to timing issues
Best Practices
Connection Naming
Use descriptive connection names that match your schema:
// In build.gradle.kts
kafkaContainer("user-events-kafka")
// In schema
@KafkaService( connectionName = "user-events-kafka" )
service UserEventStream { ... }
Test Data Management
Send test data programmatically for reliable tests:
describe("user event processing") {
it("should handle user registration") {
val kafka = containerForConnection<KafkaContainerSupport>("user-events-kafka")
kafka.sendMessage("""
{
"eventType": "USER_REGISTERED",
"userId": "123",
"timestamp": "2024-01-15T10:30:00Z"
}
""".trimIndent(), "user-events")
// Test your query logic...
}
}
Resource Isolation
Each test spec gets fresh containers, ensuring isolation:
class UserEventsSpec : OrbitalSpec({
withContainers(kafkaContainer("events"))
// Tests in this spec share the same container instance
})
class OrderEventsSpec : OrbitalSpec({
withContainers(kafkaContainer("events"))
// This gets a separate container instance
})
Supported Connectors
Currently supported container types:
Kafka
- Usage:
ConnectorSupport.Kafka
- Container: Confluent Platform Kafka
- Features: Topic creation, message production, consumer groups
- API:
KafkaContainerSupport
withsendMessage()
methods
MongoDB
- Usage:
ConnectorSupport.MongoDB
- Container: MongoDB 6.0.7
- Features: Document operations, collections, database initialization
- API:
MongoContainerSupport
with direct MongoDB client access
Future Connectors
Additional connectors are planned:
- PostgreSQL databases
- REST APIs with WireMock
- Message queues (RabbitMQ, ActiveMQ)
- Redis caches
Troubleshooting
Container Startup Issues
If containers fail to start:
// Increase startup timeout
kafkaContainer("my-kafka").apply {
withStartupTimeout(Duration.ofMinutes(5))
}
Connection Timing
If you get “container not found” errors, ensure:
- Connection names match between
kafkaContainer()
and@KafkaService(connectionName=...)
- You call
containerForConnection()
after the query starts (inside test methods)
Port Conflicts
TestContainers automatically assigns available ports. If you need specific ports:
// Let TestContainers assign ports (recommended)
val kafka = containerForConnection<KafkaContainerSupport>("my-kafka")
// Use kafka.kafkaContainer.bootstrapServers for the actual address
Performance Tips
Container Reuse
Containers are reused within a spec but not between specs. For faster tests:
// Group related tests in the same spec
class KafkaIntegrationSpec : OrbitalSpec({
withContainers(kafkaContainer("kafka"))
describe("user events") {
it("handles registration") { /* ... */ }
it("handles updates") { /* ... */ }
it("handles deletion") { /* ... */ }
}
})
Parallel Execution
Different specs run in parallel with isolated containers:
// These can run simultaneously
class UserServiceSpec : OrbitalSpec({ /* ... */ })
class OrderServiceSpec : OrbitalSpec({ /* ... */ })
class PaymentServiceSpec : OrbitalSpec({ /* ... */ })