Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,9 @@ public static void validateTabletConfigs(Configuration conf) {
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
}

/** Validate common server configs. */
protected static void validateServerConfigs(Configuration conf) {
// Validate remote.data.dir and remote.data.dirs
String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null
&& conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) {
throw new IllegalConfigurationException(
String.format(
"Either %s or %s must be configured.",
ConfigOptions.REMOTE_DATA_DIR.key(),
ConfigOptions.REMOTE_DATA_DIRS.key()));
}

if (remoteDataDir != null) {
// Must validate that remote.data.dir is a valid FsPath
try {
new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s.",
ConfigOptions.REMOTE_DATA_DIR.key()),
e);
}
}

public static void validateRemoteDataDirs(Configuration conf) {
// Validate remote.data.dirs
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
for (int i = 0; i < remoteDataDirs.size(); i++) {
String dir = remoteDataDirs.get(i);
try {
Expand Down Expand Up @@ -183,19 +158,56 @@ protected static void validateServerConfigs(Configuration conf) {
weights.size()));
}

// Validate all weights are no less than 0
// Verify that each weight is non-negative and that the total weight is greater than
// 0.
int totalWeight = 0;
for (int i = 0; i < weights.size(); i++) {
if (weights.get(i) < 0) {
int weight = weights.get(i);
if (weight < 0) {
throw new IllegalConfigurationException(
String.format(
"All weights in '%s' must be no less than 0, but found %d at index %d.",
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.get(i),
i));
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), weight, i));
}
totalWeight += weight;
}
if (totalWeight <= 0) {
throw new IllegalConfigurationException(
String.format(
"The sum of all weights in '%s' must be greater than 0, but the current sum is %d.",
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), totalWeight));
}
}
}
}

/** Validate common server configs. */
protected static void validateServerConfigs(Configuration conf) {
// Validate remote.data.dir and remote.data.dirs
String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
if (remoteDataDir == null && remoteDataDirs.isEmpty()) {
throw new IllegalConfigurationException(
String.format(
"Either %s or %s must be configured.",
ConfigOptions.REMOTE_DATA_DIR.key(),
ConfigOptions.REMOTE_DATA_DIRS.key()));
}

if (remoteDataDir != null) {
// Must validate that remote.data.dir is a valid FsPath
try {
new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s.",
ConfigOptions.REMOTE_DATA_DIR.key()),
e);
}
}

validateRemoteDataDirs(conf);

validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ void testValidateCoordinatorConfigs() {
.hasMessageContaining(
"All weights in 'remote.data.dirs.weights' must be no less than 0");

// Test all zero weights
Configuration zeroWeightsConf = new Configuration();
zeroWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
zeroWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
zeroWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 0));
assertThatThrownBy(() -> validateCoordinatorConfigs(zeroWeightsConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining("The sum of all weights")
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
.hasMessageContaining("must be greater than 0");

// Test invalid DEFAULT_REPLICATION_FACTOR
Configuration invalidReplicationConf = new Configuration();
invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER;
import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS;
import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_STRATEGY;
import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;

Expand All @@ -64,7 +67,11 @@ class DynamicServerConfig {
DATALAKE_FORMAT.key(),
LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
KV_SNAPSHOT_INTERVAL.key()));
KV_SNAPSHOT_INTERVAL.key(),
// Config options for remote.data.dirs
REMOTE_DATA_DIRS.key(),
REMOTE_DATA_DIRS_STRATEGY.key(),
REMOTE_DATA_DIRS_WEIGHTS.key()));
private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake.");

