CSV ファイル

Spark SQLは、CSV形式のファイルまたはファイルのディレクトリをSpark DataFrameに読み込むためのspark.read().csv("file_name")と、CSVファイルに書き込むためのdataframe.write().csv("path")を提供します。関数option()を使って、ヘッダ、区切り文字、文字セットなどの動作の制御と、読み取りまたは書き込みの動作をカスタマイズできます。

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" で見つかります。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" で見つかります。
# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option(delimiter=';').csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
例の完全なコードは Spark のリポジトリの "examples/src/main/python/sql/datasource.py" で見つかります。

データソース オプション

Data source options of CSV can be set via:

プロパティ名デフォルト意味スコープ
sep , 各フィールドと値のための区切り文字を設定します。この区切り文字は1つ以上の文字にすることができます。 read/write
encoding UTF-8 読み込みの場合、指定されたエンコード型でCSVファイルをデコードします。書き込みの場合、保存されたCSVファイルのエンコード(文字セット)を指定します。 read/write
quote " 区切り文字を値の一部にすることができる、引用符で囲まれた値をエスケープするために使われる単一の文字を設定します。読み込みの場合、引用符をオフにしたい場合は、nullではなく空の文字列を設定する必要があります。書き込みの場合、空の文字列が設定されている場合、u0000 (null character)を使います。 read/write
quoteAll false 全ての値を常に引用符で囲む必要があるかどうかを示すフラグ。デフォルトでは、引用符を含む値のみをエスケープします。 write
escape \ すでに引用符で囲まれた値内で引用符をエスケープするために使われる単一の文字を設定します。 read/write
escapeQuotes true 引用符を含む値を常に引用符で囲む必要があるかどうかを示すフラグ。デフォルトでは、引用符を含む全ての値をエスケープします。 write
comment この文字で始まる行をスキップするために使われる単一の文字を設定します。デフォルトでは無効になっています。 read
header false 読み込みの場合、最初の行を列の名前として使います。書き込みの場合、最初の行として列の名前を書き込みます。指定されたパスが文字列のRDDである場合、このヘッダオプションはヘッダと同じ行が存在する場合はヘッダと同じ全ての行を削除することに注意してください。 read/write
inferSchema false データから入力スキーマを自動的に推測します。It requires one extra pass over the data. read
enforceSchema true trueに設定すると、指定または推測されたスキーマが強制的にデータソースファイルに適用されます。CSVファイルのヘッダは無視されます。,オプションがfalseに設定されている場合、headerオプションがtrueに設定されている場合はスキーマはCSVファイルの全てのヘッダに対して検証されます。スキーマのフィールド名とCSVヘッダのカラム名は、spark.sql.caseSensitiveを考慮してそれらの位置によってチェックされます。デフォルト値はtrueですが、誤った結果を回避するために、enforceSchemaオプションを無効にすることをお勧めします。 read
ignoreLeadingWhiteSpace false (読み取り用)、true (書き込み用) 読み込み/書き込みされている値の先頭の空白をスキップする必要があるかどうかを示すフラグ。 read/write
ignoreTrailingWhiteSpace false (読み取り用)、true (書き込み用) 読み込み/書き込みされている値の末尾の空白をスキップする必要があるかどうかを示すフラグ。 read/write
nullValue null値の文字列表現を設定します。2.0.1以降、このnullValueパラメータは文字列型を含むサポートされる全ての型に適用されます。 read/write
nanValue NaN 非数値の文字列表現を設定します。 read
positiveInf Inf 静の無限大値の文字列表現を設定します。 read
negativeInf -Inf 負の無限大値の文字列表現を設定します。 read
dateFormat yyyy-MM-dd 日付フォーマットを示す文字列を設定します。独自の日付フォーマットはDatetime パターンの形式に従います。これは日付型に適用されます。 read/write
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプフォーマットを示す文字列を設定します。独自の日付フォーマットはDatetime パターンの形式に従います。これはタイムスタンプ型に適用されます。 read/write
maxColumns 20480 レコードのカラム数のハードリミットを定義します。 read
maxCharsPerColumn -1 読み込まれる特定の値に許可される最大文字数を定義します。デフォルトは-1で、長さが無制限であることを意味します read
モード PERMISSIVE 解析中に破損したレコードを処理するためのモードを許可します。次の大文字と小文字を区別しないモードをサポートします。Note that Spark tries to parse only required columns in CSV under column pruning. したがって破損したレコードは必要なフィールドのセットに基づいて異なる可能性があります。この動作は、spark.sql.csv.parser.columnPruning.enabled (デフォルトで有効)で制御できます。
  • PERMISSIVE: 破損したレコードに遭遇すると、不正な形式の文字列をcolumnNameOfCorruptRecordで設定されたフィールドに入れ、不正な形式のフィールドをnullに設定します。破損したレコードを保持するために、ユーザはユーザ定義のスキーマにcolumnNameOfCorruptRecordという名前の文字列型フィールドを設定できます。スキーマにフィールドが無い場合、解析中に破損したレコードは削除されます。スキーマより少ない/多いトークンを持つレコードは、CSVへの破損レコードではありません。スキーマの長さよりトークンが少ないレコードに遭遇した場合、追加のフィールドにnullを設定します。レコードにスキーマの長さよりも多くのトークンがある場合、余分なトークンが削除されます。
  • DROPMALFORMED: 破損したレコード全体を無視します。
  • FAILFAST: 破損したレコードに遭遇すると、例外を投げます。
