diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
| -rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 147 | 
1 files changed, 104 insertions, 43 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 2f9e4e19a4f..344bcf90cbf 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -1050,7 +1050,7 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,  				if (lock->ml.node == dead_node) {  					mlog(0, "AHA! there was "  					     "a $RECOVERY lock for dead " -					     "node %u (%s)!\n",  +					     "node %u (%s)!\n",  					     dead_node, dlm->name);  					list_del_init(&lock->list);  					dlm_lock_put(lock); @@ -1164,6 +1164,39 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,  	mres->master = master;  } +static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock, +					  struct dlm_migratable_lockres *mres, +					  int queue) +{ +	if (!lock->lksb) +	       return; + +	/* Ignore lvb in all locks in the blocked list */ +	if (queue == DLM_BLOCKED_LIST) +		return; + +	/* Only consider lvbs in locks with granted EX or PR lock levels */ +	if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) +		return; + +	if (dlm_lvb_is_empty(mres->lvb)) { +		memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); +		return; +	} + +	/* Ensure the lvb copied for migration matches in other valid locks */ +	if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN)) +		return; + +	mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, " +	     "node=%u\n", +	     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), +	     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), +	     lock->lockres->lockname.len, lock->lockres->lockname.name, +	     lock->ml.node); +	dlm_print_one_lock_resource(lock->lockres); +	BUG(); +}  /* returns 1 if this lock fills the network structure,   * 0 otherwise */ @@ -1181,20 +1214,7 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,  	ml->list = queue;  	if (lock->lksb) {  		ml->flags = lock->lksb->flags; -		/* send our current lvb */ -		if (ml->type == LKM_EXMODE || -		    ml->type == LKM_PRMODE) { -			/* if it is already set, this had better be a PR -			 * and it has to match */ -			if (!dlm_lvb_is_empty(mres->lvb) && -			    (ml->type == LKM_EXMODE || -			     memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { -				mlog(ML_ERROR, "mismatched lvbs!\n"); -				dlm_print_one_lock_resource(lock->lockres); -				BUG(); -			} -			memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); -		} +		dlm_prepare_lvb_for_migration(lock, mres, queue);  	}  	ml->node = lock->ml.node;  	mres->num_locks++; @@ -1730,6 +1750,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,  	struct dlm_lock *lock = NULL;  	u8 from = O2NM_MAX_NODES;  	unsigned int added = 0; +	__be64 c;  	mlog(0, "running %d locks for this lockres\n", mres->num_locks);  	for (i=0; i<mres->num_locks; i++) { @@ -1777,19 +1798,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,  			/* lock is always created locally first, and  			 * destroyed locally last.  it must be on the list */  			if (!lock) { -				__be64 c = ml->cookie; -				mlog(ML_ERROR, "could not find local lock " -					       "with cookie %u:%llu!\n", +				c = ml->cookie; +				mlog(ML_ERROR, "Could not find local lock " +					       "with cookie %u:%llu, node %u, " +					       "list %u, flags 0x%x, type %d, " +					       "conv %d, highest blocked %d\n",  				     dlm_get_lock_cookie_node(be64_to_cpu(c)), -				     dlm_get_lock_cookie_seq(be64_to_cpu(c))); +				     dlm_get_lock_cookie_seq(be64_to_cpu(c)), +				     ml->node, ml->list, ml->flags, ml->type, +				     ml->convert_type, ml->highest_blocked); +				__dlm_print_one_lock_resource(res); +				BUG(); +			} + +			if (lock->ml.node != ml->node) { +				c = lock->ml.cookie; +				mlog(ML_ERROR, "Mismatched node# in lock " +				     "cookie %u:%llu, name %.*s, node %u\n", +				     dlm_get_lock_cookie_node(be64_to_cpu(c)), +				     dlm_get_lock_cookie_seq(be64_to_cpu(c)), +				     res->lockname.len, res->lockname.name, +				     lock->ml.node); +				c = ml->cookie; +				mlog(ML_ERROR, "Migrate lock cookie %u:%llu, " +				     "node %u, list %u, flags 0x%x, type %d, " +				     "conv %d, highest blocked %d\n", +				     dlm_get_lock_cookie_node(be64_to_cpu(c)), +				     dlm_get_lock_cookie_seq(be64_to_cpu(c)), +				     ml->node, ml->list, ml->flags, ml->type, +				     ml->convert_type, ml->highest_blocked);  				__dlm_print_one_lock_resource(res);  				BUG();  			} -			BUG_ON(lock->ml.node != ml->node);  			if (tmpq != queue) { -				mlog(0, "lock was on %u instead of %u for %.*s\n", -				     j, ml->list, res->lockname.len, res->lockname.name); +				c = ml->cookie; +				mlog(0, "Lock cookie %u:%llu was on list %u " +				     "instead of list %u for %.*s\n", +				     dlm_get_lock_cookie_node(be64_to_cpu(c)), +				     dlm_get_lock_cookie_seq(be64_to_cpu(c)), +				     j, ml->list, res->lockname.len, +				     res->lockname.name); +				__dlm_print_one_lock_resource(res);  				spin_unlock(&res->spinlock);  				continue;  			} @@ -1839,7 +1889,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,  				 * the lvb. */  				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);  			} else { -				/* otherwise, the node is sending its  +				/* otherwise, the node is sending its  				 * most recent valid lvb info */  				BUG_ON(ml->type != LKM_EXMODE &&  				       ml->type != LKM_PRMODE); @@ -1886,7 +1936,7 @@ skip_lvb:  		spin_lock(&res->spinlock);  		list_for_each_entry(lock, queue, list) {  			if (lock->ml.cookie == ml->cookie) { -				__be64 c = lock->ml.cookie; +				c = lock->ml.cookie;  				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "  				     "exists on this lockres!\n", dlm->name,  				     res->lockname.len, res->lockname.name, @@ -2114,7 +2164,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,  	assert_spin_locked(&res->spinlock);  	if (res->owner == dlm->node_num) -		/* if this node owned the lockres, and if the dead node  +		/* if this node owned the lockres, and if the dead node  		 * had an EX when he died, blank out the lvb */  		search_node = dead_node;  	else { @@ -2152,7 +2202,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,  	/* this node is the lockres master:  	 * 1) remove any stale locks for the dead node -	 * 2) if the dead node had an EX when he died, blank out the lvb  +	 * 2) if the dead node had an EX when he died, blank out the lvb  	 */  	assert_spin_locked(&dlm->spinlock);  	assert_spin_locked(&res->spinlock); @@ -2193,7 +2243,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,  		mlog(0, "%s:%.*s: freed %u locks for dead node %u, "  		     "dropping ref from lockres\n", dlm->name,  		     res->lockname.len, res->lockname.name, freed, dead_node); -		BUG_ON(!test_bit(dead_node, res->refmap)); +		if(!test_bit(dead_node, res->refmap)) { +			mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, " +			     "but ref was not set\n", dlm->name, +			     res->lockname.len, res->lockname.name, freed, dead_node); +			__dlm_print_one_lock_resource(res); +		}  		dlm_lockres_clear_refmap_bit(dead_node, res);  	} else if (test_bit(dead_node, res->refmap)) {  		mlog(0, "%s:%.*s: dead node %u had a ref, but had " @@ -2260,7 +2315,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)  				}  				spin_unlock(&res->spinlock);  				continue; -			}			 +			}  			spin_lock(&res->spinlock);  			/* zero the lvb if necessary */  			dlm_revalidate_lvb(dlm, res, dead_node); @@ -2411,7 +2466,7 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)   * this function on each node racing to become the recovery   * master will not stop attempting this until either:   * a) this node gets the EX (and becomes the recovery master), - * or b) dlm->reco.new_master gets set to some nodenum  + * or b) dlm->reco.new_master gets set to some nodenum   * != O2NM_INVALID_NODE_NUM (another node will do the reco).   * so each time a recovery master is needed, the entire cluster   * will sync at this point.  if the new master dies, that will @@ -2424,7 +2479,7 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)  	mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",  	     dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); -again:	 +again:  	memset(&lksb, 0, sizeof(lksb));  	ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, @@ -2437,8 +2492,8 @@ again:  	if (ret == DLM_NORMAL) {  		mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",  		     dlm->name, dlm->node_num); -		 -		/* got the EX lock.  check to see if another node  + +		/* got the EX lock.  check to see if another node  		 * just became the reco master */  		if (dlm_reco_master_ready(dlm)) {  			mlog(0, "%s: got reco EX lock, but %u will " @@ -2451,12 +2506,12 @@ again:  			/* see if recovery was already finished elsewhere */  			spin_lock(&dlm->spinlock);  			if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { -				status = -EINVAL;	 +				status = -EINVAL;  				mlog(0, "%s: got reco EX lock, but "  				     "node got recovered already\n", dlm->name);  				if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {  					mlog(ML_ERROR, "%s: new master is %u " -					     "but no dead node!\n",  +					     "but no dead node!\n",  					     dlm->name, dlm->reco.new_master);  					BUG();  				} @@ -2468,7 +2523,7 @@ again:  		 * set the master and send the messages to begin recovery */  		if (!status) {  			mlog(0, "%s: dead=%u, this=%u, sending " -			     "begin_reco now\n", dlm->name,  +			     "begin_reco now\n", dlm->name,  			     dlm->reco.dead_node, dlm->node_num);  			status = dlm_send_begin_reco_message(dlm,  				      dlm->reco.dead_node); @@ -2501,7 +2556,7 @@ again:  		mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",  		     dlm->name, dlm->node_num);  		/* another node is master. wait on -		 * reco.new_master != O2NM_INVALID_NODE_NUM  +		 * reco.new_master != O2NM_INVALID_NODE_NUM  		 * for at most one second */  		wait_event_timeout(dlm->dlm_reco_thread_wq,  					 dlm_reco_master_ready(dlm), @@ -2589,7 +2644,13 @@ retry:  			     "begin reco msg (%d)\n", dlm->name, nodenum, ret);  			ret = 0;  		} -		if (ret == -EAGAIN) { + +		/* +		 * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8, +		 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN. +		 * We are handling both for compatibility reasons. +		 */ +		if (ret == -EAGAIN || ret == EAGAIN) {  			mlog(0, "%s: trying to start recovery of node "  			     "%u, but node %u is waiting for last recovery "  			     "to complete, backoff for a bit\n", dlm->name, @@ -2599,7 +2660,7 @@ retry:  		}  		if (ret < 0) {  			struct dlm_lock_resource *res; -			/* this is now a serious problem, possibly ENOMEM  +			/* this is now a serious problem, possibly ENOMEM  			 * in the network stack.  must retry */  			mlog_errno(ret);  			mlog(ML_ERROR, "begin reco of dlm %s to node %u " @@ -2612,7 +2673,7 @@ retry:  			} else {  				mlog(ML_ERROR, "recovery lock not found\n");  			} -			/* sleep for a bit in hopes that we can avoid  +			/* sleep for a bit in hopes that we can avoid  			 * another ENOMEM */  			msleep(100);  			goto retry; @@ -2664,7 +2725,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,  	}  	if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {  		mlog(ML_NOTICE, "%s: dead_node previously set to %u, " -		     "node %u changing it to %u\n", dlm->name,  +		     "node %u changing it to %u\n", dlm->name,  		     dlm->reco.dead_node, br->node_idx, br->dead_node);  	}  	dlm_set_reco_master(dlm, br->node_idx); @@ -2730,8 +2791,8 @@ stage2:  		if (ret < 0) {  			mlog_errno(ret);  			if (dlm_is_host_down(ret)) { -				/* this has no effect on this recovery  -				 * session, so set the status to zero to  +				/* this has no effect on this recovery +				 * session, so set the status to zero to  				 * finish out the last recovery */  				mlog(ML_ERROR, "node %u went down after this "  				     "node finished recovery.\n", nodenum); @@ -2768,7 +2829,7 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,  	mlog(0, "%s: node %u finalizing recovery stage%d of "  	     "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,  	     fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); -  +  	spin_lock(&dlm->spinlock);  	if (dlm->reco.new_master != fr->node_idx) {  |