diff options
Diffstat (limited to 'fs/dlm/recover.c')
| -rw-r--r-- | fs/dlm/recover.c | 295 | 
1 files changed, 190 insertions, 105 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 7554e4dac6b..4a7a76e42fc 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -36,30 +36,23 @@   * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another   * function thinks it could have completed the waited-on task, they should wake   * up ls_wait_general to get an immediate response rather than waiting for the - * timer to detect the result.  A timer wakes us up periodically while waiting - * to see if we should abort due to a node failure.  This should only be called - * by the dlm_recoverd thread. + * timeout.  This uses a timeout so it can check periodically if the wait + * should abort due to node failure (which doesn't cause a wake_up). + * This should only be called by the dlm_recoverd thread.   */ -static void dlm_wait_timer_fn(unsigned long data) -{ -	struct dlm_ls *ls = (struct dlm_ls *) data; -	mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ)); -	wake_up(&ls->ls_wait_general); -} -  int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))  {  	int error = 0; +	int rv; -	init_timer(&ls->ls_timer); -	ls->ls_timer.function = dlm_wait_timer_fn; -	ls->ls_timer.data = (long) ls; -	ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ); -	add_timer(&ls->ls_timer); - -	wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls)); -	del_timer_sync(&ls->ls_timer); +	while (1) { +		rv = wait_event_timeout(ls->ls_wait_general, +					testfn(ls) || dlm_recovery_stopped(ls), +					dlm_config.ci_recover_timer * HZ); +		if (rv) +			break; +	}  	if (dlm_recovery_stopped(ls)) {  		log_debug(ls, "dlm_wait_function aborted"); @@ -277,22 +270,6 @@ static void recover_list_del(struct dlm_rsb *r)  	dlm_put_rsb(r);  } -static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id) -{ -	struct dlm_rsb *r = NULL; - -	spin_lock(&ls->ls_recover_list_lock); - -	list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) { -		if (id == (unsigned long) r) -			goto out; -	} -	r = NULL; - out: -	spin_unlock(&ls->ls_recover_list_lock); -	return r; -} -  static void recover_list_clear(struct dlm_ls *ls)  {  	struct dlm_rsb *r, *s; @@ -313,6 +290,94 @@ static void recover_list_clear(struct dlm_ls *ls)  	spin_unlock(&ls->ls_recover_list_lock);  } +static int recover_idr_empty(struct dlm_ls *ls) +{ +	int empty = 1; + +	spin_lock(&ls->ls_recover_idr_lock); +	if (ls->ls_recover_list_count) +		empty = 0; +	spin_unlock(&ls->ls_recover_idr_lock); + +	return empty; +} + +static int recover_idr_add(struct dlm_rsb *r) +{ +	struct dlm_ls *ls = r->res_ls; +	int rv, id; + +	rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS); +	if (!rv) +		return -ENOMEM; + +	spin_lock(&ls->ls_recover_idr_lock); +	if (r->res_id) { +		spin_unlock(&ls->ls_recover_idr_lock); +		return -1; +	} +	rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id); +	if (rv) { +		spin_unlock(&ls->ls_recover_idr_lock); +		return rv; +	} +	r->res_id = id; +	ls->ls_recover_list_count++; +	dlm_hold_rsb(r); +	spin_unlock(&ls->ls_recover_idr_lock); +	return 0; +} + +static void recover_idr_del(struct dlm_rsb *r) +{ +	struct dlm_ls *ls = r->res_ls; + +	spin_lock(&ls->ls_recover_idr_lock); +	idr_remove(&ls->ls_recover_idr, r->res_id); +	r->res_id = 0; +	ls->ls_recover_list_count--; +	spin_unlock(&ls->ls_recover_idr_lock); + +	dlm_put_rsb(r); +} + +static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +{ +	struct dlm_rsb *r; + +	spin_lock(&ls->ls_recover_idr_lock); +	r = idr_find(&ls->ls_recover_idr, (int)id); +	spin_unlock(&ls->ls_recover_idr_lock); +	return r; +} + +static int recover_idr_clear_rsb(int id, void *p, void *data) +{ +	struct dlm_ls *ls = data; +	struct dlm_rsb *r = p; + +	r->res_id = 0; +	r->res_recover_locks_count = 0; +	ls->ls_recover_list_count--; + +	dlm_put_rsb(r); +	return 0; +} + +static void recover_idr_clear(struct dlm_ls *ls) +{ +	spin_lock(&ls->ls_recover_idr_lock); +	idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls); +	idr_remove_all(&ls->ls_recover_idr); + +	if (ls->ls_recover_list_count != 0) { +		log_error(ls, "warning: recover_list_count %d", +			  ls->ls_recover_list_count); +		ls->ls_recover_list_count = 0; +	} +	spin_unlock(&ls->ls_recover_idr_lock); +} +  /* Master recovery: find new master node for rsb's that were     mastered on nodes that have been removed. @@ -361,9 +426,8 @@ static void set_master_lkbs(struct dlm_rsb *r)   * rsb's to consider.   */ -static void set_new_master(struct dlm_rsb *r, int nodeid) +static void set_new_master(struct dlm_rsb *r)  { -	r->res_nodeid = nodeid;  	set_master_lkbs(r);  	rsb_set_flag(r, RSB_NEW_MASTER);  	rsb_set_flag(r, RSB_NEW_MASTER2); @@ -372,31 +436,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)  /*   * We do async lookups on rsb's that need new masters.  The rsb's   * waiting for a lookup reply are kept on the recover_list. + * + * Another node recovering the master may have sent us a rcom lookup, + * and our dlm_master_lookup() set it as the new master, along with + * NEW_MASTER so that we'll recover it here (this implies dir_nodeid + * equals our_nodeid below).   */ -static int recover_master(struct dlm_rsb *r) +static int recover_master(struct dlm_rsb *r, unsigned int *count)  {  	struct dlm_ls *ls = r->res_ls; -	int error, ret_nodeid; -	int our_nodeid = dlm_our_nodeid(); -	int dir_nodeid = dlm_dir_nodeid(r); +	int our_nodeid, dir_nodeid; +	int is_removed = 0; +	int error; + +	if (is_master(r)) +		return 0; + +	is_removed = dlm_is_removed(ls, r->res_nodeid); + +	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) +		return 0; + +	our_nodeid = dlm_our_nodeid(); +	dir_nodeid = dlm_dir_nodeid(r);  	if (dir_nodeid == our_nodeid) { -		error = dlm_dir_lookup(ls, our_nodeid, r->res_name, -				       r->res_length, &ret_nodeid); -		if (error) -			log_error(ls, "recover dir lookup error %d", error); +		if (is_removed) { +			r->res_master_nodeid = our_nodeid; +			r->res_nodeid = 0; +		} -		if (ret_nodeid == our_nodeid) -			ret_nodeid = 0; -		lock_rsb(r); -		set_new_master(r, ret_nodeid); -		unlock_rsb(r); +		/* set master of lkbs to ourself when is_removed, or to +		   another new master which we set along with NEW_MASTER +		   in dlm_master_lookup */ +		set_new_master(r); +		error = 0;  	} else { -		recover_list_add(r); +		recover_idr_add(r);  		error = dlm_send_rcom_lookup(r, dir_nodeid);  	} +	(*count)++;  	return error;  } @@ -415,7 +496,7 @@ static int recover_master(struct dlm_rsb *r)   * resent.   */ -static int recover_master_static(struct dlm_rsb *r) +static int recover_master_static(struct dlm_rsb *r, unsigned int *count)  {  	int dir_nodeid = dlm_dir_nodeid(r);  	int new_master = dir_nodeid; @@ -423,11 +504,12 @@ static int recover_master_static(struct dlm_rsb *r)  	if (dir_nodeid == dlm_our_nodeid())  		new_master = 0; -	lock_rsb(r);  	dlm_purge_mstcpy_locks(r); -	set_new_master(r, new_master); -	unlock_rsb(r); -	return 1; +	r->res_master_nodeid = dir_nodeid; +	r->res_nodeid = new_master; +	set_new_master(r); +	(*count)++; +	return 0;  }  /* @@ -443,7 +525,10 @@ static int recover_master_static(struct dlm_rsb *r)  int dlm_recover_masters(struct dlm_ls *ls)  {  	struct dlm_rsb *r; -	int error = 0, count = 0; +	unsigned int total = 0; +	unsigned int count = 0; +	int nodir = dlm_no_directory(ls); +	int error;  	log_debug(ls, "dlm_recover_masters"); @@ -455,50 +540,58 @@ int dlm_recover_masters(struct dlm_ls *ls)  			goto out;  		} -		if (dlm_no_directory(ls)) -			count += recover_master_static(r); -		else if (!is_master(r) && -			 (dlm_is_removed(ls, r->res_nodeid) || -			  rsb_flag(r, RSB_NEW_MASTER))) { -			recover_master(r); -			count++; -		} +		lock_rsb(r); +		if (nodir) +			error = recover_master_static(r, &count); +		else +			error = recover_master(r, &count); +		unlock_rsb(r); +		cond_resched(); +		total++; -		schedule(); +		if (error) { +			up_read(&ls->ls_root_sem); +			goto out; +		}  	}  	up_read(&ls->ls_root_sem); -	log_debug(ls, "dlm_recover_masters %d resources", count); +	log_debug(ls, "dlm_recover_masters %u of %u", count, total); -	error = dlm_wait_function(ls, &recover_list_empty); +	error = dlm_wait_function(ls, &recover_idr_empty);   out:  	if (error) -		recover_list_clear(ls); +		recover_idr_clear(ls);  	return error;  }  int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)  {  	struct dlm_rsb *r; -	int nodeid; +	int ret_nodeid, new_master; -	r = recover_list_find(ls, rc->rc_id); +	r = recover_idr_find(ls, rc->rc_id);  	if (!r) {  		log_error(ls, "dlm_recover_master_reply no id %llx",  			  (unsigned long long)rc->rc_id);  		goto out;  	} -	nodeid = rc->rc_result; -	if (nodeid == dlm_our_nodeid()) -		nodeid = 0; +	ret_nodeid = rc->rc_result; + +	if (ret_nodeid == dlm_our_nodeid()) +		new_master = 0; +	else +		new_master = ret_nodeid;  	lock_rsb(r); -	set_new_master(r, nodeid); +	r->res_master_nodeid = ret_nodeid; +	r->res_nodeid = new_master; +	set_new_master(r);  	unlock_rsb(r); -	recover_list_del(r); +	recover_idr_del(r); -	if (recover_list_empty(ls)) +	if (recover_idr_empty(ls))  		wake_up(&ls->ls_wait_general);   out:  	return 0; @@ -711,6 +804,7 @@ static void recover_lvb(struct dlm_rsb *r)  static void recover_conversion(struct dlm_rsb *r)  { +	struct dlm_ls *ls = r->res_ls;  	struct dlm_lkb *lkb;  	int grmode = -1; @@ -725,10 +819,15 @@ static void recover_conversion(struct dlm_rsb *r)  	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {  		if (lkb->lkb_grmode != DLM_LOCK_IV)  			continue; -		if (grmode == -1) +		if (grmode == -1) { +			log_debug(ls, "recover_conversion %x set gr to rq %d", +				  lkb->lkb_id, lkb->lkb_rqmode);  			lkb->lkb_grmode = lkb->lkb_rqmode; -		else +		} else { +			log_debug(ls, "recover_conversion %x set gr %d", +				  lkb->lkb_id, grmode);  			lkb->lkb_grmode = grmode; +		}  	}  } @@ -791,20 +890,8 @@ int dlm_create_root_list(struct dlm_ls *ls)  			dlm_hold_rsb(r);  		} -		/* If we're using a directory, add tossed rsbs to the root -		   list; they'll have entries created in the new directory, -		   but no other recovery steps should do anything with them. */ - -		if (dlm_no_directory(ls)) { -			spin_unlock(&ls->ls_rsbtbl[i].lock); -			continue; -		} - -		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) { -			r = rb_entry(n, struct dlm_rsb, res_hashnode); -			list_add(&r->res_root_list, &ls->ls_root_list); -			dlm_hold_rsb(r); -		} +		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) +			log_error(ls, "dlm_create_root_list toss not empty");  		spin_unlock(&ls->ls_rsbtbl[i].lock);  	}   out: @@ -824,28 +911,26 @@ void dlm_release_root_list(struct dlm_ls *ls)  	up_write(&ls->ls_root_sem);  } -/* If not using a directory, clear the entire toss list, there's no benefit to -   caching the master value since it's fixed.  If we are using a dir, keep the -   rsb's we're the master of.  Recovery will add them to the root list and from -   there they'll be entered in the rebuilt directory. */ - -void dlm_clear_toss_list(struct dlm_ls *ls) +void dlm_clear_toss(struct dlm_ls *ls)  {  	struct rb_node *n, *next; -	struct dlm_rsb *rsb; +	struct dlm_rsb *r; +	unsigned int count = 0;  	int i;  	for (i = 0; i < ls->ls_rsbtbl_size; i++) {  		spin_lock(&ls->ls_rsbtbl[i].lock);  		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { -			next = rb_next(n);; -			rsb = rb_entry(n, struct dlm_rsb, res_hashnode); -			if (dlm_no_directory(ls) || !is_master(rsb)) { -				rb_erase(n, &ls->ls_rsbtbl[i].toss); -				dlm_free_rsb(rsb); -			} +			next = rb_next(n); +			r = rb_entry(n, struct dlm_rsb, res_hashnode); +			rb_erase(n, &ls->ls_rsbtbl[i].toss); +			dlm_free_rsb(r); +			count++;  		}  		spin_unlock(&ls->ls_rsbtbl[i].lock);  	} + +	if (count) +		log_debug(ls, "dlm_clear_toss %u done", count);  }  |