private final ReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.PartitionAssignment;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class AutoPartitionManager implements AutoCloseable {

private final ServerMetadataCache metadataCache;
private final MetadataManager metadataManager;
private final RemoteDirDynamicLoader remoteDirDynamicLoader;
private final Clock clock;

private final long periodicInterval;
Expand All @@ -108,10 +110,12 @@ public class AutoPartitionManager implements AutoCloseable {
public AutoPartitionManager(
ServerMetadataCache metadataCache,
MetadataManager metadataManager,
RemoteDirDynamicLoader remoteDirDynamicLoader,
Configuration conf) {
this(
metadataCache,
metadataManager,
remoteDirDynamicLoader,
conf,
SystemClock.getInstance(),
Executors.newScheduledThreadPool(
Expand All @@ -122,11 +126,13 @@ public AutoPartitionManager(
AutoPartitionManager(
ServerMetadataCache metadataCache,
MetadataManager metadataManager,
RemoteDirDynamicLoader remoteDirDynamicLoader,
Configuration conf,
Clock clock,
ScheduledExecutorService periodicExecutor) {
this.metadataCache = metadataCache;
this.metadataManager = metadataManager;
this.remoteDirDynamicLoader = remoteDirDynamicLoader;
this.clock = clock;
this.periodicExecutor = periodicExecutor;
this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis();
Expand Down Expand Up @@ -350,8 +356,10 @@ private void createPartitions(
PartitionAssignment partitionAssignment =
new PartitionAssignment(tableInfo.getTableId(), bucketAssignments);

// select a remote data dir for the partition
String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir();
metadataManager.createPartition(
tablePath, tableId, partitionAssignment, partition, false);
tablePath, tableId, remoteDataDir, partitionAssignment, partition, false);
// only single partition key table supports automatic creation of partitions
currentPartitions.put(partition.getPartitionName(), null);
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.fluss.server.authorizer.AuthorizerLoader;
import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager;
import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader;
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metrics.ServerMetricUtils;
Expand Down Expand Up @@ -149,6 +150,9 @@ public class CoordinatorServer extends ServerBase {
@GuardedBy("lock")
private LakeCatalogDynamicLoader lakeCatalogDynamicLoader;

@GuardedBy("lock")
private RemoteDirDynamicLoader remoteDirDynamicLoader;

@GuardedBy("lock")
private CoordinatorLeaderElection coordinatorLeaderElection;

Expand Down Expand Up @@ -225,10 +229,13 @@ protected void initCoordinatorStandby() throws Exception {
this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId);

this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true);
this.remoteDirDynamicLoader = new RemoteDirDynamicLoader(conf);

this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true);

// Register server reconfigurable components
dynamicConfigManager.register(lakeCatalogDynamicLoader);
dynamicConfigManager.register(remoteDirDynamicLoader);

dynamicConfigManager.startup();

Expand Down Expand Up @@ -272,6 +279,7 @@ protected void initCoordinatorStandby() throws Exception {
authorizer,
lakeCatalogDynamicLoader,
lakeTableTieringManager,
remoteDirDynamicLoader,
dynamicConfigManager,
ioExecutor,
kvSnapshotLeaseManager,
Expand Down Expand Up @@ -302,7 +310,8 @@ protected void initCoordinatorLeader() throws Exception {
this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient);

this.autoPartitionManager =
new AutoPartitionManager(metadataCache, metadataManager, conf);
new AutoPartitionManager(
metadataCache, metadataManager, remoteDirDynamicLoader, conf);
autoPartitionManager.start();

registerCoordinatorLeader();
Expand Down Expand Up @@ -615,6 +624,10 @@ CompletableFuture<Void> stopServices() {
lakeCatalogDynamicLoader.close();
}

if (remoteDirDynamicLoader != null) {
remoteDirDynamicLoader.close();
}

if (kvSnapshotLeaseManager != null) {
kvSnapshotLeaseManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager;
import org.apache.fluss.server.coordinator.producer.ProducerOffsetsManager;
import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.DatabasePropertyChanges;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
Expand Down Expand Up @@ -242,6 +243,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
private final ProducerOffsetsManager producerOffsetsManager;
private final KvSnapshotLeaseManager kvSnapshotLeaseManager;
private final CoordinatorLeaderElection coordinatorLeaderElection;
private final RemoteDirDynamicLoader remoteDirDynamicLoader;

public CoordinatorService(
Configuration conf,
Expand All @@ -253,6 +255,7 @@ public CoordinatorService(
@Nullable Authorizer authorizer,
LakeCatalogDynamicLoader lakeCatalogDynamicLoader,
LakeTableTieringManager lakeTableTieringManager,
RemoteDirDynamicLoader remoteDirDynamicLoader,
DynamicConfigManager dynamicConfigManager,
ExecutorService ioExecutor,
KvSnapshotLeaseManager kvSnapshotLeaseManager,
Expand All @@ -279,6 +282,7 @@ public CoordinatorService(
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
this.remoteDirDynamicLoader = remoteDirDynamicLoader;

// Initialize and start the producer snapshot manager
this.producerOffsetsManager = new ProducerOffsetsManager(conf, zkClient);
Expand Down Expand Up @@ -494,9 +498,18 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
}
}

// select remote data dir for table.
// remote data dir will be used to store table data for non-partitioned table and metadata
// (such as lake snapshot offset file) for partitioned table
String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir();

// then create table;
metadataManager.createTable(
tablePath, tableDescriptor, tableAssignment, request.isIgnoreIfExists());
tablePath,
remoteDataDir,
tableDescriptor,
tableAssignment,
request.isIgnoreIfExists());

return CompletableFuture.completedFuture(new CreateTableResponse());
}
Expand Down Expand Up @@ -706,9 +719,13 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
PartitionAssignment partitionAssignment =
new PartitionAssignment(table.tableId, bucketAssignments);

// select remote data dir for partition
String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir();

metadataManager.createPartition(
tablePath,
table.tableId,
remoteDataDir,
partitionAssignment,
partitionToCreate,
request.isIgnoreIfNotExists());
Expand Down Expand Up @@ -757,6 +774,7 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
return metadataResponseAccessContextEvent.getResultFuture();
}

@Override
public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) {
CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>();
eventManagerSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ public void completeDeletePartition(long partitionId) {
* Returns -1 if the table already exists and ignoreIfExists is true.
*
* @param tablePath the table path
* @param remoteDataDir the remote data directory
* @param tableToCreate the table descriptor describing the table to create
* @param tableAssignment the table assignment, will be null when the table is partitioned table
* @param ignoreIfExists whether to ignore if the table already exists
* @return the table id
*/
public long createTable(
TablePath tablePath,
String remoteDataDir,
TableDescriptor tableToCreate,
@Nullable TableAssignment tableAssignment,
boolean ignoreIfExists)
Expand Down Expand Up @@ -405,10 +407,7 @@ public long createTable(
// register the table
zookeeperClient.registerTable(
tablePath,
TableRegistration.newTable(
tableId,
zookeeperClient.getDefaultRemoteDataDir(),
tableToCreate),
TableRegistration.newTable(tableId, remoteDataDir, tableToCreate),
false);
return tableId;
},
Expand Down Expand Up @@ -795,6 +794,7 @@ public Set<String> getPartitions(TablePath tablePath) {
public void createPartition(
TablePath tablePath,
long tableId,
String remoteDataDir,
Copy link
Copy Markdown
Contributor

@gyang94 gyang94 Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about possible improvements here:
1.
Will it be better to put the remote dir selection logic inside into the MetadataManager.createPartition() and MetadataManager.createTable().

I notice that now there exist some code pattern that (a). select a remoteDataDir value. (b) pass the value into createTable/createPartition functions, when we need to createTable/partition otherwhere.
Put the step(a) inside those functions will make the code more focused and void adding a remoteDataDir parameter in these functions.

Or
2.
Is is good to add the 'remoteDataDir' field into the TablePath class? Make the TablePath object carrys the remote dir info, and pass it through to everywhere. Then we don't need to care about adding an independent remoteDataDir parameter in functions which already has the TablePath parameter.

The implementation now is good to work. Just put my thoughts here. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Put the selection into MetadataManager also need to change a lot. By comparison, I think the current approach would actually introduce smaller changes.

  2. TablePath represents the logical path of a Fluss table, while remoteDataDir is the physical storage path in the implementation. I don’t think it’s appropriate to put remoteDataDir into TablePath.

PartitionAssignment partitionAssignment,
ResolvedPartitionSpec partition,
boolean ignoreIfExists) {
Expand Down Expand Up @@ -857,7 +857,7 @@ public void createPartition(
partitionId,
partitionName,
partitionAssignment,
zookeeperClient.getDefaultRemoteDataDir(),
remoteDataDir,
tablePath,
tableId);
LOG.info(
Expand Down
Loading
Loading