一般的なファイルソースオプション
これらの一般的なオプション/構成は、ファイルベースのソース (parquet、orc、avro、json、csv、text) を使う場合のみ有効です。
以下の例で使われているディレクトリ構成は以下の通りです:
破損したファイルを無視する
Spark により、ファイルからデータを読み取る時に破損したファイルを無視するために spark.sql.files.ignoreCorruptFiles
を使うことができます。true に設定された場合、Spark ジョブは破損したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。
データファイルを読み取り中に破損したファイルを無視するには、以下を使うことができます:
// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
# enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df = spark.read.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
# enable ignore corrupt files
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
testCorruptDF <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF)
# file
# 1 file1.parquet
# 2 file2.parquet
不足しているファイルを無視する
Spark により、ファイルからデータを読み取りながら破損したファイルを無視するために spark.sql.files.ignoreMissingFiles
を使うことができます。ここで、欠落しているファイルとは、DataFrame
を作成した後で、ディレクトリの下で削除されたファイルを意味します。true に設定された場合、Spark ジョブは欠落したファイルが検出された時に引き続き実行され、読み取られた内容は引き続き返されます。
パスグローバルフィルタ
pathGlobFilter
は、ファイル名がパターンに一致するファイルのみを含めるために使われます。構文は、org.apache.hadoop.fs.GlobFilter
に従います。パーティション検出の動作は変更されません。
パーティション検出の動作を維持しながら、特定の glob パターンに一致するパスを持つファイルをロードするために、以下を使うことができます:
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
# file
# 1 file1.parquet
再帰的なファイル検索
recursiveFileLookup
は再帰的にファイルをロードし、パーティションの推測を無効にします。デフォルト値は false
です。 recursiveFileLookup
が true の時にデータソースが明示的に partitionSpec
を指定する場合、例外が投げられます。
全てのファイルを再帰的にロードするには、以下のようにします:
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
recursive_loaded_df = spark.read.format("parquet")\
.option("recursiveFileLookup", "true")\
.load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
# file
# 1 file1.parquet
# 2 file2.parquet
変換時間パスフィルター
modifiedBefore
とmodifiedAfter
は、Sparkバッチクエリ中にファイルをロードする際の粒度を高めるために、同時にあるいは個別に適用できるオプションです。(構造化ストリーミングファイルソースはこれらのオプションをサポートしません。)
modifiedBefore
: 指定された時刻より前に変更時刻が発生したファイルのみを含めるオプションのタイムスタンプ。提供されるタイムスタンプは次の形式である必要があります: YYYY-MM-DDTHH:mm:ss (例えば 2020-06-01T13:00:00)modifiedAfter
: 指定された時刻より後に変更時刻が発生したファイルのみを含めるオプションのタイムスタンプ。提供されるタイムスタンプは次の形式である必要があります: YYYY-MM-DDTHH:mm:ss (例えば 2020-06-01T13:00:00)
タイムゾーンオプションが指定されていない場合、タイムスタンプはSparkセッションタイムゾーン(spark.sql.session.timeZone
)に従って解釈されます。
指定された変更時間範囲に一致するパスを持つファイルをロードするには、以下のようにできます:
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file