博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的TableFactory
阅读量:6381 次
发布时间:2019-06-23

本文共 18244 字,大约阅读时间需要 60 分钟。

本文主要研究一下flink的TableFactory

实例

class MySystemTableSourceFactory implements StreamTableSourceFactory
{ @Override public Map
requiredContext() { Map
context = new HashMap<>(); context.put("update-mode", "append"); context.put("connector.type", "my-system"); return context; } @Override public List
supportedProperties() { List
list = new ArrayList<>(); list.add("connector.debug"); return list; } @Override public StreamTableSource
createStreamTableSource(Map
properties) { boolean isDebug = Boolean.valueOf(properties.get("connector.debug")); # additional validation of the passed properties can also happen here return new MySystemAppendTableSource(isDebug); }}public class MySystemConnector extends ConnectorDescriptor { public final boolean isDebug; public MySystemConnector(boolean isDebug) { super("my-system", 1, false); this.isDebug = isDebug; } @Override protected Map
toConnectorProperties() { Map
properties = new HashMap<>(); properties.put("connector.debug", Boolean.toString(isDebug)); return properties; }}
  • 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值

TableFactory

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java

@PublicEvolvingpublic interface TableFactory {    Map
requiredContext(); List
supportedProperties();}
  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常

BatchTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

trait BatchTableSourceFactory[T] extends TableFactory {  def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]}
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法

BatchTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala

trait BatchTableSinkFactory[T] extends TableFactory {  def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]}
  • BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法

StreamTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala

trait StreamTableSourceFactory[T] extends TableFactory {  def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]}
  • StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法

StreamTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala

trait StreamTableSinkFactory[T] extends TableFactory {  def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]}
  • StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法

ConnectorDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java

@PublicEvolvingpublic abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {    private String type;    private int version;    private boolean formatNeeded;    /**     * Constructs a {@link ConnectorDescriptor}.     *     * @param type string that identifies this connector     * @param version property version for backwards compatibility     * @param formatNeeded flag for basic validation of a needed format descriptor     */    public ConnectorDescriptor(String type, int version, boolean formatNeeded) {        this.type = type;        this.version = version;        this.formatNeeded = formatNeeded;    }    @Override    public final Map
toProperties() { final DescriptorProperties properties = new DescriptorProperties(); properties.putString(CONNECTOR_TYPE, type); properties.putLong(CONNECTOR_PROPERTY_VERSION, version); properties.putProperties(toConnectorProperties()); return properties.asMap(); } /** * Returns if this connector requires a format descriptor. */ protected final boolean isFormatNeeded() { return formatNeeded; } /** * Converts this descriptor into a set of connector properties. Usually prefixed with * {@link FormatDescriptorValidator#FORMAT}. */ protected abstract Map
toConnectorProperties();}
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性

TableFactoryUtil

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala

object TableFactoryUtil {  /**    * Returns a table source for a table environment.    */  def findAndCreateTableSource[T](      tableEnvironment: TableEnvironment,      descriptor: Descriptor)    : TableSource[T] = {    val javaMap = descriptor.toProperties    tableEnvironment match {      case _: BatchTableEnvironment =>        TableFactoryService          .find(classOf[BatchTableSourceFactory[T]], javaMap)          .createBatchTableSource(javaMap)      case _: StreamTableEnvironment =>        TableFactoryService          .find(classOf[StreamTableSourceFactory[T]], javaMap)          .createStreamTableSource(javaMap)      case e@_ =>        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")    }  }  /**    * Returns a table sink for a table environment.    */  def findAndCreateTableSink[T](      tableEnvironment: TableEnvironment,      descriptor: Descriptor)    : TableSink[T] = {    val javaMap = descriptor.toProperties    tableEnvironment match {      case _: BatchTableEnvironment =>        TableFactoryService          .find(classOf[BatchTableSinkFactory[T]], javaMap)          .createBatchTableSink(javaMap)      case _: StreamTableEnvironment =>        TableFactoryService          .find(classOf[StreamTableSinkFactory[T]], javaMap)          .createStreamTableSink(javaMap)      case e@_ =>        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")    }  }}
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory

TableFactoryService

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala

