This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
カタログ #
カタログは、データベース、テーブル、パーティション、ビュー、データベースやその他の外部システムに保存されているデータにアクセスするために必要な関数や情報などのメタデータを提供します。
データ処理の最も重要な側面の1つは、メタデータの管理です。 これは、一時テーブルなどの一時的なメタデータ、またはtable環境に対して登録されたUDFである可能性があります。 あるいはHiveメタストアのような永続的なメタデータ。カタログはメタデータを管理しTable APIやSQLクエリからメタデータにアクセスできるようにする統一されたAPIを提供します。
カタログを使うとデータシステム内の既存のメタデータを参照し、自動的にFlinkの対応するメタデータに自動的にマッピングします。 例えば、FlinkはJDBCテーブルをFlinkのtableに自動的にマップできるため、ユーザはFlinkのDDLを手動で書き直す必要はありません。 カタログは、ユーザの既存のシステムでFlinkを開始するために必要な手順を大幅に簡素化し、ユーザのエクスペリエンスを大幅に強化します。
カタログの型 #
GenericInMemoryCatalog #
GenericInMemoryCatalog
はカタログのメモリ内実装です。全てのオブジェクトはセッションの存続期間中のみ利用できます。
JdbcCatalog #
JdbcCatalog
により、ユーザはJDBCプロトコルを介してFlinkをリレーショナルデータベースに接続できます。現在のところ、JDBCカタログを実装できるのは、PostgresカタログとMySQLカタログの2つだけです。
カタログのセットアップの詳細については、JdbcCatalogドキュメントを参照してください。
HiveCatalog #
HiveCatalog
には2つの目的があります; pure Flinkメタデータの永続ストレージ、既存のHiveメタデータを読み書きするためのインタフェース。
FlinkのHiveドキュメントには、カタログのセットアップと既存のHiveインストールとのインタフェースに関する詳細が記載されています。
Hiveメタデータは全てのメタオブジェクト名を小文字で保存します。これは大文字と小文字を区別するGenericInMemoryCatalog
とは異なります
ユーザ定義のカタログ #
カタログはプラグ可能であり、ユーザはCatalog
インタフェースを実装することで独自のカタログを開発できます。
Flink SQLで独自のカタログを使うには、ユーザはCatalogFactory
インタフェースを実装して対応するカタログファクトリを実装する必要があります。
ファクトリはJavaのサービスプロバイダインタフェース(SPI)を使って検出されます。
このインタフェースを実装するクラスは、JARファイルのMETA_INF/services/org.apache.flink.table.factories.Factory
に追加できます。
指定されてファクトリ識別子は、SQL CREATE CATALOG
DDLステートメントで必要なtype
プロパティとの照合に使われます。
Flink v1.16から、TableEnvironmentには、tableプログラム、SQLクライアント、SQLゲートウェイで一貫したクラス読み込み動作を実現するユーザクラスローダーが導入されました。ユーザクラスローダーは、ADD JAR
やCREATE FUNCTION ..USING JAR ..
ステートメントによって追加されたjarなど、全てのユーザのjarを管理します。USING JAR ..statements. ユーザ定義カタログでは、クラスをロードするために
Thread.currentThread().getContextClassLoader()をユーザクラスローダーに置き換える必要があります。それ以外の場合は、
ClassNotFoundExceptionが投げられる可能性があります。ユーザクラスローダーは、
CatalogFactory.Context#getClassLoader`を介してアクセスできます。
タイムトラベルをサポートするためのカタログのインタフェース #
バージョン1.18以降、Flinkのフレームワークは、テーブルの履歴データをクエリするためのtime travelをサポートします。テーブルの履歴データをクエリするには、テーブルが所属するカタログに対してgetTable(ObjectPath tablePath, long timestamp)
メソッドをじ実装する必要があります。
public class MyCatalogSupportTimeTravel implements Catalog {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
throws TableNotExistException {
// Build a schema corresponding to the specific time point.
Schema schema = buildSchema(timestamp);
// Set parameters to read data at the corresponding time point.
Map<String, String> options = buildOptions(timestamp);
// Build CatalogTable
CatalogTable catalogTable =
CatalogTable.of(schema, "", Collections.emptyList(), options, timestamp);
return catalogTable;
}
}
public class MyDynamicTableFactory implements DynamicTableSourceFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final ReadableConfig configuration =
Configuration.fromMap(context.getCatalogTable().getOptions());
// Get snapshot from CatalogTable
final Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
// Build DynamicTableSource using snapshot options.
final DynamicTableSource dynamicTableSource = buildDynamicSource(configuration, snapshot);
return dynamicTableSource;
}
}
Flinkテーブルを作成してカタログに登録する方法 #
SQL DDLの使用 #
ユーザはSQL DDLを使って、Table APIとSQLの両方のカタログにテーブルを作成できます。
TableEnvironment tableEnv = ...;
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
tableEnv.listTables(); // should return the tables in current catalog and database.
val tableEnv = ...
// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
// Register the catalog
tableEnv.registerCatalog("myhive", catalog)
// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
tableEnv.listTables() // should return the tables in current catalog and database.
from pyflink.table.catalog import HiveCatalog
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
t_env.execute_sql("CREATE DATABASE mydb WITH (...)")
# Create a catalog table
t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
# should return the tables in current catalog and database.
t_env.list_tables()
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
Flink SQL> SHOW TABLES;
mytable
詳細な情報については、Flink SQL CREATE DDLを確認してください。
Java、Scala、Pythonの使用 #
ユーザはJava、Scala、Pythonを使って、プログラム的にカタログテーブルを作成できます。
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
// Create a catalog table
final Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
.schema(schema)
// …
.build());
List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.hive.HiveCatalog
val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
// Register the catalog
tableEnv.registerCatalog("myhive", catalog)
// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
// Create a catalog table
val schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()
tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
.schema(schema)
// …
.build())
val tables = catalog.listTables("mydb") // tables should contain "mytable"
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \
.column("name", DataTypes.STRING()) \
.column("age", DataTypes.INT()) \
.build()
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka")
.schema(schema)
# …
.build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")
カタログAPI #
注意: ここにはカタログプログラムAPIだけが一覧表示されています。ユーザはSQL DDLを使って、これらの多くの機能と同じことができます。 DDL情報の詳細については、SQL CREATE DDLを参照してください。
データベースオペレーション #
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get database
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases();
from pyflink.table.catalog import CatalogDatabase
# create database
catalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", catalog_database, False)
# drop database
catalog.drop_database("mydb", False)
# alter database
catalog.alter_database("mydb", catalog_database, False)
# get database
catalog.get_database("mydb")
# check if a database exist
catalog.database_exists("mydb")
# list databases in a catalog
catalog.list_databases()
テーブルオペレーション #
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
from pyflink.table import *
from pyflink.table.catalog import CatalogBaseTable, ObjectPath
from pyflink.table.descriptors import Kafka
table_schema = TableSchema.builder() \
.field("name", DataTypes.STRING()) \
.field("age", DataTypes.INT()) \
.build()
table_properties = Kafka() \
.version("0.11") \
.start_from_earlist() \
.to_properties()
catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")
# create table
catalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)
# drop table
catalog.drop_table(ObjectPath("mydb", "mytable"), False)
# alter table
catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
# rename table
catalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")
# get table
catalog.get_table("mytable")
# check if a table exist or not
catalog.table_exists("mytable")
# list tables in a database
catalog.list_tables("mydb")
ビューオペレーション #
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
from pyflink.table import *
from pyflink.table.catalog import CatalogBaseTable, ObjectPath
table_schema = TableSchema.builder() \
.field("name", DataTypes.STRING()) \
.field("age", DataTypes.INT()) \
.build()
catalog_table = CatalogBaseTable.create_view(
original_query="select * from t1",
expanded_query="select * from test-catalog.db1.t1",
schema=table_schema,
properties={},
comment="This is a view"
)
catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)
# drop view
catalog.drop_table(ObjectPath("mydb", "myview"), False)
# alter view
catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
# rename view
catalog.rename_table(ObjectPath("mydb", "myview"), "my_new_view", False)
# get view
catalog.get_table("myview")
# check if a view exist or not
catalog.table_exists("mytable")
# list views in a database
catalog.list_views("mydb")
パーティションオペレーション #
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartition
catalog_partition = CatalogPartition.create_instance({}, "my partition")
catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})
catalog.create_partition(
ObjectPath("mydb", "mytable"),
catalog_partition_spec,
catalog_partition,
False)
# drop partition
catalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)
# alter partition
catalog.alter_partition(
ObjectPath("mydb", "mytable"),
CatalogPartitionSpec(...),
catalog_partition,
False)
# get partition
catalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)
# check if a partition exist or not
catalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)
# list partitions of a table
catalog.list_partitions(ObjectPath("mydb", "mytable"))
# list partitions of a table under a give partition spec
catalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)
関数オペレーション #
// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
from pyflink.table.catalog import ObjectPath, CatalogFunction
catalog_function = CatalogFunction.create_instance(class_name="my.python.udf")
# create function
catalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
# drop function
catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
# alter function
catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
# get function
catalog.get_function("myfunc")
# check if a function exist or not
catalog.function_exists("myfunc")
# list functions in a database
catalog.list_functions("mydb")
Table API and SQL for Catalog #
Registering a Catalog #
Users have access to a default in-memory catalog named default_catalog
, that is always created by default. This catalog by default has a single database called default_database
.
Users can also register additional catalogs into an existing Flink session.
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
t_env.register_catalog(catalog)
YAMLを使って定義される全てのカタログは、カタログの型を指定するtype
プロパティを提供する必要があります。
以下の型はそのままでサポートされます。
Catalog | Type Value |
---|---|
GenericInMemory | generic_in_memory |
Hive | hive |
catalogs:
- name: myCatalog
type: custom_catalog
hive-conf-dir: ...
現在のカタログとデータベースの変更 #
Flinkは現在のカタログとデータベースの中で常にテーブル、ビュー、UDFを検索します。
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
t_env.use_catalog("myCatalog")
t_env.use_database("myDb")
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
現在のカタログではないカタログのメタデータには、catalog.database.object
の形式で完全装飾名を指定してアクセスできます。
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
利用可能なカタログの一覧 #
tableEnv.listCatalogs();
t_env.list_catalogs()
Flink SQL> show catalogs;
利用可能なデータベースの一覧 #
tableEnv.listDatabases();
t_env.list_databases()
Flink SQL> show databases;
利用可能なテーブルの一覧 #
tableEnv.listTables();
t_env.list_tables()
Flink SQL> show tables;
カタログ変更リスナー #
Flinkは、データベースやテーブルddlのようなカタログ変更用のカスタマイズされたリスナーの登録をサポートします。FlinkはddlのCatalogModificationEvent
イベントを作成し、CatalogModificationListener
に通知します。リスナーを実装し、イベントの受信時になんらかの外部メタデータシステムに情報をレポートするなど、カスタマイズされたオペレーションを行います。
カタログリスナーの実装 #
カタログ変更リスナーには2つのインタフェースがあります: リスナーを作成するためのCatalogModificationListenerFactory
と、イベントを受信して処理するためのCatalogModificationListener
。これらのインタフェースを実装する必要があり、以下はその例です。
/** Factory used to create a {@link CatalogModificationListener} instance. */
public class YourCatalogListenerFactory implements CatalogModificationListenerFactory {
/** The identifier for the customized listener factory, you can named it yourself. */
private static final String IDENTIFIER = "your_factory";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public CatalogModificationListener createListener(Context context) {
return new YourCatalogListener(Create http client from context);
}
}
/** Customized catalog modification listener. */
public class YourCatalogListener implements CatalogModificationListener {
private final HttpClient client;
YourCatalogListener(HttpClient client) {
this.client = client;
}
@Override
public void onEvent(CatalogModificationEvent event) {
// Report the database and table information via http client.
}
}
META-INF/services
に、カスタマイズされたカタログリスナーファクトリ用のYourCatalogListenerFactoryの完全名
の内容を持つファイルorg.apache.flink.table.factories.Factory
を作成する必要があります。その後コードをjarファイルにパッケージ化し、それをFlinkクラスタのlib
に追加します。
カタログリスナーの登録 #
上記のカタログ変更ファクトリとリスナーを実装した後で、それをtable環境に登録できます。
Configuration configuration = new Configuration();
// Add the factory identifier, you can set multiple listeners in the configuraiton.
configuration.set(TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList("your_factory"));
TableEnvironment env = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(configuration)
.build());
// Create/Alter/Drop database and table.
env.executeSql("CREATE TABLE ...").wait();
sqlゲートウェイの場合、flink-conf.yaml
にオプションtable.catalog-modification.listeners
を追加してゲートウェイを開始するか、動的パラメータを指定してsqlゲートウェイを開始してから、sqlクライアントを使って直接ddlを実行できます。
カタログストア #
カタログストアはカタログの設定を保存するために使われます。カタログストアを使う場合、セッション内で作成されたカタログの設定は、カタログストアの対応する外部システムに保存されます。 セッションが再構築された場合でも、以前に作成したカタログをカタログストアから取得できます。
カタログストアの設定 #
ユーザは様々な方法でカタログストアを設定できます。1つはテーブルAPIを使う方法、もう1つはYAML設定を使う方法です。
カタログストアインスタンスを使ってカタログストアを登録します:
// Initialize a catalog Store instance
CatalogStore catalogStore = new FileCatalogStore("file:///path/to/catalog/store/");
// set up the catalog store
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode()
.withCatalogStore(catalogStore)
.build();
設定を使ってカタログストアを登録します:
// Set up configuration
Configuration configuration = new Configuration();
configuration.set("table.catalog-store.kind", "file");
configuration.set("table.catalog-store.file.path", "file:///path/to/catalog/store/");
// set up the configuration.
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode()
.withConfiguration(configuration)
.build();
final TableEnvironment tableEnv = TableEnvironment.create(settings);
SQLゲートウェイでは、全てのセッションが事前に作成されたカタログを自動的に使えるように、yamlファイルに設定することをお勧めします。通常、カタログストアの種類と、カタログストアに必要なその他のパラメータを設定する必要があります。
table.catalog-store.kind: file
table.catalog-store.file.path: file:///path/to/catalog/store/
カタログストアの種類 #
Flinkには、GenericInMemoryCatalogStore
とFileCatalogStore
という名前の2つの組み込みのカタログストアがあります。
GenericInMemoryCatalogStore #
GenericInMemoryCatalogStore
は、メモリ内に設定情報を保存するCatalogStore
の実装です。
全てのカタログ設定はセッションのライフサイクル内でのみ利用可能で、保存されたカタログ設定はセッションが閉じられた後で自動的に削除されます。
デフォルトでは、設定に関するカタログストアが指定されていない場合、システムはこの実装を使います。
FileCatalogStore #
FileCatalogStore
はファイルにカタログ設定を保存できます。FileCatalogStore
を使うには、カタログ設定を保存する必要があるディレクトリを指定する必要があります。各カタログは、カタログ名と同じ名前の独自のファイルがあります。
FileCatalogStore
実装は、Flink FileSystem
abstractionを介して利用可能な、ローカルファイルシステムとリモートファイルシステムの両方をサポートします。
指定されたカタログストアパスが、完全にまたは部分的に存在しない場合、FileCatalogStore
は不足しているディレクトリを作成しようとします。
指定されたカタログストアパスが存在せず、FileCatalogStore
がディレクトリの作成に失敗した場合は、カタログストアは初期化できないため例外が投げられます。FileCatalogstore
初期化が成功しなかった場合、SQLクライアントとSQLゲートウェイの両方が破損します。
FileCatalogStore
を使ったカタログ設定のストレージを表すディレクトリ構成の例です:
- /path/to/save/the/catalog/
- catalog1.yaml
- catalog2.yaml
- catalog3.yaml
カタログストア設定 #
以下のオプションを使ってカタログストアの挙動を調整できます。
キー | デフォルト | 種類 | 説明 |
---|---|---|---|
table.catalog-store.kind |
"generic_in_memory" | 文字列 | 使用するカタログストアの種類そのままで、'generic_in_memory'と'file'オプションがサポートされます。 |
table.catalog-store.file.path |
(none) | 文字列 | ファイルカタログストアのルートディレクトリへのパスを指定するための設定オプション。 |
独自のカタログストア #
カタログストアは拡張可能で、ユーザはそのインスタンスを実装することでカタログストアをカスタマイズできます。 SQL CLIやSQLゲートウェイがカタログストアを使う必要がある場合、対応するCatalogStoreFactoryインタフェースもこのカタログストア用に実装する必要があります。
public class CustomCatalogStoreFactory implements CatalogStoreFactory {
public static final String IDENTIFIER = "custom-kind";
// Used to connect external storage systems
private CustomClient client;
@Override
public CatalogStore createCatalogStore() {
return new CustomCatalogStore();
}
@Override
public void open(Context context) throws CatalogException {
// initialize the resources, such as http client
client = initClient(context);
}
@Override
public void close() throws CatalogException {
// release the resources
}
@Override
public String factoryIdentifier() {
// table store kind identifier
return IDENTIFIER;
}
public Set<ConfigOption<?>> requiredOptions() {
// define the required options
Set<ConfigOption> options = new HashSet();
options.add(OPTION_1);
options.add(OPTION_2);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
// define the optional options
}
}
public class CustomCatalogStore extends AbstractCatalogStore {
private Client client;
public CustomCatalogStore(Client client) {
this.client = client;
}
@Override
public void storeCatalog(String catalogName, CatalogDescriptor catalog)
throws CatalogException {
// store the catalog
}
@Override
public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
throws CatalogException {
// remove the catalog descriptor
}
@Override
public Optional<CatalogDescriptor> getCatalog(String catalogName) {
// retrieve the catalog configuration and build the catalog descriptor
}
@Override
public Set<String> listCatalogs() {
// list all catalogs
}
@Override
public boolean contains(String catalogName) {
}
}