diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java index 4055398ddb7ec..790fd637d616a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java @@ -37,6 +37,11 @@ public enum CnToDnSyncRequestType { DELETE_OLD_REGION_PEER, RESET_PEER_LIST, + // Data Partition Table Maintenance + COLLECT_EARLIEST_TIMESLOTS, + GENERATE_DATA_PARTITION_TABLE, + GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + // PartitionCache INVALIDATE_PARTITION_CACHE, INVALIDATE_PERMISSION_CACHE, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index d63d5a74f6095..9f5729ef06dfd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -32,6 +32,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TKillQueryInstanceReq; @@ -139,6 +140,15 @@ private void buildActionMap() { actionMapBuilder.put( CnToDnSyncRequestType.SHOW_APPLIED_CONFIGURATIONS, (req, client) -> client.showAppliedConfigurations()); + actionMapBuilder.put( + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + (req, client) -> client.getEarliestTimeslots()); + actionMapBuilder.put( + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE, + (req, client) -> client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req)); + actionMapBuilder.put( + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + (req, client) -> client.generateDataPartitionTableHeartbeat()); actionMap = actionMapBuilder.build(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 3abb322d08472..59b318a4b11e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -319,6 +319,8 @@ public class ConfigNodeConfig { private long forceWalPeriodForConfigNodeSimpleInMs = 100; + private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000; + public ConfigNodeConfig() { // empty constructor } @@ -1286,4 +1288,13 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() { + return partitionTableRecoverWaitAllDnUpTimeoutInMs; + } + + public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs( + long partitionTableRecoverWaitAllDnUpTimeoutInMs) { + this.partitionTableRecoverWaitAllDnUpTimeoutInMs = partitionTableRecoverWaitAllDnUpTimeoutInMs; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a903..6843aced0e511 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,12 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs( + Long.parseLong( + properties.getProperty( + "partition_table_recover_wait_all_dn_up_timeout", + String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs())))); + String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 646aaf66daf4f..1a69044d37d3d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -67,6 +67,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -1374,6 +1375,16 @@ public TSStatus createRegionGroups( } } + /** Used to repair the lost data partition table */ + public TSStatus dataPartitionTableIntegrityCheck() { + DataPartitionTableIntegrityCheckProcedure procedure; + synchronized (this) { + procedure = new DataPartitionTableIntegrityCheckProcedure(); + executor.submitProcedure(procedure); + } + return waitingProcedureFinished(procedure); + } + /** * Generate {@link CreateTriggerProcedure} and wait until it finished. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java new file mode 100644 index 0000000000000..c1ebd7ffccde1 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.confignode.manager.ConfigManager; + +/** + * Environment object for ConfigNode procedures. Provides access to ConfigManager and other + * necessary components. + */ +public class ConfigNodeProcedureEnv { + + private final ConfigManager configManager; + + public ConfigNodeProcedureEnv(ConfigManager configManager) { + this.configManager = configManager; + } + + public ConfigManager getConfigManager() { + return configManager; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java new file mode 100644 index 0000000000000..ef7da5bfb3c85 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -0,0 +1,923 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class); + + private static final int MAX_RETRY_COUNT = 3; + private static final long HEART_BEAT_REQUEST_RATE = 10000; + + NodeManager dataNodeManager; + private List allDataNodes = new ArrayList<>(); + + // ============Need serialize BEGIN=============/ + /** Collected earliest timeslots from DataNodes: database -> earliest timeslot */ + private Map earliestTimeslots = new ConcurrentHashMap<>(); + + /** DataPartitionTables collected from DataNodes: dataNodeId -> DataPartitionTable */ + private Map> dataPartitionTables = + new ConcurrentHashMap<>(); + + private Set lostDataPartitionsOfDatabases = new HashSet<>(); + + /** Final merged DataPartitionTable */ + private Map finalDataPartitionTables; + + private static Set skipDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static Set failedDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // ============Need serialize END=============/ + + public DataPartitionTableIntegrityCheckProcedure() { + super(); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws InterruptedException { + try { + // Ensure to get the real-time DataNodes in the current cluster at every step + dataNodeManager = env.getConfigManager().getNodeManager(); + allDataNodes = dataNodeManager.getRegisteredDataNodes(); + + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + failedDataNodes = new HashSet<>(); + return collectEarliestTimeslots(); + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases = new HashSet<>(); + return analyzeMissingPartitions(env); + case REQUEST_PARTITION_TABLES: + return requestPartitionTables(); + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return requestPartitionTablesHeartBeat(); + case MERGE_PARTITION_TABLES: + finalDataPartitionTables = new HashMap<>(); + return mergePartitionTables(env); + case WRITE_PARTITION_TABLE_TO_RAFT: + return writePartitionTableToRaft(env); + default: + throw new ProcedureException("Unknown state: " + state); + } + } catch (Exception e) { + LOG.error("[DataPartitionIntegrity] Error executing state {}: {}", state, e.getMessage(), e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws IOException, InterruptedException, ProcedureException { + // Cleanup resources + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + earliestTimeslots.clear(); + break; + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases.clear(); + break; + case REQUEST_PARTITION_TABLES: + case REQUEST_PARTITION_TABLES_HEART_BEAT: + dataPartitionTables.clear(); + break; + case MERGE_PARTITION_TABLES: + finalDataPartitionTables.clear(); + break; + default: + allDataNodes.clear(); + earliestTimeslots.clear(); + dataPartitionTables.clear(); + finalDataPartitionTables.clear(); + throw new ProcedureException("Unknown state for rollback: " + state); + } + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getState(final int stateId) { + return DataPartitionTableIntegrityCheckProcedureState.values()[stateId]; + } + + @Override + protected int getStateId(final DataPartitionTableIntegrityCheckProcedureState state) { + return state.ordinal(); + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getInitialState() { + skipDataNodes = new HashSet<>(); + failedDataNodes = new HashSet<>(); + return DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS; + } + + /** + * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map where key is database name and value is the earliest timeslot id. + */ + private Flow collectEarliestTimeslots() { + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting earliest timeslots from all DataNodes..."); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataNodes registered, no way to collect earliest timeslots, waiting for them to go up"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Collect earliest timeslots from all DataNodes + allDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + try { + TGetEarliestTimeslotsResp resp = + (TGetEarliestTimeslotsResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to collected earliest timeslots from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + continue; + } + + Map nodeTimeslots = resp.getDatabaseToEarliestTimeslot(); + + // Merge with existing timeslots (take minimum) + for (Map.Entry entry : nodeTimeslots.entrySet()) { + earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + nodeTimeslots); + } + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Failed to collect earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + e.getMessage(), + e); + failedDataNodes.add(dataNode); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", + allDataNodes.size(), + earliestTimeslots, + allDataNodes.size() - failedDataNodes.size()); + } + + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + } else { + setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); + } + return Flow.HAS_MORE_STATE; + } + + /** + * Analyze which data partitions are missing based on earliest timeslots. Identify data partitions + * of databases need to be repaired. + */ + private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Analyzing missing data partitions..."); + } + + if (earliestTimeslots.isEmpty()) { + LOG.warn( + "[DataPartitionIntegrity] No missing data partitions detected, nothing needs to be repaired, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + // Find all databases that have lost data partition tables + for (Map.Entry entry : earliestTimeslots.entrySet()) { + String database = entry.getKey(); + long earliestTimeslot = entry.getValue(); + + // Get current DataPartitionTable from ConfigManager + Map>>> + localDataPartitionTable = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTable == null + || localDataPartitionTable.isEmpty() + || localDataPartitionTable.get(database) == null + || localDataPartitionTable.get(database).isEmpty()) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "[DataPartitionIntegrity] No data partition table related to database {} was found from the ConfigNode, and this issue needs to be repaired", + database); + continue; + } + + Map>> + seriesPartitionMap = localDataPartitionTable.get(database); + for (Map.Entry>> + seriesPartitionEntry : seriesPartitionMap.entrySet()) { + Map> tTimePartitionSlotListMap = + seriesPartitionEntry.getValue(); + + if (tTimePartitionSlotListMap.isEmpty()) { + continue; + } + + TTimePartitionSlot localEarliestSlot = + tTimePartitionSlotListMap.keySet().stream() + .min(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) + .orElse(null); + + if (localEarliestSlot.getStartTime() + > TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot)) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "[DataPartitionIntegrity] Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired", + database, + earliestTimeslot); + } + } + } + + if (lostDataPartitionsOfDatabases.isEmpty()) { + LOG.info( + "[DataPartitionIntegrity] No databases have lost data partitions, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + LOG.info( + "[DataPartitionIntegrity] Identified {} databases have lost data partitions, will request DataPartitionTable generation from {} DataNodes", + lostDataPartitionsOfDatabases.size(), + allDataNodes.size() - failedDataNodes.size()); + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + private Map>>> + getLocalDataPartitionTable(final ConfigNodeProcedureEnv env, final String database) { + Map> schemaPartitionTable = + env.getConfigManager() + .getSchemaPartition(Collections.singletonMap(database, Collections.emptyList())) + .getSchemaPartitionTable(); + + // Construct request for getting data partition + final Map> partitionSlotsMap = new HashMap<>(); + schemaPartitionTable.forEach( + (key, value) -> { + Map slotListMap = new HashMap<>(); + value + .keySet() + .forEach( + slot -> + slotListMap.put( + slot, new TTimeSlotList(Collections.emptyList(), true, true))); + partitionSlotsMap.put(key, slotListMap); + }); + final GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap); + return env.getConfigManager().getDataPartition(getDataPartitionPlan).getDataPartitionTable(); + } + + /** + * Request DataPartitionTable generation from target DataNodes. Each DataNode scans its tsfile + * resources and generates a DataPartitionTable. + */ + private Flow requestPartitionTables() { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Requesting DataPartitionTable generation from {} DataNodes...", allDataNodes.size()); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataNodes registered, no way to requested DataPartitionTable generation, terminating procedure"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + allDataNodes.removeAll(skipDataNodes); + allDataNodes.removeAll(failedDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq(); + req.setDatabases(lostDataPartitionsOfDatabases); + TGenerateDataPartitionTableResp resp = + (TGenerateDataPartitionTableResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + req, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + } + } catch (Exception e) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from DataNode[id={}]: {}", + dataNodeId, + e.getMessage(), + e); + } + } + } + + if (failedDataNodes.size() == allDataNodes.size() + && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + setNextState( + DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); + return Flow.HAS_MORE_STATE; + } + + private Flow requestPartitionTablesHeartBeat() { + if (LOG.isDebugEnabled()) { + LOG.debug("Checking DataPartitionTable generation completion status..."); + } + + int completeCount = 0; + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableHeartbeatResp resp = + (TGenerateDataPartitionTableHeartbeatResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + MAX_RETRY_COUNT); + DataPartitionTableGeneratorState state = + DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); + + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], state is {}, response status is {}", + dataNode.getLocation().getDataNodeId(), + state, + resp.getStatus()); + continue; + } + + switch (state) { + case SUCCESS: + List byteBufferList = resp.getDatabaseScopedDataPartitionTables(); + List databaseScopedDataPartitionTableList = + deserializeDatabaseScopedTableList(byteBufferList); + dataPartitionTables.put(dataNodeId, databaseScopedDataPartitionTableList); + LOG.info( + "[DataPartitionIntegrity] DataNode {} completed DataPartitionTable generation, terminating heart beat", + dataNodeId); + completeCount++; + break; + case IN_PROGRESS: + LOG.info( + "[DataPartitionIntegrity] DataNode {} still generating DataPartitionTable", + dataNodeId); + break; + default: + LOG.error( + "[DataPartitionIntegrity] DataNode {} returned unknown error code: {}", + dataNodeId, + resp.getErrorCode()); + break; + } + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Error checking DataPartitionTable status from DataNode {}: {}, terminating heart beat", + dataNodeId, + e.getMessage(), + e); + completeCount++; + } + } else { + completeCount++; + } + } + + if (completeCount >= allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + try { + Thread.sleep(HEART_BEAT_REQUEST_RATE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error( + "[DataPartitionIntegrity] Error checking DataPartitionTable status due to thread interruption."); + } + setNextState( + DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); + return Flow.HAS_MORE_STATE; + } + + /** Merge DataPartitionTables from all DataNodes into a final table. */ + private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Merging DataPartitionTables from {} DataNodes...", dataPartitionTables.size()); + } + + if (dataPartitionTables.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataPartitionTables to merge, dataPartitionTables is empty"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + Map finalDataPartitionMap = new HashMap<>(); + + for (String database : lostDataPartitionsOfDatabases) { + // Get current DataPartitionTable from ConfigManager + Map>>> + localDataPartitionTableMap = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTableMap == null + || localDataPartitionTableMap.isEmpty() + || localDataPartitionTableMap.get(database) == null + || localDataPartitionTableMap.get(database).isEmpty()) { + LOG.warn( + "[DataPartitionIntegrity] No data partition table related to database {} was found from the ConfigNode, use data partition table of DataNode directly", + database); + continue; + } + + localDataPartitionTableMap + .values() + .forEach( + map -> + map.forEach( + (tSeriesPartitionSlot, seriesPartitionTableMap) -> { + if (tSeriesPartitionSlot == null + || seriesPartitionTableMap == null + || seriesPartitionTableMap.isEmpty()) { + return; + } + finalDataPartitionMap.computeIfAbsent( + tSeriesPartitionSlot, + k -> new SeriesPartitionTable(seriesPartitionTableMap)); + })); + + dataPartitionTables.forEach( + (k, v) -> + v.forEach( + databaseScopedDataPartitionTable -> { + if (!databaseScopedDataPartitionTable.getDatabase().equals(database)) { + return; + } + finalDataPartitionTables.put( + database, + new DataPartitionTable(finalDataPartitionMap) + .merge(databaseScopedDataPartitionTable.getDataPartitionTable())); + })); + } + + LOG.info("[DataPartitionIntegrity] DataPartitionTables merge completed successfully"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.WRITE_PARTITION_TABLE_TO_RAFT); + return Flow.HAS_MORE_STATE; + } + + /** Write the final DataPartitionTable to raft log. */ + private Flow writePartitionTableToRaft(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing DataPartitionTable to raft log..."); + } + + if (lostDataPartitionsOfDatabases.isEmpty()) { + LOG.error("[DataPartitionIntegrity] No database lost data partition table"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("No database lost data partition table for raft write")); + return getFlow(); + } + + if (finalDataPartitionTables.isEmpty()) { + LOG.error("[DataPartitionIntegrity] DataPartitionTable to write to raft"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("No DataPartitionTable available for raft write")); + return getFlow(); + } + + int failedCnt = 0; + while (failedCnt < MAX_RETRY_COUNT) { + try { + CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); + Map assignedDataPartition = new HashMap<>(); + for (String database : lostDataPartitionsOfDatabases) { + assignedDataPartition.put(database, finalDataPartitionTables.get(database)); + } + createPlan.setAssignedDataPartition(assignedDataPartition); + TSStatus tsStatus = env.getConfigManager().getConsensusManager().write(createPlan); + + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.info("[DataPartitionIntegrity] DataPartitionTable successfully written to raft log"); + break; + } else { + LOG.error("[DataPartitionIntegrity] Failed to write DataPartitionTable to raft log"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("Failed to write DataPartitionTable to raft log")); + } + } catch (Exception e) { + LOG.error("[DataPartitionIntegrity] Error writing DataPartitionTable to raft log", e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + } + failedCnt++; + } + + return getFlow(); + } + + /** + * Determine whether there are still DataNode nodes with failed execution of a certain step in + * this round. If such nodes exist, calculate the skipDataNodes and exclude these nodes when + * requesting the list of DataNode nodes in the cluster for the next round; if no such nodes + * exist, it means the procedure has been completed + */ + private Flow getFlow() { + if (!failedDataNodes.isEmpty()) { + allDataNodes.removeAll(failedDataNodes); + skipDataNodes = new HashSet<>(allDataNodes); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } else { + skipDataNodes.clear(); + return Flow.NO_MORE_STATE; + } + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + super.serialize(stream); + + // Serialize earliestTimeslots + stream.writeInt(earliestTimeslots.size()); + for (Map.Entry entry : earliestTimeslots.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), stream); + stream.writeLong(entry.getValue()); + } + + // Serialize dataPartitionTables count + stream.writeInt(dataPartitionTables.size()); + for (Map.Entry> entry : + dataPartitionTables.entrySet()) { + stream.writeInt(entry.getKey()); + + List tableList = entry.getValue(); + stream.writeInt(tableList.size()); + + for (DatabaseScopedDataPartitionTable table : tableList) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + + TTransport transport = new TIOStreamTransport(dos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + table.serialize(dos, protocol); + + byte[] data = baos.toByteArray(); + // Length of data written for a single object + stream.writeInt(data.length); + // data written for a single object + stream.write(data); + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} serialize failed for dataNodeId: {}", + this.getClass().getSimpleName(), + entry.getKey(), + e); + throw new IOException("Failed to serialize dataPartitionTables", e); + } + } + } + + stream.writeInt(lostDataPartitionsOfDatabases.size()); + for (String database : lostDataPartitionsOfDatabases) { + ReadWriteIOUtils.write(database, stream); + } + + if (finalDataPartitionTables != null && !finalDataPartitionTables.isEmpty()) { + stream.writeInt(finalDataPartitionTables.size()); + + for (Map.Entry entry : finalDataPartitionTables.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), stream); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + + TTransport transport = new TIOStreamTransport(dos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + entry.getValue().serialize(dos, protocol); + + byte[] data = baos.toByteArray(); + stream.writeInt(data.length); + stream.write(data); + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} serialize finalDataPartitionTables failed", + this.getClass().getSimpleName(), + e); + throw new IOException("Failed to serialize finalDataPartitionTables", e); + } + } + } else { + stream.writeInt(0); + } + + stream.writeInt(skipDataNodes.size()); + for (TDataNodeConfiguration skipDataNode : skipDataNodes) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + TTransport transport = new TIOStreamTransport(baos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + skipDataNode.write(protocol); + + byte[] data = baos.toByteArray(); + stream.writeInt(data.length); + stream.write(data); + } catch (TException e) { + LOG.error("[DataPartitionIntegrity] Failed to serialize skipDataNode", e); + throw new IOException("Failed to serialize skipDataNode", e); + } + } + + stream.writeInt(failedDataNodes.size()); + for (TDataNodeConfiguration failedDataNode : failedDataNodes) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + TTransport transport = new TIOStreamTransport(baos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + failedDataNode.write(protocol); + + byte[] data = baos.toByteArray(); + stream.writeInt(data.length); + stream.write(data); + } catch (TException e) { + LOG.error("[DataPartitionIntegrity] Failed to serialize failedDataNode", e); + throw new IOException("Failed to serialize failedDataNode", e); + } + } + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + // Deserialize earliestTimeslots + int earliestTimeslotsSize = byteBuffer.getInt(); + earliestTimeslots = new ConcurrentHashMap<>(); + for (int i = 0; i < earliestTimeslotsSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + long timeslot = byteBuffer.getLong(); + earliestTimeslots.put(database, timeslot); + } + + // Deserialize dataPartitionTables count + int dataPartitionTablesSize = byteBuffer.getInt(); + dataPartitionTables = new ConcurrentHashMap<>(); + for (int i = 0; i < dataPartitionTablesSize; i++) { + int dataNodeId = byteBuffer.getInt(); + int listSize = byteBuffer.getInt(); + + List tableList = new ArrayList<>(listSize); + + for (int j = 0; j < listSize; j++) { + int dataSize = byteBuffer.getInt(); + byte[] bytes = new byte[dataSize]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais)) { + + TTransport transport = new TIOStreamTransport(dis); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + DatabaseScopedDataPartitionTable table = + DatabaseScopedDataPartitionTable.deserialize(dis, protocol); + tableList.add(table); + + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} deserialize failed for dataNodeId: {}", + this.getClass().getSimpleName(), + dataNodeId, + e); + throw new RuntimeException("Failed to deserialize dataPartitionTables", e); + } + } + + dataPartitionTables.put(dataNodeId, tableList); + } + + int lostDataPartitionsOfDatabasesSize = byteBuffer.getInt(); + for (int i = 0; i < lostDataPartitionsOfDatabasesSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + lostDataPartitionsOfDatabases.add(database); + } + + // Deserialize finalDataPartitionTable size + int finalDataPartitionTablesSize = byteBuffer.getInt(); + finalDataPartitionTables = new ConcurrentHashMap<>(); + + for (int i = 0; i < finalDataPartitionTablesSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + + int dataSize = byteBuffer.getInt(); + byte[] bytes = new byte[dataSize]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais)) { + + TTransport transport = new TIOStreamTransport(dis); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + DataPartitionTable dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(dis, protocol); + + finalDataPartitionTables.put(database, dataPartitionTable); + + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} deserialize finalDataPartitionTables failed", + this.getClass().getSimpleName(), + e); + throw new RuntimeException("Failed to deserialize finalDataPartitionTables", e); + } + } + + skipDataNodes = new HashSet<>(); + int skipDataNodesSize = byteBuffer.getInt(); + for (int i = 0; i < skipDataNodesSize; i++) { + int size = byteBuffer.getInt(); + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { + TTransport transport = new TIOStreamTransport(bais); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + TDataNodeConfiguration dataNode = new TDataNodeConfiguration(); + dataNode.read(protocol); + skipDataNodes.add(dataNode); + } catch (TException | IOException e) { + LOG.error("[DataPartitionIntegrity] Failed to deserialize skipDataNode", e); + throw new RuntimeException(e); + } + } + + failedDataNodes = new HashSet<>(); + int failedDataNodesSize = byteBuffer.getInt(); + for (int i = 0; i < failedDataNodesSize; i++) { + int size = byteBuffer.getInt(); + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { + TTransport transport = new TIOStreamTransport(bais); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + TDataNodeConfiguration dataNode = new TDataNodeConfiguration(); + dataNode.read(protocol); + failedDataNodes.add(dataNode); + } catch (TException | IOException e) { + LOG.error("[DataPartitionIntegrity] Failed to deserialize failedDataNode", e); + throw new RuntimeException(e); + } + } + } + + private List deserializeDatabaseScopedTableList( + List dataList) { + if (dataList == null || dataList.isEmpty()) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(dataList.size()); + + for (ByteBuffer data : dataList) { + if (data == null || data.remaining() == 0) { + LOG.warn("[DataPartitionIntegrity] Skipping empty ByteBuffer during deserialization"); + continue; + } + + try { + ByteBuffer dataBuffer = data.duplicate(); + + DatabaseScopedDataPartitionTable table = + DatabaseScopedDataPartitionTable.deserialize(dataBuffer); + + result.add(table); + + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Failed to deserialize DatabaseScopedDataPartitionTable", e); + } + } + + return result; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java new file mode 100644 index 0000000000000..899ed502b2a88 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.state; + +public enum DataPartitionTableIntegrityCheckProcedureState { + /** Collect earliest timeslot information from all DataNodes */ + COLLECT_EARLIEST_TIMESLOTS, + /** Analyze missing data partitions */ + ANALYZE_MISSING_PARTITIONS, + /** Request DataPartitionTable generation from DataNodes */ + REQUEST_PARTITION_TABLES, + /** Round robin get DataPartitionTable generation result from DataNodes */ + REQUEST_PARTITION_TABLES_HEART_BEAT, + /** Merge DataPartitionTables from all DataNodes */ + MERGE_PARTITION_TABLES, + /** Write final DataPartitionTable to raft log */ + WRITE_PARTITION_TABLE_TO_RAFT +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index dd15558608718..140fffa852ccc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -404,6 +405,9 @@ public Procedure create(ByteBuffer buffer) throws IOException { case ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE: procedure = new AddNeverFinishSubProcedureProcedure(); break; + case DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE: + procedure = new DataPartitionTableIntegrityCheckProcedure(); + break; default: LOGGER.error("Unknown Procedure type: {}", typeCode); throw new IOException("Unknown Procedure type: " + typeCode); @@ -554,6 +558,8 @@ public static ProcedureType getProcedureType(final Procedure procedure) { return ProcedureType.NEVER_FINISH_PROCEDURE; } else if (procedure instanceof AddNeverFinishSubProcedureProcedure) { return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE; + } else if (procedure instanceof DataPartitionTableIntegrityCheckProcedure) { + return ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE; } throw new UnsupportedOperationException( "Procedure type " + procedure.getClass() + " is not supported"); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 820a90f7ebfb9..839c8ace0984d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -172,7 +172,10 @@ public enum ProcedureType { @TestOnly NEVER_FINISH_PROCEDURE((short) 30000), @TestOnly - ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001); + ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001), + + /** Data Partition Table Integrity Check */ + DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE((short) 1600); private final short typeCode; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index f20f77095d97a..33e9df4c662fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.ServerCommandLine; import org.apache.iotdb.commons.client.ClientManagerMetrics; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadModule; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics; @@ -79,6 +81,9 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { @@ -110,6 +115,11 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { private int exitStatusCode = 0; + private Future dataPartitionTableCheckFuture; + + private ExecutorService dataPartitionTableCheckExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK"); + public ConfigNode() { super("ConfigNode"); // We do not init anything here, so that we can re-initialize the instance in IT. @@ -147,6 +157,15 @@ protected void start() throws IoTDBException { } active(); LOGGER.info("IoTDB started"); + if (dataPartitionTableCheckFuture != null) { + try { + dataPartitionTableCheckFuture.get(); + } catch (ExecutionException | InterruptedException e) { + LOGGER.error("Data partition table check task execute failed", e); + } finally { + dataPartitionTableCheckExecutor.shutdownNow(); + } + } } @Override @@ -203,6 +222,34 @@ public void active() { } loadSecretKey(); loadHardwareCode(); + + dataPartitionTableCheckFuture = + dataPartitionTableCheckExecutor.submit( + () -> { + LOGGER.info( + "[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up"); + Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs()); + + while (true) { + List dnList = + configManager + .getLoadManager() + .filterDataNodeThroughStatus(NodeStatus.Running); + if (dnList != null && !dnList.isEmpty()) { + LOGGER.info("Starting dataPartitionTableIntegrityCheck..."); + TSStatus status = + configManager.getProcedureManager().dataPartitionTableIntegrityCheck(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.error("Data partition table integrity check failed!"); + } + break; + } else { + LOGGER.info("No running datanodes found, waiting..."); + Thread.sleep(5000); + } + } + return null; + }); return; } else { saveSecretKey(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 98c15a2d9bf06..9b4d7ada28432 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1219,6 +1219,11 @@ public class IoTDBConfig { private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; + /* Need use these parameters when repair data partition table */ + private int partitionTableRecoverWorkerNum = 10; + // Rate limit set to 10 MB/s + private int partitionTableRecoverMaxReadMBsPerSecond = 10; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4367,4 +4372,21 @@ public long getMaxObjectSizeInByte() { public void setMaxObjectSizeInByte(long maxObjectSizeInByte) { this.maxObjectSizeInByte = maxObjectSizeInByte; } + + public int getPartitionTableRecoverWorkerNum() { + return partitionTableRecoverWorkerNum; + } + + public void setPartitionTableRecoverWorkerNum(int partitionTableRecoverWorkerNum) { + this.partitionTableRecoverWorkerNum = partitionTableRecoverWorkerNum; + } + + public int getPartitionTableRecoverMaxReadMBsPerSecond() { + return partitionTableRecoverMaxReadMBsPerSecond; + } + + public void setPartitionTableRecoverMaxReadMBsPerSecond( + int partitionTableRecoverMaxReadMBsPerSecond) { + this.partitionTableRecoverMaxReadMBsPerSecond = partitionTableRecoverMaxReadMBsPerSecond; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 6730138b2af5c..5b49ce83ee2ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1139,6 +1139,17 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException // update trusted_uri_pattern loadTrustedUriPattern(properties); + conf.setPartitionTableRecoverWorkerNum( + Integer.parseInt( + properties.getProperty( + "partition_table_recover_worker_num", + String.valueOf(conf.getPartitionTableRecoverWorkerNum())))); + conf.setPartitionTableRecoverMaxReadMBsPerSecond( + Integer.parseInt( + properties.getProperty( + "partition_table_recover_max_read_bytes_per_second", + String.valueOf(conf.getPartitionTableRecoverMaxReadMBsPerSecond())))); + conf.setIncludeNullValueInWriteThroughputMetric( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java new file mode 100644 index 0000000000000..bdb5eff49bef1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.commons.utils.rateLimiter.LeakyBucketRateLimiter; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generator for DataPartitionTable by scanning tsfile resources. This class scans the data + * directory structure and builds a complete DataPartitionTable based on existing tsfiles. + */ +public class DataPartitionTableGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(DataPartitionTableGenerator.class); + + // Task status + private volatile TaskStatus status = TaskStatus.NOT_STARTED; + private volatile String errorMessage; + private Map databasePartitionTableMap = new ConcurrentHashMap<>(); + + // Progress tracking + private final AtomicInteger processedFiles = new AtomicInteger(0); + private final AtomicInteger failedFiles = new AtomicInteger(0); + private final AtomicLong totalFiles = new AtomicLong(0); + + // Configuration + private final ExecutorService executor; + private final Set databases; + private final int seriesSlotNum; + private final String seriesPartitionExecutorClass; + + private final LeakyBucketRateLimiter limiter = + new LeakyBucketRateLimiter( + (long) + IoTDBDescriptor.getInstance() + .getConfig() + .getPartitionTableRecoverMaxReadMBsPerSecond() + * 1024 + * 1024); + + public static final Set IGNORE_DATABASE = + new HashSet() { + { + add("root.__audit"); + add("root.__system"); + } + }; + + public static final String SCAN_FILE_SUFFIX_NAME = ".tsfile"; + + public DataPartitionTableGenerator( + ExecutorService executor, + Set databases, + int seriesSlotNum, + String seriesPartitionExecutorClass) { + this.executor = executor; + this.databases = databases; + this.seriesSlotNum = seriesSlotNum; + this.seriesPartitionExecutorClass = seriesPartitionExecutorClass; + } + + public Map getDatabasePartitionTableMap() { + return databasePartitionTableMap; + } + + public enum TaskStatus { + NOT_STARTED, + IN_PROGRESS, + COMPLETED, + FAILED + } + + /** Start generating DataPartitionTable asynchronously. */ + public CompletableFuture startGeneration() { + if (status != TaskStatus.NOT_STARTED) { + throw new IllegalStateException("Task is already started or completed"); + } + + status = TaskStatus.IN_PROGRESS; + return CompletableFuture.runAsync(this::generateDataPartitionTableByMemory); + } + + private void generateDataPartitionTableByMemory() { + Map dataPartitionMap = new ConcurrentHashMap<>(); + List> futures = new ArrayList<>(); + + SeriesPartitionExecutor seriesPartitionExecutor = + SeriesPartitionExecutor.getSeriesPartitionExecutor( + seriesPartitionExecutorClass, seriesSlotNum); + + try { + for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) { + CompletableFuture regionFuture = + CompletableFuture.runAsync( + () -> { + try { + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + String databaseName = dataRegion.getDatabaseName(); + if (!databases.contains(databaseName) + || IGNORE_DATABASE.contains(databaseName)) { + return; + } + + tsFileManager.readLock(); + List seqTsFileList = tsFileManager.getTsFileList(true); + List unseqTsFileList = tsFileManager.getTsFileList(false); + tsFileManager.readUnlock(); + + constructDataPartitionMap( + seqTsFileList, seriesPartitionExecutor, dataPartitionMap); + constructDataPartitionMap( + unseqTsFileList, seriesPartitionExecutor, dataPartitionMap); + + if (dataPartitionMap.isEmpty()) { + LOG.error("Failed to generate DataPartitionTable, dataPartitionMap is empty"); + status = TaskStatus.FAILED; + errorMessage = "DataPartitionMap is empty after processing resource file"; + return; + } + + DataPartitionTable dataPartitionTable = + new DataPartitionTable(dataPartitionMap); + + databasePartitionTableMap.compute( + databaseName, + (k, v) -> { + if (v == null) { + return new DataPartitionTable(dataPartitionMap); + } + v.merge(dataPartitionTable); + return v; + }); + } catch (Exception e) { + LOG.error("Error processing data region: {}", dataRegion.getDatabaseName(), e); + failedFiles.incrementAndGet(); + errorMessage = "Failed to process data region: " + e.getMessage(); + } + }, + executor); + futures.add(regionFuture); + } + + // Wait for all tasks to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + status = TaskStatus.COMPLETED; + LOG.info( + "DataPartitionTable generation completed successfully. Processed: {}, Failed: {}", + processedFiles.get(), + failedFiles.get()); + } catch (Exception e) { + LOG.error("Failed to generate DataPartitionTable", e); + status = TaskStatus.FAILED; + errorMessage = "Generation failed: " + e.getMessage(); + } + } + + private void constructDataPartitionMap( + List seqTsFileList, + SeriesPartitionExecutor seriesPartitionExecutor, + Map dataPartitionMap) { + for (TsFileResource tsFileResource : seqTsFileList) { + try { + Set devices = tsFileResource.getDevices(limiter); + long timeSlotId = tsFileResource.getTsFileID().timePartitionId; + int regionId = tsFileResource.getTsFileID().regionId; + + TConsensusGroupId consensusGroupId = new TConsensusGroupId(); + consensusGroupId.setId(regionId); + consensusGroupId.setType(TConsensusGroupType.DataRegion); + + for (IDeviceID deviceId : devices) { + TSeriesPartitionSlot seriesSlotId = + seriesPartitionExecutor.getSeriesPartitionSlot(deviceId); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId)); + dataPartitionMap + .computeIfAbsent( + seriesSlotId, empty -> newSeriesPartitionTable(consensusGroupId, timeSlotId)) + .putDataPartition(timePartitionSlot, consensusGroupId); + } + processedFiles.incrementAndGet(); + } catch (Exception e) { + failedFiles.incrementAndGet(); + LOG.error("Failed to process tsfile {}, {}", tsFileResource.getTsFileID(), e.getMessage()); + } + } + } + + private static SeriesPartitionTable newSeriesPartitionTable( + TConsensusGroupId consensusGroupId, long timeSlotId) { + SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable(); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId)); + seriesPartitionTable.putDataPartition(timePartitionSlot, consensusGroupId); + return seriesPartitionTable; + } + + // Getters + public TaskStatus getStatus() { + return status; + } + + public String getErrorMessage() { + return errorMessage; + } + + public double getProgress() { + if (totalFiles.get() == 0) { + return 0.0; + } + return (double) (processedFiles.get() + failedFiles.get()) / totalFiles.get(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 9c44de9f5fdca..881e823ef2d67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -55,7 +55,10 @@ public enum OperationType { WRITE_AUDIT_LOG("writeAuditLog"), PREPARE_STATEMENT("prepareStatement"), EXECUTE_PREPARED_STATEMENT("executePreparedStatement"), - DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"); + DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), + GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), + GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 42929be741819..e80a85fccd0b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -61,8 +61,11 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.commons.path.ExtendedPartialPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; @@ -102,6 +105,7 @@ import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.partition.DataPartitionTableGenerator; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager; @@ -260,6 +264,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateColumnCacheReq; @@ -317,11 +325,16 @@ import com.google.common.collect.ImmutableList; import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.NotImplementedException; +import org.apache.tsfile.external.commons.lang3.StringUtils; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -331,9 +344,13 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; @@ -347,6 +364,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -370,7 +388,6 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface { - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class); @@ -3117,4 +3134,385 @@ public TSStatus writeAuditLog(TAuditLogReq req) { public void handleClientExit() { // Do nothing } + + // ==================================================== + // Data Partition Table Integrity Check Implementation + // ==================================================== + + private volatile DataPartitionTableGenerator currentGenerator; + private volatile CompletableFuture currentGeneratorFuture; + private volatile long currentTaskId = 0; + + @Override + public TGetEarliestTimeslotsResp getEarliestTimeslots() { + TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp(); + + try { + Map earliestTimeslots = new HashMap<>(); + + // Get data directories from configuration + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + + for (String dataDir : dataDirs) { + File dir = new File(dataDir); + if (dir.exists() && dir.isDirectory()) { + processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots); + } + } + + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setDatabaseToEarliestTimeslot(earliestTimeslots); + + LOGGER.info("Retrieved earliest timeslots for {} databases", earliestTimeslots.size()); + + } catch (Exception e) { + LOGGER.error("Failed to get earliest timeslots", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GET_EARLIEST_TIMESLOTS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableResp generateDataPartitionTable( + TGenerateDataPartitionTableReq req) { + TGenerateDataPartitionTableResp resp = new TGenerateDataPartitionTableResp(); + + try { + // Check if there's already a task in the progress + if (currentGenerator != null + && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setMessage("DataPartitionTable generation is already in the progress"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + // Create generator for all data directories + int seriesSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum(); + String seriesPartitionExecutorClass = + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(); + + final ExecutorService partitionTableRecoverExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()), + ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + currentGenerator = + new DataPartitionTableGenerator( + partitionTableRecoverExecutor, + req.getDatabases(), + seriesSlotNum, + seriesPartitionExecutorClass); + currentTaskId = System.currentTimeMillis(); + + // Start generation synchronously for now to return the data partition table immediately + currentGeneratorFuture = currentGenerator.startGeneration(); + parseGenerationStatus(resp); + } catch (Exception e) { + LOGGER.error("Failed to generate DataPartitionTable", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GENERATE_DATA_PARTITION_TABLE, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() { + TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp(); + // Must be lower than the RPC request timeout, in milliseconds + final long timeoutMs = 50000; + // Set default value + resp.setDatabaseScopedDataPartitionTables(Collections.emptyList()); + try { + currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + if (currentGenerator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + parseGenerationStatus(resp); + if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + boolean success = false; + List databaseScopedDataPartitionTableList = + new ArrayList<>(); + Map dataPartitionTableMap = + currentGenerator.getDatabasePartitionTableMap(); + if (!dataPartitionTableMap.isEmpty()) { + for (Map.Entry entry : dataPartitionTableMap.entrySet()) { + String database = entry.getKey(); + DataPartitionTable dataPartitionTable = entry.getValue(); + if (!StringUtils.isEmpty(database) && dataPartitionTable != null) { + DatabaseScopedDataPartitionTable databaseScopedDataPartitionTable = + new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable); + success = true; + } + } + } + + if (success) { + List result = + serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList); + resp.setDatabaseScopedDataPartitionTables(result); + + // Clear current generator + currentGenerator = null; + } + } + } catch (Exception e) { + LOGGER.error("Failed to check DataPartitionTable generation status", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.CHECK_DATA_PARTITION_TABLE_STATUS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + return resp; + } + + private void parseGenerationStatus(T resp) { + if (resp instanceof TGenerateDataPartitionTableResp) { + handleResponse((TGenerateDataPartitionTableResp) resp); + } else { + handleResponse((TGenerateDataPartitionTableHeartbeatResp) resp); + } + } + + private void handleResponse(TGenerateDataPartitionTableResp resp) { + updateResponse(resp); + } + + private void handleResponse(TGenerateDataPartitionTableHeartbeatResp resp) { + updateResponse(resp); + } + + private void updateResponse(T resp) { + if (currentGenerator == null) return; + + switch (currentGenerator.getStatus()) { + case IN_PROGRESS: + setResponseFields( + resp, + DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), + String.format( + "DataPartitionTable generation in progress: %.1f%%", + currentGenerator.getProgress() * 100), + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + LOGGER.info( + String.format( + "DataPartitionTable generation with task ID: %s in progress: %.1f%%", + currentTaskId, currentGenerator.getProgress() * 100)); + break; + case COMPLETED: + setResponseFields( + resp, + DataPartitionTableGeneratorState.SUCCESS.getCode(), + "DataPartitionTable generation completed successfully", + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + LOGGER.info("DataPartitionTable generation completed with task ID: {}", currentTaskId); + break; + case FAILED: + setResponseFields( + resp, + DataPartitionTableGeneratorState.FAILED.getCode(), + "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage(), + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + default: + setResponseFields( + resp, + DataPartitionTableGeneratorState.UNKNOWN.getCode(), + "Unknown task status: " + currentGenerator.getStatus(), + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + } + } + + private void setResponseFields(T resp, int errorCode, String message, TSStatus status) { + try { + Method setErrorCode = resp.getClass().getMethod("setErrorCode", int.class); + Method setMessage = resp.getClass().getMethod("setMessage", String.class); + Method setStatus = resp.getClass().getMethod("setStatus", TSStatus.class); + + setErrorCode.invoke(resp, errorCode); + setMessage.invoke(resp, message); + setStatus.invoke(resp, status); + } catch (Exception e) { + LOGGER.error("Failed to set response fields", e); + } + } + + /** + * Process data directory to find the earliest timeslots for each database. Map + * earliestTimeslots key(String): database name value(Long): the earliest time slot id of the + * database + */ + private void processDataDirectoryForEarliestTimeslots( + File dataDir, Map earliestTimeslots) { + Map databaseEarliestRegionMap = new ConcurrentHashMap<>(); + try (Stream sequenceTypePaths = Files.list(dataDir.toPath())) { + sequenceTypePaths + .filter(Files::isDirectory) + .forEach( + sequenceTypePath -> { + try (Stream dbPaths = Files.list(sequenceTypePath)) { + dbPaths + .filter(Files::isDirectory) + .forEach( + dbPath -> { + String databaseName = dbPath.getFileName().toString(); + if (DataPartitionTableGenerator.IGNORE_DATABASE.contains( + databaseName)) { + return; + } + databaseEarliestRegionMap.computeIfAbsent( + databaseName, key -> Long.MAX_VALUE); + long earliestTimeslot = + findEarliestTimeslotInDatabase( + dbPath.toFile(), databaseEarliestRegionMap); + + if (earliestTimeslot != Long.MAX_VALUE) { + earliestTimeslots.merge(databaseName, earliestTimeslot, Math::min); + } + }); + } catch (IOException e) { + LOGGER.error( + "Failed to process data directory: {}", sequenceTypePath.toFile(), e); + } + }); + } catch (IOException e) { + LOGGER.error("Failed to process data directory: {}", dataDir, e); + } + } + + /** Find the earliest timeslot in a database directory. */ + private long findEarliestTimeslotInDatabase( + File databaseDir, Map databaseEarliestRegionMap) { + String databaseName = databaseDir.getName(); + List> futureList = new ArrayList<>(); + + final ExecutorService findEarliestTimeSlotExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()), + ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + try (Stream databasePaths = Files.list(databaseDir.toPath())) { + databasePaths + .filter(Files::isDirectory) + .forEach( + regionPath -> { + Future future = + findEarliestTimeSlotExecutor.submit( + () -> { + try (Stream regionPaths = Files.list(regionPath)) { + regionPaths + .filter(Files::isDirectory) + .forEach( + timeSlotPath -> { + try { + Optional matchedFile = + Files.find( + timeSlotPath, + 1, + (path, attrs) -> + attrs.isRegularFile() + && path.toString() + .endsWith( + DataPartitionTableGenerator + .SCAN_FILE_SUFFIX_NAME)) + .findFirst(); + if (!matchedFile.isPresent()) { + return; + } + String timeSlotName = timeSlotPath.getFileName().toString(); + long timeslot = Long.parseLong(timeSlotName); + databaseEarliestRegionMap.compute( + databaseName, + (k, v) -> v == null ? timeslot : Math.min(v, timeslot)); + } catch (IOException e) { + LOGGER.error( + "Failed to find any {} files in the {} directory", + DataPartitionTableGenerator.SCAN_FILE_SUFFIX_NAME, + timeSlotPath, + e); + } + }); + } catch (IOException e) { + LOGGER.error("Failed to scan {}", regionPath, e); + } + }); + futureList.add(future); + }); + } catch (IOException e) { + LOGGER.error("Failed to walk database directory: {}", databaseDir, e); + } + + for (Future future : futureList) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Failed to wait for task completion", e); + Thread.currentThread().interrupt(); + } + } + findEarliestTimeSlotExecutor.shutdownNow(); + return databaseEarliestRegionMap.get(databaseName); + } + + private List serializeDatabaseScopedTableList( + List list) { + if (list == null || list.isEmpty()) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(list.size()); + + for (DatabaseScopedDataPartitionTable table : list) { + try (PublicBAOS baos = new PublicBAOS(); + DataOutputStream oos = new DataOutputStream(baos)) { + + TTransport transport = new TIOStreamTransport(oos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + table.serialize(oos, protocol); + + result.add(ByteBuffer.wrap(baos.toByteArray())); + + } catch (IOException | TException e) { + LOGGER.error( + "Failed to serialize DatabaseScopedDataPartitionTable for database: {}", + table.getDatabase(), + e); + } + } + + return result; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index b84cce9e8d21b..f4a950a72afd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.commons.utils.rateLimiter.LeakyBucketRateLimiter; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.PartitionViolationException; @@ -677,6 +678,10 @@ public Set getDevices() { return timeIndex.getDevices(file.getPath(), this); } + public Set getDevices(LeakyBucketRateLimiter limiter) { + return timeIndex.getDevices(file.getPath(), this, limiter); + } + public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer deserializer) throws IOException { readLock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java index 8499b6d6b3d3e..a3262ddd37a1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.commons.utils.rateLimiter.LeakyBucketRateLimiter; import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -171,6 +172,12 @@ public Set getDevices(String tsFilePath, TsFileResource tsFileResourc return deviceToIndex.keySet(); } + @Override + public Set getDevices( + String tsFilePath, TsFileResource tsFileResource, LeakyBucketRateLimiter limiter) { + return deviceToIndex.keySet(); + } + public Map getDeviceToIndex() { return deviceToIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java index e4a812012a8e3..b79ffc578e83f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.commons.utils.rateLimiter.LeakyBucketRateLimiter; import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -120,6 +121,45 @@ public Set getDevices(String tsFilePath, TsFileResource tsFileResourc } } + @Override + public Set getDevices( + String tsFilePath, TsFileResource tsFileResource, LeakyBucketRateLimiter limiter) { + tsFileResource.readLock(); + try { + limiter.acquire(tsFileResource.getTsFileSize()); + + try (InputStream inputStream = + FSFactoryProducer.getFSFactory() + .getBufferedInputStream(tsFilePath + TsFileResource.RESOURCE_SUFFIX)) { + // The first byte is VERSION_NUMBER, second byte is timeIndexType. + byte[] bytes = ReadWriteIOUtils.readBytes(inputStream, 2); + + if (bytes[1] == ARRAY_DEVICE_TIME_INDEX_TYPE) { + return ArrayDeviceTimeIndex.getDevices(inputStream); + } else { + return PlainDeviceTimeIndex.getDevices(inputStream); + } + } + } catch (NoSuchFileException e) { + // deleted by ttl + if (tsFileResource.isDeleted()) { + return Collections.emptySet(); + } else { + logger.error( + "Can't read file {} from disk ", tsFilePath + TsFileResource.RESOURCE_SUFFIX, e); + throw new RuntimeException( + "Can't read file " + tsFilePath + TsFileResource.RESOURCE_SUFFIX + " from disk"); + } + } catch (Exception e) { + logger.error( + "Failed to get devices from tsfile: {}", tsFilePath + TsFileResource.RESOURCE_SUFFIX, e); + throw new RuntimeException( + "Failed to get devices from tsfile: " + tsFilePath + TsFileResource.RESOURCE_SUFFIX); + } finally { + tsFileResource.readUnlock(); + } + } + @Override public boolean endTimeEmpty() { return endTime == Long.MIN_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java index d705a2417d7c6..400c478df5054 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.rateLimiter.LeakyBucketRateLimiter; import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -74,6 +75,14 @@ ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deseriali */ Set getDevices(String tsFilePath, TsFileResource tsFileResource); + /** + * get devices in TimeIndex and limit files reading rate + * + * @return device names + */ + Set getDevices( + String tsFilePath, TsFileResource tsFileResource, LeakyBucketRateLimiter limiter); + /** * @return whether end time is empty (Long.MIN_VALUE) */ diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index f4ebae2fb807e..c36f35cd5778c 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -742,6 +742,21 @@ failure_detector_phi_acceptable_pause_in_ms=10000 # Datatype: double(percentage) disk_space_warning_threshold=0.05 +# The number of threads used for parallel scanning in the partition table recovery +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_worker_num=10 + +# Limit the number of bytes read per second from a file, the unit is MB +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_max_read_bytes_per_second=10 + +# Set a timeout to wait for all datanodes complete startup, the unit is ms +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_wait_all_dn_up_timeout=60000 + #################### ### Memory Control Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 6f9f95ca8fe88..39bc7eebfa92b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -202,6 +202,8 @@ public enum ThreadName { FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"), BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"), BINARY_ALLOCATOR_AUTO_RELEASER("BinaryAllocator-Auto-Releaser"), + FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL("FindEarliestTimeSlot-Parallel-Pool"), + DATA_PARTITION_RECOVER_PARALLEL_POOL("DataPartitionRecover-Parallel-Pool"), // the unknown thread name is used for metrics UNKNOWN("UNKNOWN"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java new file mode 100644 index 0000000000000..93cca687799fc --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum DataPartitionTableGeneratorState { + SUCCESS(0), + FAILED(1), + IN_PROGRESS(2), + UNKNOWN(-1); + + private final int code; + + DataPartitionTableGeneratorState(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + /** + * get DataPartitionTableGeneratorState by code + * + * @param code code + * @return DataPartitionTableGeneratorState + */ + public static DataPartitionTableGeneratorState getStateByCode(int code) { + for (DataPartitionTableGeneratorState state : DataPartitionTableGeneratorState.values()) { + if (state.code == code) { + return state; + } + } + return UNKNOWN; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 91346f0c69c85..d154f1813e1b7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -282,6 +282,48 @@ public Set autoCleanPartitionTable( return removedTimePartitionSlots; } + /** + * Merge a complete DataPartitionTable from the partition tables received from multiple DataNodes + * (supports cross-database merging, which is exactly the logic implemented in the current PR) + * + * @param sourceMap Map + * @return The complete merged partition table + */ + public DataPartitionTable merge(Map sourceMap) { + DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap); + for (DataPartitionTable table : sourceMap.values()) { + for (Map.Entry entry : + table.dataPartitionMap.entrySet()) { + TSeriesPartitionSlot slot = entry.getKey(); + SeriesPartitionTable seriesTable = entry.getValue(); + merged + .dataPartitionMap + .computeIfAbsent(slot, k -> new SeriesPartitionTable()) + .merge(seriesTable); + } + } + return merged; + } + + /** + * Support single table merging Merge another DataPartitionTable into the current object (used for + * incremental merging) + */ + public DataPartitionTable merge(DataPartitionTable sourcePartitionTable) { + DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap); + if (sourcePartitionTable == null) { + return merged; + } + for (Map.Entry entry : + sourcePartitionTable.dataPartitionMap.entrySet()) { + merged + .dataPartitionMap + .computeIfAbsent(entry.getKey(), k -> new SeriesPartitionTable()) + .merge(entry.getValue()); + } + return merged; + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java new file mode 100644 index 0000000000000..a47f4024eac88 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.partition; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class DatabaseScopedDataPartitionTable { + private final String database; + private DataPartitionTable dataPartitionTable; + + public DatabaseScopedDataPartitionTable(String database, DataPartitionTable dataPartitionTable) { + this.database = database; + this.dataPartitionTable = dataPartitionTable; + } + + public String getDatabase() { + return database; + } + + public DataPartitionTable getDataPartitionTable() { + return dataPartitionTable; + } + + public void serialize(OutputStream outputStream, TProtocol protocol) + throws IOException, TException { + ReadWriteIOUtils.write(database, outputStream); + + ReadWriteIOUtils.write(dataPartitionTable != null, outputStream); + + if (dataPartitionTable != null) { + dataPartitionTable.serialize(outputStream, protocol); + } + } + + public static DatabaseScopedDataPartitionTable deserialize(ByteBuffer buffer) { + String database = ReadWriteIOUtils.readString(buffer); + + boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(buffer); + + DataPartitionTable dataPartitionTable = null; + if (hasDataPartitionTable) { + dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(buffer); + } + + return new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + } + + public static DatabaseScopedDataPartitionTable deserialize( + InputStream inputStream, TProtocol protocol) throws IOException, TException { + String database = ReadWriteIOUtils.readString(inputStream); + + boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(inputStream); + + DataPartitionTable dataPartitionTable = null; + if (hasDataPartitionTable) { + dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(inputStream, protocol); + } + + return new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DatabaseScopedDataPartitionTable that = (DatabaseScopedDataPartitionTable) o; + return Objects.equals(database, that.database) + && Objects.equals(dataPartitionTable, that.dataPartitionTable); + } + + @Override + public int hashCode() { + return Objects.hash(database, dataPartitionTable); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index f46344566dc32..915e3df4e329a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -73,7 +73,13 @@ public Map> getSeriesPartitionMap() } public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) { - seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new Vector<>()).add(groupId); + List groupList = + seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new Vector<>()); + synchronized (groupList) { + if (!groupList.contains(groupId)) { + groupList.add(groupId); + } + } } /** @@ -270,6 +276,14 @@ public List autoCleanPartitionTable( return removedTimePartitions; } + public void merge(SeriesPartitionTable sourceMap) { + if (sourceMap == null) return; + sourceMap.seriesPartitionMap.forEach( + (timeSlot, groups) -> { + this.seriesPartitionMap.computeIfAbsent(timeSlot, k -> new ArrayList<>()).addAll(groups); + }); + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java index eb53cdb2798dd..250a347d1496b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java @@ -122,6 +122,10 @@ public static long getTimePartitionIdWithoutOverflow(long time) { return partitionId.longValue(); } + public static long getStartTimeByPartitionId(long partitionId) { + return (partitionId * timePartitionInterval) + timePartitionOrigin; + } + public static boolean satisfyPartitionId(long startTime, long endTime, long partitionId) { long startPartition = originMayCauseOverflow diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java new file mode 100644 index 0000000000000..287030753b28a --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.rateLimiter; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +/** + * A global leaky-bucket rate limiter for bytes throughput. Features: - Strict throughput limiting + * (no burst) - Smooth bandwidth shaping - Thread-safe - Fair for multi-thread - Low contention + */ +public class LeakyBucketRateLimiter { + /** bytes per second */ + private volatile long bytesPerSecond; + + /** start time */ + private final long startTimeNs; + + /** total consumed bytes */ + private final AtomicLong totalBytes = new AtomicLong(0); + + public LeakyBucketRateLimiter(long bytesPerSecond) { + if (bytesPerSecond <= 0) { + throw new IllegalArgumentException("bytesPerSecond must be > 0"); + } + this.bytesPerSecond = bytesPerSecond; + this.startTimeNs = System.nanoTime(); + } + + /** + * Acquire permission for reading bytes. + * + *

This method will block if reading too fast. + */ + public void acquire(long bytes) { + if (bytes <= 0) { + return; + } + + long currentTotal = totalBytes.addAndGet(bytes); + + long expectedTimeNs = expectedTimeNs(currentTotal); + long now = System.nanoTime(); + + long sleepNs = expectedTimeNs - now; + + if (sleepNs > 0) { + LockSupport.parkNanos(sleepNs); + } + } + + /** + * Try acquire without blocking. + * + * @return true if allowed immediately + */ + public boolean tryAcquire(long bytes) { + if (bytes <= 0) { + return true; + } + + long currentTotal = totalBytes.addAndGet(bytes); + + long expectedTimeNs = expectedTimeNs(currentTotal); + long now = System.nanoTime(); + + if (expectedTimeNs <= now) { + return true; + } + + // rollback + totalBytes.addAndGet(-bytes); + return false; + } + + /** Update rate dynamically. */ + public void setRate(long newBytesPerSecond) { + if (newBytesPerSecond <= 0) { + throw new IllegalArgumentException("bytesPerSecond must be > 0"); + } + this.bytesPerSecond = newBytesPerSecond; + } + + /** Current rate. */ + public long getRate() { + return bytesPerSecond; + } + + /** Total bytes processed. */ + public long getTotalBytes() { + return totalBytes.get(); + } + + /** + * Calculate the expected time using double (double can easily hold nanoseconds on the order of + * 10^18), then perform clamping and convert to long. Advantages: Extremely simple, zero + * exceptions thrown, and double precision is sufficient (nanosecond-level errors are negligible). + * Disadvantages: In extreme cases (when totalBytes is close to 2^63), double loses precision in + * the trailing digits. However, in IoTDB's actual scenarios, bytesPerSecond is typically between + * 10MB/s and 1GB/s, so this situation will not occur. + */ + private long expectedTimeNs(long totalBytes) { + if (totalBytes <= 0) { + return startTimeNs; + } + + // Use double for calculations to avoid overflow in long multiplication + double seconds = (double) totalBytes / bytesPerSecond; + double elapsedNsDouble = seconds * 1_000_000_000.0; + + if (elapsedNsDouble > Long.MAX_VALUE - startTimeNs) { + // clamp + return Long.MAX_VALUE; + } + + return startTimeNs + (long) elapsedNsDouble; + } +} diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cca7110f28d40..84d4c2a1c0b11 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -678,6 +678,36 @@ struct TAuditLogReq { 11: required i32 cnId } +/** +* BEGIN: Data Partition Table Integrity Check Structures +**/ + +struct TGetEarliestTimeslotsResp { + 1: required common.TSStatus status + 2: optional map databaseToEarliestTimeslot +} + +struct TGenerateDataPartitionTableReq { + 1: required set databases +} + +struct TGenerateDataPartitionTableResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: optional string message +} + +struct TGenerateDataPartitionTableHeartbeatResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: optional string message + 4: optional list databaseScopedDataPartitionTables +} + +/** +* END: Data Partition Table Integrity Check Structures +**/ + /** * BEGIN: Used for EXPLAIN ANALYZE **/ @@ -1276,6 +1306,30 @@ service IDataNodeRPCService { * Write an audit log entry to the DataNode's AuditEventLogger */ common.TSStatus writeAuditLog(TAuditLogReq req); + + /** + * BEGIN: Data Partition Table Integrity Check + **/ + + /** + * Get earliest timeslot information from DataNode + * Returns map of database name to earliest timeslot id + */ + TGetEarliestTimeslotsResp getEarliestTimeslots() + + /** + * Request DataNode to generate DataPartitionTable by scanning tsfile resources + */ + TGenerateDataPartitionTableResp generateDataPartitionTable(TGenerateDataPartitionTableReq req) + + /** + * Check the status of DataPartitionTable generation task + */ + TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() + + /** + * END: Data Partition Table Integrity Check + **/ } service MPPDataExchangeService {