In many real-world Talend implementations, teams need a lightweight yet powerful ETL solution that provides full control over SQL logic, schema evolution, and audit tracking—without relying on multiple Talend components or licensed ETL tools.
This blog explains how to build a dynamic MySQL ETL pipeline in Talend using only three components:
All ETL logic—including schema discovery, table creation, schema synchronization, data loading, and cumulative audit logging—is implemented using pure Java (JDBC) inside a single tJava component.
| Component | Responsibility |
|---|---|
| tPrejob | Job initialization |
| tLibraryLoad | Loads MySQL JDBC driver |
| tJava | Executes complete ETL logic |
This architecture is commonly used in advanced Talend jobs where developers prefer Java-driven orchestration over component-heavy pipelines.
tPrejob ensures that initialization logic executes once before the ETL process starts. It controls execution order and prepares the runtime environment.
Always place driver loading and initialization logic inside tPrejob.
tLibraryLoad dynamically loads the MySQL JDBC driver at runtime.
All ETL functionality is implemented inside a single tJava component, including:
This approach provides maximum flexibility, transparency, and control.
/*Advance Settings*/
import java.sql.*;
import java.util.*;
/* =========================
tjava Basic Settings CONFIGURATION
========================= */
String mysqlUrl = "jdbc:mysql://localhost:3306/";
String mysqlUser = "*****";
String mysqlPassword = "*******";
/* SOURCE */
String sourceDb = "*****";
String sourceTable = "******";
/* TARGET */
String targetDb = "*****_audit_db";
String targetTable = "******_dt";
/* AUDIT TABLE */
String auditTable = "etl_audit_log";
/* LOAD USER */
String loadUser = mysqlUser;
/* Fully qualified names */
String sourceFullTable = sourceDb + "." + sourceTable;
String targetFullTable = targetDb + "." + targetTable;
/* =========================
CONNECT
========================= */
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword);
Statement readStmt = conn.createStatement();
Statement writeStmt = conn.createStatement();
/* =========================
CREATE TARGET DATABASE
========================= */
writeStmt.execute("CREATE DATABASE IF NOT EXISTS " + targetDb);
/* =========================
READ SOURCE COLUMNS
========================= */
List srcColumns = new ArrayList<>();
ResultSet srcColRs = readStmt.executeQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_SCHEMA = '" + sourceDb + "' " +
"AND TABLE_NAME = '" + sourceTable + "' " +
"ORDER BY ORDINAL_POSITION"
);
while (srcColRs.next()) {
srcColumns.add(srcColRs.getString("COLUMN_NAME"));
}
srcColRs.close();
/* =========================
CREATE TARGET DATA TABLE
========================= */
StringBuilder createTargetSQL = new StringBuilder(
"CREATE TABLE IF NOT EXISTS " + targetFullTable + " ("
);
for (int i = 0; i < srcColumns.size(); i++) {
createTargetSQL.append("`").append(srcColumns.get(i)).append("` VARCHAR(255)");
if (i < srcColumns.size() - 1) {
createTargetSQL.append(",");
}
}
createTargetSQL.append(")");
writeStmt.execute(createTargetSQL.toString());
/* =========================
SYNC NEW SOURCE COLUMNS
========================= */
Set tgtColumns = new HashSet<>();
ResultSet tgtColRs = readStmt.executeQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_SCHEMA = '" + targetDb + "' " +
"AND TABLE_NAME = '" + targetTable + "'"
);
while (tgtColRs.next()) {
tgtColumns.add(tgtColRs.getString("COLUMN_NAME"));
}
tgtColRs.close();
for (String col : srcColumns) {
if (!tgtColumns.contains(col)) {
writeStmt.execute(
"ALTER TABLE " + targetFullTable +
" ADD COLUMN `" + col + "` VARCHAR(255)"
);
}
}
/* =========================
SOURCE ROW COUNT
========================= */
int sourceCount = 0;
ResultSet cntRs = readStmt.executeQuery(
"SELECT COUNT(*) FROM " + sourceFullTable
);
if (cntRs.next()) {
sourceCount = cntRs.getInt(1);
}
cntRs.close();
/* =========================
READ SOURCE DATA
========================= */
ResultSet srcRs = readStmt.executeQuery(
"SELECT * FROM " + sourceFullTable
);
/* =========================
INSERT INTO TARGET DATA TABLE
========================= */
int insertedThisRun = 0;
StringBuilder colBuilder = new StringBuilder();
for (int i = 0; i < srcColumns.size(); i++) {
colBuilder.append("`").append(srcColumns.get(i)).append("`");
if (i < srcColumns.size() - 1) {
colBuilder.append(",");
}
}
while (srcRs.next()) {
StringBuilder valBuilder = new StringBuilder("(");
for (int i = 0; i < srcColumns.size(); i++) {
String val = srcRs.getString(srcColumns.get(i));
if (val == null) {
valBuilder.append("NULL");
} else {
valBuilder.append("'").append(val.replace("'", "''")).append("'");
}
if (i < srcColumns.size() - 1) {
valBuilder.append(",");
}
}
valBuilder.append(")");
writeStmt.execute(
"INSERT INTO " + targetFullTable +
" (" + colBuilder + ") VALUES " + valBuilder
);
insertedThisRun++;
}
srcRs.close();
/* =========================
CREATE AUDIT TABLE
========================= */
writeStmt.execute(
"CREATE TABLE IF NOT EXISTS " + targetDb + "." + auditTable + " (" +
"id INT AUTO_INCREMENT PRIMARY KEY," +
"source_table_name VARCHAR(200)," +
"target_table_name VARCHAR(200)," +
"source_count INT," +
"target_count INT," +
"load_user VARCHAR(100)," +
"load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP" +
")"
);
/* =========================
READ PREVIOUS TARGET COUNT
========================= */
int previousTargetCount = 0;
ResultSet prevAuditRs = readStmt.executeQuery(
"SELECT target_count FROM " + targetDb + "." + auditTable +
" WHERE target_table_name = '" + targetFullTable + "'" +
" ORDER BY load_timestamp DESC LIMIT 1"
);
if (prevAuditRs.next()) {
previousTargetCount = prevAuditRs.getInt("target_count");
}
prevAuditRs.close();
/* =========================
INSERT AUDIT RECORD (CUMULATIVE)
========================= */
int cumulativeTargetCount = previousTargetCount + insertedThisRun;
writeStmt.execute(
"INSERT INTO " + targetDb + "." + auditTable +
" (source_table_name, target_table_name, source_count, target_count, load_user) VALUES (" +
"'" + sourceFullTable + "'," +
"'" + targetFullTable + "'," +
sourceCount + "," +
cumulativeTargetCount + "," +
"'" + loadUser + "'" +
")"
);
/* =========================
CLOSE
========================= */
readStmt.close();
writeStmt.close();
conn.close();
System.out.println("JOB COMPLETED SUCCESSFULLY");
System.out.println("SOURCE COUNT = " + sourceCount);
System.out.println("INSERTED THIS RUN = " + insertedThisRun);
System.out.println("TARGET TOTAL COUNT = " + cumulativeTargetCount);
System.out.println("AUDIT ROW INSERTED");
This ETL uses a cumulative audit model, meaning:
1. tPrejob initializes job execution
2. tLibraryLoad loads the MySQL JDBC driver
3. tJava performs:
4. Job completes with clear execution metrics
This Talend ETL pattern demonstrates that robust, auditable, and production-ready pipelines can be built using only tPrejob, tLibraryLoad, and tJava. By leveraging JDBC and metadata-driven logic, teams gain full control, transparency, and flexibility without overengineering their Talend jobs.