序
本文主要研究一下flink的TableFactory
实例
class MySystemTableSourceFactory implements StreamTableSourceFactory{ @Override public Map
requiredContext() { Map context = new HashMap<>(); context.put("update-mode", "append"); context.put("connector.type", "my-system"); return context; } @Override public List supportedProperties() { List list = new ArrayList<>(); list.add("connector.debug"); return list; } @Override public StreamTableSource createStreamTableSource(Map
properties) { boolean isDebug = Boolean.valueOf(properties.get("connector.debug")); # additional validation of the passed properties can also happen here return new MySystemAppendTableSource(isDebug); }}public class MySystemConnector extends ConnectorDescriptor { public final boolean isDebug; public MySystemConnector(boolean isDebug) { super("my-system", 1, false); this.isDebug = isDebug; } @Override protected Map toConnectorProperties() { Map properties = new HashMap<>(); properties.put("connector.debug", Boolean.toString(isDebug)); return properties; }}
- 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值
TableFactory
flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java
@PublicEvolvingpublic interface TableFactory { MaprequiredContext(); List supportedProperties();}
- TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
BatchTableSourceFactory
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala
trait BatchTableSourceFactory[T] extends TableFactory { def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]}
- BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法
BatchTableSinkFactory
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala
trait BatchTableSinkFactory[T] extends TableFactory { def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]}
- BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法
StreamTableSourceFactory
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala
trait StreamTableSourceFactory[T] extends TableFactory { def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]}
- StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法
StreamTableSinkFactory
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala
trait StreamTableSinkFactory[T] extends TableFactory { def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]}
- StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
ConnectorDescriptor
flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java
@PublicEvolvingpublic abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor { private String type; private int version; private boolean formatNeeded; /** * Constructs a {@link ConnectorDescriptor}. * * @param type string that identifies this connector * @param version property version for backwards compatibility * @param formatNeeded flag for basic validation of a needed format descriptor */ public ConnectorDescriptor(String type, int version, boolean formatNeeded) { this.type = type; this.version = version; this.formatNeeded = formatNeeded; } @Override public final MaptoProperties() { final DescriptorProperties properties = new DescriptorProperties(); properties.putString(CONNECTOR_TYPE, type); properties.putLong(CONNECTOR_PROPERTY_VERSION, version); properties.putProperties(toConnectorProperties()); return properties.asMap(); } /** * Returns if this connector requires a format descriptor. */ protected final boolean isFormatNeeded() { return formatNeeded; } /** * Converts this descriptor into a set of connector properties. Usually prefixed with * {@link FormatDescriptorValidator#FORMAT}. */ protected abstract Map toConnectorProperties();}
- ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
TableFactoryUtil
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala
object TableFactoryUtil { /** * Returns a table source for a table environment. */ def findAndCreateTableSource[T]( tableEnvironment: TableEnvironment, descriptor: Descriptor) : TableSource[T] = { val javaMap = descriptor.toProperties tableEnvironment match { case _: BatchTableEnvironment => TableFactoryService .find(classOf[BatchTableSourceFactory[T]], javaMap) .createBatchTableSource(javaMap) case _: StreamTableEnvironment => TableFactoryService .find(classOf[StreamTableSourceFactory[T]], javaMap) .createStreamTableSource(javaMap) case e@_ => throw new TableException(s"Unsupported table environment: ${e.getClass.getName}") } } /** * Returns a table sink for a table environment. */ def findAndCreateTableSink[T]( tableEnvironment: TableEnvironment, descriptor: Descriptor) : TableSink[T] = { val javaMap = descriptor.toProperties tableEnvironment match { case _: BatchTableEnvironment => TableFactoryService .find(classOf[BatchTableSinkFactory[T]], javaMap) .createBatchTableSink(javaMap) case _: StreamTableEnvironment => TableFactoryService .find(classOf[StreamTableSinkFactory[T]], javaMap) .createStreamTableSink(javaMap) case e@_ => throw new TableException(s"Unsupported table environment: ${e.getClass.getName}") } }}
- TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
TableFactoryService
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala
object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) /** * Finds a table factory of the given class and descriptor. * * @param factoryClass desired factory class * @param descriptor descriptor describing the factory configuration * @tparam T factory class type * @return the matching factory */ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { Preconditions.checkNotNull(descriptor) findInternal(factoryClass, descriptor.toProperties, None) } /** * Finds a table factory of the given class, descriptor, and classloader. * * @param factoryClass desired factory class * @param descriptor descriptor describing the factory configuration * @param classLoader classloader for service loading * @tparam T factory class type * @return the matching factory */ def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { Preconditions.checkNotNull(descriptor) Preconditions.checkNotNull(classLoader) findInternal(factoryClass, descriptor.toProperties, Some(classLoader)) } /** * Finds a table factory of the given class and property map. * * @param factoryClass desired factory class * @param propertyMap properties that describe the factory configuration * @tparam T factory class type * @return the matching factory */ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { findInternal(factoryClass, propertyMap, None) } /** * Finds a table factory of the given class, property map, and classloader. * * @param factoryClass desired factory class * @param propertyMap properties that describe the factory configuration * @param classLoader classloader for service loading * @tparam T factory class type * @return the matching factory */ def find[T]( factoryClass: Class[T], propertyMap: JMap[String, String], classLoader: ClassLoader) : T = { Preconditions.checkNotNull(classLoader) findInternal(factoryClass, propertyMap, Some(classLoader)) } /** * Finds a table factory of the given class, property map, and classloader. * * @param factoryClass desired factory class * @param propertyMap properties that describe the factory configuration * @param classLoader classloader for service loading * @tparam T factory class type * @return the matching factory */ private def findInternal[T]( factoryClass: Class[T], propertyMap: JMap[String, String], classLoader: Option[ClassLoader]) : T = { Preconditions.checkNotNull(factoryClass) Preconditions.checkNotNull(propertyMap) val properties = propertyMap.asScala.toMap val foundFactories = discoverFactories(classLoader) val classFactories = filterByFactoryClass( factoryClass, properties, foundFactories) val contextFactories = filterByContext( factoryClass, properties, foundFactories, classFactories) filterBySupportedProperties( factoryClass, properties, foundFactories, contextFactories) } /** * Searches for factories using Java service providers. * * @return all factories in the classpath */ private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = { try { val iterator = classLoader match { case Some(customClassLoader) => val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader) customLoader.iterator() case None => defaultLoader.iterator() } iterator.asScala.toSeq } catch { case e: ServiceConfigurationError => LOG.error("Could not load service provider for table factories.", e) throw new TableException("Could not load service provider for table factories.", e) } } /** * Filters factories with matching context by factory class. */ private def filterByFactoryClass[T]( factoryClass: Class[T], properties: Map[String, String], foundFactories: Seq[TableFactory]) : Seq[TableFactory] = { val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass)) if (classFactories.isEmpty) { throw new NoMatchingTableFactoryException( s"No factory implements '${factoryClass.getCanonicalName}'.", factoryClass, foundFactories, properties) } classFactories } /** * Filters for factories with matching context. * * @return all matching factories */ private def filterByContext[T]( factoryClass: Class[T], properties: Map[String, String], foundFactories: Seq[TableFactory], classFactories: Seq[TableFactory]) : Seq[TableFactory] = { val matchingFactories = classFactories.filter { factory => val requestedContext = normalizeContext(factory) val plainContext = mutable.Map[String, String]() plainContext ++= requestedContext // we remove the version for now until we have the first backwards compatibility case // with the version we can provide mappings in case the format changes plainContext.remove(CONNECTOR_PROPERTY_VERSION) plainContext.remove(FORMAT_PROPERTY_VERSION) plainContext.remove(METADATA_PROPERTY_VERSION) plainContext.remove(STATISTICS_PROPERTY_VERSION) // check if required context is met plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2) } if (matchingFactories.isEmpty) { throw new NoMatchingTableFactoryException( "No context matches.", factoryClass, foundFactories, properties) } matchingFactories } /** * Prepares the properties of a context to be used for match operations. */ private def normalizeContext(factory: TableFactory): Map[String, String] = { val requiredContextJava = factory.requiredContext() if (requiredContextJava == null) { throw new TableException( s"Required context of factory '${factory.getClass.getName}' must not be null.") } requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap } /** * Filters the matching class factories by supported properties. */ private def filterBySupportedProperties[T]( factoryClass: Class[T], properties: Map[String, String], foundFactories: Seq[TableFactory], classFactories: Seq[TableFactory]) : T = { val plainGivenKeys = mutable.ArrayBuffer[String]() properties.keys.foreach { k => // replace arrays with wildcard val key = k.replaceAll(".\\d+", ".#") // ignore duplicates if (!plainGivenKeys.contains(key)) { plainGivenKeys += key } } var lastKey: Option[String] = None val supportedFactories = classFactories.filter { factory => val requiredContextKeys = normalizeContext(factory).keySet val (supportedKeys, wildcards) = normalizeSupportedProperties(factory) // ignore context keys val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_)) // perform factory specific filtering of keys val givenFilteredKeys = filterSupportedPropertiesFactorySpecific( factory, givenContextFreeKeys) givenFilteredKeys.forall { k => lastKey = Option(k) supportedKeys.contains(k) || wildcards.exists(k.startsWith) } } if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) { // special case: when there is only one matching factory but the last property key // was incorrect val factory = classFactories.head val (supportedKeys, _) = normalizeSupportedProperties(factory) throw new NoMatchingTableFactoryException( s""" |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'. | |Supported properties of this factory are: |${supportedKeys.sorted.mkString("\n")}""".stripMargin, factoryClass, foundFactories, properties) } else if (supportedFactories.isEmpty) { throw new NoMatchingTableFactoryException( s"No factory supports all properties.", factoryClass, foundFactories, properties) } else if (supportedFactories.length > 1) { throw new AmbiguousTableFactoryException( supportedFactories, factoryClass, foundFactories, properties) } supportedFactories.head.asInstanceOf[T] } /** * Prepares the supported properties of a factory to be used for match operations. */ private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = { val supportedPropertiesJava = factory.supportedProperties() if (supportedPropertiesJava == null) { throw new TableException( s"Supported properties of factory '${factory.getClass.getName}' must not be null.") } val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase) // extract wildcard prefixes val wildcards = extractWildcardPrefixes(supportedKeys) (supportedKeys, wildcards) } /** * Converts the prefix of properties with wildcards (e.g., "format.*"). */ private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = { propertyKeys .filter(_.endsWith("*")) .map(s => s.substring(0, s.length - 1)) } /** * Performs filtering for special cases (i.e. table format factories with schema derivation). */ private def filterSupportedPropertiesFactorySpecific( factory: TableFactory, keys: Seq[String]) : Seq[String] = factory match { case formatFactory: TableFormatFactory[_] => val includeSchema = formatFactory.supportsSchemaDerivation() // ignore non-format (or schema) keys keys.filter { k => if (includeSchema) { k.startsWith(SchemaValidator.SCHEMA + ".") || k.startsWith(FormatDescriptorValidator.FORMAT + ".") } else { k.startsWith(FormatDescriptorValidator.FORMAT + ".") } } case _ => keys }}
- TableFactoryService主要用于根据factoryClass及Descriptor(
或者descriptor.toProperties
)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories
小结
- TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
- BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法;BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法;StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法;StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
- ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
- TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
- TableFactoryService主要用于根据factoryClass及Descriptor(
或者descriptor.toProperties
)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories