Join our FREE personalized newsletter for news, trends, and insights that matter to everyone in America

Newsletter
New

From Tb-scale Mongodb To Doris: 5 Critical Challenges And Fixes With Apache Seatunnel

Card image cap

Recently, in several data middle-platform projects, we have frequently used Apache SeaTunnel to synchronize data from MongoDB to Apache Doris. Honestly, this task looks simple, but once you actually get started, you realize there are quite a few pitfalls. Especially in production environments, where data volume is large and structures are complex, a small oversight can easily lead to problems.

This article does not intend to repeat those basic configuration steps—there are already plenty of them online. I want to focus on the areas in real production environments where people are most likely to stumble. Particularly when you are dealing with TB-level MongoDB collections that need to be stably synchronized to Doris for real-time analysis, the following five pitfalls are almost inevitable. I will combine specific error logs, troubleshooting approaches, and the solutions our team has explored to help you eliminate these issues one by one.

1. Data Type Mapping: BSON-to-SQL Conversion Issues

MongoDB’s BSON type system and Doris’s SQL type system appear to be automatically mappable on the surface, but in reality they hide quite a few “surprises.” The most typical examples are the handling of Decimal128 and ObjectId.

1.1 Precision Loss Issue with Decimal128

MongoDB uses Decimal128 to store high-precision numeric values, such as financial transaction amounts. SeaTunnel maps it to Doris’s DECIMAL type by default, but there is a key limitation here: Doris’s DECIMAL supports a maximum precision of 38 digits, while Decimal128 supports 34 decimal digits. If you do not explicitly specify precision in the SeaTunnel schema, you are likely to encounter the following error:

java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result  

The solution is to explicitly declare the precision in the schema. Do not rely on automatic inference—control it manually:

source {  
  MongoDB {  
    uri = "mongodb://user:password@host:27017"  
    database = "finance"  
    collection = "transactions"  
    schema = {  
      fields {  
        _id = string  
        amount = "decimal(38, 18)"  # Explicitly specify total precision 38, scale 18  
        currency = string  
        timestamp = timestamp  
      }  
    }  
  }  
}  

Note: If the Decimal128 values in your data contain more than 18 decimal places, you need to adjust according to your actual situation. In one of our e-commerce projects, coupon calculation required high precision, so we used decimal(38, 24).

1.2 ObjectId and Nested Document Serialization Pitfalls

The _id field in MongoDB is of type ObjectId by default, and SeaTunnel converts it to a string. This seems fine—until you discover primary key conflicts in the Doris table—because after ObjectId is converted to a string, Doris’s UNIQUE KEY check may cause issues.

Nested documents are even more troublesome. A common structure in MongoDB looks like this:

{  
  "_id": ObjectId("507f1f77bcf86cd799439011"),  
  "user": {  
    "name": "Zhang San",  
    "address": {  
      "city": "Beijing",  
      "district": "Chaoyang"  
    }  
  }  
}  

By default, SeaTunnel converts the entire user object into a JSON string and stores it in a VARCHAR field in Doris. If you want to directly query user.address.city in Doris, you have to use JSON functions to parse it, which results in poor performance.

Our approach is to flatten nested fields in advance using the transform plugin in SeaTunnel:

transform {  
  # Flatten nested fields  
  sql {  
    query = """  
      SELECT   
        _id,  
        user.name as user_name,  
        user.address.city as city,  
        user.address.district as district  
      FROM mongodb_source  
    """  
  }  
}  
  
sink {  
  Doris {  
    fenodes = "fe1:8030,fe2:8030"  
    username = "admin"  
    password = "***"  
    database = "analytics"  
    table = "user_flat"  
    # The table structure is now flat, and query performance is high  
  }  
}  

If the nesting level is too deep or uncertain, you may also consider using the MAP type in Doris, but note that it is only supported in version 2.0 and above.

2. Connection and Timeout Configuration: High-Concurrency Challenges in Production

In a test environment with dozens of records, everything works fine. Once you move to production, connection timeouts, cursor timeouts, and memory overflows all appear.

2.1 MongoDB Connection Pool and Cursor Timeout

