节点模式调节的基础是VPP主循环执行的次数,即main_loop_count的值。
always_inline void
vlib_increment_main_loop_counter (vlib_main_t * vm)
{
vm->main_loop_count++;
vm->internal_node_last_vectors_per_main_loop = 0;
if (PREDICT_FALSE (vm->main_loop_exit_now))
clib_longjmp (&vm->main_loop_exit, VLIB_MAIN_LOOP_EXIT_CLI);
}
每次主循环执行,都调用以上函数,递增main_loop_count的计数。
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{
while (1)
{
vlib_increment_main_loop_counter (vm);
向量数量计算
以下函数更新node节点的向量处理信息,本次处理了(n_vectors)数量的向量。节点main_loop_vector_stats为具有两个成员的数组,VLIB_LOG2_MAIN_LOOPS_PER_STATS_UPDATE值为7。如下i变量的值有两种可能0和1,初始值为0,每当main_loop_count的值增加128之后(主循环执行128次),i的值发生改变。
i = ((vm->main_loop_count >> 7) & (2 - 1));
or:
i = ((vm->main_loop_count >> 7) & 1);
i0表示当前使用的统计值索引;i1表示上一次使用的索引。
always_inline u32
vlib_node_runtime_update_main_loop_vector_stats (vlib_main_t * vm,
vlib_node_runtime_t * node, uword n_vectors)
{
u32 i, d, vi0, vi1, i0, i1;
ASSERT (is_pow2 (ARRAY_LEN (node->main_loop_vector_stats)));
i = ((vm->main_loop_count >> VLIB_LOG2_MAIN_LOOPS_PER_STATS_UPDATE)
& (ARRAY_LEN (node->main_loop_vector_stats) - 1));
i0 = i ^ 0;
i1 = i ^ 1;
初始情况下,没有统计值,i值为0,i0和i1的值分别为0和1。d为0,向量统计值vi0和vi1也都是0,此时的向量计数(n_vectors)增加到当前索引vi0表示的数组成员中,更新main_loop_count_last_dispatch。返回值为上一次的向量统计值(索引为i1)。
d = ((vm->main_loop_count >> VLIB_LOG2_MAIN_LOOPS_PER_STATS_UPDATE)
-
(node->main_loop_count_last_dispatch >> VLIB_LOG2_MAIN_LOOPS_PER_STATS_UPDATE));
vi0 = node->main_loop_vector_stats[i0];
vi1 = node->main_loop_vector_stats[i1];
vi0 = d == 0 ? vi0 : 0;
vi1 = d <= 1 ? vi1 : 0;
vi0 += n_vectors;
node->main_loop_vector_stats[i0] = vi0;
node->main_loop_vector_stats[i1] = vi1;
node->main_loop_count_last_dispatch = vm->main_loop_count;
/* Return previous counter. */
return node->main_loop_vector_stats[i1];
}
在索引变换之前,即main_loop_count的值小于128之前,向量值n_vectors都是增加到索引i0(等于0)表示的数组成员中。等到main_loop_count大于等于128,小于256之后,i的值为1,i0和i1的值分别为1和0,索引变换,索引i0仍然表示的为当前使用的索引,但是其值为1。
由于node节点并不是每次主循环都被调用,这导致main_loop_count的值可能大于节点成员main_loop_count_last_dispatch的值,二者的差值为d,d的值按照128为单位增加1,即节点node超过128次没有被调用时,d值增加1。在node节点被调用的时候,对d值进行检查:
1)当d值等于1时,即发生了128次node节点未被调度的情况,将当前统计值(i0)设置为n_vectors,重新开始统计。返回上一次的统计值(i1索引)。
2)当d值等于2时,即发生了256次node节点未被调用的情况,将当前统计值(i0)设置为n_vectors,重新开始统计。将上一次的统计值(i1索引)清空,返回空值。
3)当d值大于2时,即发生了256次以上node节点未被调用的情况,处理与2)相同。
以上可见,函数统计128次主循环中,节点处理的向量的数量;并且返回上一个128次主循环处理的向量数量,后者用于判断节点node是否在中断或者轮询模式之间切换。
自适应节点调节
首先,更新节点的向量处理信息,返回值为上一节函数中返回的上一个128次主循环处理的向量数量。
static_always_inline u64
dispatch_node (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_node_type_t type, vlib_node_state_t dispatch_state,
vlib_frame_t * frame, u64 last_time_stamp)
{
v = vlib_node_runtime_update_stats (vm, node,
/* n_calls */ 1, /* n_vectors */ n,
/* n_clocks */ t - last_time_stamp);
对于自适应调度模式的节点,根据向量数量(v)和阈值来决定当前的调度模式。阈值polling_threshold_vector_length值固定为10.
/* When in adaptive mode and vector rate crosses threshold switch to
polling mode and vice versa. */
if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_ADAPTIVE_MODE))
{
第一种情况,当前的调度状态dispatch_state为中断,向量数量(v)大于等于轮询模式的阈值,并且节点没有设置由中断到轮询的标志;以上三个条件都满足,将节点切换到轮询模式。
节点状态设置为轮询(VLIB_NODE_STATE_POLLING)。为节点flags设置上由中断到轮询模式的标志位(VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE)。
if ((dispatch_state == VLIB_NODE_STATE_INTERRUPT
&& v >= nm->polling_threshold_vector_length) &&
!(node->flags & VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE))
{
vlib_node_t *n = vlib_get_node (vm, node->node_index);
n->state = VLIB_NODE_STATE_POLLING;
node->state = VLIB_NODE_STATE_POLLING;
node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] -= 1;
nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] += 1;
}
否则,以上条件不成立。对于当前调度状态dispatch_state为轮询的情况,如果向量数量小于等于中断模式的阈值,准备切换到中断调度模式。分成以下两种情况:第一,如果节点没有设置由轮询切换到中断模式的标志,为节点设置此标志。
第二种情况,当轮询再次到来时,向量数量还是低于中断模式阈值,并且节点带有从轮询到中断的标志位,真正的将节点切换到中断模式。
else if (dispatch_state == VLIB_NODE_STATE_POLLING
&& v <= nm->interrupt_threshold_vector_length)
{
vlib_node_t *n = vlib_get_node (vm, node->node_index);
if (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE)
{
/* Switch to interrupt mode after dispatch in polling one more time.
This allows driver to re-enable interrupts. */
n->state = VLIB_NODE_STATE_INTERRUPT;
node->state = VLIB_NODE_STATE_INTERRUPT;
node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] -= 1;
nm->input_node_counts_by_state[VLIB_NODE_STATE_INTERRUPT] += 1;
} else {
vlib_worker_thread_t *w = vlib_worker_threads + vm->thread_index;
node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
文章评论