read
columnNameOfCorruptRecord (spark.sql.columnNameOfCorruptRecord 設定の値) PERMISSIVEモードで作成された不正な形式の文字列を持つ新しいフィールドの名前を変更できます。これは、spark.sql.columnNameOfCorruptRecordを上書きします。 read
multiLine false ファイルごとに複数行にまたがる可能性のあるレコードを解析します。 read
charToEscapeQuoteEscaping escapeまたは\0 引用文字のエスケープをエスケープするために使われる単一の文字を設定します。デフォルト値は、エスケープ文字と引用文字が異なる場合はエスケープ文字。それ以外の場合は\0 read/write
samplingRatio 1.0 スキーマの推測に使われる行の割合を定義します。 read
emptyValue (読み込み用)、"" (書き込み用) 空の値の文字列表現を設定します。 read/write
locale en-US ロケールをIETF BCP 47形式の言語タグとして設定します。例えば、これは日付とタイムスタンプを解析する時に使われます。 read
lineSep \r\r\n\n (読み込み用)、\n (書き込み用) 解析/書き込みに使われる行区切り文字を定義します。最大長は1文字です。 read/write
unescapedQuoteHandling STOP_AT_DELIMITER CsvParserがエスケープされていない引用符を含む値を処理する方法を定義します。
  • STOP_AT_CLOSING_QUOTE: 入力にエスケープされていない引用符が見つかった場合、引用符文字を累積し、終了の引用符が見つかるまで値を引用符で囲まれた値として解析します。
  • BACK_TO_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。これにより、区切り文字が見つかるまでパーサーは現在の解析値の全ての文字を累積します。値に区切り文字が見つからない場合、パーサーは区切り文字または行末が見つかるまで入力から文字を累積し続けます。
  • STOP_AT_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。これにより、入力に区切り文字または行末が見つかるまで、パーサーは全ての文字を累積します。
  • SKIP_VALUE: エスケープされていない引用符が入力で見つかった場合、指定された値に対して解析されたコンテンツはスキップされ、代わりにnullValueに設定された値が生成されます。
  • RAISE_ERROR: エスケープされていない引用符が入力で見つかった場合、TextParsingExceptionが投げられます。
read
圧縮 (none) ファイルに保存する時に使う圧縮コーディック。大文字と小文字を区別しない既知の短縮名の1つです(nonebzip2gziplz4snappydeflate)。 write

その他の汎用オプションは、汎用ファイルソースオプションにあります。

TOP
inserted by FC2 system