Several key parameters in the MongoDB source plugin of SeaTunnel are easy to overlook:

Parameter Default Value Production Recommendation Description
cursor.no-timeout true false Setting to true may cause excessive accumulation of server-side cursors
fetch.size 2048 8192–16384 Adjust based on document size to reduce network round trips
max.time-min 600 30 Query timeout (minutes) to prevent long-running queries from exhausting cluster resources
partition.split-key _id Depends on business logic Partition key for parallel reads

We once encountered a major pitfall: setting cursor.no-timeout=true together with large data queries caused hundreds of cursors to accumulate on the MongoDB server side, each consuming memory and nearly bringing the cluster down. Later, we changed it to:

source {  
  MongoDB {  
    uri = "mongodb://user:password@host1:27017,host2:27017/?replicaSet=rs0&readPreference=secondaryPreferred"  
    database = "logs"  
    collection = "access_logs"  
    cursor.no-timeout = false  
    fetch.size = 16384  
    max.time-min = 30  
    partition.split-key = "_id"  
    partition.split-size = 1048576  # 1MB per partition  
  
    # Only synchronize data from the last 7 days to avoid full table scans  
    match.query = "{timestamp: {$gte: ISODate('2024-01-01T00:00:00Z')}}"  
  }  
}  

2.2 Stream Load Tuning in Doris

On the Doris Sink side, the core lies in Stream Load batch parameters. The default configuration is friendly for small data volumes, but production environments require tuning:

sink {  
  Doris {  
    fenodes = "fe1:8030,fe2:8030,fe3:8030"  
    username = "sync_user"  
    password = "***"  
    database = "dw"  
    table = "fact_table"  
  
    sink.label-prefix = "seatunnel_sync"  
    sink.enable-2pc = true  # Enable two-phase commit to ensure Exactly-Once  
    sink.buffer-size = 524288  # 512KB, default 256KB is too small  
    sink.buffer-count = 5  # Number of buffers  
    doris.batch.size = 5000  # 5000 rows per batch, default 1024  
  
    # Key: advanced Stream Load parameters  
    doris.config = {  
      format = "json"  
      read_json_by_line = "true"  
      strip_outer_array = "true"  
      num_as_string = "true"  # Convert numbers to string to avoid type issues  
  
      # Connection and timeout control  
      connect_timeout = "10"  
      socket_timeout = "30"  
  
      # Partial update mode (if the table uses Unique model)  
      partial_columns = "true"  
      merge_type = "MERGE"  
    }  
  }  
}  

There is a detail here: sink.label-prefix must be unique for each task; otherwise, Doris will reject duplicate import labels. We use the pattern "seatunnel_${job_id}_${timestamp}".

3. Performance Bottleneck Identification and Optimization: From Hours to Minutes

When synchronization tasks run slowly, it is usually not caused by a single reason, but by multiple factors stacking together.

3.1 Diagnostic Toolchain

First, you need to know where the bottleneck is. Our commonly used monitoring combination:

  1. SeaTunnel’s own logs: enable DEBUG level to see the reading progress of each partition
  2. MongoDB Profiler: temporarily enable to confirm whether the query uses indexes
  3. Doris FE/BE monitoring: use show proc '/current_queries' to check import status
  4. System monitoring: CPU, memory, network IO

There was once a case where synchronization speed was stuck at 1000 records per second and could not improve. After investigation, we found:

  • On the MongoDB side: the query used the $or operator and did not use an index
  • Network: cross-availability-zone transmission with high latency
  • On the Doris side: BE node disk IO was saturated

3.2 Partition Strategy Optimization

SeaTunnel supports parallel reading based on partition.split-key. However, using _id as the default partition key is not always optimal.

If the data has a natural time dimension, such as a log table, partitioning by a time field works better:

source {  
  MongoDB {  
    # Assume each record has an event_time field  
    partition.split-key = "event_time"  
    partition.split-size = 3600000  # Partition by 1 hour  
  
    # Combine with query conditions to avoid full table scans  
    match.query = """  
      {  
        event_time: {  
          $gte: ISODate("2024-01-01T00:00:00Z"),  
          $lt: ISODate("2024-01-02T00:00:00Z")  
        }  
      }  
    """  
  }  
}  

