Problema encontrado
En nuestro proyecto, SeatUnnel se utiliza para extraer datos de la base de datos empresarial en el almacén de datos (StoRrocks), y ya hemos utilizado con éxito MySQL-CDC para la sincronización en tiempo actual a gran escala. Sin embargo, encontramos un problema anormal al sincronizar una tabla MySQL en explicit: después de que comenzó el trabajo, los registros mostraron cero recuentos de lectura y escritura, y el trabajo no se detuvo durante mucho tiempo. Después de 6 horas de ejecución, terminó con un error de tiempo de espera del punto de management.
La estructura del trabajo es la siguiente (información confidencial eliminada):
Registros de clave durante la ejecución:
Fondo
- Escenario: extracción de datos en tiempo actual de MySQL a Starrocks usando MySQL-CDC
- Versión de SeatUnnel: 2.3.9
- Versión de MySQL: 8.x
- Versión de Starrocks: 3.2
- Volumen de datos de la tabla de origen: 60–70 millones de filas
Preguntas clave
- ¿Por qué los recuentos de lectura y escritura permanecen en 0?
- ¿Por qué lleva tanto tiempo lanzar un error de tiempo de espera?
Proceso de análisis
Hemos utilizado MySQL-CDC para muchos trabajos de sincronización antes, y las configuraciones fueron en su mayoría el mismo, por lo que el problema probablemente no se puede con SEATUnnel.
Comparamos esta tabla de origen con las que anteriormente exitosos para ver si había diferencias.
Efectivamente, encontramos algo sospechoso:
Las tablas anteriores tenían claves primarias de incremento automático; Este no lo hizo. Solo tenía múltiples índices únicos.
Entonces surge la pregunta: ¿cómo exactamente los datos de sincronización de SeatUnnel?
Hasta donde sabemos, SeatUnnel utiliza un enfoque de dos pasos al sincronizar los datos de los CDC: primera sincronización de instantánea, luego sincronización incremental basada en Binlog.
Dado que Learn Depend sigue siendo cero, el trabajo debe estar atascado en la fase de instantánea. Entonces, ¿cómo funciona la sincronización de instantáneas?
Revisamos los documentos oficiales de Seatunnel:
MySQL CDC | Apache Seatunnel:
https://seatunnel.apache.org/zh-cn/docs/2.3.9/connector-v2/supply/mysql-cdc
No hay ninguna explicación arquitectónica, pero encontramos algunos parámetros configurables.
Explicación de parámetros
chunk-key.even-distribution.issue.upper-bound
Valor predeterminado: 100
Descripción:
El límite superior del issue de distribución de la tecla de fragmento. Este issue se utiliza para determinar si los datos de la tabla se distribuyen uniformemente. Si el issue de distribución (p. Ej. (MAX (ID) – MIN (ID) + 1) / recuento de filas) es ≤ este valor, la tabla se considera uniformemente distribuida y utilizará fragmentos uniformes. Si excede este valor, y el número estimado de fragmentos supera sample-sharding.threshold
se utilizará la estrategia de fragmentación basada en muestreo. Valor predeterminado: 100.0
chunk-key.even-distribution.issue.lower-bound
Valor predeterminado: 0.5
Descripción:
El límite inferior del issue de distribución. Si el issue de distribución es ≥ este valor, la tabla se considera uniformemente distribuida. De lo contrario, se considera desigual y podría desencadenar fragmentos basados en el muestreo.
sample-sharding.threshold
Valor predeterminado: 1000
Descripción:
Si el issue de distribución está fuera del [lower, upper] El rango y el número estimado de fragmentos (aproximadamente el tamaño de la fila / tamaño del trozo) excede este umbral, se utilizará la estrategia de fragmentación basada en muestreo. Esto mejora la eficiencia de grandes conjuntos de datos.
inverse-sampling.price
Valor predeterminado: 1000
Descripción:
Utilizado en fragmentos basados en muestreo. Un valor de 1000 significa una tasa de muestreo de 1/1000. Controla la granularidad del muestreo y afecta el número de fragmentos finales.
snapshot.cut up.measurement
Valor predeterminado: 8096
Descripción:
El número de filas por fragmento en sincronización de instantáneas. Las tablas se dividirán en trozos basados en esto.
snapshot.fetch.measurement
Valor predeterminado: 1024
Descripción:
Número máximo de filas obtenidas por encuesta durante la lectura de instantáneas.
De estos parámetros, aprendimos:
Durante la sincronización de la instantánea, SeatUnnel fragará los datos en múltiples divisiones. La estrategia de fragmentación depende de si los datos se distribuyen uniformemente.
Nuestra tabla tiene ~ 60 millones de filas (estimadas por el private comercial ya que no pudimos contarlas directamente).
Dado que la tabla no tiene una clave principal, no estábamos seguros de qué utiliza SeaTunnel de campo para fragmentos.
Asumimos que usó la columna ID, que tiene un índice único y probado:
SELECT MAX(ID), MIN(ID) FROM desk;
- Valor de clave máxima: 804306477418
- Valor clave mínimo: 607312608210
- Issue de distribución = (804306477418 – 607312608210 + 1) / 60,000,000 ≈ 3283.23
Esto está claramente fuera del [0.5, 100] Rango “incluso” → Entonces SeatUnnel considera esta distribución desigual.
- Tamaño de fragmento predeterminado: 8096
- Recuento de fragmentos = 60,000,000 / 8096 ≈ 7411 → Mayor que
sample-sharding.threshold
(1000)
Por lo tanto, SeatUnnel probablemente cambió a fragmentos basados en muestreo.
- Tasa de muestreo (inversa): 1000 → Necesidad de probar 60,000 filas
En este punto, estábamos convencidos de que Seatunnel estaba atascado, y nos damos curiosidad: ¿cómo se muestra exactamente? ¿Por qué se ejecuta durante 6 horas?
Incluso con 60 m filas, el muestreo de 60k no debería ser eso lento. ¿Seguramente está escaneando la columna ID (que tiene un índice único)?
Decidimos sumergirnos en el código fuente.
Github: https://github.com/apache/seatunnel/
La arquitectura de Seatunnel es bastante compleja, y la configuración del medio ambiente nos llevó un día completo (principalmente configuración de dependencia).
Encontrar la lógica crítica tomó otro día: nos trazamos desde mensajes de registro y búsquedas de palabras clave.
Análisis de código fuente parcial
non-public Record splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
last String splitColumnName = splitColumn.title();
// Get min/max values
last Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
last Object min = minMax[0];
last Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// Empty desk or just one row — full desk scan as a piece
return Collections.singletonList(ChunkRange.all());
}
// Get chunk measurement, distribution issue bounds, and sampling threshold from config
last int chunkSize = sourceConfig.getSplitSize();
last double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
last double distributionFactorLower = sourceConfig.getDistributionFactorLower();
last int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();
log.data("Splitting desk {} into chunks, cut up column: {}, min: {}, max: {}, chunk measurement: {}, "
+ "distribution issue higher: {}, distribution issue decrease: {}, pattern sharding threshold: {}",
tableId, splitColumnName, min, max, chunkSize,
distributionFactorUpper, distributionFactorLower, sampleShardingThreshold);
if (isEvenlySplitColumn(splitColumn)) {
lengthy approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
last int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
if (sampleShardingThreshold < shardCount) {
if (inverseSamplingRate > chunkSize) {
log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize);
inverseSamplingRate = chunkSize;
}
log.data("Utilizing sampling sharding for desk {}, price = {}", tableId, inverseSamplingRate);
Object[] pattern = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
log.data("Sampled {} information from desk {}", pattern.size, tableId);
return efficientShardingThroughSampling(tableId, pattern, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
Centrémonos en la lógica de muestreo:
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate
) throws Exception {
last String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
Assertion stmt = null;
ResultSet rs = null;
Record
Esta es la lógica de muestreo de núcleo:
Escanea toda la tabla fila por fila, muestreando 1 de cada 1000 registros.
Eso explica por qué estaba funcionando tan lentamente: vimos Processing row index
mensajes en los registros y se preguntó qué estaban haciendo.
Se tomaron muestras de aproximadamente 60,000 ID.
Ahora para la estrategia de fragmentación basada en muestreo:
protected Record efficientShardingThroughSampling(
TableId tableId, Object[] sampleData, lengthy approximateRowCnt, int shardCount
) {
log.data("Utilizing sampling-based sharding on desk {}, approx rows: {}, shards: {}",
tableId, approximateRowCnt, shardCount);
Record splits = new ArrayList<>();
if (shardCount == 0) {
splits.add(ChunkRange.of(null, null));
return splits;
}
double approxSamplePerShard = (double) sampleData.size / shardCount;
Object lastEnd = null;
if (approxSamplePerShard <= 1) {
splits.add(ChunkRange.of(null, sampleData[0]));
lastEnd = sampleData[0];
for (int i = 1; i < sampleData.size; i++) {
if (!sampleData[i].equals(lastEnd)) {
splits.add(ChunkRange.of(lastEnd, sampleData[i]));
lastEnd = sampleData[i];
}
}
splits.add(ChunkRange.of(lastEnd, null));
} else {
for (int i = 0; i < shardCount; i++) {
Object chunkStart = lastEnd;
Object chunkEnd = (i < shardCount - 1)
? sampleData[(int) ((i + 1) * approxSamplePerShard)]
: null;
if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
lastEnd = chunkEnd;
}
}
}
return splits;
}
Cada fragmento obtiene un comienzo y un last distintos en función de las identificaciones muestreadas ordenadas, sin superposición.
Veamos el ChunkRange
clase que representa el resultado:
El fragmento de instantánea permite lecturas de datos paralelos, acelerando la sincronización histórica.
Solución last
A través del análisis anterior, confirmamos que el trabajo estaba atascado en la fase de instantánea que realizaba muestreo, activado porque SeatUnnel determinó que la tabla de origen se distribuyó de manera desigual.
Dado que el trabajo de sincronización había sido bloqueado durante días, se nos ocurrió una solución easy: ajustar los umbrales del issue de distribución para que SeatUnnel tratara la tabla como un muestreo distribuido y saltado uniformemente.
El rango de factores predeterminado es 0.5 ~ 100
pero el issue de nuestra tabla fue ~ 3283, por lo que aumentamos el límite superior a 4000. La configuración last fue:
snapshot.cut up.measurement
: Nuestra tabla period muy escasa, por lo que aumentamos drásticamente este valor (multiplicado al azar por 1000, ciertamente no muy científico).
table-names-config
: Especificó manualmente la clave primaria y la tecla dividida, ya que la tabla no tenía una clave primaria y no estábamos seguros de qué columna usó SeatUnnel. Mejor ser explícito.
Resultado last
¡Finalmente comenzó a sincronizar! 🎉