From Tb-scale Mongodb To Doris: 5 Critical Challenges And Fixes With Apache Seatunnel
Recently, in several data middle-platform projects, we have frequently used Apache SeaTunnel to synchronize data from MongoDB to Apache Doris. Honestly, this task looks simple, but once you actually get started, you realize there are quite a few pitfalls. Especially in production environments, where data volume is large and structures are complex, a small oversight can easily lead to problems.
This article does not intend to repeat those basic configuration steps—there are already plenty of them online. I want to focus on the areas in real production environments where people are most likely to stumble. Particularly when you are dealing with TB-level MongoDB collections that need to be stably synchronized to Doris for real-time analysis, the following five pitfalls are almost inevitable. I will combine specific error logs, troubleshooting approaches, and the solutions our team has explored to help you eliminate these issues one by one.
1. Data Type Mapping: BSON-to-SQL Conversion Issues
MongoDB’s BSON type system and Doris’s SQL type system appear to be automatically mappable on the surface, but in reality they hide quite a few “surprises.” The most typical examples are the handling of Decimal128 and ObjectId.
1.1 Precision Loss Issue with Decimal128
MongoDB uses Decimal128 to store high-precision numeric values, such as financial transaction amounts. SeaTunnel maps it to Doris’s DECIMAL type by default, but there is a key limitation here: Doris’s DECIMAL supports a maximum precision of 38 digits, while Decimal128 supports 34 decimal digits. If you do not explicitly specify precision in the SeaTunnel schema, you are likely to encounter the following error:
java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result
The solution is to explicitly declare the precision in the schema. Do not rely on automatic inference—control it manually:
source {
MongoDB {
uri = "mongodb://user:password@host:27017"
database = "finance"
collection = "transactions"
schema = {
fields {
_id = string
amount = "decimal(38, 18)" # Explicitly specify total precision 38, scale 18
currency = string
timestamp = timestamp
}
}
}
}
Note: If the Decimal128 values in your data contain more than 18 decimal places, you need to adjust according to your actual situation. In one of our e-commerce projects, coupon calculation required high precision, so we used decimal(38, 24).
1.2 ObjectId and Nested Document Serialization Pitfalls
The _id field in MongoDB is of type ObjectId by default, and SeaTunnel converts it to a string. This seems fine—until you discover primary key conflicts in the Doris table—because after ObjectId is converted to a string, Doris’s UNIQUE KEY check may cause issues.
Nested documents are even more troublesome. A common structure in MongoDB looks like this:
{
"_id": ObjectId("507f1f77bcf86cd799439011"),
"user": {
"name": "Zhang San",
"address": {
"city": "Beijing",
"district": "Chaoyang"
}
}
}
By default, SeaTunnel converts the entire user object into a JSON string and stores it in a VARCHAR field in Doris. If you want to directly query user.address.city in Doris, you have to use JSON functions to parse it, which results in poor performance.
Our approach is to flatten nested fields in advance using the transform plugin in SeaTunnel:
transform {
# Flatten nested fields
sql {
query = """
SELECT
_id,
user.name as user_name,
user.address.city as city,
user.address.district as district
FROM mongodb_source
"""
}
}
sink {
Doris {
fenodes = "fe1:8030,fe2:8030"
username = "admin"
password = "***"
database = "analytics"
table = "user_flat"
# The table structure is now flat, and query performance is high
}
}
If the nesting level is too deep or uncertain, you may also consider using the MAP type in Doris, but note that it is only supported in version 2.0 and above.
2. Connection and Timeout Configuration: High-Concurrency Challenges in Production
In a test environment with dozens of records, everything works fine. Once you move to production, connection timeouts, cursor timeouts, and memory overflows all appear.
2.1 MongoDB Connection Pool and Cursor Timeout
Several key parameters in the MongoDB source plugin of SeaTunnel are easy to overlook:
| Parameter | Default Value | Production Recommendation | Description |
|---|---|---|---|
cursor.no-timeout |
true | false | Setting to true may cause excessive accumulation of server-side cursors |
fetch.size |
2048 | 8192–16384 | Adjust based on document size to reduce network round trips |
max.time-min |
600 | 30 | Query timeout (minutes) to prevent long-running queries from exhausting cluster resources |
partition.split-key |
_id |
Depends on business logic | Partition key for parallel reads |
We once encountered a major pitfall: setting cursor.no-timeout=true together with large data queries caused hundreds of cursors to accumulate on the MongoDB server side, each consuming memory and nearly bringing the cluster down. Later, we changed it to:
source {
MongoDB {
uri = "mongodb://user:password@host1:27017,host2:27017/?replicaSet=rs0&readPreference=secondaryPreferred"
database = "logs"
collection = "access_logs"
cursor.no-timeout = false
fetch.size = 16384
max.time-min = 30
partition.split-key = "_id"
partition.split-size = 1048576 # 1MB per partition
# Only synchronize data from the last 7 days to avoid full table scans
match.query = "{timestamp: {$gte: ISODate('2024-01-01T00:00:00Z')}}"
}
}
2.2 Stream Load Tuning in Doris
On the Doris Sink side, the core lies in Stream Load batch parameters. The default configuration is friendly for small data volumes, but production environments require tuning:
sink {
Doris {
fenodes = "fe1:8030,fe2:8030,fe3:8030"
username = "sync_user"
password = "***"
database = "dw"
table = "fact_table"
sink.label-prefix = "seatunnel_sync"
sink.enable-2pc = true # Enable two-phase commit to ensure Exactly-Once
sink.buffer-size = 524288 # 512KB, default 256KB is too small
sink.buffer-count = 5 # Number of buffers
doris.batch.size = 5000 # 5000 rows per batch, default 1024
# Key: advanced Stream Load parameters
doris.config = {
format = "json"
read_json_by_line = "true"
strip_outer_array = "true"
num_as_string = "true" # Convert numbers to string to avoid type issues
# Connection and timeout control
connect_timeout = "10"
socket_timeout = "30"
# Partial update mode (if the table uses Unique model)
partial_columns = "true"
merge_type = "MERGE"
}
}
}
There is a detail here: sink.label-prefix must be unique for each task; otherwise, Doris will reject duplicate import labels. We use the pattern "seatunnel_${job_id}_${timestamp}".
3. Performance Bottleneck Identification and Optimization: From Hours to Minutes
When synchronization tasks run slowly, it is usually not caused by a single reason, but by multiple factors stacking together.
3.1 Diagnostic Toolchain
First, you need to know where the bottleneck is. Our commonly used monitoring combination:
- SeaTunnel’s own logs: enable DEBUG level to see the reading progress of each partition
- MongoDB Profiler: temporarily enable to confirm whether the query uses indexes
- Doris FE/BE monitoring: use
show proc '/current_queries'to check import status - System monitoring: CPU, memory, network IO
There was once a case where synchronization speed was stuck at 1000 records per second and could not improve. After investigation, we found:
- On the MongoDB side: the query used the
$oroperator and did not use an index - Network: cross-availability-zone transmission with high latency
- On the Doris side: BE node disk IO was saturated
3.2 Partition Strategy Optimization
SeaTunnel supports parallel reading based on partition.split-key. However, using _id as the default partition key is not always optimal.
If the data has a natural time dimension, such as a log table, partitioning by a time field works better:
source {
MongoDB {
# Assume each record has an event_time field
partition.split-key = "event_time"
partition.split-size = 3600000 # Partition by 1 hour
# Combine with query conditions to avoid full table scans
match.query = """
{
event_time: {
$gte: ISODate("2024-01-01T00:00:00Z"),
$lt: ISODate("2024-01-02T00:00:00Z")
}
}
"""
}
}
If the data distribution is uneven, you can first analyze key distribution using an aggregation query:
// Execute in MongoShell
db.collection.aggregate([
{ $bucketAuto: { groupBy: "$shard_key", buckets: 10 } }
])
3.3 Memory and GC Tuning
SeaTunnel is based on the JVM, and GC issues are common with large data volumes. Our production JVM parameters:
# seatunnel_env.sh or startup script
export JAVA_OPTS="-Xmx8g -Xms8g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:ParallelGCThreads=4 \
-XX:ConcGCThreads=2 \
-XX:+AlwaysPreTouch \
-XX:+UseStringDeduplication \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-Xloggc:/var/log/seatunnel/gc.log"
The key point is -XX:+AlwaysPreTouch, which pre-allocates memory at startup to avoid runtime jitter.
4. Data Consistency and Error Handling: Exactly-Once Implementation Details
Data synchronization must not lose data, nor duplicate data. SeaTunnel supports Exactly-Once semantics, but it must be configured correctly.
4.1 Pitfalls of Two-Phase Commit (2PC)
Setting sink.enable-2pc = true in the Doris Sink enables two-phase commit, theoretically ensuring Exactly-Once. However, we once encountered a strange issue: after a task failed and retried, data was duplicated.
The reason was repeated use of labels. When SeaTunnel retries after failure using the same label-prefix, Doris may treat it as the same import task and skip certain data.
Solution: include timestamp and attempt count in the label:
sink {
Doris {
sink.label-prefix = "sync_${table_name}_${now()}_${attempt_num}"
sink.enable-2pc = true
sink.max-retries = 3
sink.check-interval = 5000 # Check every 5 seconds
}
}
4.2 Dirty Data and Type Conversion Errors
MongoDB is schema-less; the same field may be a string in one row and a number in another. Doris has a strict schema and will report errors on type mismatch.
The needs_unsupported_type_casting parameter in SeaTunnel can help:
sink {
Doris {
# Attempt to automatically convert incompatible types, e.g., Decimal to Double
needs_unsupported_type_casting = true
# But it is recommended to handle this in the transform layer
}
}
transform {
# Normalize types before writing
sql {
query = """
SELECT
CAST(amount AS DOUBLE) as amount_double,
COALESCE(name, '') as name_safe, # Handle null
REGEXP_REPLACE(description, '[\\x00-\\x1F]', '') as description_clean
FROM source_table
"""
}
}
4.3 Resume from Breakpoint and Checkpoint
SeaTunnel supports Checkpoint, but the storage backend must be configured correctly. We use HDFS:
env {
execution.parallelism = 8
job.mode = "BATCH"
# Checkpoint configuration
checkpoint.interval = 60000 # Once per minute
checkpoint.timeout = 600000 # 10-minute timeout
checkpoint.max-concurrent-checkpoints = 1
state.backend = "hdfs"
state.checkpoints.dir = "hdfs://namenode:8020/seatunnel/checkpoints"
state.savepoints.dir = "hdfs://namenode:8020/seatunnel/savepoints"
# Restore from the latest checkpoint after task failure
execution.savepoint-restore.enabled = true
}
A detail here: if the Checkpoint frequency is too high, it affects performance; if too low, too much data may be reprocessed during recovery. We generally decide based on data volume—for example, one Checkpoint per one million rows processed.
5. Operations Monitoring and Alerting: From Passive Firefighting to Proactive Prevention
This last one is not a technical pitfall, but it is more fatal than technical ones—lack of monitoring. Often, you only discover that the sync task has failed long ago when users report incorrect data.
5.1 Key Metric Monitoring
We monitor the following metrics in Prometheus (exposed via SeaTunnel JMX):
| Metric | Description | Alert Threshold |
|---|---|---|
seatunnel_source_records_total |
Total records read from source | Zero for 5 consecutive minutes |
seatunnel_sink_records_total |
Total records written to sink | Difference from source exceeds 10% |
seatunnel_sink_duration_seconds |
Sink write duration | P95 > 10 seconds |
seatunnel_sink_errors_total |
Total sink write errors | Alert on any error |
seatunnel_checkpoint_duration |
Checkpoint duration | > 30 seconds |
seatunnel_jvm_memory_used |
JVM memory usage | > 80% |
Grafana dashboard configuration example:
-- Sync lag monitoring
SELECT
time_bucket('1m', timestamp) as time,
source_max_timestamp - sink_max_timestamp as lag_seconds
FROM (
-- Maximum timestamp from source
SELECT MAX(event_time) as source_max_timestamp
FROM mongodb_source_table
WHERE event_time > now() - interval '1 hour'
) source,
(
-- Maximum timestamp from sink
SELECT MAX(event_time) as sink_max_timestamp
FROM doris_target_table
WHERE event_time > now() - interval '1 hour'
) sink
GROUP BY 1
ORDER BY 1 DESC
5.2 Automated Repair Scripts
Some common errors can be automatically fixed. For example, insufficient Doris table space:
#!/bin/bash
# auto_extend_doris.sh
ERROR_LOG=$1
TABLE_NAME=$(grep -o "table [a-zA-Z0-9_]*" "$ERROR_LOG" | head -1 | cut -d' ' -f2)
if [[ -n "$TABLE_NAME" ]]; then
# Check table partition usage
USAGE=$(mysql -h doris-fe -P 9030 -u admin -p'***' -e \
"SHOW PARTITIONS FROM $TABLE_NAME WHERE UsedPercent > 90;" | wc -l)
if [[ $USAGE -gt 0 ]]; then
# Automatically add partition
mysql -h doris-fe -P 9030 -u admin -p'***' <<EOF
ALTER TABLE $TABLE_NAME ADD PARTITION p_$(date +%Y%m%d)
VALUES [("$(date +%Y-%m-%d)"), ("$(date -d '+7 days' +%Y-%m-%d)"));
EOF
echo "Automatic partition extension completed, restart SeaTunnel task"
systemctl restart seatunnel-worker
fi
fi
5.3 Data Quality Validation
Automatically validate after synchronization completes:
# validate_sync.py
import pymongo
import pymysql
from datetime import datetime, timedelta
def validate_counts():
# MongoDB count
mongo_client = pymongo.MongoClient("mongodb://host:27017")
mongo_count = mongo_client.db.collection.count_documents({
"update_time": {"$gte": datetime.utcnow() - timedelta(hours=1)}
})
# Doris count
doris_conn = pymysql.connect(host="doris-fe", port=9030,
user="admin", password="***", database="dw")
with doris_conn.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM target_table
WHERE update_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
""")
doris_count = cursor.fetchone()[0]
# Allow 1% deviation (considering deletes, updates, etc.)
diff_ratio = abs(mongo_count - doris_count) / max(mongo_count, 1)
if diff_ratio > 0.01:
send_alert(f"Data inconsistency: MongoDB={mongo_count}, Doris={doris_count}, Difference={diff_ratio:.2%}")
return False
return True
After building this monitoring system, our team has never again been woken up by midnight alarms—not because there are no problems, but because issues are automatically handled before impacting the business.
These weren’t theoretical issues — they cost us real time in production. Data synchronization is not just about correct configuration; the real challenge lies in stable operation in production environments. Next time you encounter SeaTunnel synchronization issues, start troubleshooting from these five aspects—you will most likely find the direction. Every environment has its own particularities, but the core problem-solving ideas are universal.
Popular Products
-
Put Me Down Funny Toilet Seat Sticker$33.56$16.78 -
Stainless Steel Tongue Scrapers$33.56$16.78 -
Stylish Blue Light Blocking Glasses$85.56$42.78 -
Adjustable Ankle Tension Rope$53.56$26.78 -
Electronic Bidet Toilet Seat$981.56$490.78