diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index b35b74e47f..ac866eb452 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -33,12 +33,16 @@ MODULES_COMMON_SPARK="\
fluss-spark,\
fluss-spark/fluss-spark-common,\
fluss-spark/fluss-spark-ut,\
+fluss-spark/fluss-spark-paimon,\
+fluss-spark/fluss-spark-iceberg,\
"
MODULES_SPARK3="\
fluss-spark,\
fluss-spark/fluss-spark-common,\
fluss-spark/fluss-spark-ut,\
+fluss-spark/fluss-spark-paimon,\
+fluss-spark/fluss-spark-iceberg,\
fluss-spark/fluss-spark-3.5,\
fluss-spark/fluss-spark-3.4,\
"
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 144db03aea..c2ad05081c 100644
--- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement}
-import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussUpsertScanBuilder}
+import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder}
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -61,8 +61,19 @@ class SparkTable(
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
populateSparkConf(flussConfig)
+ val isDataLakeEnabled = tableInfo.getTableConfig.isDataLakeEnabled
+ val startupMode = options
+ .getOrDefault(
+ SparkFlussConf.SCAN_START_UP_MODE.key(),
+ flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE))
+ .toUpperCase
+ val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString
if (tableInfo.getPrimaryKeys.isEmpty) {
- new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ if (isDataLakeEnabled && isFullMode) {
+ new FlussLakeAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ } else {
+ new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ }
} else {
new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
}
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index 397b624306..ccf89cc87c 100644
--- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -45,6 +45,24 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long
}
}
+/**
+ * Represents an input partition for reading data from a single lake split. Each lake split maps to
+ * one Spark task, enabling parallel lake reads across splits.
+ *
+ * @param tableBucket
+ * the table bucket this split belongs to
+ * @param lakeSplitBytes
+ * serialized lake split data
+ */
+case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte])
+ extends FlussInputPartition {
+ override def toString: String = {
+ s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," +
+ s" partitionId=${tableBucket.getPartitionId}," +
+ s" splitSize=${lakeSplitBytes.length}}"
+ }
+}
+
/**
* Represents an input partition for reading data from a primary key table bucket. This partition
* includes snapshot information for hybrid snapshot-log reading.
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
new file mode 100644
index 0000000000..6a33820315
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer}
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath}
+import org.apache.fluss.utils.ExceptionUtils
+
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Batch for reading lake-enabled log table (append-only table with datalake). */
+class FlussLakeAppendBatch(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+ // Required by FlussBatch but unused — lake snapshot determines start offsets.
+ override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()
+
+ override val stoppingOffsetsInitializer: OffsetsInitializer = {
+ FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig)
+ }
+
+ private lazy val planned: (Array[InputPartition], Boolean) = doPlan()
+
+ override def planInputPartitions(): Array[InputPartition] = planned._1
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ if (planned._2) {
+ new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig)
+ } else {
+ new FlussLakeAppendPartitionReaderFactory(
+ tableInfo.getProperties.toMap,
+ tablePath,
+ tableInfo.getRowType,
+ projection,
+ flussConfig)
+ }
+ }
+
+ /**
+ * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot
+ * exists and the plan falls back to pure log reading.
+ */
+ private def doPlan(): (Array[InputPartition], Boolean) = {
+ val lakeSnapshot =
+ try {
+ admin.getReadableLakeSnapshot(tablePath).get()
+ } catch {
+ case e: Exception =>
+ if (
+ ExceptionUtils
+ .stripExecutionException(e)
+ .isInstanceOf[LakeTableSnapshotNotExistException]
+ ) {
+ return (planFallbackPartitions(), true)
+ }
+ throw e
+ }
+
+ val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
+ lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+
+ val lakeSplits = lakeSource
+ .createPlanner(new LakeSource.PlannerContext {
+ override def snapshotId(): Long = lakeSnapshot.getSnapshotId
+ })
+ .plan()
+
+ val splitSerializer = lakeSource.getSplitSerializer
+ val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath)
+
+ val partitions = if (tableInfo.isPartitioned) {
+ planPartitionedTable(
+ lakeSplits.asScala,
+ splitSerializer,
+ tableBucketsOffset,
+ buckets,
+ bucketOffsetsRetriever)
+ } else {
+ planNonPartitionedTable(
+ lakeSplits.asScala,
+ splitSerializer,
+ tableBucketsOffset,
+ buckets,
+ bucketOffsetsRetriever)
+ }
+
+ (partitions, false)
+ }
+
+ private def planNonPartitionedTable(
+ lakeSplits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = {
+ val tableId = tableInfo.getTableId
+
+ val lakePartitions =
+ createLakePartitions(lakeSplits, splitSerializer, tableId, partitionId = None)
+
+ val stoppingOffsets =
+ getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever)
+ val logPartitions = buckets.flatMap {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, bucketId)
+ createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId))
+ }
+
+ (lakePartitions ++ logPartitions).toArray
+ }
+
+ private def planPartitionedTable(
+ lakeSplits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = {
+ val tableId = tableInfo.getTableId
+
+ val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long]
+ partitionInfos.asScala.foreach {
+ pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId
+ }
+
+ val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits)
+ var lakeSplitPartitionId = -1L
+
+ val lakeAndLogPartitions = lakeSplitsByPartition.flatMap {
+ case (partitionName, splits) =>
+ flussPartitionIdByName.remove(partitionName) match {
+ case Some(partitionId) =>
+ // Partition in both lake and Fluss — lake splits + log tail
+ val lakePartitions =
+ createLakePartitions(splits, splitSerializer, tableId, Some(partitionId))
+
+ val stoppingOffsets = getBucketOffsets(
+ stoppingOffsetsInitializer,
+ partitionName,
+ buckets,
+ bucketOffsetsRetriever)
+ val logPartitions = buckets.flatMap {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+ createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId))
+ }
+
+ lakePartitions ++ logPartitions
+
+ case None =>
+ // Partition only in lake (expired in Fluss) — lake splits only
+ val pid = lakeSplitPartitionId
+ lakeSplitPartitionId -= 1
+ createLakePartitions(splits, splitSerializer, tableId, Some(pid))
+ }
+ }.toSeq
+
+ // Partitions only in Fluss (not yet tiered) — log from earliest
+ val flussOnlyPartitions = flussPartitionIdByName.flatMap {
+ case (partitionName, partitionId) =>
+ val stoppingOffsets = getBucketOffsets(
+ stoppingOffsetsInitializer,
+ partitionName,
+ buckets,
+ bucketOffsetsRetriever)
+ buckets.flatMap {
+ bucketId =>
+ val stoppingOffset = stoppingOffsets(bucketId)
+ if (stoppingOffset > 0) {
+ val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+ Some(
+ FlussAppendInputPartition(
+ tableBucket,
+ LogScanner.EARLIEST_OFFSET,
+ stoppingOffset): InputPartition)
+ } else {
+ None
+ }
+ }
+ }.toSeq
+
+ (lakeAndLogPartitions ++ flussOnlyPartitions).toArray
+ }
+
+ private def groupLakeSplitsByPartition(
+ lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String, mutable.ArrayBuffer[LakeSplit]] = {
+ val grouped = mutable.LinkedHashMap.empty[String, mutable.ArrayBuffer[LakeSplit]]
+ lakeSplits.foreach {
+ split =>
+ val partitionName = if (split.partition() == null || split.partition().isEmpty) {
+ ""
+ } else {
+ split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR)
+ }
+ grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) += split
+ }
+ grouped
+ }
+
+ private def createLakePartitions(
+ splits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableId: Long,
+ partitionId: Option[Long]): Seq[InputPartition] = {
+ splits.map {
+ split =>
+ val tableBucket = partitionId match {
+ case Some(pid) => new TableBucket(tableId, pid, split.bucket())
+ case None => new TableBucket(tableId, split.bucket())
+ }
+ FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split))
+ }
+ }
+
+ private def createLogTailPartition(
+ tableBucket: TableBucket,
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ stoppingOffset: Long): Option[InputPartition] = {
+ val snapshotLogOffset = tableBucketsOffset.get(tableBucket)
+ if (snapshotLogOffset != null) {
+ if (snapshotLogOffset.longValue() < stoppingOffset) {
+ Some(FlussAppendInputPartition(tableBucket, snapshotLogOffset.longValue(), stoppingOffset))
+ } else {
+ None
+ }
+ } else if (stoppingOffset > 0) {
+ Some(FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET, stoppingOffset))
+ } else {
+ None
+ }
+ }
+
+ private def getBucketOffsets(
+ initializer: OffsetsInitializer,
+ partitionName: String,
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = {
+ initializer
+ .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever)
+ .asScala
+ .map(e => (e._1.intValue(), Long2long(e._2)))
+ .toMap
+ }
+
+ private def planFallbackPartitions(): Array[InputPartition] = {
+ val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+ val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath)
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ val tableId = tableInfo.getTableId
+
+ def createPartitions(
+ partitionId: Option[Long],
+ partitionName: String): Array[InputPartition] = {
+ val startOffsets =
+ getBucketOffsets(fallbackStartInit, partitionName, buckets, bucketOffsetsRetriever)
+ val stoppingOffsets =
+ getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever)
+
+ buckets.map {
+ bucketId =>
+ val tableBucket = partitionId match {
+ case Some(pid) => new TableBucket(tableId, pid, bucketId)
+ case None => new TableBucket(tableId, bucketId)
+ }
+ FlussAppendInputPartition(
+ tableBucket,
+ startOffsets(bucketId),
+ stoppingOffsets(bucketId)
+ ): InputPartition
+ }.toArray
+ }
+
+ if (tableInfo.isPartitioned) {
+ partitionInfos.asScala.flatMap {
+ pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName)
+ }.toArray
+ } else {
+ createPartitions(None, null)
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
new file mode 100644
index 0000000000..9c0031409b
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.record.LogRecord
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+import org.apache.fluss.utils.CloseableIterator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+/** Partition reader that reads data from a single lake split via lake storage (no Fluss connection). */
+class FlussLakePartitionReader(
+ tablePath: TablePath,
+ rowType: RowType,
+ partition: FlussLakeInputPartition,
+ lakeSource: LakeSource[LakeSplit])
+ extends PartitionReader[InternalRow]
+ with Logging {
+
+ private var currentRow: InternalRow = _
+ private var closed = false
+ private var recordIterator: CloseableIterator[LogRecord] = _
+
+ initialize()
+
+ private def initialize(): Unit = {
+ logInfo(s"Reading lake split for table $tablePath bucket=${partition.tableBucket.getBucket}")
+
+ val splitSerializer = lakeSource.getSplitSerializer
+ val split = splitSerializer.deserialize(splitSerializer.getVersion, partition.lakeSplitBytes)
+
+ recordIterator = lakeSource
+ .createRecordReader(new LakeSource.ReaderContext[LakeSplit] {
+ override def lakeSplit(): LakeSplit = split
+ })
+ .read()
+ }
+
+ override def next(): Boolean = {
+ if (closed || recordIterator == null) {
+ return false
+ }
+
+ if (recordIterator.hasNext) {
+ val logRecord = recordIterator.next()
+ currentRow = DataConverter.toSparkInternalRow(logRecord.getRow, rowType)
+ true
+ } else {
+ false
+ }
+ }
+
+ override def get(): InternalRow = currentRow
+
+ override def close(): Unit = {
+ if (!closed) {
+ closed = true
+ if (recordIterator != null) {
+ recordIterator.close()
+ }
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
new file mode 100644
index 0000000000..31c4255bd3
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
+
+import java.util
+
+/** Factory for lake-enabled log table reads. Dispatches to lake or log reader per partition type. */
+class FlussLakeAppendPartitionReaderFactory(
+ tableProperties: util.Map[String, String],
+ tablePath: TablePath,
+ rowType: RowType,
+ projection: Array[Int],
+ flussConfig: Configuration)
+ extends PartitionReaderFactory {
+
+ @transient private lazy val lakeSource: LakeSource[LakeSplit] = {
+ val source = FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath)
+ source.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+ source
+ }
+
+ override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
+ partition match {
+ case lake: FlussLakeInputPartition =>
+ new FlussLakePartitionReader(tablePath, rowType, lake, lakeSource)
+ case log: FlussAppendInputPartition =>
+ new FlussAppendPartitionReader(tablePath, projection, log, flussConfig)
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected partition type: ${partition.getClass}")
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
new file mode 100644
index 0000000000..41958c3442
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.utils.PropertiesUtils
+
+import java.util
+
+/** Shared utilities for creating lake sources and projections. */
+object FlussLakeSourceUtils {
+
+ def createLakeSource(
+ tableProperties: util.Map[String, String],
+ tablePath: TablePath): LakeSource[LakeSplit] = {
+ val tableConfig = Configuration.fromMap(tableProperties)
+ val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT)
+ val dataLakePrefix = "table.datalake." + datalakeFormat + "."
+
+ val catalogProperties = PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)
+ val lakeConfig = Configuration.fromMap(catalogProperties)
+ val lakeStoragePlugin =
+ LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null)
+ val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig)
+ lakeStorage.createLakeSource(tablePath).asInstanceOf[LakeSource[LakeSplit]]
+ }
+
+ def lakeProjection(projection: Array[Int]): Array[Array[Int]] = {
+ projection.map(i => Array(i))
+ }
+}
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
index a543961272..d4e14bd479 100644
--- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -61,6 +61,30 @@ case class FlussAppendScan(
}
}
+/** Fluss Lake Append Scan. */
+case class FlussLakeAppendScan(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ requiredSchema: Option[StructType],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussScan {
+
+ override def toBatch: Batch = {
+ new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options, flussConfig)
+ }
+
+ override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
+ new FlussAppendMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation)
+ }
+}
+
/** Fluss Upsert Scan. */
case class FlussUpsertScan(
tablePath: TablePath,
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
index cd3e6768f4..9dd49f4df6 100644
--- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -47,6 +47,19 @@ class FlussAppendScanBuilder(
}
}
+/** Fluss Lake Append Scan Builder. */
+class FlussLakeAppendScanBuilder(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ options: CaseInsensitiveStringMap,
+ flussConfig: FlussConfiguration)
+ extends FlussScanBuilder {
+
+ override def build(): Scan = {
+ FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
+ }
+}
+
/** Fluss Upsert Scan Builder. */
class FlussUpsertScanBuilder(
tablePath: TablePath,
diff --git a/fluss-spark/fluss-spark-iceberg/pom.xml b/fluss-spark/fluss-spark-iceberg/pom.xml
new file mode 100644
index 0000000000..d408d33352
--- /dev/null
+++ b/fluss-spark/fluss-spark-iceberg/pom.xml
@@ -0,0 +1,179 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.fluss
+ fluss-spark
+ 0.10-SNAPSHOT
+
+
+ fluss-spark-iceberg_${scala.binary.version}
+ Fluss : Engine Spark : Iceberg
+
+
+
+ org.apache.fluss
+ fluss-spark-ut_${scala.binary.version}
+ ${project.version}
+ tests
+ test
+
+
+
+ org.apache.fluss
+ fluss-spark-common_${scala.binary.version}
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ tests
+ test
+
+
+
+ org.apache.fluss
+ fluss-client
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-common
+ ${project.version}
+ tests
+ test
+
+
+
+
+ org.eclipse.collections
+ eclipse-collections
+ 11.1.0
+ test
+
+
+
+ org.apache.fluss
+ fluss-test-utils
+
+
+
+ org.apache.fluss
+ fluss-lake-iceberg
+ ${project.version}
+ test
+
+
+
+
+ org.apache.fluss
+ fluss-flink-common
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-flink-1.20
+ ${project.version}
+ test
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-table-test-utils
+ ${flink.version}
+ test
+
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
diff --git a/fluss-spark/fluss-spark-iceberg/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-iceberg/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
new file mode 100644
index 0000000000..d26a86107d
--- /dev/null
+++ b/fluss-spark/fluss-spark-iceberg/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.fluss.lake.iceberg.IcebergLakeStoragePlugin
diff --git a/fluss-spark/fluss-spark-iceberg/src/test/resources/log4j2-test.properties b/fluss-spark/fluss-spark-iceberg/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000000..edfee0abf9
--- /dev/null
+++ b/fluss-spark/fluss-spark-iceberg/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/fluss-spark/fluss-spark-iceberg/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala b/fluss-spark/fluss-spark-iceberg/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
new file mode 100644
index 0000000000..2eccb1245b
--- /dev/null
+++ b/fluss-spark/fluss-spark-iceberg/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.flink.tiering.LakeTieringJobBuilder
+import org.apache.fluss.flink.tiering.source.TieringSourceOptions
+import org.apache.fluss.metadata.{DataLakeFormat, TableBucket}
+
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import java.nio.file.Files
+import java.time.Duration
+
+class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+ override protected def flussConf: Configuration = {
+ val conf = super.flussConf
+ conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString)
+ conf.setString("datalake.iceberg.type", "hadoop")
+ warehousePath =
+ Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString
+ conf.setString("datalake.iceberg.warehouse", warehousePath)
+ conf
+ }
+
+ override protected def tierToLake(tableName: String): Unit = {
+ val tableId = loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId
+
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
+ execEnv.setParallelism(2)
+ execEnv.enableCheckpointing(1000)
+
+ val flussConfig = new Configuration(flussServer.getClientConfig)
+ flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L))
+
+ val lakeCatalogConf = new Configuration()
+ lakeCatalogConf.setString("type", "hadoop")
+ lakeCatalogConf.setString("warehouse", warehousePath)
+
+ val jobClient = LakeTieringJobBuilder
+ .newBuilder(
+ execEnv,
+ flussConfig,
+ lakeCatalogConf,
+ new Configuration(),
+ DataLakeFormat.ICEBERG.toString)
+ .build()
+
+ try {
+ val tableBucket = new TableBucket(tableId, 0)
+ val deadline = System.currentTimeMillis() + 120000
+ var synced = false
+ while (!synced && System.currentTimeMillis() < deadline) {
+ try {
+ val replica = flussServer.waitAndGetLeaderReplica(tableBucket)
+ synced = replica.getLogTablet.getLakeTableSnapshotId >= 0
+ } catch {
+ case _: Exception =>
+ }
+ if (!synced) Thread.sleep(500)
+ }
+ assert(synced, s"Bucket $tableBucket not synced to lake within 2 minutes")
+ } finally {
+ jobClient.cancel().get()
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-paimon/pom.xml b/fluss-spark/fluss-spark-paimon/pom.xml
new file mode 100644
index 0000000000..09e5ac1f61
--- /dev/null
+++ b/fluss-spark/fluss-spark-paimon/pom.xml
@@ -0,0 +1,186 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.fluss
+ fluss-spark
+ 0.10-SNAPSHOT
+
+
+ fluss-spark-paimon_${scala.binary.version}
+ Fluss : Engine Spark : Paimon
+
+
+
+ org.apache.fluss
+ fluss-spark-ut_${scala.binary.version}
+ ${project.version}
+ tests
+ test
+
+
+
+ org.apache.fluss
+ fluss-spark-common_${scala.binary.version}
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ tests
+ test
+
+
+
+ org.apache.fluss
+ fluss-client
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-common
+ ${project.version}
+ tests
+ test
+
+
+
+
+ org.eclipse.collections
+ eclipse-collections
+ 11.1.0
+ test
+
+
+
+ org.apache.fluss
+ fluss-test-utils
+
+
+
+ org.apache.fluss
+ fluss-lake-paimon
+ ${project.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-bundle
+ ${paimon.version}
+ test
+
+
+
+
+ org.apache.fluss
+ fluss-flink-common
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-flink-1.20
+ ${project.version}
+ test
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-table-test-utils
+ ${flink.version}
+ test
+
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${spark.version}
+ tests
+ test
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
diff --git a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-spark/fluss-spark-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
similarity index 93%
rename from fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
rename to fluss-spark/fluss-spark-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
index 69bf0f8a4b..8758f682cf 100644
--- a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
+++ b/fluss-spark/fluss-spark-paimon/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -16,4 +16,4 @@
# limitations under the License.
#
-org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
+org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
diff --git a/fluss-spark/fluss-spark-paimon/src/test/resources/log4j2-test.properties b/fluss-spark/fluss-spark-paimon/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000000..edfee0abf9
--- /dev/null
+++ b/fluss-spark/fluss-spark-paimon/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/fluss-spark/fluss-spark-paimon/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala b/fluss-spark/fluss-spark-paimon/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
new file mode 100644
index 0000000000..eb9a9e9599
--- /dev/null
+++ b/fluss-spark/fluss-spark-paimon/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.flink.tiering.LakeTieringJobBuilder
+import org.apache.fluss.flink.tiering.source.TieringSourceOptions
+import org.apache.fluss.metadata.{DataLakeFormat, TableBucket}
+
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import java.nio.file.Files
+import java.time.Duration
+
+class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+ override protected def flussConf: Configuration = {
+ val conf = super.flussConf
+ conf.setString("datalake.format", DataLakeFormat.PAIMON.toString)
+ conf.setString("datalake.paimon.metastore", "filesystem")
+ conf.setString("datalake.paimon.cache-enabled", "false")
+ warehousePath =
+ Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString
+ conf.setString("datalake.paimon.warehouse", warehousePath)
+ conf
+ }
+
+ override protected def tierToLake(tableName: String): Unit = {
+ val tableId = loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId
+
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
+ execEnv.setParallelism(2)
+ execEnv.enableCheckpointing(1000)
+
+ val flussConfig = new Configuration(flussServer.getClientConfig)
+ flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L))
+
+ val lakeCatalogConf = new Configuration()
+ lakeCatalogConf.setString("metastore", "filesystem")
+ lakeCatalogConf.setString("warehouse", warehousePath)
+
+ val jobClient = LakeTieringJobBuilder
+ .newBuilder(
+ execEnv,
+ flussConfig,
+ lakeCatalogConf,
+ new Configuration(),
+ DataLakeFormat.PAIMON.toString)
+ .build()
+
+ try {
+ val tableBucket = new TableBucket(tableId, 0)
+ val deadline = System.currentTimeMillis() + 120000
+ var synced = false
+ while (!synced && System.currentTimeMillis() < deadline) {
+ try {
+ val replica = flussServer.waitAndGetLeaderReplica(tableBucket)
+ synced = replica.getLogTablet.getLakeTableSnapshotId >= 0
+ } catch {
+ case _: Exception =>
+ }
+ if (!synced) Thread.sleep(500)
+ }
+ assert(synced, s"Bucket $tableBucket not synced to lake within 2 minutes")
+ } finally {
+ jobClient.cancel().get()
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala b/fluss-spark/fluss-spark-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
similarity index 100%
rename from fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
rename to fluss-spark/fluss-spark-paimon/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml
index a6d663a0ed..63286df449 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -81,20 +81,6 @@
fluss-test-utils
-
- org.apache.fluss
- fluss-lake-paimon
- ${project.version}
- test
-
-
-
- org.apache.paimon
- paimon-bundle
- ${paimon.version}
- test
-
-
org.apache.curator
diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
new file mode 100644
index 0000000000..20c463e7c2
--- /dev/null
+++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.ConfigOptions
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
+
+import org.apache.spark.sql.Row
+
+/**
+ * Base class for lake-enabled log table read tests. Subclasses provide the lake format config and
+ * implement tiering via [[tierToLake]].
+ */
+abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase {
+
+ protected var warehousePath: String = _
+
+ /** Tier all pending data for the given table to the lake. */
+ protected def tierToLake(tableName: String): Unit
+
+ override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try {
+ f
+ } finally {
+ tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t"))
+ }
+ }
+
+ test("Spark Lake Read: log table falls back when no lake snapshot") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(1, "hello"), (2, "world"), (3, "fluss")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+ Row(1, "hello") :: Row(2, "world") :: Row(3, "fluss") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t ORDER BY name"),
+ Row("fluss") :: Row("hello") :: Row("world") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: log table lake-only (all data in lake, no log tail)") {
+ withTable("t_lake_only") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_lake_only")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t_lake_only ORDER BY name"),
+ Row("alpha") :: Row("beta") :: Row("gamma") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: log table union read (lake + log tail)") {
+ withTable("t_union") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_union")
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+ |(4, "delta"), (5, "epsilon")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") ::
+ Row(4, "delta") :: Row(5, "epsilon") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t_union ORDER BY name"),
+ Row("alpha") :: Row("beta") :: Row("delta") ::
+ Row("epsilon") :: Row("gamma") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: non-FULL startup mode skips lake path") {
+ withTable("t_earliest") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_earliest (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_earliest VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_earliest")
+
+ try {
+ spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+ )
+ } finally {
+ spark.conf.set("spark.sql.fluss.scan.startup.mode", "full")
+ }
+ }
+ }
+}
diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml
index 5a7ed67849..ae6e19a3d9 100644
--- a/fluss-spark/pom.xml
+++ b/fluss-spark/pom.xml
@@ -31,9 +31,15 @@
Fluss : Engine Spark :
pom
+
+ 1.20.3
+
+
fluss-spark-common
fluss-spark-ut
+ fluss-spark-paimon
+ fluss-spark-iceberg
fluss-spark-3.5
fluss-spark-3.4