If the data distribution is uneven, you can first analyze key distribution using an aggregation query:

// Execute in MongoShell  
db.collection.aggregate([  
  { $bucketAuto: { groupBy: "$shard_key", buckets: 10 } }  
])  

3.3 Memory and GC Tuning

SeaTunnel is based on the JVM, and GC issues are common with large data volumes. Our production JVM parameters:

# seatunnel_env.sh or startup script  
export JAVA_OPTS="-Xmx8g -Xms8g \  
-XX:+UseG1GC \  
-XX:MaxGCPauseMillis=200 \  
-XX:InitiatingHeapOccupancyPercent=35 \  
-XX:ParallelGCThreads=4 \  
-XX:ConcGCThreads=2 \  
-XX:+AlwaysPreTouch \  
-XX:+UseStringDeduplication \  
-XX:+PrintGCDetails \  
-XX:+PrintGCDateStamps \  
-Xloggc:/var/log/seatunnel/gc.log"  

The key point is -XX:+AlwaysPreTouch, which pre-allocates memory at startup to avoid runtime jitter.

4. Data Consistency and Error Handling: Exactly-Once Implementation Details

Data synchronization must not lose data, nor duplicate data. SeaTunnel supports Exactly-Once semantics, but it must be configured correctly.

4.1 Pitfalls of Two-Phase Commit (2PC)

Setting sink.enable-2pc = true in the Doris Sink enables two-phase commit, theoretically ensuring Exactly-Once. However, we once encountered a strange issue: after a task failed and retried, data was duplicated.

The reason was repeated use of labels. When SeaTunnel retries after failure using the same label-prefix, Doris may treat it as the same import task and skip certain data.

Solution: include timestamp and attempt count in the label:

sink {  
  Doris {  
    sink.label-prefix = "sync_${table_name}_${now()}_${attempt_num}"  
    sink.enable-2pc = true  
    sink.max-retries = 3  
    sink.check-interval = 5000  # Check every 5 seconds  
  }  
}  

4.2 Dirty Data and Type Conversion Errors

MongoDB is schema-less; the same field may be a string in one row and a number in another. Doris has a strict schema and will report errors on type mismatch.

The needs_unsupported_type_casting parameter in SeaTunnel can help:

sink {  
  Doris {  
    # Attempt to automatically convert incompatible types, e.g., Decimal to Double  
    needs_unsupported_type_casting = true  
  
    # But it is recommended to handle this in the transform layer  
  }  
}  
  
transform {  
  # Normalize types before writing  
  sql {  
    query = """  
      SELECT   
        CAST(amount AS DOUBLE) as amount_double,  
        COALESCE(name, '') as name_safe,  # Handle null  
        REGEXP_REPLACE(description, '[\\x00-\\x1F]', '') as description_clean  
      FROM source_table  
    """  
  }  
}  

4.3 Resume from Breakpoint and Checkpoint

SeaTunnel supports Checkpoint, but the storage backend must be configured correctly. We use HDFS:

env {  
  execution.parallelism = 8  
  job.mode = "BATCH"  
  
  # Checkpoint configuration  
  checkpoint.interval = 60000  # Once per minute  
  checkpoint.timeout = 600000  # 10-minute timeout  
  checkpoint.max-concurrent-checkpoints = 1  
  
  state.backend = "hdfs"  
  state.checkpoints.dir = "hdfs://namenode:8020/seatunnel/checkpoints"  
  state.savepoints.dir = "hdfs://namenode:8020/seatunnel/savepoints"  
  
  # Restore from the latest checkpoint after task failure  
  execution.savepoint-restore.enabled = true  
}  

A detail here: if the Checkpoint frequency is too high, it affects performance; if too low, too much data may be reprocessed during recovery. We generally decide based on data volume—for example, one Checkpoint per one million rows processed.

5. Operations Monitoring and Alerting: From Passive Firefighting to Proactive Prevention

This last one is not a technical pitfall, but it is more fatal than technical ones—lack of monitoring. Often, you only discover that the sync task has failed long ago when users report incorrect data.

5.1 Key Metric Monitoring

