# JVM Unsafe putOrdered 优化原理

我们通常在代码中使用 acquire 和 release 语义来解决多线程之间的类似乱序现象。本文将详细描述Hotspot JIT 对于该操作的优化过程。

我们先来看该方法的调用方式,以下代码摘自 doug lea 老爷子的 ForkJoinPool代码,因为Java除了他,还有谁的并发代码更值得阅读和深究呢?我们看到这里使用 putOrdered 来保证 数组项 和 top 变量的写入顺序,但并不具备volatile 语义,同时 top变量并未使用 volatile 修饰,那么语义何在?我们知道:只要 top 变量 和 数组项 按顺序写入,那么 top 变量 只要递进到下一个 数组下标成功,那么必然数组项一定已经放入成功,所以,当线程后续如果对其他 volatile 变量进行写入操作时,那么由于 volatile 语义保证了 top变量 一定会在 volatile 变量写入之前写入成功,同时 由于 putOrdered 保证了 数组项 与 top 变量的写入顺序,那么 此时 数组项也一定会写入成功,所以此时我们根本不需要 volatile 语义来强制使用全屏障来放大这种影响:别忘了 volatile 的写入 将会 写入操作后面增加一个 storeload 屏障。

static final class WorkQueue {
    int top;
    // 将任务放到array数组
    final void push(ForkJoinTask<?> task) {
        ... 
        int s = top;
        if ((a = array) != null) {  
            int m = a.length - 1;    
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); // 先放置任务 array 的 top 项
            U.putOrderedInt(this, QTOP, s + 1); // 然后对 top + 1 移动到 数组的下一个放置下标中
            ...
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 未经 JIT 优化的 putOrdered

通过源码我们看到,当 JIT 没有参与优化时, putOrdered 的写入操作等价于 volatile 的写入操作。

UNSAFE_ENTRY(void, Unsafe_SetOrderedInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint x))
  SET_FIELD_VOLATILE(obj, offset, jint, x);
UNSAFE_END
    
#define SET_FIELD_VOLATILE(obj, offset, type_name, x) 
  oop p = JNIHandles::resolve(obj);  // 操作对象的首地址
  OrderAccess::release_store_fence((volatile type_name*)index_oop_from_field_offset_long(p, offset), x); // 使用volatile 的语义完成写入// 我们知道 x86 的 xchg 指令自身会自动声明 lock 信号,所以该操作为全屏障操作,此时等价于同时加上了(storestore 和 storeload 屏障)
inline void     OrderAccess::release_store_fence(volatile jshort* p, jshort v) {
  __asm__ volatile (  "xchgw (%2),%0"
                    : "=r" (v)
                    : "0" (v), "r" (p)
                    : "memory");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 编译器优化的 putOrdered

我们看 C1 编译器的代码,可以看到 HIR 和 LIR ,这两个是关键流程。LIR 将根据 HIR 生成的节点来完成寄存器分配,而主要优化阶段在 HIR。

void Compiler::compile_method(ciEnv* env, ciMethod* method, int entry_bci) {
    ...
    Compilation c(this, env, method, entry_bci, buffer_blob); // 开始编译
    ...
}
​
Compilation::Compilation(...){
    ...
    compile_method();
    ...
}void Compilation::compile_method() {
    ...
    int frame_size = compile_java_method();
    ...
}int Compilation::compile_java_method() {
    ...
    build_hir(); // 构建高等级中间语言
    ...
    emit_lir();  // 构建低等级中间语言
    ...
    emit_code_body(); // 根据 LIR 生成机器语言
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# build_hir

在构建 HIR 时,我们看到当invokeStaticMethod 字节码调用的为 putOrderedXX时,将会在HIR 中添加一个 UnsafePutObject 操作对象,该对象将会在后面调用 LIR 时使用。源码如下。

void Compilation::build_hir() {
    ...
    _hir = new IR(this, method(), osr_bci()); // 创建对象后直接开始构建 HIR 
    ...
}// IR 构造函数
IR::IR(Compilation* compilation, ciMethod* method, int osr_bci) :
    _locals_size(in_WordSize(-1))
  , _num_loops(0) {
  _compilation = compilation;
  _top_scope   = new IRScope(compilation, NULL, -1, method, osr_bci, true); // 创建 IRScope 由此开始转换,注意这里指定了 末位参数 create_graph 为 true
  _code        = NULL;
}// IRScope 构造函数
IRScope::IRScope(Compilation* compilation, IRScope* caller, int caller_bci, ciMethod* method, int osr_bci, bool create_graph)
: _callees(2)
, _compilation(compilation)
, _requires_phi_function(method->max_locals())
{
  ...
  if (create_graph && monitor_pairing_ok()) _start = build_graph(compilation, osr_bci); // 构建 HIR 转换图
  ...
}
​
BlockBegin* IRScope::build_graph(Compilation* compilation, int osr_bci) {
  GraphBuilder gm(compilation, this); // 创建GraphBuilder对象,在创建时开始转换,构造器执行完毕,那么转换完毕
  if (compilation->bailed_out()) return NULL;
  return gm.start(); // 获取起始的 HIR 块信息
}// GraphBuilder 构造器
GraphBuilder::GraphBuilder(Compilation* compilation, IRScope* scope)...{
    ...
    scope_data()->add_to_work_list(start_block); // 将起始块放入工作列表
    iterate_all_blocks(); // 迭代所有图中的块信息,看这里即可,其他别看
    ...
}void GraphBuilder::iterate_all_blocks(bool start_in_current_block_for_inlining) {
  do {
    if (start_in_current_block_for_inlining && !bailed_out()) {
      iterate_bytecodes_for_block(0); // 迭代处理起始块的字节码
      start_in_current_block_for_inlining = false;
    } else { // 处理非起始块的字节码
      BlockBegin* b;
      while ((b = scope_data()->remove_from_work_list()) != NULL) {
        if (!b->is_set(BlockBegin::was_visited_flag)) {
          ...
          connect_to_end(b); // 处理当前块的字节码
        }
      }
    }
  } while (!bailed_out() && !scope_data()->is_work_list_empty());
}// 我们看这里即可 bci 为 当前字节码的指针
void GraphBuilder::connect_to_end(BlockBegin* beg) {
  ...
  iterate_bytecodes_for_block(beg->bci());
}
​
BlockEnd* GraphBuilder::iterate_bytecodes_for_block(int bci) {
  ...
  while (!bailed_out() && last()->as_BlockEnd() == NULL &&
         (code = stream()->next()) != ciBytecodeStream::EOBC() &&
         (block_at(s.cur_bci()) == NULL || block_at(s.cur_bci()) == block())) { // 循环处理所有字节码
      ...
      switch (code) {
          // 这里省略其他字节码,我们调用的 unsafe.putOrderedInt 将通过以下字节码调用
          case Bytecodes::_invokevirtual  : // fall through
          case Bytecodes::_invokespecial  : // fall through
          case Bytecodes::_invokestatic   : // fall through
          case Bytecodes::_invokedynamic  : // fall through
          case Bytecodes::_invokeinterface: invoke(code); break;  // 此时调用invode方法处理该字节码
      }
      ...
  }
  ...
}void GraphBuilder::invoke(Bytecodes::Code code) {
    ...
    if (!PatchALot && Inline && klass->is_loaded() &&
      (klass->is_initialized() || klass->is_interface() && target->holder()->is_initialized())
      && target->is_loaded()
      && !patch_for_appendix) { // 对调用方法尝试进行内联
    if (code == Bytecodes::_invokestatic  ||
        code == Bytecodes::_invokespecial ||
        code == Bytecodes::_invokevirtual && target->is_final_method() ||
        code == Bytecodes::_invokedynamic) {
      ciMethod* inline_target = (cha_monomorphic_target != NULL) ? cha_monomorphic_target : target;
      bool success = try_inline(inline_target, (cha_monomorphic_target != NULL) || (exact_target != NULL), code, better_receiver); // 开始内联
     ...
  } else {
    print_inlining(target, "not inlineable", /*success*/ false);
  }
    ...
}
    
bool GraphBuilder::try_inline(ciMethod* callee, bool holder_known, Bytecodes::Code bc, Value receiver) {
   ...
   if (callee->intrinsic_id() != vmIntrinsics::_none) { // 执行内联
    if (try_inline_intrinsics(callee)) { 
      print_inlining(callee, "intrinsic");
      return true;
    }
  } 
  ...
}
    
// 这里等等,方法居然有个 intrinsic_id ?怎么来的?详情看混沌专题的描述
bool GraphBuilder::try_inline_intrinsics(ciMethod* callee) {
    vmIntrinsics::ID id = callee->intrinsic_id();
    switch (id) {
        // 这里我们看 putOrderedInt 即可
        case vmIntrinsics::_putOrderedInt    : return append_unsafe_put_obj(callee, T_INT, true);   
    }
}// 内联 putOrderedInt 
bool GraphBuilder::append_unsafe_put_obj(ciMethod* callee, BasicType t, bool is_volatile) {
  if (InlineUnsafeOps) { // 该标识符默认为 true
    Values* args = state()->pop_arguments(callee->arg_size());
    null_check(args->at(0));
    Instruction* offset = args->at(2);
    Instruction* op = append(new UnsafePutObject(t, args->at(1), offset, args->at(3), is_volatile)); // 向生成图中添加了一个 UnsafePutObject 对象
    compilation()->set_has_unsafe_access(true);
    kill_all();
  }
  return InlineUnsafeOps;
}
    
// 最后我们来看看该对象的定义
LEAF(UnsafePutObject, UnsafeObjectOp) // 使用宏定义,来完成类定义(UnsafePutObject为子类,也即当前类的定义,UnsafeObjectOp为父类,也即 UnsafePutObject的父类)
 private:
  Value _value;                                  // 要被存储的值
 public:
  UnsafePutObject(BasicType basic_type, Value object, Value offset, Value value, bool is_volatile)
  : UnsafeObjectOp(basic_type, object, offset, true, is_volatile)
    , _value(value)
  {
    ASSERT_VALUES
  }
  // accessors
  Value value()                                  { return _value; }
  // generic
  virtual void input_values_do(ValueVisitor* f)   { UnsafeObjectOp::input_values_do(f);                                           f->visit(&_value); }
};// 看如下宏定义
#define LEAF(class_name, super_class_name)       
  BASE(class_name, super_class_name)             
   public:                                       
    virtual const char* name() const             { return #class_name; }      
    virtual void visit(InstructionVisitor* v)    { v->do_##class_name(this); }  // 注意该虚函数的实现:当后面我们调用该方法时,将会直接调用传入的 InstructionVisitor的 do_UnsafePutObject 方法回调!!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

# emit_lir

可以看到生成 LIR 时,将会根据 UnsafePutObject 对象的属性设置,在调用前后加上 membar_release 与 membar 屏障。代码如下。

void Compilation::emit_lir() {
    // 根据 HIR 生成 LIR 表示
    LIRGenerator gen(this, method());
    {
        hir()->iterate_linear_scan_order(&gen);
    }
    ...
}// LIR 生成器构造函数
LIRGenerator::LIRGenerator(Compilation* compilation, ciMethod* method)...{
    init();
}void IR::iterate_linear_scan_order(BlockClosure* closure) {
  linear_scan_order()->iterate_forward(closure);
}// 获取 HIR 的块信息 并迭代其中的块,调用 BlockClosure 也即 LIRGenerator 的 block_do 函数
BlockList* linear_scan_order() {  assert(_code != NULL, "not computed"); return _code; }
void BlockList::iterate_forward (BlockClosure* closure) {
  const int l = length();
  for (int i = 0; i < l; i++) closure->block_do(at(i));
}void LIRGenerator::block_do(BlockBegin* block) {
  block_do_prolog(block); // 创建 LIR_List 列表保存生成的 LIR
  set_block(block);
  for (Instruction* instr = block; instr != NULL; instr = instr->next()) { // 迭代处理所有 HIR 块
    if (instr->is_pinned()) do_root(instr);
  }
  set_block(NULL);
  block_do_epilog(block); // 处理完成后清理使用的资源
}// 处理块中的当前指令
void LIRGenerator::do_root(Value instr) {
    ...
    instr->visit(this);
    ...
}// 于是乎,在前面我们看到当前指令为 UnsafePutObject 时,将会回调该 LIRGenerator 的 do_UnsafePutObject方法
void LIRGenerator::do_UnsafePutObject(UnsafePutObject* x) { 
  BasicType type = x->basic_type();
  ...
  if (type == T_BOOLEAN || type == T_BYTE) { // boolean 和 byte
    data.load_byte_item();
  } else {
    data.load_item();
  }
  ...
  // 若当前设置的 x 变量为 volatile 且当前为多处理器架构,那么设置 storestore 屏障,并且在赋值后再执行 全屏障
  if (x->is_volatile() && os::is_MP()) __ membar_release();
  put_Object_unsafe(src.result(), off.result(), data.result(), type, x->is_volatile());
  if (x->is_volatile() && os::is_MP()) __ membar();
}// 我们来看 X86 下放置对象的实现:直接赋值
void LIRGenerator::put_Object_unsafe(LIR_Opr src, LIR_Opr offset, LIR_Opr data,
                                     BasicType type, bool is_volatile) {
  if (is_volatile && type == T_LONG) { // long 类型
    LIR_Address* addr = new LIR_Address(src, offset, T_DOUBLE);
    LIR_Opr tmp = new_register(T_DOUBLE);
    LIR_Opr spill = new_register(T_DOUBLE);
    set_vreg_flag(spill, must_start_in_memory);
    __ move(data, spill);
    __ move(spill, tmp);
    __ move(tmp, addr);
  } else { // 其他类型
    LIR_Address* addr = new LIR_Address(src, offset, type);
    bool is_obj = (type == T_ARRAY || type == T_OBJECT);
    if (is_obj) { // 对象类型,那么需要依赖读写屏障(注意:这里的barrier 不是 内存屏障,是一个回调函数,这里暂时忽略吧,后面说 GC 时会讲解)
      pre_barrier(LIR_OprFact::address(addr), LIR_OprFact::illegalOpr /* pre_val */,
                  true /* do_load */, false /* patch */, NULL);
      __ move(data, addr);
      post_barrier(LIR_OprFact::address(addr), data);
    } else { // 否则,直接赋值
      __ move(data, addr);
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

# 总结

在JDK 1.8 中对于 putOrderedXXX 的语义采用的是一刀切的策略,都会加上同 volatile 变量写操作的 storestore 和 storeload 屏障,读者可以自行打开 jdk 1.9 + 的源码,Unsafe 的 putOrderedXXX 语义使用 putXXXRelease 操作,在 x86 上优化为空操作。