想要并行运行非线程安全库 - 可以使用多个类加载器来完成吗?

Tho*_*sen 28 java multithreading classloader

我正在开发一个项目,在这个项目中,我们使用的库不能保证线程安全(并且不是),并且在Java 8流场景中是单线程的,它可以按预期工作.

我们希望使用并行流来获得低悬挂可扩展性的成果.

不幸的是,这导致库失败 - 很可能是因为一个实例干扰了与另一个实例共享的变量 - 因此我们需要隔离.

我正在考虑为每个实例(可能是本地线程)使用单独的类加载器,据我所知,这应该意味着,出于所有实际目的,我需要隔离,但我不熟悉为此目的故意构建类加载器.

这是正确的方法吗?为了获得适当的生产质量,我该如何做?


编辑:我被要求提供有关触发问题的情况的其他信息,以便更好地理解它.问题仍然是关于一般情况,而不是修复图书馆.

我可以完全控制库创建的对象(https://github.com/veraPDF/)

<dependency>
    <groupId>org.verapdf</groupId>
    <artifactId>validation-model</artifactId>
    <version>1.1.6</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

使用项目maven存储库来存在工件.

<repositories>
    <repository>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>vera-dev</id>
        <name>Vera development</name>
        <url>http://artifactory.openpreservation.org/artifactory/vera-dev</url>
    </repository>
</repositories>
Run Code Online (Sandbox Code Playgroud)

目前,强化图书馆是不可行的.


编辑:我被要求显示代码.我们的核心适配器大致是:

public class VeraPDFValidator implements Function<InputStream, byte[]> {
    private String flavorId;
    private Boolean prettyXml;

    public VeraPDFValidator(String flavorId, Boolean prettyXml) {
        this.flavorId = flavorId;
        this.prettyXml = prettyXml;
        VeraGreenfieldFoundryProvider.initialise();
    }

    @Override
    public byte[] apply(InputStream inputStream) {
        try {
            return apply0(inputStream);
        } catch (RuntimeException e) {
            throw e;
        } catch (ModelParsingException | ValidationException | JAXBException | EncryptedPdfException e) {
            throw new RuntimeException("invoking VeraPDF validation", e);
        }
    }

    private byte[] apply0(InputStream inputStream) throws ModelParsingException, ValidationException, JAXBException, EncryptedPdfException {
        PDFAFlavour flavour = PDFAFlavour.byFlavourId(flavorId);
        PDFAValidator validator = Foundries.defaultInstance().createValidator(flavour, false);
        PDFAParser loader = Foundries.defaultInstance().createParser(inputStream, flavour);
        ValidationResult result = validator.validate(loader);

        // do in-memory generation of XML byte array - as we need to pass it to Fedora we need it to fit in memory anyway.

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        XmlSerialiser.toXml(result, baos, prettyXml, false);
        final byte[] byteArray = baos.toByteArray();
        return byteArray;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个从InputStream(提供PDF文件)映射到字节数组(表示XML报告输出)的函数.

(看到代码,我注意到构造函数中有一个初始化程序的调用,在我的特定情况下可能是这里的罪魁祸首.我仍然喜欢这个泛型问题的解决方案.

Bru*_*der 13

我们面临着类似的挑战.问题通常来自静态属性,这些属性在各个线程之间变得不情愿地"共享".

使用不同的类加载器对我们起作用,只要我们能保证静态属性实际上是由类加载器加载的类设置的.Java 可能有一些类,它们提供的属性或方法不是在线程之间隔离的,或者不是线程安全的(' System.setProperties()并且Security.addProvider()没关系 - 任何有关此问题的规范文档都受到欢迎btw).

一个潜在可行且快速的解决方案 - 至少可以让你有机会为你的库测试这个理论 - 是使用一个servlet引擎,如Jetty或Tomcat.

构建一些包含你的库的战争并并行启动进程(每场战争1次).

在servlet线程中运行代码时WebappClassLoaders,这些引擎中的这些引擎首先尝试从父类加载器加载类(与引擎相同),如果找不到类,则尝试从打包的jar /类加载它战争.

使用jetty,您可以通过编程方式将战争热部署到您选择的上下文中,然后根据需要在理论上扩展处理器(战争)的数量.

我们通过扩展实现了我们自己的类加载器,URLClassLoader并从Jetty Webapp ClassLoader中获得灵感.这并不像看起来那么难.

我们的类加载器所做的正好相反:它会尝试加载从本地的"一揽子"的罐子一类第一,然后尝试从父类加载器得到它们.这保证了从不考虑父类加载器意外加载的库(第一个).我们的'包'实际上是一个包含其他具有自定义清单文件的jar /库的jar.

按"原样"发布此类加载器代码不会有太大意义(并创建一些版权问题).如果你想进一步探索这条路线,我可以试着想出一个骨架.

Jetty WebappClassLoader的来源


Nic*_*tto 8

答案实际上取决于您的图书馆依赖的内容:

  1. 如果您的库依赖于至少一个本机库,使用ClassLoaders来隔离库的代码将无济于事,因为根据JNI规范,不允许将相同的JNI本机库加载到多个类加载器中,这样就可以了结束了UnsatisfiedLinkError.
  2. 如果您的库依赖于至少一个不希望共享的外部资源(例如文件而且由库修改),则最终可能会出现复杂的错误和/或资源损坏.

假设您不在上面列出的情况中,一般来说,如果一个类被称为非线程安全并且不修改任何静态字段,则每个调用或每个线程使用此类的专用实例就足够了作为类实例然后不再共享.

在这里,您的库显然依赖并修改了一些不打算共享的静态字段,您确实需要在专用中隔离库的类ClassLoader,当然要确保您的线程不共享相同的ClassLoader.

为此,您可以简单地创建一个URLClassLoader将库提供的位置URL(使用URLClassLoader.newInstance(URL[] urls, ClassLoader parent)),然后通过反射,您将检索与入口点对应的库的类并调用目标方法.为避免URLClassLoader在每次调用时构建新内容,您可以考虑依赖于a ThreadLocal来存储URLClassLoader或者用于给定线程ClassMethod实例.


那么你可以这样做:

假设我的库的入口点是Foo看起来像这样的类:

package com.company;

public class Foo {

    // A static field in which we store the name of the current thread
    public static String threadName;

    public void execute() {
        // We print the value of the field before setting a value
        System.out.printf(
            "%s: The value before %s%n", Thread.currentThread().getName(), threadName
        );
        // We set a new value
        threadName = Thread.currentThread().getName();
        // We print the value of the field after setting a value
        System.out.printf(
            "%s: The value after %s%n", Thread.currentThread().getName(), threadName
        );
    }
}
Run Code Online (Sandbox Code Playgroud)

这个类显然不是线程安全的,并且该方法execute修改了一个静态字段的值,该静态字段不会被并发线程修改,就像您的用例一样.

假设要启动我的库,我只需要创建一个实例Foo并调用该方法execute.我可以将相应的存储Method在a中ThreadLocal,每个线程只使用一次反射来检索它,使用ThreadLocal.withInitial(Supplier<? extends S> supplier)下一个:

private static final ThreadLocal<Method> TL = ThreadLocal.withInitial(
    () -> {
        try {
            // Create the instance of URLClassLoader using the context 
            // CL as parent CL to be able to retrieve the potential 
            // dependencies of your library assuming that they are
            // thread safe otherwise you will need to provide their 
            // URL to isolate them too
            URLClassLoader cl = URLClassLoader.newInstance(
                new URL[]{/* Here the URL of my library*/},
                Thread.currentThread().getContextClassLoader()
            );
            // Get by reflection the class Foo
            Class<?> myClass = cl.loadClass("com.company.Foo");
            // Get by reflection the method execute
            return myClass.getMethod("execute");
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        }
    }
);
Run Code Online (Sandbox Code Playgroud)

最后让我们模拟我的库的并发执行:

// Launch 50 times concurrently my library
IntStream.rangeClosed(1, 50).parallel().forEach(
    i -> {
        try {
            // Get the method instance from the ThreadLocal
            Method myMethod = TL.get();
            // Create an instance of my class using the default constructor
            Object myInstance = myMethod.getDeclaringClass().newInstance();
            // Invoke the method
            myMethod.invoke(myInstance);
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        }
    }
);
Run Code Online (Sandbox Code Playgroud)

您将获得下一个类型的输出,该输出显示我们在线程之间没有冲突,并且线程在从一个调用execute到另一个调用时正确地重用其对应的类/字段的值:

ForkJoinPool.commonPool-worker-7: The value before null
ForkJoinPool.commonPool-worker-7: The value after ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-7: The value before ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-7: The value after ForkJoinPool.commonPool-worker-7
main: The value before null
main: The value after main
main: The value before main
main: The value after main
...
Run Code Online (Sandbox Code Playgroud)

由于此方法将为ClassLoader每个线程创建一个,因此请确保使用具有固定数量线程的线程池应用此方法,并且应明智地选择线程数以防止内存不足,因为ClassLoader内存占用空间不足所以你需要根据你的堆大小限制实例的总量.

一旦你完成了你的库,你应该清理ThreadLocal线程池的每个线程,以防止内存泄漏,这样做是这样你可以继续:

// The size of your the thread pool
// Here as I used for my example the common pool, its size by default is
// Runtime.getRuntime().availableProcessors()
int poolSize = Runtime.getRuntime().availableProcessors();
// The cyclic barrier used to make sure that all the threads of the pool
// will execute the code that will cleanup the ThreadLocal
CyclicBarrier barrier = new CyclicBarrier(poolSize);
// Launch one cleanup task per thread in the pool
IntStream.rangeClosed(1, poolSize).parallel().forEach(
    i -> {
        try {
            // Wait for all other threads of the pool
            // This is needed to fill up the thread pool in order to make sure 
            // that all threads will execute the cleanup code
            barrier.await();
            // Close the URLClassLoader to prevent memory leaks
            ((URLClassLoader) TL.get().getDeclaringClass().getClassLoader()).close();
        } catch (Exception e) {
            // Here deal with the exceptions
            throw new IllegalStateException(e);
        } finally {
            // Remove the URLClassLoader instance for this thread
            TL.remove();
        }
    }
);
Run Code Online (Sandbox Code Playgroud)


kri*_*aex 7

我找到了问题,并为您创建了一个小工具:

https://github.com/kriegaex/ThreadSafeClassLoader

目前它尚未作为Maven Central的正式版本提供,但您可以获得如下快照:

<dependency>
  <groupId>de.scrum-master</groupId>
  <artifactId>threadsafe-classloader</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

<!-- (...) -->

<repositories>
  <repository>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
    <id>ossrh</id>
    <name>Sonatype OSS Snapshots</name>
    <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>
Run Code Online (Sandbox Code Playgroud)

课程ThreadSafeClassLoader:

它使用了JCL(Jar类加载器),因为它已经提供了该线程其他部分讨论的类加载,对象实例化和代理生成功能.(为什么重新发明轮子?)我在顶部添加的是一个很好的界面,正是我们需要的:

package de.scrum_master.thread_safe;

import org.xeustechnologies.jcl.JarClassLoader;
import org.xeustechnologies.jcl.JclObjectFactory;
import org.xeustechnologies.jcl.JclUtils;
import org.xeustechnologies.jcl.proxy.CglibProxyProvider;
import org.xeustechnologies.jcl.proxy.ProxyProviderFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class ThreadSafeClassLoader extends JarClassLoader {
  private static final JclObjectFactory OBJECT_FACTORY = JclObjectFactory.getInstance();

  static {
    ProxyProviderFactory.setDefaultProxyProvider(new CglibProxyProvider());
  }

  private final List<Class> classes = new ArrayList<>();

  public static ThreadLocal<ThreadSafeClassLoader> create(Class... classes) {
    return ThreadLocal.withInitial(
      () -> new ThreadSafeClassLoader(classes)
    );
  }

  private ThreadSafeClassLoader(Class... classes) {
    super();
    this.classes.addAll(Arrays.asList(classes));
    for (Class clazz : classes)
      add(clazz.getProtectionDomain().getCodeSource().getLocation());
  }

  public <T> T newObject(ObjectConstructionRules rules) {
    rules.validate(classes);
    Class<T> castTo = rules.targetType;
    return JclUtils.cast(createObject(rules), castTo, castTo.getClassLoader());
  }

  private Object createObject(ObjectConstructionRules rules) {
    String className = rules.implementingType.getName();
    String factoryMethod = rules.factoryMethod;
    Object[] arguments = rules.arguments;
    Class[] argumentTypes = rules.argumentTypes;
    if (factoryMethod == null) {
      if (argumentTypes == null)
        return OBJECT_FACTORY.create(this, className, arguments);
      else
        return OBJECT_FACTORY.create(this, className, arguments, argumentTypes);
    } else {
      if (argumentTypes == null)
        return OBJECT_FACTORY.create(this, className, factoryMethod, arguments);
      else
        return OBJECT_FACTORY.create(this, className, factoryMethod, arguments, argumentTypes);
    }
  }

  public static class ObjectConstructionRules {
    private Class targetType;
    private Class implementingType;
    private String factoryMethod;
    private Object[] arguments;
    private Class[] argumentTypes;

    private ObjectConstructionRules(Class targetType) {
      this.targetType = targetType;
    }

    public static ObjectConstructionRules forTargetType(Class targetType) {
      return new ObjectConstructionRules(targetType);
    }

    public ObjectConstructionRules implementingType(Class implementingType) {
      this.implementingType = implementingType;
      return this;
    }

    public ObjectConstructionRules factoryMethod(String factoryMethod) {
      this.factoryMethod = factoryMethod;
      return this;
    }

    public ObjectConstructionRules arguments(Object... arguments) {
      this.arguments = arguments;
      return this;
    }

    public ObjectConstructionRules argumentTypes(Class... argumentTypes) {
      this.argumentTypes = argumentTypes;
      return this;
    }

    private void validate(List<Class> classes) {
      if (implementingType == null)
        implementingType = targetType;
      if (!classes.contains(implementingType))
        throw new IllegalArgumentException(
          "Class " + implementingType.getName() + " is not protected by this thread-safe classloader"
        );
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我用几个单元集成测试测试了我的概念,其中一个展示了如何重现和解决veraPDF问题.

现在,这是使用我的特殊类加载器时代码的样子:

课程VeraPDFValidator:

我们只是static ThreadLocal<ThreadSafeClassLoader>在我们的类中添加一个成员,告诉它将哪些类/库放入新的类加载器(每个库提一个类就足够了,随后我的工具自动识别库).

然后通过threadSafeClassLoader.get().newObject(forTargetType(VeraPDFValidatorHelper.class))我们在线程安全的类加载器中实例化我们的帮助器类并为它创建一个代理对象,以便我们可以从外部调用它.

BTW,static boolean threadSafeMode仅存在于veraPDF的旧(不安全)和新(线程安全)使用之间切换,以便使原始问题可再现为负集成测试用例.

package de.scrum_master.app;

import de.scrum_master.thread_safe.ThreadSafeClassLoader;
import org.verapdf.core.*;
import org.verapdf.pdfa.*;

import javax.xml.bind.JAXBException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.function.Function;

import static de.scrum_master.thread_safe.ThreadSafeClassLoader.ObjectConstructionRules.forTargetType;

public class VeraPDFValidator implements Function<InputStream, byte[]> {
  public static boolean threadSafeMode = true;

  private static ThreadLocal<ThreadSafeClassLoader> threadSafeClassLoader =
    ThreadSafeClassLoader.create(           // Add one class per artifact for thread-safe classloader:
      VeraPDFValidatorHelper.class,         //   - our own helper class
      PDFAParser.class,                     //   - veraPDF core
      VeraGreenfieldFoundryProvider.class   //   - veraPDF validation-model
    );

  private String flavorId;
  private Boolean prettyXml;

  public VeraPDFValidator(String flavorId, Boolean prettyXml)
    throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
    this.flavorId = flavorId;
    this.prettyXml = prettyXml;
  }

  @Override
  public byte[] apply(InputStream inputStream) {
    try {
      VeraPDFValidatorHelper validatorHelper = threadSafeMode
        ? threadSafeClassLoader.get().newObject(forTargetType(VeraPDFValidatorHelper.class))
        : new VeraPDFValidatorHelper();
      return validatorHelper.validatePDF(inputStream, flavorId, prettyXml);
    } catch (ModelParsingException | ValidationException | JAXBException | EncryptedPdfException e) {
      throw new RuntimeException("invoking veraPDF validation", e);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

课程VeraPDFValidatorHelper:

在这个类中,我们隔离了对损坏库的所有访问.这里没什么特别的,只是从OP的问题复制的代码.这里完成的一切都发生在线程安全的类加载器中.

package de.scrum_master.app;

import org.verapdf.core.*;
import org.verapdf.pdfa.*;
import org.verapdf.pdfa.flavours.PDFAFlavour;
import org.verapdf.pdfa.results.ValidationResult;

import javax.xml.bind.JAXBException;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;

public class VeraPDFValidatorHelper {
  public byte[] validatePDF(InputStream inputStream, String flavorId, Boolean prettyXml)
    throws ModelParsingException, ValidationException, JAXBException, EncryptedPdfException
  {
    VeraGreenfieldFoundryProvider.initialise();
    PDFAFlavour flavour = PDFAFlavour.byFlavourId(flavorId);
    PDFAValidator validator = Foundries.defaultInstance().createValidator(flavour, false);
    PDFAParser loader = Foundries.defaultInstance().createParser(inputStream, flavour);
    ValidationResult result = validator.validate(loader);

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    XmlSerialiser.toXml(result, baos, prettyXml, false);
    return baos.toByteArray();
  }
}
Run Code Online (Sandbox Code Playgroud)