object TableFactoryService extends Logging {  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])  /**    * Finds a table factory of the given class and descriptor.    *    * @param factoryClass desired factory class    * @param descriptor descriptor describing the factory configuration    * @tparam T factory class type    * @return the matching factory    */  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {    Preconditions.checkNotNull(descriptor)    findInternal(factoryClass, descriptor.toProperties, None)  }  /**    * Finds a table factory of the given class, descriptor, and classloader.    *    * @param factoryClass desired factory class    * @param descriptor descriptor describing the factory configuration    * @param classLoader classloader for service loading    * @tparam T factory class type    * @return the matching factory    */  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {    Preconditions.checkNotNull(descriptor)    Preconditions.checkNotNull(classLoader)    findInternal(factoryClass, descriptor.toProperties, Some(classLoader))  }  /**    * Finds a table factory of the given class and property map.    *    * @param factoryClass desired factory class    * @param propertyMap properties that describe the factory configuration    * @tparam T factory class type    * @return the matching factory    */  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {    findInternal(factoryClass, propertyMap, None)  }  /**    * Finds a table factory of the given class, property map, and classloader.    *    * @param factoryClass desired factory class    * @param propertyMap properties that describe the factory configuration    * @param classLoader classloader for service loading    * @tparam T factory class type    * @return the matching factory    */  def find[T](      factoryClass: Class[T],      propertyMap: JMap[String, String],      classLoader: ClassLoader)    : T = {    Preconditions.checkNotNull(classLoader)    findInternal(factoryClass, propertyMap, Some(classLoader))  }  /**    * Finds a table factory of the given class, property map, and classloader.    *    * @param factoryClass desired factory class    * @param propertyMap properties that describe the factory configuration    * @param classLoader classloader for service loading    * @tparam T factory class type    * @return the matching factory    */  private def findInternal[T](      factoryClass: Class[T],      propertyMap: JMap[String, String],      classLoader: Option[ClassLoader])    : T = {    Preconditions.checkNotNull(factoryClass)    Preconditions.checkNotNull(propertyMap)    val properties = propertyMap.asScala.toMap    val foundFactories = discoverFactories(classLoader)    val classFactories = filterByFactoryClass(      factoryClass,      properties,      foundFactories)    val contextFactories = filterByContext(      factoryClass,      properties,      foundFactories,      classFactories)    filterBySupportedProperties(      factoryClass,      properties,      foundFactories,      contextFactories)  }  /**    * Searches for factories using Java service providers.    *    * @return all factories in the classpath    */  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {    try {      val iterator = classLoader match {        case Some(customClassLoader) =>          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)          customLoader.iterator()        case None =>          defaultLoader.iterator()      }      iterator.asScala.toSeq    } catch {      case e: ServiceConfigurationError =>        LOG.error("Could not load service provider for table factories.", e)        throw new TableException("Could not load service provider for table factories.", e)    }  }  /**    * Filters factories with matching context by factory class.    */  private def filterByFactoryClass[T](      factoryClass: Class[T],      properties: Map[String, String],      foundFactories: Seq[TableFactory])    : Seq[TableFactory] = {    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))    if (classFactories.isEmpty) {      throw new NoMatchingTableFactoryException(        s"No factory implements '${factoryClass.getCanonicalName}'.",        factoryClass,        foundFactories,        properties)    }    classFactories  }  /**    * Filters for factories with matching context.    *    * @return all matching factories    */  private def filterByContext[T](      factoryClass: Class[T],      properties: Map[String, String],      foundFactories: Seq[TableFactory],      classFactories: Seq[TableFactory])    : Seq[TableFactory] = {    val matchingFactories = classFactories.filter { factory =>      val requestedContext = normalizeContext(factory)      val plainContext = mutable.Map[String, String]()      plainContext ++= requestedContext      // we remove the version for now until we have the first backwards compatibility case      // with the version we can provide mappings in case the format changes      plainContext.remove(CONNECTOR_PROPERTY_VERSION)      plainContext.remove(FORMAT_PROPERTY_VERSION)      plainContext.remove(METADATA_PROPERTY_VERSION)      plainContext.remove(STATISTICS_PROPERTY_VERSION)      // check if required context is met      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)    }    if (matchingFactories.isEmpty) {      throw new NoMatchingTableFactoryException(        "No context matches.",        factoryClass,        foundFactories,        properties)    }    matchingFactories  }  /**    * Prepares the properties of a context to be used for match operations.    */  private def normalizeContext(factory: TableFactory): Map[String, String] = {    val requiredContextJava = factory.requiredContext()    if (requiredContextJava == null) {      throw new TableException(        s"Required context of factory '${factory.getClass.getName}' must not be null.")    }    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap  }  /**    * Filters the matching class factories by supported properties.    */  private def filterBySupportedProperties[T](      factoryClass: Class[T],      properties: Map[String, String],      foundFactories: Seq[TableFactory],      classFactories: Seq[TableFactory])    : T = {    val plainGivenKeys = mutable.ArrayBuffer[String]()    properties.keys.foreach { k =>      // replace arrays with wildcard      val key = k.replaceAll(".\\d+", ".#")      // ignore duplicates      if (!plainGivenKeys.contains(key)) {        plainGivenKeys += key      }    }    var lastKey: Option[String] = None    val supportedFactories = classFactories.filter { factory =>      val requiredContextKeys = normalizeContext(factory).keySet      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)      // ignore context keys      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))      // perform factory specific filtering of keys      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(        factory,        givenContextFreeKeys)      givenFilteredKeys.forall { k =>        lastKey = Option(k)        supportedKeys.contains(k) || wildcards.exists(k.startsWith)      }    }    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {      // special case: when there is only one matching factory but the last property key      // was incorrect      val factory = classFactories.head      val (supportedKeys, _) = normalizeSupportedProperties(factory)      throw new NoMatchingTableFactoryException(        s"""          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.          |          |Supported properties of this factory are:          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,        factoryClass,        foundFactories,        properties)    } else if (supportedFactories.isEmpty) {      throw new NoMatchingTableFactoryException(        s"No factory supports all properties.",        factoryClass,        foundFactories,        properties)    } else if (supportedFactories.length > 1) {      throw new AmbiguousTableFactoryException(        supportedFactories,        factoryClass,        foundFactories,        properties)    }    supportedFactories.head.asInstanceOf[T]  }  /**    * Prepares the supported properties of a factory to be used for match operations.    */  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {    val supportedPropertiesJava = factory.supportedProperties()    if (supportedPropertiesJava == null) {      throw new TableException(        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")    }    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)    // extract wildcard prefixes    val wildcards = extractWildcardPrefixes(supportedKeys)    (supportedKeys, wildcards)  }  /**    * Converts the prefix of properties with wildcards (e.g., "format.*").    */  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {    propertyKeys      .filter(_.endsWith("*"))      .map(s => s.substring(0, s.length - 1))  }  /**    * Performs filtering for special cases (i.e. table format factories with schema derivation).    */  private def filterSupportedPropertiesFactorySpecific(      factory: TableFactory,      keys: Seq[String])    : Seq[String] = factory match {    case formatFactory: TableFormatFactory[_] =>      val includeSchema = formatFactory.supportsSchemaDerivation()      // ignore non-format (or schema) keys      keys.filter { k =>        if (includeSchema) {          k.startsWith(SchemaValidator.SCHEMA + ".") ||            k.startsWith(FormatDescriptorValidator.FORMAT + ".")        } else {          k.startsWith(FormatDescriptorValidator.FORMAT + ".")        }      }    case _ =>      keys  }}
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

小结

  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法;BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法;StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法;StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
  • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
  • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

doc

转载地址:http://mgqqa.baihongyu.com/

你可能感兴趣的文章
[置顶] android 自定义ListView实现动画特效
查看>>
机器学习A-Z~Logistic Regression
查看>>
聊聊flink的NetworkEnvironmentConfiguration
查看>>
【Go】strings.Replace 与 bytes.Replace 调优
查看>>
RSA签名的PSS模式
查看>>
c# 注销 代码
查看>>
ubuntu 安装-apache2-trac-ldap【验证】-svn-mysql
查看>>
Nginx 安装
查看>>
php GD库
查看>>
项目管理
查看>>
隐私政策
查看>>
二分搜索树
查看>>
[折半查找]排序数组中某个元素出现次数
查看>>
【11-01】Sublime text 学习笔记
查看>>
.wav file research
查看>>
Link-Cut-Tree题目泛做(为了对应自己的课件)
查看>>
关于Android热点模式下的UDP广播
查看>>
多态-典型用法
查看>>
学习笔记之pandas Foundations | DataCamp
查看>>
C++编程练习(14)-------“单例模式”的实现
查看>>