diff options
Diffstat (limited to 'fs/btrfs')
| -rw-r--r-- | fs/btrfs/Makefile | 2 | ||||
| -rw-r--r-- | fs/btrfs/async-thread.c | 288 | ||||
| -rw-r--r-- | fs/btrfs/async-thread.h | 78 | ||||
| -rw-r--r-- | fs/btrfs/ctree.h | 14 | ||||
| -rw-r--r-- | fs/btrfs/disk-io.c | 198 | ||||
| -rw-r--r-- | fs/btrfs/inode.c | 4 | ||||
| -rw-r--r-- | fs/btrfs/volumes.c | 162 | ||||
| -rw-r--r-- | fs/btrfs/volumes.h | 10 | 
8 files changed, 625 insertions, 131 deletions
diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile index e5fc3cfea0a..9dcfc2fe333 100644 --- a/fs/btrfs/Makefile +++ b/fs/btrfs/Makefile @@ -6,7 +6,7 @@ btrfs-y := super.o ctree.o extent-tree.o print-tree.o root-tree.o dir-item.o \  	   hash.o file-item.o inode-item.o inode-map.o disk-io.o \  	   transaction.o bit-radix.o inode.o file.o tree-defrag.o \  	   extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \ -	   extent_io.o volumes.o +	   extent_io.o volumes.o async-thread.o  btrfs-$(CONFIG_FS_POSIX_ACL)	+= acl.o  else diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c new file mode 100644 index 00000000000..2911b67bd6f --- /dev/null +++ b/fs/btrfs/async-thread.c @@ -0,0 +1,288 @@ +/* + * Copyright (C) 2007 Oracle.  All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#include <linux/kthread.h> +#include <linux/list.h> +#include <linux/spinlock.h> +#include <linux/freezer.h> +#include "async-thread.h" + +/* + * container for the kthread task pointer and the list of pending work + * One of these is allocated per thread. + */ +struct btrfs_worker_thread { +	/* list of struct btrfs_work that are waiting for service */ +	struct list_head pending; + +	/* list of worker threads from struct btrfs_workers */ +	struct list_head worker_list; + +	/* kthread */ +	struct task_struct *task; + +	/* number of things on the pending list */ +	atomic_t num_pending; + +	/* protects the pending list. */ +	spinlock_t lock; + +	/* set to non-zero when this thread is already awake and kicking */ +	int working; +}; + +/* + * main loop for servicing work items + */ +static int worker_loop(void *arg) +{ +	struct btrfs_worker_thread *worker = arg; +	struct list_head *cur; +	struct btrfs_work *work; +	do { +		spin_lock_irq(&worker->lock); +		while(!list_empty(&worker->pending)) { +			cur = worker->pending.next; +			work = list_entry(cur, struct btrfs_work, list); +			list_del(&work->list); +			clear_bit(0, &work->flags); + +			work->worker = worker; +			spin_unlock_irq(&worker->lock); + +			work->func(work); + +			atomic_dec(&worker->num_pending); +			spin_lock_irq(&worker->lock); +		} +		worker->working = 0; +		if (freezing(current)) { +			refrigerator(); +		} else { +			set_current_state(TASK_INTERRUPTIBLE); +			spin_unlock_irq(&worker->lock); +			schedule(); +			__set_current_state(TASK_RUNNING); +		} +	} while (!kthread_should_stop()); +	return 0; +} + +/* + * this will wait for all the worker threads to shutdown + */ +int btrfs_stop_workers(struct btrfs_workers *workers) +{ +	struct list_head *cur; +	struct btrfs_worker_thread *worker; + +	while(!list_empty(&workers->worker_list)) { +		cur = workers->worker_list.next; +		worker = list_entry(cur, struct btrfs_worker_thread, +				    worker_list); +		kthread_stop(worker->task); +		list_del(&worker->worker_list); +		kfree(worker); +	} +	return 0; +} + +/* + * simple init on struct btrfs_workers + */ +void btrfs_init_workers(struct btrfs_workers *workers, int max) +{ +	workers->num_workers = 0; +	INIT_LIST_HEAD(&workers->worker_list); +	workers->last = NULL; +	spin_lock_init(&workers->lock); +	workers->max_workers = max; +} + +/* + * starts new worker threads.  This does not enforce the max worker + * count in case you need to temporarily go past it. + */ +int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) +{ +	struct btrfs_worker_thread *worker; +	int ret = 0; +	int i; + +	for (i = 0; i < num_workers; i++) { +		worker = kzalloc(sizeof(*worker), GFP_NOFS); +		if (!worker) { +			ret = -ENOMEM; +			goto fail; +		} + +		INIT_LIST_HEAD(&worker->pending); +		INIT_LIST_HEAD(&worker->worker_list); +		spin_lock_init(&worker->lock); +		atomic_set(&worker->num_pending, 0); +		worker->task = kthread_run(worker_loop, worker, "btrfs"); +		if (IS_ERR(worker->task)) { +			ret = PTR_ERR(worker->task); +			goto fail; +		} + +		spin_lock_irq(&workers->lock); +		list_add_tail(&worker->worker_list, &workers->worker_list); +		workers->last = worker; +		workers->num_workers++; +		spin_unlock_irq(&workers->lock); +	} +	return 0; +fail: +	btrfs_stop_workers(workers); +	return ret; +} + +/* + * run through the list and find a worker thread that doesn't have a lot + * to do right now.  This can return null if we aren't yet at the thread + * count limit and all of the threads are busy. + */ +static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) +{ +	struct btrfs_worker_thread *worker; +	struct list_head *next; +	struct list_head *start; +	int enforce_min = workers->num_workers < workers->max_workers; + +	/* start with the last thread if it isn't busy */ +	worker = workers->last; +	if (atomic_read(&worker->num_pending) < 64) +		goto done; + +	next = worker->worker_list.next; +	start = &worker->worker_list; + +	/* +	 * check all the workers for someone that is bored.  FIXME, do +	 * something smart here +	 */ +	while(next != start) { +		if (next == &workers->worker_list) { +			next = workers->worker_list.next; +			continue; +		} +		worker = list_entry(next, struct btrfs_worker_thread, +				    worker_list); +		if (atomic_read(&worker->num_pending) < 64 || !enforce_min) +			goto done; +		next = next->next; +	} +	/* +	 * nobody was bored, if we're already at the max thread count, +	 * use the last thread +	 */ +	if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) { +		return workers->last; +	} +	return NULL; +done: +	workers->last = worker; +	return worker; +} + +static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) +{ +	struct btrfs_worker_thread *worker; +	unsigned long flags; + +again: +	spin_lock_irqsave(&workers->lock, flags); +	worker = next_worker(workers); +	spin_unlock_irqrestore(&workers->lock, flags); + +	if (!worker) { +		spin_lock_irqsave(&workers->lock, flags); +		if (workers->num_workers >= workers->max_workers) { +			/* +			 * we have failed to find any workers, just +			 * return the force one +			 */ +			worker = list_entry(workers->worker_list.next, +				  struct btrfs_worker_thread, worker_list); +			spin_unlock_irqrestore(&workers->lock, flags); +		} else { +			spin_unlock_irqrestore(&workers->lock, flags); +			/* we're below the limit, start another worker */ +			btrfs_start_workers(workers, 1); +			goto again; +		} +	} +	return worker; +} + +/* + * btrfs_requeue_work just puts the work item back on the tail of the list + * it was taken from.  It is intended for use with long running work functions + * that make some progress and want to give the cpu up for others. + */ +int btrfs_requeue_work(struct btrfs_work *work) +{ +	struct btrfs_worker_thread *worker = work->worker; +	unsigned long flags; + +	if (test_and_set_bit(0, &work->flags)) +		goto out; + +	spin_lock_irqsave(&worker->lock, flags); +	atomic_inc(&worker->num_pending); +	list_add_tail(&work->list, &worker->pending); +	spin_unlock_irqrestore(&worker->lock, flags); +out: +	return 0; +} + +/* + * places a struct btrfs_work into the pending queue of one of the kthreads + */ +int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) +{ +	struct btrfs_worker_thread *worker; +	unsigned long flags; +	int wake = 0; + +	/* don't requeue something already on a list */ +	if (test_and_set_bit(0, &work->flags)) +		goto out; + +	worker = find_worker(workers); + +	spin_lock_irqsave(&worker->lock, flags); +	atomic_inc(&worker->num_pending); +	list_add_tail(&work->list, &worker->pending); + +	/* +	 * avoid calling into wake_up_process if this thread has already +	 * been kicked +	 */ +	if (!worker->working) +		wake = 1; +	worker->working = 1; + +	spin_unlock_irqrestore(&worker->lock, flags); + +	if (wake) +		wake_up_process(worker->task); +out: +	return 0; +} diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h new file mode 100644 index 00000000000..52fc9da0f9e --- /dev/null +++ b/fs/btrfs/async-thread.h @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2007 Oracle.  All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#ifndef __BTRFS_ASYNC_THREAD_ +#define __BTRFS_ASYNC_THREAD_ + +struct btrfs_worker_thread; + +/* + * This is similar to a workqueue, but it is meant to spread the operations + * across all available cpus instead of just the CPU that was used to + * queue the work.  There is also some batching introduced to try and + * cut down on context switches. + * + * By default threads are added on demand up to 2 * the number of cpus. + * Changing struct btrfs_workers->max_workers is one way to prevent + * demand creation of kthreads. + * + * the basic model of these worker threads is to embed a btrfs_work + * structure in your own data struct, and use container_of in a + * work function to get back to your data struct. + */ +struct btrfs_work { +	/* +	 * only func should be set to the function you want called +	 * your work struct is passed as the only arg +	 */ +	void (*func)(struct btrfs_work *work); + +	/* +	 * flags should be set to zero.  It is used to make sure the +	 * struct is only inserted once into the list. +	 */ +	unsigned long flags; + +	/* don't touch these */ +	struct btrfs_worker_thread *worker; +	struct list_head list; +}; + +struct btrfs_workers { +	/* current number of running workers */ +	int num_workers; + +	/* max number of workers allowed.  changed by btrfs_start_workers */ +	int max_workers; + +	/* list with all the work threads */ +	struct list_head worker_list; + +	/* the last worker thread to have something queued */ +	struct btrfs_worker_thread *last; + +	/* lock for finding the next worker thread to queue on */ +	spinlock_t lock; +}; + +int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work); +int btrfs_start_workers(struct btrfs_workers *workers, int num_workers); +int btrfs_stop_workers(struct btrfs_workers *workers); +void btrfs_init_workers(struct btrfs_workers *workers, int max); +int btrfs_requeue_work(struct btrfs_work *work); +#endif diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h index 49cbc62b42f..6c91a510c96 100644 --- a/fs/btrfs/ctree.h +++ b/fs/btrfs/ctree.h @@ -30,6 +30,7 @@  #include "bit-radix.h"  #include "extent_io.h"  #include "extent_map.h" +#include "async-thread.h"  struct btrfs_trans_handle;  struct btrfs_transaction; @@ -518,13 +519,20 @@ struct btrfs_fs_info {  	struct list_head hashers;  	struct list_head dead_roots;  	struct list_head end_io_work_list; -	struct list_head async_submit_work_list;  	struct work_struct end_io_work; -	struct work_struct async_submit_work;  	spinlock_t end_io_work_lock; -	spinlock_t async_submit_work_lock;  	atomic_t nr_async_submits; +	/* +	 * there is a pool of worker threads for checksumming during writes +	 * and a pool for checksumming after reads.  This is because readers +	 * can run with FS locks held, and the writers may be waiting for +	 * those locks.  We don't want ordering in the pending list to cause +	 * deadlocks, and so the two are serviced separately. +	 */ +	struct btrfs_workers workers; +	struct btrfs_workers endio_workers; +  #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)  	struct work_struct trans_work;  #else diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c index b9a53646ceb..98ff4fbcb38 100644 --- a/fs/btrfs/disk-io.c +++ b/fs/btrfs/disk-io.c @@ -31,6 +31,7 @@  #include "btrfs_inode.h"  #include "volumes.h"  #include "print-tree.h" +#include "async-thread.h"  #if 0  static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf) @@ -46,8 +47,7 @@ static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)  #endif  static struct extent_io_ops btree_extent_io_ops; -static struct workqueue_struct *end_io_workqueue; -static struct workqueue_struct *async_submit_workqueue; +static void end_workqueue_fn(struct btrfs_work *work);  struct end_io_wq {  	struct bio *bio; @@ -57,6 +57,7 @@ struct end_io_wq {  	int error;  	int metadata;  	struct list_head list; +	struct btrfs_work work;  };  struct async_submit_bio { @@ -66,6 +67,7 @@ struct async_submit_bio {  	extent_submit_bio_hook_t *submit_bio_hook;  	int rw;  	int mirror_num; +	struct btrfs_work work;  };  struct extent_map *btree_get_extent(struct inode *inode, struct page *page, @@ -389,7 +391,6 @@ static int end_workqueue_bio(struct bio *bio,  {  	struct end_io_wq *end_io_wq = bio->bi_private;  	struct btrfs_fs_info *fs_info; -	unsigned long flags;  #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)  	if (bio->bi_size) @@ -397,11 +398,10 @@ static int end_workqueue_bio(struct bio *bio,  #endif  	fs_info = end_io_wq->info; -	spin_lock_irqsave(&fs_info->end_io_work_lock, flags);  	end_io_wq->error = err; -	list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list); -	spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); -	queue_work(end_io_workqueue, &fs_info->end_io_work); +	end_io_wq->work.func = end_workqueue_fn; +	end_io_wq->work.flags = 0; +	btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work);  #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)  	return 0; @@ -428,6 +428,19 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,  	return 0;  } +static void run_one_async_submit(struct btrfs_work *work) +{ +	struct btrfs_fs_info *fs_info; +	struct async_submit_bio *async; + +	async = container_of(work, struct  async_submit_bio, work); +	fs_info = BTRFS_I(async->inode)->root->fs_info; +	atomic_dec(&fs_info->nr_async_submits); +	async->submit_bio_hook(async->inode, async->rw, async->bio, +			       async->mirror_num); +	kfree(async); +} +  int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,  			int rw, struct bio *bio, int mirror_num,  			extent_submit_bio_hook_t *submit_bio_hook) @@ -443,13 +456,10 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,  	async->bio = bio;  	async->mirror_num = mirror_num;  	async->submit_bio_hook = submit_bio_hook; - -	spin_lock(&fs_info->async_submit_work_lock); -	list_add_tail(&async->list, &fs_info->async_submit_work_list); +	async->work.func = run_one_async_submit; +	async->work.flags = 0;  	atomic_inc(&fs_info->nr_async_submits); -	spin_unlock(&fs_info->async_submit_work_lock); - -	queue_work(async_submit_workqueue, &fs_info->async_submit_work); +	btrfs_queue_worker(&fs_info->workers, &async->work);  	return 0;  } @@ -462,19 +472,32 @@ static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,  	offset = bio->bi_sector << 9; +	/* +	 * when we're called for a write, we're already in the async +	 * submission context.  Just jump ingo btrfs_map_bio +	 */  	if (rw & (1 << BIO_RW)) { -		return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); +		return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, +				     mirror_num, 0);  	} +	/* +	 * called for a read, do the setup so that checksum validation +	 * can happen in the async kernel threads +	 */  	ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);  	BUG_ON(ret); -	return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num); +	return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);  }  static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,  				 int mirror_num)  { +	/* +	 * kthread helpers are used to submit writes so that checksumming +	 * can happen in parallel across all CPUs +	 */  	if (!(rw & (1 << BIO_RW))) {  		return __btree_submit_bio_hook(inode, rw, bio, mirror_num);  	} @@ -1036,95 +1059,40 @@ static int bio_ready_for_csum(struct bio *bio)  	return ret;  } -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -static void btrfs_end_io_csum(void *p) -#else -static void btrfs_end_io_csum(struct work_struct *work) -#endif +/* + * called by the kthread helper functions to finally call the bio end_io + * functions.  This is where read checksum verification actually happens + */ +static void end_workqueue_fn(struct btrfs_work *work)  { -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -	struct btrfs_fs_info *fs_info = p; -#else -	struct btrfs_fs_info *fs_info = container_of(work, -						     struct btrfs_fs_info, -						     end_io_work); -#endif -	unsigned long flags; -	struct end_io_wq *end_io_wq;  	struct bio *bio; -	struct list_head *next; +	struct end_io_wq *end_io_wq; +	struct btrfs_fs_info *fs_info;  	int error; -	int was_empty; - -	while(1) { -		spin_lock_irqsave(&fs_info->end_io_work_lock, flags); -		if (list_empty(&fs_info->end_io_work_list)) { -			spin_unlock_irqrestore(&fs_info->end_io_work_lock, -					       flags); -			return; -		} -		next = fs_info->end_io_work_list.next; -		list_del(next); -		spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags); -		end_io_wq = list_entry(next, struct end_io_wq, list); +	end_io_wq = container_of(work, struct end_io_wq, work); +	bio = end_io_wq->bio; +	fs_info = end_io_wq->info; -		bio = end_io_wq->bio; -		if (end_io_wq->metadata && !bio_ready_for_csum(bio)) { -			spin_lock_irqsave(&fs_info->end_io_work_lock, flags); -			was_empty = list_empty(&fs_info->end_io_work_list); -			list_add_tail(&end_io_wq->list, -				      &fs_info->end_io_work_list); -			spin_unlock_irqrestore(&fs_info->end_io_work_lock, -					       flags); -			if (was_empty) -				return; -			continue; -		} -		error = end_io_wq->error; -		bio->bi_private = end_io_wq->private; -		bio->bi_end_io = end_io_wq->end_io; -		kfree(end_io_wq); -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) -		bio_endio(bio, bio->bi_size, error); -#else -		bio_endio(bio, error); -#endif +	/* metadata bios are special because the whole tree block must +	 * be checksummed at once.  This makes sure the entire block is in +	 * ram and up to date before trying to verify things.  For +	 * blocksize <= pagesize, it is basically a noop +	 */ +	if (end_io_wq->metadata && !bio_ready_for_csum(bio)) { +		btrfs_queue_worker(&fs_info->endio_workers, +				   &end_io_wq->work); +		return;  	} -} - -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -static void btrfs_async_submit_work(void *p) -#else -static void btrfs_async_submit_work(struct work_struct *work) -#endif -{ -#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -	struct btrfs_fs_info *fs_info = p; +	error = end_io_wq->error; +	bio->bi_private = end_io_wq->private; +	bio->bi_end_io = end_io_wq->end_io; +	kfree(end_io_wq); +#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23) +	bio_endio(bio, bio->bi_size, error);  #else -	struct btrfs_fs_info *fs_info = container_of(work, -						     struct btrfs_fs_info, -						     async_submit_work); +	bio_endio(bio, error);  #endif -	struct async_submit_bio *async; -	struct list_head *next; - -	while(1) { -		spin_lock(&fs_info->async_submit_work_lock); -		if (list_empty(&fs_info->async_submit_work_list)) { -			spin_unlock(&fs_info->async_submit_work_lock); -			return; -		} -		next = fs_info->async_submit_work_list.next; -		list_del(next); -		atomic_dec(&fs_info->nr_async_submits); -		spin_unlock(&fs_info->async_submit_work_lock); - -		async = list_entry(next, struct async_submit_bio, list); -		async->submit_bio_hook(async->inode, async->rw, async->bio, -				       async->mirror_num); -		kfree(async); -	}  }  struct btrfs_root *open_ctree(struct super_block *sb, @@ -1155,19 +1123,11 @@ struct btrfs_root *open_ctree(struct super_block *sb,  		err = -ENOMEM;  		goto fail;  	} -	end_io_workqueue = create_workqueue("btrfs-end-io"); -	BUG_ON(!end_io_workqueue); -	async_submit_workqueue = create_workqueue("btrfs-async-submit"); -  	INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS);  	INIT_LIST_HEAD(&fs_info->trans_list);  	INIT_LIST_HEAD(&fs_info->dead_roots);  	INIT_LIST_HEAD(&fs_info->hashers); -	INIT_LIST_HEAD(&fs_info->end_io_work_list); -	INIT_LIST_HEAD(&fs_info->async_submit_work_list);  	spin_lock_init(&fs_info->hash_lock); -	spin_lock_init(&fs_info->end_io_work_lock); -	spin_lock_init(&fs_info->async_submit_work_lock);  	spin_lock_init(&fs_info->delalloc_lock);  	spin_lock_init(&fs_info->new_trans_lock); @@ -1222,13 +1182,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,  	fs_info->do_barriers = 1;  #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18) -	INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info); -	INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work, -		  fs_info);  	INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info);  #else -	INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum); -	INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work);  	INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner);  #endif  	BTRFS_I(fs_info->btree_inode)->root = tree_root; @@ -1240,6 +1195,19 @@ struct btrfs_root *open_ctree(struct super_block *sb,  	mutex_init(&fs_info->trans_mutex);  	mutex_init(&fs_info->fs_mutex); +	/* we need to start all the end_io workers up front because the +	 * queue work function gets called at interrupt time.  The endio +	 * workers don't normally start IO, so some number of them <= the +	 * number of cpus is fine.  They handle checksumming after a read. +	 * +	 * The other worker threads do start IO, so the max is larger than +	 * the number of CPUs.  FIXME, tune this for huge machines +	 */ +	btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2); +	btrfs_init_workers(&fs_info->endio_workers, num_online_cpus()); +	btrfs_start_workers(&fs_info->workers, 1); +	btrfs_start_workers(&fs_info->endio_workers, num_online_cpus()); +  #if 0  	ret = add_hasher(fs_info, "crc32c");  	if (ret) { @@ -1375,6 +1343,8 @@ fail_sb_buffer:  	extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);  fail_iput:  	iput(fs_info->btree_inode); +	btrfs_stop_workers(&fs_info->workers); +	btrfs_stop_workers(&fs_info->endio_workers);  fail:  	btrfs_close_devices(fs_info->fs_devices);  	btrfs_mapping_tree_free(&fs_info->mapping_tree); @@ -1623,16 +1593,10 @@ int close_ctree(struct btrfs_root *root)  	extent_io_tree_empty_lru(&fs_info->extent_ins);  	extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree); -	flush_workqueue(async_submit_workqueue); -	flush_workqueue(end_io_workqueue); -  	truncate_inode_pages(fs_info->btree_inode->i_mapping, 0); -	flush_workqueue(async_submit_workqueue); -	destroy_workqueue(async_submit_workqueue); - -	flush_workqueue(end_io_workqueue); -	destroy_workqueue(end_io_workqueue); +	btrfs_stop_workers(&fs_info->workers); +	btrfs_stop_workers(&fs_info->endio_workers);  	iput(fs_info->btree_inode);  #if 0 diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c index 0f14697bece..7daef8d3700 100644 --- a/fs/btrfs/inode.c +++ b/fs/btrfs/inode.c @@ -359,7 +359,7 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,  	kfree(sums); -	return btrfs_map_bio(root, rw, bio, mirror_num); +	return btrfs_map_bio(root, rw, bio, mirror_num, 1);  }  int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio, @@ -383,7 +383,7 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,  				   inode, rw, bio, mirror_num,  				   __btrfs_submit_bio_hook);  mapit: -	return btrfs_map_bio(root, rw, bio, mirror_num); +	return btrfs_map_bio(root, rw, bio, mirror_num, 0);  }  int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end) diff --git a/fs/btrfs/volumes.c b/fs/btrfs/volumes.c index 722eb455015..c57458ce633 100644 --- a/fs/btrfs/volumes.c +++ b/fs/btrfs/volumes.c @@ -27,6 +27,7 @@  #include "transaction.h"  #include "print-tree.h"  #include "volumes.h" +#include "async-thread.h"  struct map_lookup {  	u64 type; @@ -110,6 +111,101 @@ static struct btrfs_fs_devices *find_fsid(u8 *fsid)  	return NULL;  } +/* + * we try to collect pending bios for a device so we don't get a large + * number of procs sending bios down to the same device.  This greatly + * improves the schedulers ability to collect and merge the bios. + * + * But, it also turns into a long list of bios to process and that is sure + * to eventually make the worker thread block.  The solution here is to + * make some progress and then put this work struct back at the end of + * the list if the block device is congested.  This way, multiple devices + * can make progress from a single worker thread. + */ +int run_scheduled_bios(struct btrfs_device *device) +{ +	struct bio *pending; +	struct backing_dev_info *bdi; +	struct bio *tail; +	struct bio *cur; +	int again = 0; +	unsigned long num_run = 0; + +	bdi = device->bdev->bd_inode->i_mapping->backing_dev_info; +loop: +	spin_lock(&device->io_lock); + +	/* take all the bios off the list at once and process them +	 * later on (without the lock held).  But, remember the +	 * tail and other pointers so the bios can be properly reinserted +	 * into the list if we hit congestion +	 */ +	pending = device->pending_bios; +	tail = device->pending_bio_tail; +	WARN_ON(pending && !tail); +	device->pending_bios = NULL; +	device->pending_bio_tail = NULL; + +	/* +	 * if pending was null this time around, no bios need processing +	 * at all and we can stop.  Otherwise it'll loop back up again +	 * and do an additional check so no bios are missed. +	 * +	 * device->running_pending is used to synchronize with the +	 * schedule_bio code. +	 */ +	if (pending) { +		again = 1; +		device->running_pending = 1; +	} else { +		again = 0; +		device->running_pending = 0; +	} +	spin_unlock(&device->io_lock); + +	while(pending) { +		cur = pending; +		pending = pending->bi_next; +		cur->bi_next = NULL; +		atomic_dec(&device->dev_root->fs_info->nr_async_submits); +		submit_bio(cur->bi_rw, cur); +		num_run++; + +		/* +		 * we made progress, there is more work to do and the bdi +		 * is now congested.  Back off and let other work structs +		 * run instead +		 */ +		if (pending && num_run && bdi_write_congested(bdi)) { +			struct bio *old_head; + +			spin_lock(&device->io_lock); +			old_head = device->pending_bios; +			device->pending_bios = pending; +			if (device->pending_bio_tail) +				tail->bi_next = old_head; +			else +				device->pending_bio_tail = tail; + +			spin_unlock(&device->io_lock); +			btrfs_requeue_work(&device->work); +			goto done; +		} +	} +	if (again) +		goto loop; +done: +	return 0; +} + +void pending_bios_fn(struct btrfs_work *work) +{ +	struct btrfs_device *device; + +	device = container_of(work, struct btrfs_device, work); +	run_scheduled_bios(device); +} +  static int device_list_add(const char *path,  			   struct btrfs_super_block *disk_super,  			   u64 devid, struct btrfs_fs_devices **fs_devices_ret) @@ -141,6 +237,7 @@ static int device_list_add(const char *path,  			return -ENOMEM;  		}  		device->devid = devid; +		device->work.func = pending_bios_fn;  		memcpy(device->uuid, disk_super->dev_item.uuid,  		       BTRFS_UUID_SIZE);  		device->barriers = 1; @@ -925,6 +1022,7 @@ int btrfs_init_new_device(struct btrfs_root *root, char *device_path)  	}  	device->barriers = 1; +	device->work.func = pending_bios_fn;  	generate_random_uuid(device->uuid);  	spin_lock_init(&device->io_lock);  	device->name = kstrdup(device_path, GFP_NOFS); @@ -1965,8 +2063,61 @@ static int end_bio_multi_stripe(struct bio *bio,  #endif  } +struct async_sched { +	struct bio *bio; +	int rw; +	struct btrfs_fs_info *info; +	struct btrfs_work work; +}; + +/* + * see run_scheduled_bios for a description of why bios are collected for + * async submit. + * + * This will add one bio to the pending list for a device and make sure + * the work struct is scheduled. + */ +int schedule_bio(struct btrfs_root *root, struct btrfs_device *device, +		 int rw, struct bio *bio) +{ +	int should_queue = 1; + +	/* don't bother with additional async steps for reads, right now */ +	if (!(rw & (1 << BIO_RW))) { +		submit_bio(rw, bio); +		return 0; +	} + +	/* +	 * nr_async_sumbits allows us to reliably return congestion to the +	 * higher layers.  Otherwise, the async bio makes it appear we have +	 * made progress against dirty pages when we've really just put it +	 * on a queue for later +	 */ +	atomic_inc(&root->fs_info->nr_async_submits); +	bio->bi_next = NULL; +	bio->bi_rw |= rw; + +	spin_lock(&device->io_lock); + +	if (device->pending_bio_tail) +		device->pending_bio_tail->bi_next = bio; + +	device->pending_bio_tail = bio; +	if (!device->pending_bios) +		device->pending_bios = bio; +	if (device->running_pending) +		should_queue = 0; + +	spin_unlock(&device->io_lock); + +	if (should_queue) +		btrfs_queue_worker(&root->fs_info->workers, &device->work); +	return 0; +} +  int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, -		  int mirror_num) +		  int mirror_num, int async_submit)  {  	struct btrfs_mapping_tree *map_tree;  	struct btrfs_device *dev; @@ -2012,10 +2163,10 @@ int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,  		dev = multi->stripes[dev_nr].dev;  		if (dev && dev->bdev) {  			bio->bi_bdev = dev->bdev; -			spin_lock(&dev->io_lock); -			dev->total_ios++; -			spin_unlock(&dev->io_lock); -			submit_bio(rw, bio); +			if (async_submit) +				schedule_bio(root, dev, rw, bio); +			else +				submit_bio(rw, bio);  		} else {  			bio->bi_bdev = root->fs_info->fs_devices->latest_bdev;  			bio->bi_sector = logical >> 9; @@ -2054,6 +2205,7 @@ static struct btrfs_device *add_missing_dev(struct btrfs_root *root,  	device->barriers = 1;  	device->dev_root = root->fs_info->dev_root;  	device->devid = devid; +	device->work.func = pending_bios_fn;  	fs_devices->num_devices++;  	spin_lock_init(&device->io_lock);  	memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE); diff --git a/fs/btrfs/volumes.h b/fs/btrfs/volumes.h index 4df6b1608f9..48a44f7a938 100644 --- a/fs/btrfs/volumes.h +++ b/fs/btrfs/volumes.h @@ -20,6 +20,7 @@  #define __BTRFS_VOLUMES_  #include <linux/bio.h> +#include "async-thread.h"  struct buffer_head;  struct btrfs_device { @@ -27,6 +28,9 @@ struct btrfs_device {  	struct list_head dev_alloc_list;  	struct btrfs_root *dev_root;  	struct buffer_head *pending_io; +	struct bio *pending_bios; +	struct bio *pending_bio_tail; +	int running_pending;  	u64 generation;  	int barriers; @@ -36,8 +40,6 @@ struct btrfs_device {  	struct block_device *bdev; -	u64 total_ios; -  	char *name;  	/* the internal btrfs device id */ @@ -63,6 +65,8 @@ struct btrfs_device {  	/* physical drive uuid (or lvm uuid) */  	u8 uuid[BTRFS_UUID_SIZE]; + +	struct btrfs_work work;  };  struct btrfs_fs_devices { @@ -117,7 +121,7 @@ int btrfs_alloc_chunk(struct btrfs_trans_handle *trans,  void btrfs_mapping_init(struct btrfs_mapping_tree *tree);  void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree);  int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio, -		  int mirror_num); +		  int mirror_num, int async_submit);  int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf);  int btrfs_open_devices(struct btrfs_fs_devices *fs_devices,  		       int flags, void *holder);  |