Введение в механизм Java SPI
Механизм Java SPI, интерфейс поставщика услуг Java, представляет собой механизм динамической загрузки, предоставляемый Java, основанный на комбинации «программирование интерфейса + режим стратегии + файл конфигурации». Вызывающий может включить, расширить или заменить существующие стратегии реализации платформы в соответствии с фактическими потребностями использования. В Java, основанном на идее SPI, предоставляется конкретная реализация ServiceLoader, которая может легко реализовать сервисно-ориентированную регистрацию и обнаружение, а также завершить разделение предоставления и использования сервиса.
Общие примеры механизмов Java SPI, такие как:
- Интерфейс драйвера базы данных реализует загрузку классов: JDBC может загружать различные типы драйверов базы данных в соответствии с фактическим использованием, например OracleDriver, SQLServerDriver, Driver (MySql).
- Загрузка классов реализации интерфейса лог-фасада slf4j: slf4j лог-фасад не является фреймворком лога.Необходимо использовать механизм Java SPI для загрузки квалифицированных классов реализации интерфейса лог-фреймворка, чтобы завершить привязку фреймворков журналов, таких как Log4j, Logback и т. д. .
Для конкретного использования см.Продвинутые разработчики должны понимать механизм SPI в Java..
Применение механизма Java SPI во Flink
В программе Flink SQL механизм Java SPI используется для динамической загрузки классов реализации различных Factory. Например, для интерфейса TableFactory программа Flink найдет META-INF/services/org.apache.flink.table.factories.TableFactory из зависимостей, используемых программой, создаст экземпляр реализации интерфейса TableFactory посредством отражения, а затем use TableFactoryService# Метод filter() отфильтровывает подходящие классы реализации TableFactory. Взяв в качестве примера программу Flink SQL, считывающую данные из Kafka (версия 0.11), программа Flink SQL сначала получит все доступные классы реализации TableFactory и получит квалифицированный экземпляр класса реализации TableFactory Kafka011TableSourceSinkFactory через TableFactoryService#filter(). В этой статье в основном описывается применение механизма Java SPI в программах Flink SQL.Для проверки классов реализации TableFactory читатели могут самостоятельно прочитать соответствующий код.
Особое примечание: версия исходного кода flink, используемая в этой статье, — 1.9.
tEnv
.connect(
new Kafka()
.version("0.11")
.topic(topic)
.startFromLatest()
.properties(props))
.withSchema(schema)
.withFormat(format)
.registerTableSource("record");
Вышеупомянутая программа используется для установления соединения с Kafka и определяет структуру и формат считываемых данных. Наконец, registerTableSource используется для завершения регистрации источника таблицы. Мы проверили код и обнаружили, что метод TableFactoryService#find() был вызван внутри для поиска квалифицированного экземпляра TableSourceFactory, а метод createTableSource() был вызван для создания экземпляра Kafka011TableSource.
# TableFactoryUtil.java
private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
try {
return TableFactoryService
.find(TableSourceFactory.class, properties)
.createTableSource(properties);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSource failed.", t);
}
}
# TableFactoryService
public static <T extends TableFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
return findSingleInternal(factoryClass, propertyMap, Optional.empty());
}
# TableFactoryService.java
private static <T extends TableFactory> T findSingleInternal(
Class<T> factoryClass,Map<String, String> properties,Optional<ClassLoader> classLoader) {
List<TableFactory> tableFactories = discoverFactories(classLoader);
List<T> filtered = filter(tableFactories, factoryClass, properties);
...
}
В методе TableFactoryService#findSingleInternal() мы видим, что используются два основных метода: метод discoveryFactory() в основном используется для запроса класса реализации интерфейса TableFactory, предоставленного в текущей программе Flink SQL, и метод filter(). метод используется для фильтрации. Получите класс реализации TableFactory, который удовлетворяет условию. Очевидно, что использование механизма Java SPI находится внутри метода discoveryFactories().
#TableFactoryService.java
private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
try {
List<TableFactory> result = new LinkedList<>();
if (classLoader.isPresent()) {
ServiceLoader
.load(TableFactory.class, classLoader.get())
.iterator()
.forEachRemaining(result::add);
} else {
defaultLoader.iterator().forEachRemaining(result::add);
}
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for table factories.", e);
throw new TableException("Could not load service provider for table factories.", e);
}
}
В методе discoveryFactories(), поскольку входящий classLoader является Optional.empty(), то есть classLoader.isPresent() имеет значение false, выполняется блок кода else.
private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);
Видно, что defaultLoader является статической переменной класса, и именно по этой причине в коде Flink SQL 1.9 может быть ошибка. Конечно, мы объясним эту ошибку в конце статьи.
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
public static <S> ServiceLoader<S> load(Class<S> service,ClassLoader loader) {
return new ServiceLoader<>(service, loader);
}
# service => TableFactory, loader => AppClassLoader, acc => null
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}
В методе построения ServiceLoader мы видим, что назначение переменных service, loader и acc завершено.
// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
// The current lazy-lookup iterator
private LazyIterator lookupIterator;
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}
В методе reload() данные, хранящиеся в переменной provider, сначала очищаются, и создается экземпляр LazyIterator. Переменная provider хранит экземпляр класса реализации TableFactory в папке services. LazyIterator, как следует из названия, представляет собой полностью ленивый поиск провайдера.
private class LazyIterator implements Iterator<S> {
Class<S> service;
ClassLoader loader;
Enumeration<URL> configs = null; # 用于保存项目中所有的依赖名
Iterator<String> pending = null; # 用于保存每个依赖中services文件夹的TableFactory实现类的全路径名
String nextName = null; # 用于保存当前TableFactory实现类的全路径名
# service -> TableFactory, loader -> AppClassLoader
private LazyIterator(Class<S> service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}
private boolean hasNextService() {
...
}
private S nextService() {
...
}
public boolean hasNext() {
...
}
public S next() {
...
}
public void remove() {
...
}
}
Прочитав переменную defaultLoader, продолжаем работу.
defaultLoader.iterator().forEachRemaining(result::add);
# ServiceLoader.java
public Iterator<S> iterator() {
return new Iterator<S>() {
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
Вы можете видеть, что в методе defaultLoader.iterator() создается внутренний класс интерфейса Iterator, создается экземпляр knownProviders и предоставляются такие методы, как hasNext(), next() и remove(). Ознакомившись с методом iterator(), давайте перейдем к forEachRemaining().
#Iterator.java
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
Стоит отметить, что здесь методы hasNext() и next() фактически вызывают методы hasNext() и next() внутреннего класса вышеуказанного интерфейса Iterator. Во-первых, давайте взглянем на реализацию метода hasNext().
public boolean hasNext() {
# 由于程序第一次寻找TableFactory的实现类,因此providers在一开始是经过clear()处理的,
# 同时,knownProviders = providers.entrySet().iterator();
# 也就是说knownProviders.hasNext()在当前这一组TableFactory实现类的查询过程中都是为false。
# 进入lookupIterator.hasNext()中。
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
# lookupIterator
public boolean hasNext() {
# 在Flink SQL查询TableFactory接口实现类时,acc(AccessControlContext:创建ServiceLoader时采取的访问控制上下文)始终为null
if (acc == null) {
return hasNextService();
}
...
}
# lookupIterator
private boolean hasNextService() {
# nextName 表示查询到的下一个TableFactory实现类的全路径名
if (nextName != null) {
return true;
}
# 在程序第一次寻找TableFactory的实现类时,其为null(Enumeration<URL> configs = null)。
if (configs == null) {
try {
# PREFIX = META-INF/services/
# service.getName()为TableFactory的全路径名
# 这里也就说明了,Java SPI机制在哪里读取接口的实现类。
String fullName = PREFIX + service.getName();
# 使用classloader根据路径去加载资源信息,
# 并将加载到项目中所有包含META-INF/services/org.apache.flink.table.factories.TableFactory的依赖jar地址,
# classLoader等信息保存到变量configs(Enumeration<URL> configs)中。
# 当系统实例化一个jar中的TableFactory实现类后,会通过configs.next()方法读取下一个jar中services文件中的内容。
# configs数据结构如下图所示。
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
# pending变量用于存储一个依赖jar中读取到的TableFactory实现类的全路径名,
# 其是一个Iterator数据接口,需要使用的时候,每次每次调用pending.next()方法
# 并将得到的TableFactory实现类的全路径名赋值给nextName。(Iterator<String> pending = null)
# 其中pending为null表示第一次进行TableFactory接口实现类的读取时,
# !pending.hasNext() = true则表示当读取完一个依赖jar中services文件夹的内容时,
# 希望继续从接下来的依赖jar中读取信息。
while ((pending == null) || !pending.hasNext()) {
# 当所有的依赖都遍历完后,configs.hasMoreElements()将返回false,
# 这个时候也就意味着这一组TableFactory实现类查询结束。
if (!configs.hasMoreElements()) {
return false;
}
# parse()方法用于读取一个依赖jar中的services文件夹中的TableFactory接口实现类的全路径名并保存到pending变量中。
# 该方法中,有一点值得说明的事,如果providers中已经保存了TableFactory接口实现类的全路径名A,即使当前依赖jar中任然包含该全路径名A,
# 那么这个时候,这个全路径名A也就不会添加到pending变量中。这样就能够保证providers中的保存的TableFactory的实现类实例唯一,
# 即使多个依赖的services文件夹里面包含同一个实现类的全路径名
pending = parse(service, configs.nextElement());
}
# 读取pending中保存的TableFactory接口实现类的全路径名,并保存到nextName变量中。
nextName = pending.next();
return true;
}
В этом методе hasNext() в основном выполняются следующие действия:
- В соответствии с именем пути к ресурсу (META-INF/services/org.apache.flink.table.factories.TableFactory) используйте загрузчик классов для загрузки информации о ресурсе и назначения ее переменной configs.
- Получите jar-файл зависимостей из переменной configs и прочитайте полный путь к интерфейсу TableFactory из этого jar-файла зависимостей и сохраните эти полные пути в переменной pending.
- Возьмите полное имя интерфейса TableFactory из переменной pending и сохраните его в переменной nextName для использования в методе next().
Далее, давайте посмотрим на метод next().
public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
# lookupIterator
public S next() {
# 在Flink SQL查询TableFactory接口实现类时,acc(AccessControlContext:创建ServiceLoader时采取的访问控制上下文)始终为null
if (acc == null) {
return nextService();
}
...
}
# lookupIterator
private S nextService() {
# hasNextService()方法就是上述的方法,这个时候nextName != null,则其返回true。
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
# nextName赋值为null,用于下一次的TableFactory接口实现类全路径名的赋值工作。
nextName = null;
Class<?> c = null;
...
# 使用Class.forName()根据TableFactory接口实现类的全路径名进行反射,
# 并根据全路径名对该类进行实例化
c = Class.forName(cn, false, loader);
...
S p = service.cast(c.newInstance());
# 将实例化后的TableFactory接口实现类保存到providers变量中。
providers.put(cn, p);
return p;
...
}
На этом запрос класса реализации интерфейса TableFactory завершен. Затем выполняется запрос нескольких классов реализации интерфейса TableFactory в зависимом банке, а затем выполняется запрос во всех зависимых банках, содержащих META-INF/services/org.apache.flink.table.factories.TableFactory в текущем проекте. Общая работа та же самая, поэтому я не буду здесь вдаваться в подробности.
Проблемы с механизмом Java SPI в Flink SQL 1.9
В следующих сценариях есть два продукта MQ (очередь сообщений), обозначенные как A и B, и у Flink есть соответствующие задания коннектора flink-connector-A и flink-connector-B, и оба они содержат META-INF /services/ org.apache.flink.table.factories.TableFactory. В это время в том же кластере после запуска программы Flink SQL 1, которая использует данные из flink-connector-A, а затем запускает программу Flink SQL 2, которая использует данные из flink-connector-2, будет сообщено о следующем исключении:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
...
50395:The following factories have been considered:
...
51110:org.apache.flink.streaming.connectors.kafka.A
...
Очевидно, что информация о классе реализации интерфейса TableFactory, запрашиваемая программой Flink SQL 2, на самом деле является информацией, запрашиваемой программой Flink SQL 1. Эта проблема возникает из-за переменной defaultLoader.
# TableFactoryService
private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);
Все внутренние переменные и методы класса TableFactoryService изменяются с помощью static. После поиска класса реализации интерфейса TableFactory в программе Flink SQL 1 поставщики переменных, на которые ссылается переменная defaultLoader, сохраняют класс экземпляра интерфейса TableFactory, прочитанный на этот раз. Когда программа Flink SQL программа 2 собирается найти класс реализации интерфейса TableFactory, она будет напрямую использовать defaultLoader в программе Flink SQL 1 (поскольку это статическая переменная, она будет храниться в JVM, если кластер не остановлен).
defaultLoader.iterator().forEachRemaining(result::add);
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
public Iterator<S> iterator() {
return new Iterator<S>() {
# 这个时候knownProviders保存的数据就是Flink SQL程序1读取到TableFactory接口实现类的实例
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
# 此时,kownProviders.hasNext() 为true
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
# 此时,knownProviders.hasNext() 为true
if (knownProviders.hasNext())
# 从knownProviders中读取TableFactory接口实现类的实例
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
На данный момент у нас есть четкое понимание того, почему программа Flink SQL 2 считывает данные класса реализации интерфейса TableFactory, запрашиваемые программой Flink SQL 1. К счастью, эта ошибка была устранена в Flink 1.10.
private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
try {
List<TableFactory> result = new LinkedList<>();
ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
ServiceLoader
.load(TableFactory.class, cl)
.iterator()
.forEachRemaining(result::add);
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for table factories.", e);
throw new TableException("Could not load service provider for table factories.", e);
}
}
конец текста