使用fork/join可以跨线程边界安全地移植非线程安全值吗?

oxb*_*kes 6 java parallel-processing fork-join java-memory-model java-stream

我有一些不是线程安全的类:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}
Run Code Online (Sandbox Code Playgroud)

(我在long这里使用了a 作为字段,但我们应该将其字段视为一些线程不安全的类型).

我现在有一个看起来像这样的课程

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}
Run Code Online (Sandbox Code Playgroud)

也就是说,线程不安全类是它的最后一个字段.现在我要这样做:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}
Run Code Online (Sandbox Code Playgroud)

也就是说,从thread T(main),我调用了一些工作T'(fork-join-pool),它创建并改变了我的不安全类的实例,然后返回包含在a中的结果Foo.请注意,我的线程不安全类的所有变异都发生在一个线程上T'.

问题1:我是否保证thread-unsafe-class实例的结束状态安全地移植到?的T' ~> T线程边界上join

问题2:如果使用并行流完成此操作会怎么样?例如:

Map<Long, Foo> results = 
  Stream
    .of(new ThreadUnsafeClass())
    .parallel()
    .map(tuc -> {
      tuc.incrementAndGet();
      return new Foo(tuc);
    })
    .collect(
      Collectors.toConcurrentMap(
        foo -> foo.c.i,
        Function.identity();
      )
    );
assert(results.get(1) != null)
Run Code Online (Sandbox Code Playgroud)

Ale*_*lev 11

我认为ForkJoinTask.join()具有相同的记忆效应Future.get()(因为它在join()Javadoc中说的基本上get()是中断和异常差异).而Future.get()指定为:

由Future表示的异步计算所采取的操作发生在通过另一个线程中的Future.get()检索结果之后的操作之前.

换句话说,这基本上是通过Future/ 的"安全出版物" FJT.这意味着,执行程序线程通过FJT结果执行和发布的任何内容对FJT.join()用户都是可见的.由于该示例仅分配对象并在执行程序线程中填充其字段,并且在从执行程序返回后对象没有任何反应,因此我们只能看到执行程序线程生成的值.

请注意,通过整个过程final并不会带来任何额外的好处.即使你只是做了普通的实地商店,你仍然可以保证:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    Future<MyObject> f = s.submit(() -> new MyObject(42));
    assert (f.get().x == 42); // guaranteed!
    s.shutdown();
}

public class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}
Run Code Online (Sandbox Code Playgroud)

但要注意的是,在Stream例如(如果我们假设之间的对称性Stream.of.parallelExecutor.submit和之间 Stream.collectFJT.join/ Future.get),已创建的对象调用程序线程,然后通过它来执行人做一些事情.这是一个微妙的区别,但它仍然没有多大关系,因为我们还提交HB,这样就无法看到对象的旧状态:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    MyObject o = new MyObject(42);
    Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
    f.get(); // get -->hb--> read o.x
    assert (o.x == 43); // guaranteed
    s.shutdown();
}

public static class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}
Run Code Online (Sandbox Code Playgroud)

(在正式发言中,那是因为所有HB路径都来自read(o.x)执行程序线程的动作store(o.x, 43))