We monitor the following metrics in Prometheus (exposed via SeaTunnel JMX):

Metric Description Alert Threshold
seatunnel_source_records_total Total records read from source Zero for 5 consecutive minutes
seatunnel_sink_records_total Total records written to sink Difference from source exceeds 10%
seatunnel_sink_duration_seconds Sink write duration P95 > 10 seconds
seatunnel_sink_errors_total Total sink write errors Alert on any error
seatunnel_checkpoint_duration Checkpoint duration > 30 seconds
seatunnel_jvm_memory_used JVM memory usage > 80%

Grafana dashboard configuration example:

-- Sync lag monitoring  
SELECT   
  time_bucket('1m', timestamp) as time,  
  source_max_timestamp - sink_max_timestamp as lag_seconds  
FROM (  
  -- Maximum timestamp from source  
  SELECT MAX(event_time) as source_max_timestamp  
  FROM mongodb_source_table  
  WHERE event_time > now() - interval '1 hour'  
) source,  
(  
  -- Maximum timestamp from sink  
  SELECT MAX(event_time) as sink_max_timestamp    
  FROM doris_target_table  
  WHERE event_time > now() - interval '1 hour'  
) sink  
GROUP BY 1  
ORDER BY 1 DESC  

5.2 Automated Repair Scripts

Some common errors can be automatically fixed. For example, insufficient Doris table space:

#!/bin/bash  
# auto_extend_doris.sh  
  
ERROR_LOG=$1  
TABLE_NAME=$(grep -o "table [a-zA-Z0-9_]*" "$ERROR_LOG" | head -1 | cut -d' ' -f2)  
  
if [[ -n "$TABLE_NAME" ]]; then  
  # Check table partition usage  
  USAGE=$(mysql -h doris-fe -P 9030 -u admin -p'***' -e \  
    "SHOW PARTITIONS FROM $TABLE_NAME WHERE UsedPercent > 90;" | wc -l)  
  
  if [[ $USAGE -gt 0 ]]; then  
    # Automatically add partition  
    mysql -h doris-fe -P 9030 -u admin -p'***' <<EOF  
    ALTER TABLE $TABLE_NAME ADD PARTITION p_$(date +%Y%m%d)   
    VALUES [("$(date +%Y-%m-%d)"), ("$(date -d '+7 days' +%Y-%m-%d)"));  
EOF  
    echo "Automatic partition extension completed, restart SeaTunnel task"  
    systemctl restart seatunnel-worker  
  fi  
fi  

5.3 Data Quality Validation

Automatically validate after synchronization completes:

# validate_sync.py  
import pymongo  
import pymysql  
from datetime import datetime, timedelta  
  
def validate_counts():  
    # MongoDB count  
    mongo_client = pymongo.MongoClient("mongodb://host:27017")  
    mongo_count = mongo_client.db.collection.count_documents({  
        "update_time": {"$gte": datetime.utcnow() - timedelta(hours=1)}  
    })  
  
    # Doris count    
    doris_conn = pymysql.connect(host="doris-fe", port=9030,  
                                 user="admin", password="***", database="dw")  
    with doris_conn.cursor() as cursor:  
        cursor.execute("""  
            SELECT COUNT(*)   
            FROM target_table   
            WHERE update_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)  
        """)  
        doris_count = cursor.fetchone()[0]  
  
    # Allow 1% deviation (considering deletes, updates, etc.)  
    diff_ratio = abs(mongo_count - doris_count) / max(mongo_count, 1)  
  
    if diff_ratio > 0.01:  
        send_alert(f"Data inconsistency: MongoDB={mongo_count}, Doris={doris_count}, Difference={diff_ratio:.2%}")  
        return False  
    return True  

After building this monitoring system, our team has never again been woken up by midnight alarms—not because there are no problems, but because issues are automatically handled before impacting the business.

These weren’t theoretical issues — they cost us real time in production. Data synchronization is not just about correct configuration; the real challenge lies in stable operation in production environments. Next time you encounter SeaTunnel synchronization issues, start troubleshooting from these five aspects—you will most likely find the direction. Every environment has its own particularities, but the core problem-solving ideas are universal.