Hello,
With cmwq, workqueue now can handle all use cases of slow-work. This
patchset converts all slow-work users to use workqueue instead and
remove slow-work from the kernel. The following users are converted.
* fscache-object: a dedicated unbound workqueue is used.
* fscache-operation: a dedicated unbound workqueue is used.
* cifs: system_nrt_wq is used.
* gfs2: a dedicated nrt workqueue is used.
* drm: system_nrt_wq is used.
cifs and gfs2 conversions largely remained unchanged from the last
posting. fscache conversions are updated to use unbound workqueues
and debugfs support is dropped for now (workqueue debugfs support is
not included in wq#for-next). Tracing API based debug implementation
is planned.
The drm conversion is new but the conversion is mostly trivial and my
simplistic testing involving plugging and unplugging monitor works
fine.
David, fscache conversion now uses unbound workqueues and it should
behave virtually the same with slow-work.
As whether cmwq will get merged isn't completely clear yet, I think it
would be better to keep these changes in the wq tree instead of
scattering them around multiple trees. Would the maintainers be okay
with that?
This patchset contains the following patches.
0001-fscache-convert-object-to-use-workqueue-instead-of-s.patch
0002-fscache-convert-operation-to-use-workqueue-instead-o.patch
0003-fscache-drop-references-to-slow-work.patch
0004-cifs-use-workqueue-instead-of-slow-work.patch
0005-gfs2-use-workqueue-instead-of-slow-work.patch
0006-drm-use-workqueue-instead-of-slow-work.patch
0007-slow-work-kill-it.patch
It's on top of the following branch.
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git for-next
and is available in the following git branch
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git review-slow-work-conversion
and contains the following changes.
Documentation/filesystems/caching/fscache.txt | 10
Documentation/slow-work.txt | 322 -------
drivers/gpu/drm/drm_crtc_helper.c | 29
fs/cachefiles/namei.c | 13
fs/cachefiles/rdwr.c | 4
fs/cifs/Kconfig | 1
fs/cifs/cifsfs.c | 5
fs/cifs/cifsglob.h | 8
fs/cifs/dir.c | 2
fs/cifs/file.c | 30
fs/cifs/misc.c | 20
fs/fscache/Kconfig | 1
fs/fscache/internal.h | 8
fs/fscache/main.c | 104 ++
fs/fscache/object-list.c | 11
fs/fscache/object.c | 106 +-
fs/fscache/operation.c | 67 -
fs/fscache/page.c | 36
fs/gfs2/Kconfig | 1
fs/gfs2/incore.h | 3
fs/gfs2/main.c | 14
fs/gfs2/ops_fstype.c | 8
fs/gfs2/recovery.c | 54 -
fs/gfs2/recovery.h | 6
fs/gfs2/sys.c | 3
include/drm/drm_crtc.h | 3
include/linux/fscache-cache.h | 47 -
include/linux/slow-work.h | 163 ---
init/Kconfig | 24
kernel/Makefile | 2
kernel/slow-work-debugfs.c | 227 -----
kernel/slow-work.c | 1068 --------------------------
kernel/slow-work.h | 72 -
kernel/sysctl.c | 8
34 files changed, 292 insertions(+), 2188 deletions(-)
Thanks.
--
tejun
Workqueue can now handle high concurrency. Convert drm_crtc_helper to
use system_nrt_wq instead of slow-work. The conversion is mostly
straight forward. One difference is that drm_helper_hpd_irq_event()
no longer blocks and can be called from any context.
Signed-off-by: Tejun Heo <[email protected]>
Cc: David Airlie <[email protected]>
Cc: [email protected]
---
drivers/gpu/drm/drm_crtc_helper.c | 29 ++++++++++-------------------
include/drm/drm_crtc.h | 3 +--
2 files changed, 11 insertions(+), 21 deletions(-)
diff --git a/drivers/gpu/drm/drm_crtc_helper.c b/drivers/gpu/drm/drm_crtc_helper.c
index 9b2a541..7fa3380 100644
--- a/drivers/gpu/drm/drm_crtc_helper.c
+++ b/drivers/gpu/drm/drm_crtc_helper.c
@@ -808,13 +808,11 @@ int drm_helper_resume_force_mode(struct drm_device *dev)
}
EXPORT_SYMBOL(drm_helper_resume_force_mode);
-static struct slow_work_ops output_poll_ops;
-
#define DRM_OUTPUT_POLL_PERIOD (10*HZ)
-static void output_poll_execute(struct slow_work *work)
+static void output_poll_execute(struct work_struct *work)
{
- struct delayed_slow_work *delayed_work = container_of(work, struct delayed_slow_work, work);
- struct drm_device *dev = container_of(delayed_work, struct drm_device, mode_config.output_poll_slow_work);
+ struct delayed_work *delayed_work = to_delayed_work(work);
+ struct drm_device *dev = container_of(delayed_work, struct drm_device, mode_config.output_poll_work);
struct drm_connector *connector;
enum drm_connector_status old_status, status;
bool repoll = false, changed = false;
@@ -854,7 +852,7 @@ static void output_poll_execute(struct slow_work *work)
}
if (repoll) {
- ret = delayed_slow_work_enqueue(delayed_work, DRM_OUTPUT_POLL_PERIOD);
+ ret = queue_delayed_work(system_nrt_wq, delayed_work, DRM_OUTPUT_POLL_PERIOD);
if (ret)
DRM_ERROR("delayed enqueue failed %d\n", ret);
}
@@ -864,7 +862,7 @@ void drm_kms_helper_poll_disable(struct drm_device *dev)
{
if (!dev->mode_config.poll_enabled)
return;
- delayed_slow_work_cancel(&dev->mode_config.output_poll_slow_work);
+ cancel_delayed_work_sync(&dev->mode_config.output_poll_work);
}
EXPORT_SYMBOL(drm_kms_helper_poll_disable);
@@ -880,7 +878,7 @@ void drm_kms_helper_poll_enable(struct drm_device *dev)
}
if (poll) {
- ret = delayed_slow_work_enqueue(&dev->mode_config.output_poll_slow_work, DRM_OUTPUT_POLL_PERIOD);
+ ret = queue_delayed_work(system_nrt_wq, &dev->mode_config.output_poll_work, DRM_OUTPUT_POLL_PERIOD);
if (ret)
DRM_ERROR("delayed enqueue failed %d\n", ret);
}
@@ -889,9 +887,7 @@ EXPORT_SYMBOL(drm_kms_helper_poll_enable);
void drm_kms_helper_poll_init(struct drm_device *dev)
{
- slow_work_register_user(THIS_MODULE);
- delayed_slow_work_init(&dev->mode_config.output_poll_slow_work,
- &output_poll_ops);
+ INIT_DELAYED_WORK(&dev->mode_config.output_poll_work, output_poll_execute);
dev->mode_config.poll_enabled = true;
drm_kms_helper_poll_enable(dev);
@@ -901,7 +897,6 @@ EXPORT_SYMBOL(drm_kms_helper_poll_init);
void drm_kms_helper_poll_fini(struct drm_device *dev)
{
drm_kms_helper_poll_disable(dev);
- slow_work_unregister_user(THIS_MODULE);
}
EXPORT_SYMBOL(drm_kms_helper_poll_fini);
@@ -909,12 +904,8 @@ void drm_helper_hpd_irq_event(struct drm_device *dev)
{
if (!dev->mode_config.poll_enabled)
return;
- delayed_slow_work_cancel(&dev->mode_config.output_poll_slow_work);
- /* schedule a slow work asap */
- delayed_slow_work_enqueue(&dev->mode_config.output_poll_slow_work, 0);
+ /* kill timer and schedule immediate execution, this doesn't block */
+ cancel_delayed_work(&dev->mode_config.output_poll_work);
+ queue_delayed_work(system_nrt_wq, &dev->mode_config.output_poll_work, 0);
}
EXPORT_SYMBOL(drm_helper_hpd_irq_event);
-
-static struct slow_work_ops output_poll_ops = {
- .execute = output_poll_execute,
-};
diff --git a/include/drm/drm_crtc.h b/include/drm/drm_crtc.h
index 93a1a31..c707270 100644
--- a/include/drm/drm_crtc.h
+++ b/include/drm/drm_crtc.h
@@ -31,7 +31,6 @@
#include <linux/idr.h>
#include <linux/fb.h>
-#include <linux/slow-work.h>
struct drm_device;
struct drm_mode_set;
@@ -595,7 +594,7 @@ struct drm_mode_config {
/* output poll support */
bool poll_enabled;
- struct delayed_slow_work output_poll_slow_work;
+ struct delayed_work output_poll_work;
/* pointers to standard properties */
struct list_head property_blob_list;
--
1.6.4.2
Workqueue can now handle high concurrency. Use system_nrt_wq
instead of slow-work.
* Updated is_valid_oplock_break() to not call cifs_oplock_break_put()
as advised by Steve French. It might cause deadlock. Instead,
reference is increased after queueing succeeded and
cifs_oplock_break() briefly grabs GlobalSMBSeslock before putting
the cfile to make sure it doesn't put before the matching get is
finished.
* Anton Blanchard reported that cifs conversion was using now gone
system_single_wq. Use system_nrt_wq which provides non-reentrance
guarantee which is enough and much better.
Signed-off-by: Tejun Heo <[email protected]>
Cc: Steve French <[email protected]>
Cc: Anton Blanchard <[email protected]>
---
fs/cifs/Kconfig | 1 -
fs/cifs/cifsfs.c | 5 -----
fs/cifs/cifsglob.h | 8 +++++---
fs/cifs/dir.c | 2 +-
fs/cifs/file.c | 30 +++++++++++++-----------------
fs/cifs/misc.c | 20 ++++++++++++--------
6 files changed, 31 insertions(+), 35 deletions(-)
diff --git a/fs/cifs/Kconfig b/fs/cifs/Kconfig
index 80f3525..6994a0f 100644
--- a/fs/cifs/Kconfig
+++ b/fs/cifs/Kconfig
@@ -2,7 +2,6 @@ config CIFS
tristate "CIFS support (advanced network filesystem, SMBFS successor)"
depends on INET
select NLS
- select SLOW_WORK
help
This is the client VFS module for the Common Internet File System
(CIFS) protocol which is the successor to the Server Message Block
diff --git a/fs/cifs/cifsfs.c b/fs/cifs/cifsfs.c
index 78c02eb..4c07517 100644
--- a/fs/cifs/cifsfs.c
+++ b/fs/cifs/cifsfs.c
@@ -917,15 +917,10 @@ init_cifs(void)
if (rc)
goto out_unregister_key_type;
#endif
- rc = slow_work_register_user(THIS_MODULE);
- if (rc)
- goto out_unregister_resolver_key;
return 0;
- out_unregister_resolver_key:
#ifdef CONFIG_CIFS_DFS_UPCALL
- unregister_key_type(&key_type_dns_resolver);
out_unregister_key_type:
#endif
#ifdef CONFIG_CIFS_UPCALL
diff --git a/fs/cifs/cifsglob.h b/fs/cifs/cifsglob.h
index a88479c..f5a1f9b 100644
--- a/fs/cifs/cifsglob.h
+++ b/fs/cifs/cifsglob.h
@@ -19,7 +19,7 @@
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/slab.h>
-#include <linux/slow-work.h>
+#include <linux/workqueue.h>
#include "cifs_fs_sb.h"
#include "cifsacl.h"
/*
@@ -363,7 +363,7 @@ struct cifsFileInfo {
atomic_t count; /* reference count */
struct mutex fh_mutex; /* prevents reopen race after dead ses*/
struct cifs_search_info srch_inf;
- struct slow_work oplock_break; /* slow_work job for oplock breaks */
+ struct work_struct oplock_break; /* work for oplock breaks */
};
/* Take a reference on the file private data */
@@ -732,4 +732,6 @@ GLOBAL_EXTERN unsigned int cifs_min_rcv; /* min size of big ntwrk buf pool */
GLOBAL_EXTERN unsigned int cifs_min_small; /* min size of small buf pool */
GLOBAL_EXTERN unsigned int cifs_max_pending; /* MAX requests at once to server*/
-extern const struct slow_work_ops cifs_oplock_break_ops;
+void cifs_oplock_break(struct work_struct *work);
+void cifs_oplock_break_get(struct cifsFileInfo *cfile);
+void cifs_oplock_break_put(struct cifsFileInfo *cfile);
diff --git a/fs/cifs/dir.c b/fs/cifs/dir.c
index 391816b..b066e73 100644
--- a/fs/cifs/dir.c
+++ b/fs/cifs/dir.c
@@ -162,7 +162,7 @@ cifs_new_fileinfo(struct inode *newinode, __u16 fileHandle,
mutex_init(&pCifsFile->lock_mutex);
INIT_LIST_HEAD(&pCifsFile->llist);
atomic_set(&pCifsFile->count, 1);
- slow_work_init(&pCifsFile->oplock_break, &cifs_oplock_break_ops);
+ INIT_WORK(&pCifsFile->oplock_break, cifs_oplock_break);
write_lock(&GlobalSMBSeslock);
list_add(&pCifsFile->tlist, &cifs_sb->tcon->openFileList);
diff --git a/fs/cifs/file.c b/fs/cifs/file.c
index 75541af..e767bfa 100644
--- a/fs/cifs/file.c
+++ b/fs/cifs/file.c
@@ -2295,8 +2295,7 @@ out:
return rc;
}
-static void
-cifs_oplock_break(struct slow_work *work)
+void cifs_oplock_break(struct work_struct *work)
{
struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
oplock_break);
@@ -2333,33 +2332,30 @@ cifs_oplock_break(struct slow_work *work)
LOCKING_ANDX_OPLOCK_RELEASE, false);
cFYI(1, "Oplock release rc = %d", rc);
}
+
+ /*
+ * We might have kicked in before is_valid_oplock_break()
+ * finished grabbing reference for us. Make sure it's done by
+ * waiting for GlobalSMSSeslock.
+ */
+ write_lock(&GlobalSMBSeslock);
+ write_unlock(&GlobalSMBSeslock);
+
+ cifs_oplock_break_put(cfile);
}
-static int
-cifs_oplock_break_get(struct slow_work *work)
+void cifs_oplock_break_get(struct cifsFileInfo *cfile)
{
- struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
- oplock_break);
mntget(cfile->mnt);
cifsFileInfo_get(cfile);
- return 0;
}
-static void
-cifs_oplock_break_put(struct slow_work *work)
+void cifs_oplock_break_put(struct cifsFileInfo *cfile)
{
- struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
- oplock_break);
mntput(cfile->mnt);
cifsFileInfo_put(cfile);
}
-const struct slow_work_ops cifs_oplock_break_ops = {
- .get_ref = cifs_oplock_break_get,
- .put_ref = cifs_oplock_break_put,
- .execute = cifs_oplock_break,
-};
-
const struct address_space_operations cifs_addr_ops = {
.readpage = cifs_readpage,
.readpages = cifs_readpages,
diff --git a/fs/cifs/misc.c b/fs/cifs/misc.c
index 1394aa3..3ccadc1 100644
--- a/fs/cifs/misc.c
+++ b/fs/cifs/misc.c
@@ -498,7 +498,6 @@ is_valid_oplock_break(struct smb_hdr *buf, struct TCP_Server_Info *srv)
struct cifsTconInfo *tcon;
struct cifsInodeInfo *pCifsInode;
struct cifsFileInfo *netfile;
- int rc;
cFYI(1, "Checking for oplock break or dnotify response");
if ((pSMB->hdr.Command == SMB_COM_NT_TRANSACT) &&
@@ -583,13 +582,18 @@ is_valid_oplock_break(struct smb_hdr *buf, struct TCP_Server_Info *srv)
pCifsInode->clientCanCacheAll = false;
if (pSMB->OplockLevel == 0)
pCifsInode->clientCanCacheRead = false;
- rc = slow_work_enqueue(&netfile->oplock_break);
- if (rc) {
- cERROR(1, "failed to enqueue oplock "
- "break: %d\n", rc);
- } else {
- netfile->oplock_break_cancelled = false;
- }
+
+ /*
+ * cifs_oplock_break_put() can't be called
+ * from here. Get reference after queueing
+ * succeeded. cifs_oplock_break() will
+ * synchronize using GlobalSMSSeslock.
+ */
+ if (queue_work(system_nrt_wq,
+ &netfile->oplock_break))
+ cifs_oplock_break_get(netfile);
+ netfile->oplock_break_cancelled = false;
+
read_unlock(&GlobalSMBSeslock);
read_unlock(&cifs_tcp_ses_lock);
return true;
--
1.6.4.2
slow-work doesn't have any user left. Kill it.
Signed-off-by: Tejun Heo <[email protected]>
Cc: David Howells <[email protected]>
---
Documentation/slow-work.txt | 322 -------------
include/linux/slow-work.h | 163 -------
init/Kconfig | 24 -
kernel/Makefile | 2 -
kernel/slow-work-debugfs.c | 227 ---------
kernel/slow-work.c | 1068 -------------------------------------------
kernel/slow-work.h | 72 ---
kernel/sysctl.c | 8 -
8 files changed, 0 insertions(+), 1886 deletions(-)
delete mode 100644 Documentation/slow-work.txt
delete mode 100644 include/linux/slow-work.h
delete mode 100644 kernel/slow-work-debugfs.c
delete mode 100644 kernel/slow-work.c
delete mode 100644 kernel/slow-work.h
diff --git a/Documentation/slow-work.txt b/Documentation/slow-work.txt
deleted file mode 100644
index 9dbf447..0000000
--- a/Documentation/slow-work.txt
+++ /dev/null
@@ -1,322 +0,0 @@
- ====================================
- SLOW WORK ITEM EXECUTION THREAD POOL
- ====================================
-
-By: David Howells <[email protected]>
-
-The slow work item execution thread pool is a pool of threads for performing
-things that take a relatively long time, such as making mkdir calls.
-Typically, when processing something, these items will spend a lot of time
-blocking a thread on I/O, thus making that thread unavailable for doing other
-work.
-
-The standard workqueue model is unsuitable for this class of work item as that
-limits the owner to a single thread or a single thread per CPU. For some
-tasks, however, more threads - or fewer - are required.
-
-There is just one pool per system. It contains no threads unless something
-wants to use it - and that something must register its interest first. When
-the pool is active, the number of threads it contains is dynamic, varying
-between a maximum and minimum setting, depending on the load.
-
-
-====================
-CLASSES OF WORK ITEM
-====================
-
-This pool support two classes of work items:
-
- (*) Slow work items.
-
- (*) Very slow work items.
-
-The former are expected to finish much quicker than the latter.
-
-An operation of the very slow class may do a batch combination of several
-lookups, mkdirs, and a create for instance.
-
-An operation of the ordinarily slow class may, for example, write stuff or
-expand files, provided the time taken to do so isn't too long.
-
-Operations of both types may sleep during execution, thus tying up the thread
-loaned to it.
-
-A further class of work item is available, based on the slow work item class:
-
- (*) Delayed slow work items.
-
-These are slow work items that have a timer to defer queueing of the item for
-a while.
-
-
-THREAD-TO-CLASS ALLOCATION
---------------------------
-
-Not all the threads in the pool are available to work on very slow work items.
-The number will be between one and one fewer than the number of active threads.
-This is configurable (see the "Pool Configuration" section).
-
-All the threads are available to work on ordinarily slow work items, but a
-percentage of the threads will prefer to work on very slow work items.
-
-The configuration ensures that at least one thread will be available to work on
-very slow work items, and at least one thread will be available that won't work
-on very slow work items at all.
-
-
-=====================
-USING SLOW WORK ITEMS
-=====================
-
-Firstly, a module or subsystem wanting to make use of slow work items must
-register its interest:
-
- int ret = slow_work_register_user(struct module *module);
-
-This will return 0 if successful, or a -ve error upon failure. The module
-pointer should be the module interested in using this facility (almost
-certainly THIS_MODULE).
-
-
-Slow work items may then be set up by:
-
- (1) Declaring a slow_work struct type variable:
-
- #include <linux/slow-work.h>
-
- struct slow_work myitem;
-
- (2) Declaring the operations to be used for this item:
-
- struct slow_work_ops myitem_ops = {
- .get_ref = myitem_get_ref,
- .put_ref = myitem_put_ref,
- .execute = myitem_execute,
- };
-
- [*] For a description of the ops, see section "Item Operations".
-
- (3) Initialising the item:
-
- slow_work_init(&myitem, &myitem_ops);
-
- or:
-
- delayed_slow_work_init(&myitem, &myitem_ops);
-
- or:
-
- vslow_work_init(&myitem, &myitem_ops);
-
- depending on its class.
-
-A suitably set up work item can then be enqueued for processing:
-
- int ret = slow_work_enqueue(&myitem);
-
-This will return a -ve error if the thread pool is unable to gain a reference
-on the item, 0 otherwise, or (for delayed work):
-
- int ret = delayed_slow_work_enqueue(&myitem, my_jiffy_delay);
-
-
-The items are reference counted, so there ought to be no need for a flush
-operation. But as the reference counting is optional, means to cancel
-existing work items are also included:
-
- cancel_slow_work(&myitem);
- cancel_delayed_slow_work(&myitem);
-
-can be used to cancel pending work. The above cancel function waits for
-existing work to have been executed (or prevent execution of them, depending
-on timing).
-
-
-When all a module's slow work items have been processed, and the
-module has no further interest in the facility, it should unregister its
-interest:
-
- slow_work_unregister_user(struct module *module);
-
-The module pointer is used to wait for all outstanding work items for that
-module before completing the unregistration. This prevents the put_ref() code
-from being taken away before it completes. module should almost certainly be
-THIS_MODULE.
-
-
-================
-HELPER FUNCTIONS
-================
-
-The slow-work facility provides a function by which it can be determined
-whether or not an item is queued for later execution:
-
- bool queued = slow_work_is_queued(struct slow_work *work);
-
-If it returns false, then the item is not on the queue (it may be executing
-with a requeue pending). This can be used to work out whether an item on which
-another depends is on the queue, thus allowing a dependent item to be queued
-after it.
-
-If the above shows an item on which another depends not to be queued, then the
-owner of the dependent item might need to wait. However, to avoid locking up
-the threads unnecessarily be sleeping in them, it can make sense under some
-circumstances to return the work item to the queue, thus deferring it until
-some other items have had a chance to make use of the yielded thread.
-
-To yield a thread and defer an item, the work function should simply enqueue
-the work item again and return. However, this doesn't work if there's nothing
-actually on the queue, as the thread just vacated will jump straight back into
-the item's work function, thus busy waiting on a CPU.
-
-Instead, the item should use the thread to wait for the dependency to go away,
-but rather than using schedule() or schedule_timeout() to sleep, it should use
-the following function:
-
- bool requeue = slow_work_sleep_till_thread_needed(
- struct slow_work *work,
- signed long *_timeout);
-
-This will add a second wait and then sleep, such that it will be woken up if
-either something appears on the queue that could usefully make use of the
-thread - and behind which this item can be queued, or if the event the caller
-set up to wait for happens. True will be returned if something else appeared
-on the queue and this work function should perhaps return, of false if
-something else woke it up. The timeout is as for schedule_timeout().
-
-For example:
-
- wq = bit_waitqueue(&my_flags, MY_BIT);
- init_wait(&wait);
- requeue = false;
- do {
- prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE);
- if (!test_bit(MY_BIT, &my_flags))
- break;
- requeue = slow_work_sleep_till_thread_needed(&my_work,
- &timeout);
- } while (timeout > 0 && !requeue);
- finish_wait(wq, &wait);
- if (!test_bit(MY_BIT, &my_flags)
- goto do_my_thing;
- if (requeue)
- return; // to slow_work
-
-
-===============
-ITEM OPERATIONS
-===============
-
-Each work item requires a table of operations of type struct slow_work_ops.
-Only ->execute() is required; the getting and putting of a reference and the
-describing of an item are all optional.
-
- (*) Get a reference on an item:
-
- int (*get_ref)(struct slow_work *work);
-
- This allows the thread pool to attempt to pin an item by getting a
- reference on it. This function should return 0 if the reference was
- granted, or a -ve error otherwise. If an error is returned,
- slow_work_enqueue() will fail.
-
- The reference is held whilst the item is queued and whilst it is being
- executed. The item may then be requeued with the same reference held, or
- the reference will be released.
-
- (*) Release a reference on an item:
-
- void (*put_ref)(struct slow_work *work);
-
- This allows the thread pool to unpin an item by releasing the reference on
- it. The thread pool will not touch the item again once this has been
- called.
-
- (*) Execute an item:
-
- void (*execute)(struct slow_work *work);
-
- This should perform the work required of the item. It may sleep, it may
- perform disk I/O and it may wait for locks.
-
- (*) View an item through /proc:
-
- void (*desc)(struct slow_work *work, struct seq_file *m);
-
- If supplied, this should print to 'm' a small string describing the work
- the item is to do. This should be no more than about 40 characters, and
- shouldn't include a newline character.
-
- See the 'Viewing executing and queued items' section below.
-
-
-==================
-POOL CONFIGURATION
-==================
-
-The slow-work thread pool has a number of configurables:
-
- (*) /proc/sys/kernel/slow-work/min-threads
-
- The minimum number of threads that should be in the pool whilst it is in
- use. This may be anywhere between 2 and max-threads.
-
- (*) /proc/sys/kernel/slow-work/max-threads
-
- The maximum number of threads that should in the pool. This may be
- anywhere between min-threads and 255 or NR_CPUS * 2, whichever is greater.
-
- (*) /proc/sys/kernel/slow-work/vslow-percentage
-
- The percentage of active threads in the pool that may be used to execute
- very slow work items. This may be between 1 and 99. The resultant number
- is bounded to between 1 and one fewer than the number of active threads.
- This ensures there is always at least one thread that can process very
- slow work items, and always at least one thread that won't.
-
-
-==================================
-VIEWING EXECUTING AND QUEUED ITEMS
-==================================
-
-If CONFIG_SLOW_WORK_DEBUG is enabled, a debugfs file is made available:
-
- /sys/kernel/debug/slow_work/runqueue
-
-through which the list of work items being executed and the queues of items to
-be executed may be viewed. The owner of a work item is given the chance to
-add some information of its own.
-
-The contents look something like the following:
-
- THR PID ITEM ADDR FL MARK DESC
- === ===== ================ == ===== ==========
- 0 3005 ffff880023f52348 a 952ms FSC: OBJ17d3: LOOK
- 1 3006 ffff880024e33668 2 160ms FSC: OBJ17e5 OP60d3b: Write1/Store fl=2
- 2 3165 ffff8800296dd180 a 424ms FSC: OBJ17e4: LOOK
- 3 4089 ffff8800262c8d78 a 212ms FSC: OBJ17ea: CRTN
- 4 4090 ffff88002792bed8 2 388ms FSC: OBJ17e8 OP60d36: Write1/Store fl=2
- 5 4092 ffff88002a0ef308 2 388ms FSC: OBJ17e7 OP60d2e: Write1/Store fl=2
- 6 4094 ffff88002abaf4b8 2 132ms FSC: OBJ17e2 OP60d4e: Write1/Store fl=2
- 7 4095 ffff88002bb188e0 a 388ms FSC: OBJ17e9: CRTN
- vsq - ffff880023d99668 1 308ms FSC: OBJ17e0 OP60f91: Write1/EnQ fl=2
- vsq - ffff8800295d1740 1 212ms FSC: OBJ16be OP4d4b6: Write1/EnQ fl=2
- vsq - ffff880025ba3308 1 160ms FSC: OBJ179a OP58dec: Write1/EnQ fl=2
- vsq - ffff880024ec83e0 1 160ms FSC: OBJ17ae OP599f2: Write1/EnQ fl=2
- vsq - ffff880026618e00 1 160ms FSC: OBJ17e6 OP60d33: Write1/EnQ fl=2
- vsq - ffff880025a2a4b8 1 132ms FSC: OBJ16a2 OP4d583: Write1/EnQ fl=2
- vsq - ffff880023cbe6d8 9 212ms FSC: OBJ17eb: LOOK
- vsq - ffff880024d37590 9 212ms FSC: OBJ17ec: LOOK
- vsq - ffff880027746cb0 9 212ms FSC: OBJ17ed: LOOK
- vsq - ffff880024d37ae8 9 212ms FSC: OBJ17ee: LOOK
- vsq - ffff880024d37cb0 9 212ms FSC: OBJ17ef: LOOK
- vsq - ffff880025036550 9 212ms FSC: OBJ17f0: LOOK
- vsq - ffff8800250368e0 9 212ms FSC: OBJ17f1: LOOK
- vsq - ffff880025036aa8 9 212ms FSC: OBJ17f2: LOOK
-
-In the 'THR' column, executing items show the thread they're occupying and
-queued threads indicate which queue they're on. 'PID' shows the process ID of
-a slow-work thread that's executing something. 'FL' shows the work item flags.
-'MARK' indicates how long since an item was queued or began executing. Lastly,
-the 'DESC' column permits the owner of an item to give some information.
-
diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h
deleted file mode 100644
index 13337bf..0000000
--- a/include/linux/slow-work.h
+++ /dev/null
@@ -1,163 +0,0 @@
-/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
- *
- * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells ([email protected])
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- *
- * See Documentation/slow-work.txt
- */
-
-#ifndef _LINUX_SLOW_WORK_H
-#define _LINUX_SLOW_WORK_H
-
-#ifdef CONFIG_SLOW_WORK
-
-#include <linux/sysctl.h>
-#include <linux/timer.h>
-
-struct slow_work;
-#ifdef CONFIG_SLOW_WORK_DEBUG
-struct seq_file;
-#endif
-
-/*
- * The operations used to support slow work items
- */
-struct slow_work_ops {
- /* owner */
- struct module *owner;
-
- /* get a ref on a work item
- * - return 0 if successful, -ve if not
- */
- int (*get_ref)(struct slow_work *work);
-
- /* discard a ref to a work item */
- void (*put_ref)(struct slow_work *work);
-
- /* execute a work item */
- void (*execute)(struct slow_work *work);
-
-#ifdef CONFIG_SLOW_WORK_DEBUG
- /* describe a work item for debugfs */
- void (*desc)(struct slow_work *work, struct seq_file *m);
-#endif
-};
-
-/*
- * A slow work item
- * - A reference is held on the parent object by the thread pool when it is
- * queued
- */
-struct slow_work {
- struct module *owner; /* the owning module */
- unsigned long flags;
-#define SLOW_WORK_PENDING 0 /* item pending (further) execution */
-#define SLOW_WORK_EXECUTING 1 /* item currently executing */
-#define SLOW_WORK_ENQ_DEFERRED 2 /* item enqueue deferred */
-#define SLOW_WORK_VERY_SLOW 3 /* item is very slow */
-#define SLOW_WORK_CANCELLING 4 /* item is being cancelled, don't enqueue */
-#define SLOW_WORK_DELAYED 5 /* item is struct delayed_slow_work with active timer */
- const struct slow_work_ops *ops; /* operations table for this item */
- struct list_head link; /* link in queue */
-#ifdef CONFIG_SLOW_WORK_DEBUG
- struct timespec mark; /* jiffies at which queued or exec begun */
-#endif
-};
-
-struct delayed_slow_work {
- struct slow_work work;
- struct timer_list timer;
-};
-
-/**
- * slow_work_init - Initialise a slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a slow work item.
- */
-static inline void slow_work_init(struct slow_work *work,
- const struct slow_work_ops *ops)
-{
- work->flags = 0;
- work->ops = ops;
- INIT_LIST_HEAD(&work->link);
-}
-
-/**
- * slow_work_init - Initialise a delayed slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a delayed slow work item.
- */
-static inline void delayed_slow_work_init(struct delayed_slow_work *dwork,
- const struct slow_work_ops *ops)
-{
- init_timer(&dwork->timer);
- slow_work_init(&dwork->work, ops);
-}
-
-/**
- * vslow_work_init - Initialise a very slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a very slow work item. This item will be restricted such that
- * only a certain number of the pool threads will be able to execute items of
- * this type.
- */
-static inline void vslow_work_init(struct slow_work *work,
- const struct slow_work_ops *ops)
-{
- work->flags = 1 << SLOW_WORK_VERY_SLOW;
- work->ops = ops;
- INIT_LIST_HEAD(&work->link);
-}
-
-/**
- * slow_work_is_queued - Determine if a slow work item is on the work queue
- * work: The work item to test
- *
- * Determine if the specified slow-work item is on the work queue. This
- * returns true if it is actually on the queue.
- *
- * If the item is executing and has been marked for requeue when execution
- * finishes, then false will be returned.
- *
- * Anyone wishing to wait for completion of execution can wait on the
- * SLOW_WORK_EXECUTING bit.
- */
-static inline bool slow_work_is_queued(struct slow_work *work)
-{
- unsigned long flags = work->flags;
- return flags & SLOW_WORK_PENDING && !(flags & SLOW_WORK_EXECUTING);
-}
-
-extern int slow_work_enqueue(struct slow_work *work);
-extern void slow_work_cancel(struct slow_work *work);
-extern int slow_work_register_user(struct module *owner);
-extern void slow_work_unregister_user(struct module *owner);
-
-extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
- unsigned long delay);
-
-static inline void delayed_slow_work_cancel(struct delayed_slow_work *dwork)
-{
- slow_work_cancel(&dwork->work);
-}
-
-extern bool slow_work_sleep_till_thread_needed(struct slow_work *work,
- signed long *_timeout);
-
-#ifdef CONFIG_SYSCTL
-extern ctl_table slow_work_sysctls[];
-#endif
-
-#endif /* CONFIG_SLOW_WORK */
-#endif /* _LINUX_SLOW_WORK_H */
diff --git a/init/Kconfig b/init/Kconfig
index 5cff9a9..cb64c58 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -1143,30 +1143,6 @@ config TRACEPOINTS
source "arch/Kconfig"
-config SLOW_WORK
- default n
- bool
- help
- The slow work thread pool provides a number of dynamically allocated
- threads that can be used by the kernel to perform operations that
- take a relatively long time.
-
- An example of this would be CacheFiles doing a path lookup followed
- by a series of mkdirs and a create call, all of which have to touch
- disk.
-
- See Documentation/slow-work.txt.
-
-config SLOW_WORK_DEBUG
- bool "Slow work debugging through debugfs"
- default n
- depends on SLOW_WORK && DEBUG_FS
- help
- Display the contents of the slow work run queue through debugfs,
- including items currently executing.
-
- See Documentation/slow-work.txt.
-
endmenu # General setup
config HAVE_GENERIC_DMA_COHERENT
diff --git a/kernel/Makefile b/kernel/Makefile
index 057472f..2484ac3 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -99,8 +99,6 @@ obj-$(CONFIG_TRACING) += trace/
obj-$(CONFIG_X86_DS) += trace/
obj-$(CONFIG_RING_BUFFER) += trace/
obj-$(CONFIG_SMP) += sched_cpupri.o
-obj-$(CONFIG_SLOW_WORK) += slow-work.o
-obj-$(CONFIG_SLOW_WORK_DEBUG) += slow-work-debugfs.o
obj-$(CONFIG_PERF_EVENTS) += perf_event.o
obj-$(CONFIG_HAVE_HW_BREAKPOINT) += hw_breakpoint.o
obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o
diff --git a/kernel/slow-work-debugfs.c b/kernel/slow-work-debugfs.c
deleted file mode 100644
index e45c436..0000000
--- a/kernel/slow-work-debugfs.c
+++ /dev/null
@@ -1,227 +0,0 @@
-/* Slow work debugging
- *
- * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells ([email protected])
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- */
-
-#include <linux/module.h>
-#include <linux/slow-work.h>
-#include <linux/fs.h>
-#include <linux/time.h>
-#include <linux/seq_file.h>
-#include "slow-work.h"
-
-#define ITERATOR_SHIFT (BITS_PER_LONG - 4)
-#define ITERATOR_SELECTOR (0xfUL << ITERATOR_SHIFT)
-#define ITERATOR_COUNTER (~ITERATOR_SELECTOR)
-
-void slow_work_new_thread_desc(struct slow_work *work, struct seq_file *m)
-{
- seq_puts(m, "Slow-work: New thread");
-}
-
-/*
- * Render the time mark field on a work item into a 5-char time with units plus
- * a space
- */
-static void slow_work_print_mark(struct seq_file *m, struct slow_work *work)
-{
- struct timespec now, diff;
-
- now = CURRENT_TIME;
- diff = timespec_sub(now, work->mark);
-
- if (diff.tv_sec < 0)
- seq_puts(m, " -ve ");
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000)
- seq_printf(m, "%3luns ", diff.tv_nsec);
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000)
- seq_printf(m, "%3luus ", diff.tv_nsec / 1000);
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000000)
- seq_printf(m, "%3lums ", diff.tv_nsec / 1000000);
- else if (diff.tv_sec <= 1)
- seq_puts(m, " 1s ");
- else if (diff.tv_sec < 60)
- seq_printf(m, "%4lus ", diff.tv_sec);
- else if (diff.tv_sec < 60 * 60)
- seq_printf(m, "%4lum ", diff.tv_sec / 60);
- else if (diff.tv_sec < 60 * 60 * 24)
- seq_printf(m, "%4luh ", diff.tv_sec / 3600);
- else
- seq_puts(m, "exces ");
-}
-
-/*
- * Describe a slow work item for debugfs
- */
-static int slow_work_runqueue_show(struct seq_file *m, void *v)
-{
- struct slow_work *work;
- struct list_head *p = v;
- unsigned long id;
-
- switch ((unsigned long) v) {
- case 1:
- seq_puts(m, "THR PID ITEM ADDR FL MARK DESC\n");
- return 0;
- case 2:
- seq_puts(m, "=== ===== ================ == ===== ==========\n");
- return 0;
-
- case 3 ... 3 + SLOW_WORK_THREAD_LIMIT - 1:
- id = (unsigned long) v - 3;
-
- read_lock(&slow_work_execs_lock);
- work = slow_work_execs[id];
- if (work) {
- smp_read_barrier_depends();
-
- seq_printf(m, "%3lu %5d %16p %2lx ",
- id, slow_work_pids[id], work, work->flags);
- slow_work_print_mark(m, work);
-
- if (work->ops->desc)
- work->ops->desc(work, m);
- seq_putc(m, '\n');
- }
- read_unlock(&slow_work_execs_lock);
- return 0;
-
- default:
- work = list_entry(p, struct slow_work, link);
- seq_printf(m, "%3s - %16p %2lx ",
- work->flags & SLOW_WORK_VERY_SLOW ? "vsq" : "sq",
- work, work->flags);
- slow_work_print_mark(m, work);
-
- if (work->ops->desc)
- work->ops->desc(work, m);
- seq_putc(m, '\n');
- return 0;
- }
-}
-
-/*
- * map the iterator to a work item
- */
-static void *slow_work_runqueue_index(struct seq_file *m, loff_t *_pos)
-{
- struct list_head *p;
- unsigned long count, id;
-
- switch (*_pos >> ITERATOR_SHIFT) {
- case 0x0:
- if (*_pos == 0)
- *_pos = 1;
- if (*_pos < 3)
- return (void *)(unsigned long) *_pos;
- if (*_pos < 3 + SLOW_WORK_THREAD_LIMIT)
- for (id = *_pos - 3;
- id < SLOW_WORK_THREAD_LIMIT;
- id++, (*_pos)++)
- if (slow_work_execs[id])
- return (void *)(unsigned long) *_pos;
- *_pos = 0x1UL << ITERATOR_SHIFT;
-
- case 0x1:
- count = *_pos & ITERATOR_COUNTER;
- list_for_each(p, &slow_work_queue) {
- if (count == 0)
- return p;
- count--;
- }
- *_pos = 0x2UL << ITERATOR_SHIFT;
-
- case 0x2:
- count = *_pos & ITERATOR_COUNTER;
- list_for_each(p, &vslow_work_queue) {
- if (count == 0)
- return p;
- count--;
- }
- *_pos = 0x3UL << ITERATOR_SHIFT;
-
- default:
- return NULL;
- }
-}
-
-/*
- * set up the iterator to start reading from the first line
- */
-static void *slow_work_runqueue_start(struct seq_file *m, loff_t *_pos)
-{
- spin_lock_irq(&slow_work_queue_lock);
- return slow_work_runqueue_index(m, _pos);
-}
-
-/*
- * move to the next line
- */
-static void *slow_work_runqueue_next(struct seq_file *m, void *v, loff_t *_pos)
-{
- struct list_head *p = v;
- unsigned long selector = *_pos >> ITERATOR_SHIFT;
-
- (*_pos)++;
- switch (selector) {
- case 0x0:
- return slow_work_runqueue_index(m, _pos);
-
- case 0x1:
- if (*_pos >> ITERATOR_SHIFT == 0x1) {
- p = p->next;
- if (p != &slow_work_queue)
- return p;
- }
- *_pos = 0x2UL << ITERATOR_SHIFT;
- p = &vslow_work_queue;
-
- case 0x2:
- if (*_pos >> ITERATOR_SHIFT == 0x2) {
- p = p->next;
- if (p != &vslow_work_queue)
- return p;
- }
- *_pos = 0x3UL << ITERATOR_SHIFT;
-
- default:
- return NULL;
- }
-}
-
-/*
- * clean up after reading
- */
-static void slow_work_runqueue_stop(struct seq_file *m, void *v)
-{
- spin_unlock_irq(&slow_work_queue_lock);
-}
-
-static const struct seq_operations slow_work_runqueue_ops = {
- .start = slow_work_runqueue_start,
- .stop = slow_work_runqueue_stop,
- .next = slow_work_runqueue_next,
- .show = slow_work_runqueue_show,
-};
-
-/*
- * open "/sys/kernel/debug/slow_work/runqueue" to list queue contents
- */
-static int slow_work_runqueue_open(struct inode *inode, struct file *file)
-{
- return seq_open(file, &slow_work_runqueue_ops);
-}
-
-const struct file_operations slow_work_runqueue_fops = {
- .owner = THIS_MODULE,
- .open = slow_work_runqueue_open,
- .read = seq_read,
- .llseek = seq_lseek,
- .release = seq_release,
-};
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
deleted file mode 100644
index 7d3f4fa..0000000
--- a/kernel/slow-work.c
+++ /dev/null
@@ -1,1068 +0,0 @@
-/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
- *
- * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells ([email protected])
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- *
- * See Documentation/slow-work.txt
- */
-
-#include <linux/module.h>
-#include <linux/slow-work.h>
-#include <linux/kthread.h>
-#include <linux/freezer.h>
-#include <linux/wait.h>
-#include <linux/debugfs.h>
-#include "slow-work.h"
-
-static void slow_work_cull_timeout(unsigned long);
-static void slow_work_oom_timeout(unsigned long);
-
-#ifdef CONFIG_SYSCTL
-static int slow_work_min_threads_sysctl(struct ctl_table *, int,
- void __user *, size_t *, loff_t *);
-
-static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
- void __user *, size_t *, loff_t *);
-#endif
-
-/*
- * The pool of threads has at least min threads in it as long as someone is
- * using the facility, and may have as many as max.
- *
- * A portion of the pool may be processing very slow operations.
- */
-static unsigned slow_work_min_threads = 2;
-static unsigned slow_work_max_threads = 4;
-static unsigned vslow_work_proportion = 50; /* % of threads that may process
- * very slow work */
-
-#ifdef CONFIG_SYSCTL
-static const int slow_work_min_min_threads = 2;
-static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
-static const int slow_work_min_vslow = 1;
-static const int slow_work_max_vslow = 99;
-
-ctl_table slow_work_sysctls[] = {
- {
- .procname = "min-threads",
- .data = &slow_work_min_threads,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = slow_work_min_threads_sysctl,
- .extra1 = (void *) &slow_work_min_min_threads,
- .extra2 = &slow_work_max_threads,
- },
- {
- .procname = "max-threads",
- .data = &slow_work_max_threads,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = slow_work_max_threads_sysctl,
- .extra1 = &slow_work_min_threads,
- .extra2 = (void *) &slow_work_max_max_threads,
- },
- {
- .procname = "vslow-percentage",
- .data = &vslow_work_proportion,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = proc_dointvec_minmax,
- .extra1 = (void *) &slow_work_min_vslow,
- .extra2 = (void *) &slow_work_max_vslow,
- },
- {}
-};
-#endif
-
-/*
- * The active state of the thread pool
- */
-static atomic_t slow_work_thread_count;
-static atomic_t vslow_work_executing_count;
-
-static bool slow_work_may_not_start_new_thread;
-static bool slow_work_cull; /* cull a thread due to lack of activity */
-static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
-static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
-static struct slow_work slow_work_new_thread; /* new thread starter */
-
-/*
- * slow work ID allocation (use slow_work_queue_lock)
- */
-static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
-
-/*
- * Unregistration tracking to prevent put_ref() from disappearing during module
- * unload
- */
-#ifdef CONFIG_MODULES
-static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
-static struct module *slow_work_unreg_module;
-static struct slow_work *slow_work_unreg_work_item;
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
-static DEFINE_MUTEX(slow_work_unreg_sync_lock);
-
-static void slow_work_set_thread_processing(int id, struct slow_work *work)
-{
- if (work)
- slow_work_thread_processing[id] = work->owner;
-}
-static void slow_work_done_thread_processing(int id, struct slow_work *work)
-{
- struct module *module = slow_work_thread_processing[id];
-
- slow_work_thread_processing[id] = NULL;
- smp_mb();
- if (slow_work_unreg_work_item == work ||
- slow_work_unreg_module == module)
- wake_up_all(&slow_work_unreg_wq);
-}
-static void slow_work_clear_thread_processing(int id)
-{
- slow_work_thread_processing[id] = NULL;
-}
-#else
-static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
-static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
-static void slow_work_clear_thread_processing(int id) {}
-#endif
-
-/*
- * Data for tracking currently executing items for indication through /proc
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
-pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
-DEFINE_RWLOCK(slow_work_execs_lock);
-#endif
-
-/*
- * The queues of work items and the lock governing access to them. These are
- * shared between all the CPUs. It doesn't make sense to have per-CPU queues
- * as the number of threads bears no relation to the number of CPUs.
- *
- * There are two queues of work items: one for slow work items, and one for
- * very slow work items.
- */
-LIST_HEAD(slow_work_queue);
-LIST_HEAD(vslow_work_queue);
-DEFINE_SPINLOCK(slow_work_queue_lock);
-
-/*
- * The following are two wait queues that get pinged when a work item is placed
- * on an empty queue. These allow work items that are hogging a thread by
- * sleeping in a way that could be deferred to yield their thread and enqueue
- * themselves.
- */
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
-static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
-
-/*
- * The thread controls. A variable used to signal to the threads that they
- * should exit when the queue is empty, a waitqueue used by the threads to wait
- * for signals, and a completion set by the last thread to exit.
- */
-static bool slow_work_threads_should_exit;
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
-static DECLARE_COMPLETION(slow_work_last_thread_exited);
-
-/*
- * The number of users of the thread pool and its lock. Whilst this is zero we
- * have no threads hanging around, and when this reaches zero, we wait for all
- * active or queued work items to complete and kill all the threads we do have.
- */
-static int slow_work_user_count;
-static DEFINE_MUTEX(slow_work_user_lock);
-
-static inline int slow_work_get_ref(struct slow_work *work)
-{
- if (work->ops->get_ref)
- return work->ops->get_ref(work);
-
- return 0;
-}
-
-static inline void slow_work_put_ref(struct slow_work *work)
-{
- if (work->ops->put_ref)
- work->ops->put_ref(work);
-}
-
-/*
- * Calculate the maximum number of active threads in the pool that are
- * permitted to process very slow work items.
- *
- * The answer is rounded up to at least 1, but may not equal or exceed the
- * maximum number of the threads in the pool. This means we always have at
- * least one thread that can process slow work items, and we always have at
- * least one thread that won't get tied up doing so.
- */
-static unsigned slow_work_calc_vsmax(void)
-{
- unsigned vsmax;
-
- vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
- vsmax /= 100;
- vsmax = max(vsmax, 1U);
- return min(vsmax, slow_work_max_threads - 1);
-}
-
-/*
- * Attempt to execute stuff queued on a slow thread. Return true if we managed
- * it, false if there was nothing to do.
- */
-static noinline bool slow_work_execute(int id)
-{
- struct slow_work *work = NULL;
- unsigned vsmax;
- bool very_slow;
-
- vsmax = slow_work_calc_vsmax();
-
- /* see if we can schedule a new thread to be started if we're not
- * keeping up with the work */
- if (!waitqueue_active(&slow_work_thread_wq) &&
- (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
- atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
- !slow_work_may_not_start_new_thread)
- slow_work_enqueue(&slow_work_new_thread);
-
- /* find something to execute */
- spin_lock_irq(&slow_work_queue_lock);
- if (!list_empty(&vslow_work_queue) &&
- atomic_read(&vslow_work_executing_count) < vsmax) {
- work = list_entry(vslow_work_queue.next,
- struct slow_work, link);
- if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
- BUG();
- list_del_init(&work->link);
- atomic_inc(&vslow_work_executing_count);
- very_slow = true;
- } else if (!list_empty(&slow_work_queue)) {
- work = list_entry(slow_work_queue.next,
- struct slow_work, link);
- if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
- BUG();
- list_del_init(&work->link);
- very_slow = false;
- } else {
- very_slow = false; /* avoid the compiler warning */
- }
-
- slow_work_set_thread_processing(id, work);
- if (work) {
- slow_work_mark_time(work);
- slow_work_begin_exec(id, work);
- }
-
- spin_unlock_irq(&slow_work_queue_lock);
-
- if (!work)
- return false;
-
- if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
- BUG();
-
- /* don't execute if the work is in the process of being cancelled */
- if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
- work->ops->execute(work);
-
- if (very_slow)
- atomic_dec(&vslow_work_executing_count);
- clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
-
- /* wake up anyone waiting for this work to be complete */
- wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
-
- slow_work_end_exec(id, work);
-
- /* if someone tried to enqueue the item whilst we were executing it,
- * then it'll be left unenqueued to avoid multiple threads trying to
- * execute it simultaneously
- *
- * there is, however, a race between us testing the pending flag and
- * getting the spinlock, and between the enqueuer setting the pending
- * flag and getting the spinlock, so we use a deferral bit to tell us
- * if the enqueuer got there first
- */
- if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
- spin_lock_irq(&slow_work_queue_lock);
-
- if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
- test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
- goto auto_requeue;
-
- spin_unlock_irq(&slow_work_queue_lock);
- }
-
- /* sort out the race between module unloading and put_ref() */
- slow_work_put_ref(work);
- slow_work_done_thread_processing(id, work);
-
- return true;
-
-auto_requeue:
- /* we must complete the enqueue operation
- * - we transfer our ref on the item back to the appropriate queue
- * - don't wake another thread up as we're awake already
- */
- slow_work_mark_time(work);
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
- list_add_tail(&work->link, &vslow_work_queue);
- else
- list_add_tail(&work->link, &slow_work_queue);
- spin_unlock_irq(&slow_work_queue_lock);
- slow_work_clear_thread_processing(id);
- return true;
-}
-
-/**
- * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
- * work: The work item under execution that wants to sleep
- * _timeout: Scheduler sleep timeout
- *
- * Allow a requeueable work item to sleep on a slow-work processor thread until
- * that thread is needed to do some other work or the sleep is interrupted by
- * some other event.
- *
- * The caller must set up a wake up event before calling this and must have set
- * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
- * condition before calling this function as no test is made here.
- *
- * False is returned if there is nothing on the queue; true is returned if the
- * work item should be requeued
- */
-bool slow_work_sleep_till_thread_needed(struct slow_work *work,
- signed long *_timeout)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
-
- DEFINE_WAIT(wait);
-
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- if (!list_empty(queue))
- return true;
-
- add_wait_queue_exclusive(wfo_wq, &wait);
- if (list_empty(queue))
- *_timeout = schedule_timeout(*_timeout);
- finish_wait(wfo_wq, &wait);
-
- return !list_empty(queue);
-}
-EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
-
-/**
- * slow_work_enqueue - Schedule a slow work item for processing
- * @work: The work item to queue
- *
- * Schedule a slow work item for processing. If the item is already undergoing
- * execution, this guarantees not to re-enter the execution routine until the
- * first execution finishes.
- *
- * The item is pinned by this function as it retains a reference to it, managed
- * through the item operations. The item is unpinned once it has been
- * executed.
- *
- * An item may hog the thread that is running it for a relatively large amount
- * of time, sufficient, for example, to perform several lookup, mkdir, create
- * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
- *
- * Conversely, if a number of items are awaiting processing, it may take some
- * time before any given item is given attention. The number of threads in the
- * pool may be increased to deal with demand, but only up to a limit.
- *
- * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
- * the very slow queue, from which only a portion of the threads will be
- * allowed to pick items to execute. This ensures that very slow items won't
- * overly block ones that are just ordinarily slow.
- *
- * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
- * attempted queued)
- */
-int slow_work_enqueue(struct slow_work *work)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
- unsigned long flags;
- int ret;
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- return -ECANCELED;
-
- BUG_ON(slow_work_user_count <= 0);
- BUG_ON(!work);
- BUG_ON(!work->ops);
-
- /* when honouring an enqueue request, we only promise that we will run
- * the work function in the future; we do not promise to run it once
- * per enqueue request
- *
- * we use the PENDING bit to merge together repeat requests without
- * having to disable IRQs and take the spinlock, whilst still
- * maintaining our promise
- */
- if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
- goto cancelled;
-
- /* we promise that we will not attempt to execute the work
- * function in more than one thread simultaneously
- *
- * this, however, leaves us with a problem if we're asked to
- * enqueue the work whilst someone is executing the work
- * function as simply queueing the work immediately means that
- * another thread may try executing it whilst it is already
- * under execution
- *
- * to deal with this, we set the ENQ_DEFERRED bit instead of
- * enqueueing, and the thread currently executing the work
- * function will enqueue the work item when the work function
- * returns and it has cleared the EXECUTING bit
- */
- if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
- set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
- } else {
- ret = slow_work_get_ref(work);
- if (ret < 0)
- goto failed;
- slow_work_mark_time(work);
- list_add_tail(&work->link, queue);
- wake_up(&slow_work_thread_wq);
-
- /* if someone who could be requeued is sleeping on a
- * thread, then ask them to yield their thread */
- if (work->link.prev == queue)
- wake_up(wfo_wq);
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- }
- return 0;
-
-cancelled:
- ret = -ECANCELED;
-failed:
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return ret;
-}
-EXPORT_SYMBOL(slow_work_enqueue);
-
-static int slow_work_wait(void *word)
-{
- schedule();
- return 0;
-}
-
-/**
- * slow_work_cancel - Cancel a slow work item
- * @work: The work item to cancel
- *
- * This function will cancel a previously enqueued work item. If we cannot
- * cancel the work item, it is guarenteed to have run when this function
- * returns.
- */
-void slow_work_cancel(struct slow_work *work)
-{
- bool wait = true, put = false;
-
- set_bit(SLOW_WORK_CANCELLING, &work->flags);
- smp_mb();
-
- /* if the work item is a delayed work item with an active timer, we
- * need to wait for the timer to finish _before_ getting the spinlock,
- * lest we deadlock against the timer routine
- *
- * the timer routine will leave DELAYED set if it notices the
- * CANCELLING flag in time
- */
- if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
- struct delayed_slow_work *dwork =
- container_of(work, struct delayed_slow_work, work);
- del_timer_sync(&dwork->timer);
- }
-
- spin_lock_irq(&slow_work_queue_lock);
-
- if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
- /* the timer routine aborted or never happened, so we are left
- * holding the timer's reference on the item and should just
- * drop the pending flag and wait for any ongoing execution to
- * finish */
- struct delayed_slow_work *dwork =
- container_of(work, struct delayed_slow_work, work);
-
- BUG_ON(timer_pending(&dwork->timer));
- BUG_ON(!list_empty(&work->link));
-
- clear_bit(SLOW_WORK_DELAYED, &work->flags);
- put = true;
- clear_bit(SLOW_WORK_PENDING, &work->flags);
-
- } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
- !list_empty(&work->link)) {
- /* the link in the pending queue holds a reference on the item
- * that we will need to release */
- list_del_init(&work->link);
- wait = false;
- put = true;
- clear_bit(SLOW_WORK_PENDING, &work->flags);
-
- } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
- /* the executor is holding our only reference on the item, so
- * we merely need to wait for it to finish executing */
- clear_bit(SLOW_WORK_PENDING, &work->flags);
- }
-
- spin_unlock_irq(&slow_work_queue_lock);
-
- /* the EXECUTING flag is set by the executor whilst the spinlock is set
- * and before the item is dequeued - so assuming the above doesn't
- * actually dequeue it, simply waiting for the EXECUTING flag to be
- * released here should be sufficient */
- if (wait)
- wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
- TASK_UNINTERRUPTIBLE);
-
- clear_bit(SLOW_WORK_CANCELLING, &work->flags);
- if (put)
- slow_work_put_ref(work);
-}
-EXPORT_SYMBOL(slow_work_cancel);
-
-/*
- * Handle expiry of the delay timer, indicating that a delayed slow work item
- * should now be queued if not cancelled
- */
-static void delayed_slow_work_timer(unsigned long data)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
- struct slow_work *work = (struct slow_work *) data;
- unsigned long flags;
- bool queued = false, put = false, first = false;
-
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
- if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
- clear_bit(SLOW_WORK_DELAYED, &work->flags);
-
- if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
- /* we discard the reference the timer was holding in
- * favour of the one the executor holds */
- set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
- put = true;
- } else {
- slow_work_mark_time(work);
- list_add_tail(&work->link, queue);
- queued = true;
- if (work->link.prev == queue)
- first = true;
- }
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- if (put)
- slow_work_put_ref(work);
- if (first)
- wake_up(wfo_wq);
- if (queued)
- wake_up(&slow_work_thread_wq);
-}
-
-/**
- * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
- * @dwork: The delayed work item to queue
- * @delay: When to start executing the work, in jiffies from now
- *
- * This is similar to slow_work_enqueue(), but it adds a delay before the work
- * is actually queued for processing.
- *
- * The item can have delayed processing requested on it whilst it is being
- * executed. The delay will begin immediately, and if it expires before the
- * item finishes executing, the item will be placed back on the queue when it
- * has done executing.
- */
-int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
- unsigned long delay)
-{
- struct slow_work *work = &dwork->work;
- unsigned long flags;
- int ret;
-
- if (delay == 0)
- return slow_work_enqueue(&dwork->work);
-
- BUG_ON(slow_work_user_count <= 0);
- BUG_ON(!work);
- BUG_ON(!work->ops);
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- return -ECANCELED;
-
- if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- goto cancelled;
-
- /* the timer holds a reference whilst it is pending */
- ret = slow_work_get_ref(work);
- if (ret < 0)
- goto cant_get_ref;
-
- if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
- BUG();
- dwork->timer.expires = jiffies + delay;
- dwork->timer.data = (unsigned long) work;
- dwork->timer.function = delayed_slow_work_timer;
- add_timer(&dwork->timer);
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- }
-
- return 0;
-
-cancelled:
- ret = -ECANCELED;
-cant_get_ref:
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return ret;
-}
-EXPORT_SYMBOL(delayed_slow_work_enqueue);
-
-/*
- * Schedule a cull of the thread pool at some time in the near future
- */
-static void slow_work_schedule_cull(void)
-{
- mod_timer(&slow_work_cull_timer,
- round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
-}
-
-/*
- * Worker thread culling algorithm
- */
-static bool slow_work_cull_thread(void)
-{
- unsigned long flags;
- bool do_cull = false;
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (slow_work_cull) {
- slow_work_cull = false;
-
- if (list_empty(&slow_work_queue) &&
- list_empty(&vslow_work_queue) &&
- atomic_read(&slow_work_thread_count) >
- slow_work_min_threads) {
- slow_work_schedule_cull();
- do_cull = true;
- }
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return do_cull;
-}
-
-/*
- * Determine if there is slow work available for dispatch
- */
-static inline bool slow_work_available(int vsmax)
-{
- return !list_empty(&slow_work_queue) ||
- (!list_empty(&vslow_work_queue) &&
- atomic_read(&vslow_work_executing_count) < vsmax);
-}
-
-/*
- * Worker thread dispatcher
- */
-static int slow_work_thread(void *_data)
-{
- int vsmax, id;
-
- DEFINE_WAIT(wait);
-
- set_freezable();
- set_user_nice(current, -5);
-
- /* allocate ourselves an ID */
- spin_lock_irq(&slow_work_queue_lock);
- id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
- BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
- __set_bit(id, slow_work_ids);
- slow_work_set_thread_pid(id, current->pid);
- spin_unlock_irq(&slow_work_queue_lock);
-
- sprintf(current->comm, "kslowd%03u", id);
-
- for (;;) {
- vsmax = vslow_work_proportion;
- vsmax *= atomic_read(&slow_work_thread_count);
- vsmax /= 100;
-
- prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
- TASK_INTERRUPTIBLE);
- if (!freezing(current) &&
- !slow_work_threads_should_exit &&
- !slow_work_available(vsmax) &&
- !slow_work_cull)
- schedule();
- finish_wait(&slow_work_thread_wq, &wait);
-
- try_to_freeze();
-
- vsmax = vslow_work_proportion;
- vsmax *= atomic_read(&slow_work_thread_count);
- vsmax /= 100;
-
- if (slow_work_available(vsmax) && slow_work_execute(id)) {
- cond_resched();
- if (list_empty(&slow_work_queue) &&
- list_empty(&vslow_work_queue) &&
- atomic_read(&slow_work_thread_count) >
- slow_work_min_threads)
- slow_work_schedule_cull();
- continue;
- }
-
- if (slow_work_threads_should_exit)
- break;
-
- if (slow_work_cull && slow_work_cull_thread())
- break;
- }
-
- spin_lock_irq(&slow_work_queue_lock);
- slow_work_set_thread_pid(id, 0);
- __clear_bit(id, slow_work_ids);
- spin_unlock_irq(&slow_work_queue_lock);
-
- if (atomic_dec_and_test(&slow_work_thread_count))
- complete_and_exit(&slow_work_last_thread_exited, 0);
- return 0;
-}
-
-/*
- * Handle thread cull timer expiration
- */
-static void slow_work_cull_timeout(unsigned long data)
-{
- slow_work_cull = true;
- wake_up(&slow_work_thread_wq);
-}
-
-/*
- * Start a new slow work thread
- */
-static void slow_work_new_thread_execute(struct slow_work *work)
-{
- struct task_struct *p;
-
- if (slow_work_threads_should_exit)
- return;
-
- if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
- return;
-
- if (!mutex_trylock(&slow_work_user_lock))
- return;
-
- slow_work_may_not_start_new_thread = true;
- atomic_inc(&slow_work_thread_count);
- p = kthread_run(slow_work_thread, NULL, "kslowd");
- if (IS_ERR(p)) {
- printk(KERN_DEBUG "Slow work thread pool: OOM\n");
- if (atomic_dec_and_test(&slow_work_thread_count))
- BUG(); /* we're running on a slow work thread... */
- mod_timer(&slow_work_oom_timer,
- round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
- } else {
- /* ratelimit the starting of new threads */
- mod_timer(&slow_work_oom_timer, jiffies + 1);
- }
-
- mutex_unlock(&slow_work_user_lock);
-}
-
-static const struct slow_work_ops slow_work_new_thread_ops = {
- .owner = THIS_MODULE,
- .execute = slow_work_new_thread_execute,
-#ifdef CONFIG_SLOW_WORK_DEBUG
- .desc = slow_work_new_thread_desc,
-#endif
-};
-
-/*
- * post-OOM new thread start suppression expiration
- */
-static void slow_work_oom_timeout(unsigned long data)
-{
- slow_work_may_not_start_new_thread = false;
-}
-
-#ifdef CONFIG_SYSCTL
-/*
- * Handle adjustment of the minimum number of threads
- */
-static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
- void __user *buffer,
- size_t *lenp, loff_t *ppos)
-{
- int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
- int n;
-
- if (ret == 0) {
- mutex_lock(&slow_work_user_lock);
- if (slow_work_user_count > 0) {
- /* see if we need to start or stop threads */
- n = atomic_read(&slow_work_thread_count) -
- slow_work_min_threads;
-
- if (n < 0 && !slow_work_may_not_start_new_thread)
- slow_work_enqueue(&slow_work_new_thread);
- else if (n > 0)
- slow_work_schedule_cull();
- }
- mutex_unlock(&slow_work_user_lock);
- }
-
- return ret;
-}
-
-/*
- * Handle adjustment of the maximum number of threads
- */
-static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
- void __user *buffer,
- size_t *lenp, loff_t *ppos)
-{
- int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
- int n;
-
- if (ret == 0) {
- mutex_lock(&slow_work_user_lock);
- if (slow_work_user_count > 0) {
- /* see if we need to stop threads */
- n = slow_work_max_threads -
- atomic_read(&slow_work_thread_count);
-
- if (n < 0)
- slow_work_schedule_cull();
- }
- mutex_unlock(&slow_work_user_lock);
- }
-
- return ret;
-}
-#endif /* CONFIG_SYSCTL */
-
-/**
- * slow_work_register_user - Register a user of the facility
- * @module: The module about to make use of the facility
- *
- * Register a user of the facility, starting up the initial threads if there
- * aren't any other users at this point. This will return 0 if successful, or
- * an error if not.
- */
-int slow_work_register_user(struct module *module)
-{
- struct task_struct *p;
- int loop;
-
- mutex_lock(&slow_work_user_lock);
-
- if (slow_work_user_count == 0) {
- printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
- init_completion(&slow_work_last_thread_exited);
-
- slow_work_threads_should_exit = false;
- slow_work_init(&slow_work_new_thread,
- &slow_work_new_thread_ops);
- slow_work_may_not_start_new_thread = false;
- slow_work_cull = false;
-
- /* start the minimum number of threads */
- for (loop = 0; loop < slow_work_min_threads; loop++) {
- atomic_inc(&slow_work_thread_count);
- p = kthread_run(slow_work_thread, NULL, "kslowd");
- if (IS_ERR(p))
- goto error;
- }
- printk(KERN_NOTICE "Slow work thread pool: Ready\n");
- }
-
- slow_work_user_count++;
- mutex_unlock(&slow_work_user_lock);
- return 0;
-
-error:
- if (atomic_dec_and_test(&slow_work_thread_count))
- complete(&slow_work_last_thread_exited);
- if (loop > 0) {
- printk(KERN_ERR "Slow work thread pool:"
- " Aborting startup on ENOMEM\n");
- slow_work_threads_should_exit = true;
- wake_up_all(&slow_work_thread_wq);
- wait_for_completion(&slow_work_last_thread_exited);
- printk(KERN_ERR "Slow work thread pool: Aborted\n");
- }
- mutex_unlock(&slow_work_user_lock);
- return PTR_ERR(p);
-}
-EXPORT_SYMBOL(slow_work_register_user);
-
-/*
- * wait for all outstanding items from the calling module to complete
- * - note that more items may be queued whilst we're waiting
- */
-static void slow_work_wait_for_items(struct module *module)
-{
-#ifdef CONFIG_MODULES
- DECLARE_WAITQUEUE(myself, current);
- struct slow_work *work;
- int loop;
-
- mutex_lock(&slow_work_unreg_sync_lock);
- add_wait_queue(&slow_work_unreg_wq, &myself);
-
- for (;;) {
- spin_lock_irq(&slow_work_queue_lock);
-
- /* first of all, we wait for the last queued item in each list
- * to be processed */
- list_for_each_entry_reverse(work, &vslow_work_queue, link) {
- if (work->owner == module) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- slow_work_unreg_work_item = work;
- goto do_wait;
- }
- }
- list_for_each_entry_reverse(work, &slow_work_queue, link) {
- if (work->owner == module) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- slow_work_unreg_work_item = work;
- goto do_wait;
- }
- }
-
- /* then we wait for the items being processed to finish */
- slow_work_unreg_module = module;
- smp_mb();
- for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
- if (slow_work_thread_processing[loop] == module)
- goto do_wait;
- }
- spin_unlock_irq(&slow_work_queue_lock);
- break; /* okay, we're done */
-
- do_wait:
- spin_unlock_irq(&slow_work_queue_lock);
- schedule();
- slow_work_unreg_work_item = NULL;
- slow_work_unreg_module = NULL;
- }
-
- remove_wait_queue(&slow_work_unreg_wq, &myself);
- mutex_unlock(&slow_work_unreg_sync_lock);
-#endif /* CONFIG_MODULES */
-}
-
-/**
- * slow_work_unregister_user - Unregister a user of the facility
- * @module: The module whose items should be cleared
- *
- * Unregister a user of the facility, killing all the threads if this was the
- * last one.
- *
- * This waits for all the work items belonging to the nominated module to go
- * away before proceeding.
- */
-void slow_work_unregister_user(struct module *module)
-{
- /* first of all, wait for all outstanding items from the calling module
- * to complete */
- if (module)
- slow_work_wait_for_items(module);
-
- /* then we can actually go about shutting down the facility if need
- * be */
- mutex_lock(&slow_work_user_lock);
-
- BUG_ON(slow_work_user_count <= 0);
-
- slow_work_user_count--;
- if (slow_work_user_count == 0) {
- printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
- slow_work_threads_should_exit = true;
- del_timer_sync(&slow_work_cull_timer);
- del_timer_sync(&slow_work_oom_timer);
- wake_up_all(&slow_work_thread_wq);
- wait_for_completion(&slow_work_last_thread_exited);
- printk(KERN_NOTICE "Slow work thread pool:"
- " Shut down complete\n");
- }
-
- mutex_unlock(&slow_work_user_lock);
-}
-EXPORT_SYMBOL(slow_work_unregister_user);
-
-/*
- * Initialise the slow work facility
- */
-static int __init init_slow_work(void)
-{
- unsigned nr_cpus = num_possible_cpus();
-
- if (slow_work_max_threads < nr_cpus)
- slow_work_max_threads = nr_cpus;
-#ifdef CONFIG_SYSCTL
- if (slow_work_max_max_threads < nr_cpus * 2)
- slow_work_max_max_threads = nr_cpus * 2;
-#endif
-#ifdef CONFIG_SLOW_WORK_DEBUG
- {
- struct dentry *dbdir;
-
- dbdir = debugfs_create_dir("slow_work", NULL);
- if (dbdir && !IS_ERR(dbdir))
- debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
- NULL, &slow_work_runqueue_fops);
- }
-#endif
- return 0;
-}
-
-subsys_initcall(init_slow_work);
diff --git a/kernel/slow-work.h b/kernel/slow-work.h
deleted file mode 100644
index a29ebd1..0000000
--- a/kernel/slow-work.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/* Slow work private definitions
- *
- * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells ([email protected])
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- */
-
-#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of
- * things to do */
-#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
- * OOM */
-
-#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */
-
-/*
- * slow-work.c
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-extern struct slow_work *slow_work_execs[];
-extern pid_t slow_work_pids[];
-extern rwlock_t slow_work_execs_lock;
-#endif
-
-extern struct list_head slow_work_queue;
-extern struct list_head vslow_work_queue;
-extern spinlock_t slow_work_queue_lock;
-
-/*
- * slow-work-debugfs.c
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-extern const struct file_operations slow_work_runqueue_fops;
-
-extern void slow_work_new_thread_desc(struct slow_work *, struct seq_file *);
-#endif
-
-/*
- * Helper functions
- */
-static inline void slow_work_set_thread_pid(int id, pid_t pid)
-{
-#ifdef CONFIG_SLOW_WORK_DEBUG
- slow_work_pids[id] = pid;
-#endif
-}
-
-static inline void slow_work_mark_time(struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_DEBUG
- work->mark = CURRENT_TIME;
-#endif
-}
-
-static inline void slow_work_begin_exec(int id, struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_DEBUG
- slow_work_execs[id] = work;
-#endif
-}
-
-static inline void slow_work_end_exec(int id, struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_DEBUG
- write_lock(&slow_work_execs_lock);
- slow_work_execs[id] = NULL;
- write_unlock(&slow_work_execs_lock);
-#endif
-}
diff --git a/kernel/sysctl.c b/kernel/sysctl.c
index d24f761..5821365 100644
--- a/kernel/sysctl.c
+++ b/kernel/sysctl.c
@@ -50,7 +50,6 @@
#include <linux/acpi.h>
#include <linux/reboot.h>
#include <linux/ftrace.h>
-#include <linux/slow-work.h>
#include <linux/perf_event.h>
#include <linux/kprobes.h>
#include <linux/pipe_fs_i.h>
@@ -906,13 +905,6 @@ static struct ctl_table kern_table[] = {
.proc_handler = proc_dointvec,
},
#endif
-#ifdef CONFIG_SLOW_WORK
- {
- .procname = "slow-work",
- .mode = 0555,
- .child = slow_work_sysctls,
- },
-#endif
#ifdef CONFIG_PERF_EVENTS
{
.procname = "perf_event_paranoid",
--
1.6.4.2
Make fscache object state transition callbacks use workqueue instead
of slow-work. New dedicated unbound CPU workqueue fscache_object_wq
is created. get/put callbacks are renamed and modified to take
@object and called directly from the enqueue wrapper and the work
function. While at it, make all open coded instances of get/put to
use fscache_get/put_object().
* Unbound workqueue is used.
* work_busy() output is printed instead of slow-work flags in object
debugging outputs. They mean basically the same thing bit-for-bit.
* sysctl fscache.object_max_active added to control concurrency. The
default value is nr_cpus clamped between 4 and
WQ_UNBOUND_MAX_ACTIVE.
* slow_work_sleep_till_thread_needed() is replaced with fscache
private implementation fscache_object_sleep_till_congested() which
waits on fscache_object_wq congestion.
* debugfs support is dropped for now. Tracing API based debug
facility is planned to be added.
Signed-off-by: Tejun Heo <[email protected]>
Cc: David Howells <[email protected]>
---
Documentation/filesystems/caching/fscache.txt | 10 +-
fs/cachefiles/namei.c | 13 ++--
fs/fscache/internal.h | 7 ++
fs/fscache/main.c | 76 ++++++++++++++++++
fs/fscache/object-list.c | 11 +--
fs/fscache/object.c | 106 ++++++++++++------------
include/linux/fscache-cache.h | 9 ++-
7 files changed, 158 insertions(+), 74 deletions(-)
diff --git a/Documentation/filesystems/caching/fscache.txt b/Documentation/filesystems/caching/fscache.txt
index a91e2e2..770267a 100644
--- a/Documentation/filesystems/caching/fscache.txt
+++ b/Documentation/filesystems/caching/fscache.txt
@@ -343,8 +343,8 @@ This will look something like:
[root@andromeda ~]# head /proc/fs/fscache/objects
OBJECT PARENT STAT CHLDN OPS OOP IPR EX READS EM EV F S | NETFS_COOKIE_DEF TY FL NETFS_DATA OBJECT_KEY, AUX_DATA
======== ======== ==== ===== === === === == ===== == == = = | ================ == == ================ ================
- 17e4b 2 ACTV 0 0 0 0 0 0 7b 4 0 8 | NFS.fh DT 0 ffff88001dd82820 010006017edcf8bbc93b43298fdfbe71e50b57b13a172c0117f38472, e567634700000000000000000000000063f2404a000000000000000000000000c9030000000000000000000063f2404a
- 1693a 2 ACTV 0 0 0 0 0 0 7b 4 0 8 | NFS.fh DT 0 ffff88002db23380 010006017edcf8bbc93b43298fdfbe71e50b57b1e0162c01a2df0ea6, 420ebc4a000000000000000000000000420ebc4a0000000000000000000000000e1801000000000000000000420ebc4a
+ 17e4b 2 ACTV 0 0 0 0 0 0 7b 4 0 0 | NFS.fh DT 0 ffff88001dd82820 010006017edcf8bbc93b43298fdfbe71e50b57b13a172c0117f38472, e567634700000000000000000000000063f2404a000000000000000000000000c9030000000000000000000063f2404a
+ 1693a 2 ACTV 0 0 0 0 0 0 7b 4 0 0 | NFS.fh DT 0 ffff88002db23380 010006017edcf8bbc93b43298fdfbe71e50b57b1e0162c01a2df0ea6, 420ebc4a000000000000000000000000420ebc4a0000000000000000000000000e1801000000000000000000420ebc4a
where the first set of columns before the '|' describe the object:
@@ -362,7 +362,7 @@ where the first set of columns before the '|' describe the object:
EM Object's event mask
EV Events raised on this object
F Object flags
- S Object slow-work work item flags
+ S Object work item busy state mask (1:pending 2:running)
and the second set of columns describe the object's cookie, if present:
@@ -395,8 +395,8 @@ and the following paired letters:
w Show objects that don't have pending writes
R Show objects that have outstanding reads
r Show objects that don't have outstanding reads
- S Show objects that have slow work queued
- s Show objects that don't have slow work queued
+ S Show objects that have work queued
+ s Show objects that don't have work queued
If neither side of a letter pair is given, then both are implied. For example:
diff --git a/fs/cachefiles/namei.c b/fs/cachefiles/namei.c
index f4a7840..42c7faf 100644
--- a/fs/cachefiles/namei.c
+++ b/fs/cachefiles/namei.c
@@ -37,9 +37,9 @@ void __cachefiles_printk_object(struct cachefiles_object *object,
printk(KERN_ERR "%sobject: OBJ%x\n",
prefix, object->fscache.debug_id);
- printk(KERN_ERR "%sobjstate=%s fl=%lx swfl=%lx ev=%lx[%lx]\n",
+ printk(KERN_ERR "%sobjstate=%s fl=%lx wbusy=%x ev=%lx[%lx]\n",
prefix, fscache_object_states[object->fscache.state],
- object->fscache.flags, object->fscache.work.flags,
+ object->fscache.flags, work_busy(&object->fscache.work),
object->fscache.events,
object->fscache.event_mask & FSCACHE_OBJECT_EVENTS_MASK);
printk(KERN_ERR "%sops=%u inp=%u exc=%u\n",
@@ -212,7 +212,7 @@ wait_for_old_object:
/* if the object we're waiting for is queued for processing,
* then just put ourselves on the queue behind it */
- if (slow_work_is_queued(&xobject->fscache.work)) {
+ if (work_pending(&xobject->fscache.work)) {
_debug("queue OBJ%x behind OBJ%x immediately",
object->fscache.debug_id,
xobject->fscache.debug_id);
@@ -220,8 +220,7 @@ wait_for_old_object:
}
/* otherwise we sleep until either the object we're waiting for
- * is done, or the slow-work facility wants the thread back to
- * do other work */
+ * is done, or the fscache_object is congested */
wq = bit_waitqueue(&xobject->flags, CACHEFILES_OBJECT_ACTIVE);
init_wait(&wait);
requeue = false;
@@ -229,8 +228,8 @@ wait_for_old_object:
prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE);
if (!test_bit(CACHEFILES_OBJECT_ACTIVE, &xobject->flags))
break;
- requeue = slow_work_sleep_till_thread_needed(
- &object->fscache.work, &timeout);
+
+ requeue = fscache_object_sleep_till_congested(&timeout);
} while (timeout > 0 && !requeue);
finish_wait(wq, &wait);
diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h
index edd7434..6e0b5fb 100644
--- a/fs/fscache/internal.h
+++ b/fs/fscache/internal.h
@@ -82,6 +82,13 @@ extern unsigned fscache_defer_lookup;
extern unsigned fscache_defer_create;
extern unsigned fscache_debug;
extern struct kobject *fscache_root;
+extern struct workqueue_struct *fscache_object_wq;
+DECLARE_PER_CPU(wait_queue_head_t, fscache_object_cong_wait);
+
+static inline bool fscache_object_congested(void)
+{
+ return workqueue_congested(WORK_CPU_UNBOUND, fscache_object_wq);
+}
extern int fscache_wait_bit(void *);
extern int fscache_wait_bit_interruptible(void *);
diff --git a/fs/fscache/main.c b/fs/fscache/main.c
index add6bdb..bb8d4c3 100644
--- a/fs/fscache/main.c
+++ b/fs/fscache/main.c
@@ -15,6 +15,7 @@
#include <linux/sched.h>
#include <linux/completion.h>
#include <linux/slab.h>
+#include <linux/seq_file.h>
#include "internal.h"
MODULE_DESCRIPTION("FS Cache Manager");
@@ -40,22 +41,89 @@ MODULE_PARM_DESC(fscache_debug,
"FS-Cache debugging mask");
struct kobject *fscache_root;
+struct workqueue_struct *fscache_object_wq;
+
+DEFINE_PER_CPU(wait_queue_head_t, fscache_object_cong_wait);
+
+/* these values serve as lower bounds, will be adjusted in fscache_init() */
+static unsigned fscache_object_max_active = 4;
+
+#ifdef CONFIG_SYSCTL
+static struct ctl_table_header *fscache_sysctl_header;
+
+static int fscache_max_active_sysctl(struct ctl_table *table, int write,
+ void __user *buffer,
+ size_t *lenp, loff_t *ppos)
+{
+ struct workqueue_struct **wqp = table->extra1;
+ unsigned int *datap = table->data;
+ int ret;
+
+ ret = proc_dointvec(table, write, buffer, lenp, ppos);
+ if (ret == 0)
+ workqueue_set_max_active(*wqp, *datap);
+ return ret;
+}
+
+ctl_table fscache_sysctls[] = {
+ {
+ .procname = "object_max_active",
+ .data = &fscache_object_max_active,
+ .maxlen = sizeof(unsigned),
+ .mode = 0644,
+ .proc_handler = fscache_max_active_sysctl,
+ .extra1 = &fscache_object_wq,
+ },
+ {}
+};
+
+ctl_table fscache_sysctls_root[] = {
+ {
+ .procname = "fscache",
+ .mode = 0555,
+ .child = fscache_sysctls,
+ },
+ {}
+};
+#endif
/*
* initialise the fs caching module
*/
static int __init fscache_init(void)
{
+ unsigned int nr_cpus = num_possible_cpus();
+ unsigned int cpu;
int ret;
ret = slow_work_register_user(THIS_MODULE);
if (ret < 0)
goto error_slow_work;
+ fscache_object_max_active =
+ clamp_val(nr_cpus,
+ fscache_object_max_active, WQ_UNBOUND_MAX_ACTIVE);
+
+ ret = -ENOMEM;
+ fscache_object_wq = alloc_workqueue("fscache_object", WQ_UNBOUND,
+ fscache_object_max_active);
+ if (!fscache_object_wq)
+ goto error_object_wq;
+
+ for_each_possible_cpu(cpu)
+ init_waitqueue_head(&per_cpu(fscache_object_cong_wait, cpu));
+
ret = fscache_proc_init();
if (ret < 0)
goto error_proc;
+#ifdef CONFIG_SYSCTL
+ ret = -ENOMEM;
+ fscache_sysctl_header = register_sysctl_table(fscache_sysctls_root);
+ if (!fscache_sysctl_header)
+ goto error_sysctl;
+#endif
+
fscache_cookie_jar = kmem_cache_create("fscache_cookie_jar",
sizeof(struct fscache_cookie),
0,
@@ -78,8 +146,14 @@ static int __init fscache_init(void)
error_kobj:
kmem_cache_destroy(fscache_cookie_jar);
error_cookie_jar:
+#ifdef CONFIG_SYSCTL
+ unregister_sysctl_table(fscache_sysctl_header);
+error_sysctl:
+#endif
fscache_proc_cleanup();
error_proc:
+ destroy_workqueue(fscache_object_wq);
+error_object_wq:
slow_work_unregister_user(THIS_MODULE);
error_slow_work:
return ret;
@@ -96,7 +170,9 @@ static void __exit fscache_exit(void)
kobject_put(fscache_root);
kmem_cache_destroy(fscache_cookie_jar);
+ unregister_sysctl_table(fscache_sysctl_header);
fscache_proc_cleanup();
+ destroy_workqueue(fscache_object_wq);
slow_work_unregister_user(THIS_MODULE);
printk(KERN_NOTICE "FS-Cache: Unloaded\n");
}
diff --git a/fs/fscache/object-list.c b/fs/fscache/object-list.c
index 4a8eb31..ebe29c5 100644
--- a/fs/fscache/object-list.c
+++ b/fs/fscache/object-list.c
@@ -34,8 +34,8 @@ struct fscache_objlist_data {
#define FSCACHE_OBJLIST_CONFIG_NOREADS 0x00000200 /* show objects without active reads */
#define FSCACHE_OBJLIST_CONFIG_EVENTS 0x00000400 /* show objects with events */
#define FSCACHE_OBJLIST_CONFIG_NOEVENTS 0x00000800 /* show objects without no events */
-#define FSCACHE_OBJLIST_CONFIG_WORK 0x00001000 /* show objects with slow work */
-#define FSCACHE_OBJLIST_CONFIG_NOWORK 0x00002000 /* show objects without slow work */
+#define FSCACHE_OBJLIST_CONFIG_WORK 0x00001000 /* show objects with work */
+#define FSCACHE_OBJLIST_CONFIG_NOWORK 0x00002000 /* show objects without work */
u8 buf[512]; /* key and aux data buffer */
};
@@ -231,12 +231,11 @@ static int fscache_objlist_show(struct seq_file *m, void *v)
READS, NOREADS);
FILTER(obj->events & obj->event_mask,
EVENTS, NOEVENTS);
- FILTER(obj->work.flags & ~(1UL << SLOW_WORK_VERY_SLOW),
- WORK, NOWORK);
+ FILTER(work_busy(&obj->work), WORK, NOWORK);
}
seq_printf(m,
- "%8x %8x %s %5u %3u %3u %3u %2u %5u %2lx %2lx %1lx %1lx | ",
+ "%8x %8x %s %5u %3u %3u %3u %2u %5u %2lx %2lx %1lx %1x | ",
obj->debug_id,
obj->parent ? obj->parent->debug_id : -1,
fscache_object_states_short[obj->state],
@@ -249,7 +248,7 @@ static int fscache_objlist_show(struct seq_file *m, void *v)
obj->event_mask & FSCACHE_OBJECT_EVENTS_MASK,
obj->events,
obj->flags,
- obj->work.flags);
+ work_busy(&obj->work));
no_cookie = true;
keylen = auxlen = 0;
diff --git a/fs/fscache/object.c b/fs/fscache/object.c
index 0b589a9..b6b897c 100644
--- a/fs/fscache/object.c
+++ b/fs/fscache/object.c
@@ -14,7 +14,6 @@
#define FSCACHE_DEBUG_LEVEL COOKIE
#include <linux/module.h>
-#include <linux/seq_file.h>
#include "internal.h"
const char *fscache_object_states[FSCACHE_OBJECT__NSTATES] = {
@@ -50,12 +49,8 @@ const char fscache_object_states_short[FSCACHE_OBJECT__NSTATES][5] = {
[FSCACHE_OBJECT_DEAD] = "DEAD",
};
-static void fscache_object_slow_work_put_ref(struct slow_work *);
-static int fscache_object_slow_work_get_ref(struct slow_work *);
-static void fscache_object_slow_work_execute(struct slow_work *);
-#ifdef CONFIG_SLOW_WORK_DEBUG
-static void fscache_object_slow_work_desc(struct slow_work *, struct seq_file *);
-#endif
+static int fscache_get_object(struct fscache_object *);
+static void fscache_put_object(struct fscache_object *);
static void fscache_initialise_object(struct fscache_object *);
static void fscache_lookup_object(struct fscache_object *);
static void fscache_object_available(struct fscache_object *);
@@ -64,17 +59,6 @@ static void fscache_withdraw_object(struct fscache_object *);
static void fscache_enqueue_dependents(struct fscache_object *);
static void fscache_dequeue_object(struct fscache_object *);
-const struct slow_work_ops fscache_object_slow_work_ops = {
- .owner = THIS_MODULE,
- .get_ref = fscache_object_slow_work_get_ref,
- .put_ref = fscache_object_slow_work_put_ref,
- .execute = fscache_object_slow_work_execute,
-#ifdef CONFIG_SLOW_WORK_DEBUG
- .desc = fscache_object_slow_work_desc,
-#endif
-};
-EXPORT_SYMBOL(fscache_object_slow_work_ops);
-
/*
* we need to notify the parent when an op completes that we had outstanding
* upon it
@@ -345,7 +329,7 @@ unsupported_event:
/*
* execute an object
*/
-static void fscache_object_slow_work_execute(struct slow_work *work)
+void fscache_object_work_func(struct work_struct *work)
{
struct fscache_object *object =
container_of(work, struct fscache_object, work);
@@ -359,23 +343,9 @@ static void fscache_object_slow_work_execute(struct slow_work *work)
if (object->events & object->event_mask)
fscache_enqueue_object(object);
clear_bit(FSCACHE_OBJECT_EV_REQUEUE, &object->events);
+ fscache_put_object(object);
}
-
-/*
- * describe an object for slow-work debugging
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-static void fscache_object_slow_work_desc(struct slow_work *work,
- struct seq_file *m)
-{
- struct fscache_object *object =
- container_of(work, struct fscache_object, work);
-
- seq_printf(m, "FSC: OBJ%x: %s",
- object->debug_id,
- fscache_object_states_short[object->state]);
-}
-#endif
+EXPORT_SYMBOL(fscache_object_work_func);
/*
* initialise an object
@@ -393,7 +363,6 @@ static void fscache_initialise_object(struct fscache_object *object)
_enter("");
ASSERT(object->cookie != NULL);
ASSERT(object->cookie->parent != NULL);
- ASSERT(list_empty(&object->work.link));
if (object->events & ((1 << FSCACHE_OBJECT_EV_ERROR) |
(1 << FSCACHE_OBJECT_EV_RELEASE) |
@@ -671,10 +640,8 @@ static void fscache_drop_object(struct fscache_object *object)
object->parent = NULL;
}
- /* this just shifts the object release to the slow work processor */
- fscache_stat(&fscache_n_cop_put_object);
- object->cache->ops->put_object(object);
- fscache_stat_d(&fscache_n_cop_put_object);
+ /* this just shifts the object release to the work processor */
+ fscache_put_object(object);
_leave("");
}
@@ -758,12 +725,10 @@ void fscache_withdrawing_object(struct fscache_cache *cache,
}
/*
- * allow the slow work item processor to get a ref on an object
+ * get a ref on an object
*/
-static int fscache_object_slow_work_get_ref(struct slow_work *work)
+static int fscache_get_object(struct fscache_object *object)
{
- struct fscache_object *object =
- container_of(work, struct fscache_object, work);
int ret;
fscache_stat(&fscache_n_cop_grab_object);
@@ -773,13 +738,10 @@ static int fscache_object_slow_work_get_ref(struct slow_work *work)
}
/*
- * allow the slow work item processor to discard a ref on a work item
+ * discard a ref on a work item
*/
-static void fscache_object_slow_work_put_ref(struct slow_work *work)
+static void fscache_put_object(struct fscache_object *object)
{
- struct fscache_object *object =
- container_of(work, struct fscache_object, work);
-
fscache_stat(&fscache_n_cop_put_object);
object->cache->ops->put_object(object);
fscache_stat_d(&fscache_n_cop_put_object);
@@ -792,8 +754,48 @@ void fscache_enqueue_object(struct fscache_object *object)
{
_enter("{OBJ%x}", object->debug_id);
- slow_work_enqueue(&object->work);
+ if (fscache_get_object(object) >= 0) {
+ wait_queue_head_t *cong_wq =
+ &get_cpu_var(fscache_object_cong_wait);
+
+ if (queue_work(fscache_object_wq, &object->work)) {
+ if (fscache_object_congested())
+ wake_up(cong_wq);
+ } else
+ fscache_put_object(object);
+
+ put_cpu_var(fscache_object_cong_wait);
+ }
+}
+
+/**
+ * fscache_object_sleep_till_congested - Sleep until object wq is congested
+ * @timoutp: Scheduler sleep timeout
+ *
+ * Allow an object handler to sleep until the object workqueue is congested.
+ *
+ * The caller must set up a wake up event before calling this and must have set
+ * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
+ * condition before calling this function as no test is made here.
+ *
+ * %true is returned if the object wq is congested, %false otherwise.
+ */
+bool fscache_object_sleep_till_congested(signed long *timeoutp)
+{
+ wait_queue_head_t *cong_wq = &__get_cpu_var(fscache_object_cong_wait);
+ DEFINE_WAIT(wait);
+
+ if (fscache_object_congested())
+ return true;
+
+ add_wait_queue_exclusive(cong_wq, &wait);
+ if (!fscache_object_congested())
+ *timeoutp = schedule_timeout(*timeoutp);
+ finish_wait(cong_wq, &wait);
+
+ return fscache_object_congested();
}
+EXPORT_SYMBOL_GPL(fscache_object_sleep_till_congested);
/*
* enqueue the dependents of an object for metadata-type processing
@@ -819,9 +821,7 @@ static void fscache_enqueue_dependents(struct fscache_object *object)
/* sort onto appropriate lists */
fscache_enqueue_object(dep);
- fscache_stat(&fscache_n_cop_put_object);
- dep->cache->ops->put_object(dep);
- fscache_stat_d(&fscache_n_cop_put_object);
+ fscache_put_object(dep);
if (!list_empty(&object->dependents))
cond_resched_lock(&object->lock);
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index c57db27..27c8df5 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -21,6 +21,7 @@
#include <linux/fscache.h>
#include <linux/sched.h>
#include <linux/slow-work.h>
+#include <linux/workqueue.h>
#define NR_MAXCACHES BITS_PER_LONG
@@ -389,7 +390,7 @@ struct fscache_object {
struct fscache_cache *cache; /* cache that supplied this object */
struct fscache_cookie *cookie; /* netfs's file/index object */
struct fscache_object *parent; /* parent object */
- struct slow_work work; /* attention scheduling record */
+ struct work_struct work; /* attention scheduling record */
struct list_head dependents; /* FIFO of dependent objects */
struct list_head dep_link; /* link in parent's dependents list */
struct list_head pending_ops; /* unstarted operations on this object */
@@ -411,7 +412,7 @@ extern const char *fscache_object_states[];
(test_bit(FSCACHE_IOERROR, &(obj)->cache->flags) && \
(obj)->state >= FSCACHE_OBJECT_DYING)
-extern const struct slow_work_ops fscache_object_slow_work_ops;
+extern void fscache_object_work_func(struct work_struct *work);
/**
* fscache_object_init - Initialise a cache object description
@@ -433,7 +434,7 @@ void fscache_object_init(struct fscache_object *object,
spin_lock_init(&object->lock);
INIT_LIST_HEAD(&object->cache_link);
INIT_HLIST_NODE(&object->cookie_link);
- vslow_work_init(&object->work, &fscache_object_slow_work_ops);
+ INIT_WORK(&object->work, fscache_object_work_func);
INIT_LIST_HEAD(&object->dependents);
INIT_LIST_HEAD(&object->dep_link);
INIT_LIST_HEAD(&object->pending_ops);
@@ -534,6 +535,8 @@ extern void fscache_io_error(struct fscache_cache *cache);
extern void fscache_mark_pages_cached(struct fscache_retrieval *op,
struct pagevec *pagevec);
+extern bool fscache_object_sleep_till_congested(signed long *timeoutp);
+
extern enum fscache_checkaux fscache_check_aux(struct fscache_object *object,
const void *data,
uint16_t datalen);
--
1.6.4.2
Make fscache operation to use only workqueue instead of combination of
workqueue and slow-work. FSCACHE_OP_SLOW is dropped and
FSCACHE_OP_FAST is renamed to FSCACHE_OP_ASYNC and uses newly added
fscache_op_wq workqueue to execute op->processor().
fscache_operation_init_slow() is dropped and fscache_operation_init()
now takes @processor argument directly.
* Unbound workqueue is used.
* fscache_retrieval_work() is no longer necessary as OP_ASYNC now does
the equivalent thing.
* sysctl fscache.operation_max_active added to control concurrency.
The default value is nr_cpus clamped between 2 and
WQ_UNBOUND_MAX_ACTIVE.
* debugfs support is dropped for now. Tracing API based debug
facility is planned to be added.
Signed-off-by: Tejun Heo <[email protected]>
Cc: David Howells <[email protected]>
---
fs/cachefiles/rdwr.c | 4 +-
fs/fscache/internal.h | 1 +
fs/fscache/main.c | 23 ++++++++++++++
fs/fscache/operation.c | 67 +++++------------------------------------
fs/fscache/page.c | 36 +++++-----------------
include/linux/fscache-cache.h | 37 +++++++----------------
6 files changed, 53 insertions(+), 115 deletions(-)
diff --git a/fs/cachefiles/rdwr.c b/fs/cachefiles/rdwr.c
index 0f0d41f..0e3c092 100644
--- a/fs/cachefiles/rdwr.c
+++ b/fs/cachefiles/rdwr.c
@@ -422,7 +422,7 @@ int cachefiles_read_or_alloc_page(struct fscache_retrieval *op,
shift = PAGE_SHIFT - inode->i_sb->s_blocksize_bits;
op->op.flags &= FSCACHE_OP_KEEP_FLAGS;
- op->op.flags |= FSCACHE_OP_FAST;
+ op->op.flags |= FSCACHE_OP_ASYNC;
op->op.processor = cachefiles_read_copier;
pagevec_init(&pagevec, 0);
@@ -729,7 +729,7 @@ int cachefiles_read_or_alloc_pages(struct fscache_retrieval *op,
pagevec_init(&pagevec, 0);
op->op.flags &= FSCACHE_OP_KEEP_FLAGS;
- op->op.flags |= FSCACHE_OP_FAST;
+ op->op.flags |= FSCACHE_OP_ASYNC;
op->op.processor = cachefiles_read_copier;
INIT_LIST_HEAD(&backpages);
diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h
index 6e0b5fb..6a02644 100644
--- a/fs/fscache/internal.h
+++ b/fs/fscache/internal.h
@@ -83,6 +83,7 @@ extern unsigned fscache_defer_create;
extern unsigned fscache_debug;
extern struct kobject *fscache_root;
extern struct workqueue_struct *fscache_object_wq;
+extern struct workqueue_struct *fscache_op_wq;
DECLARE_PER_CPU(wait_queue_head_t, fscache_object_cong_wait);
static inline bool fscache_object_congested(void)
diff --git a/fs/fscache/main.c b/fs/fscache/main.c
index bb8d4c3..44d13dd 100644
--- a/fs/fscache/main.c
+++ b/fs/fscache/main.c
@@ -42,11 +42,13 @@ MODULE_PARM_DESC(fscache_debug,
struct kobject *fscache_root;
struct workqueue_struct *fscache_object_wq;
+struct workqueue_struct *fscache_op_wq;
DEFINE_PER_CPU(wait_queue_head_t, fscache_object_cong_wait);
/* these values serve as lower bounds, will be adjusted in fscache_init() */
static unsigned fscache_object_max_active = 4;
+static unsigned fscache_op_max_active = 2;
#ifdef CONFIG_SYSCTL
static struct ctl_table_header *fscache_sysctl_header;
@@ -74,6 +76,14 @@ ctl_table fscache_sysctls[] = {
.proc_handler = fscache_max_active_sysctl,
.extra1 = &fscache_object_wq,
},
+ {
+ .procname = "operation_max_active",
+ .data = &fscache_op_max_active,
+ .maxlen = sizeof(unsigned),
+ .mode = 0644,
+ .proc_handler = fscache_max_active_sysctl,
+ .extra1 = &fscache_op_wq,
+ },
{}
};
@@ -110,6 +120,16 @@ static int __init fscache_init(void)
if (!fscache_object_wq)
goto error_object_wq;
+ fscache_op_max_active =
+ clamp_val(fscache_object_max_active / 2,
+ fscache_op_max_active, WQ_UNBOUND_MAX_ACTIVE);
+
+ ret = -ENOMEM;
+ fscache_op_wq = alloc_workqueue("fscache_operation", WQ_UNBOUND,
+ fscache_op_max_active);
+ if (!fscache_op_wq)
+ goto error_op_wq;
+
for_each_possible_cpu(cpu)
init_waitqueue_head(&per_cpu(fscache_object_cong_wait, cpu));
@@ -152,6 +172,8 @@ error_sysctl:
#endif
fscache_proc_cleanup();
error_proc:
+ destroy_workqueue(fscache_op_wq);
+error_op_wq:
destroy_workqueue(fscache_object_wq);
error_object_wq:
slow_work_unregister_user(THIS_MODULE);
@@ -172,6 +194,7 @@ static void __exit fscache_exit(void)
kmem_cache_destroy(fscache_cookie_jar);
unregister_sysctl_table(fscache_sysctl_header);
fscache_proc_cleanup();
+ destroy_workqueue(fscache_op_wq);
destroy_workqueue(fscache_object_wq);
slow_work_unregister_user(THIS_MODULE);
printk(KERN_NOTICE "FS-Cache: Unloaded\n");
diff --git a/fs/fscache/operation.c b/fs/fscache/operation.c
index f17ceca..b9f34ea 100644
--- a/fs/fscache/operation.c
+++ b/fs/fscache/operation.c
@@ -42,16 +42,12 @@ void fscache_enqueue_operation(struct fscache_operation *op)
fscache_stat(&fscache_n_op_enqueue);
switch (op->flags & FSCACHE_OP_TYPE) {
- case FSCACHE_OP_FAST:
- _debug("queue fast");
+ case FSCACHE_OP_ASYNC:
+ _debug("queue async");
atomic_inc(&op->usage);
- if (!schedule_work(&op->fast_work))
+ if (!queue_work(fscache_op_wq, &op->work))
fscache_put_operation(op);
break;
- case FSCACHE_OP_SLOW:
- _debug("queue slow");
- slow_work_enqueue(&op->slow_work);
- break;
case FSCACHE_OP_MYTHREAD:
_debug("queue for caller's attention");
break;
@@ -455,36 +451,13 @@ void fscache_operation_gc(struct work_struct *work)
}
/*
- * allow the slow work item processor to get a ref on an operation
- */
-static int fscache_op_get_ref(struct slow_work *work)
-{
- struct fscache_operation *op =
- container_of(work, struct fscache_operation, slow_work);
-
- atomic_inc(&op->usage);
- return 0;
-}
-
-/*
- * allow the slow work item processor to discard a ref on an operation
- */
-static void fscache_op_put_ref(struct slow_work *work)
-{
- struct fscache_operation *op =
- container_of(work, struct fscache_operation, slow_work);
-
- fscache_put_operation(op);
-}
-
-/*
- * execute an operation using the slow thread pool to provide processing context
- * - the caller holds a ref to this object, so we don't need to hold one
+ * execute an operation using fs_op_wq to provide processing context -
+ * the caller holds a ref to this object, so we don't need to hold one
*/
-static void fscache_op_execute(struct slow_work *work)
+void fscache_op_work_func(struct work_struct *work)
{
struct fscache_operation *op =
- container_of(work, struct fscache_operation, slow_work);
+ container_of(work, struct fscache_operation, work);
unsigned long start;
_enter("{OBJ%x OP%x,%d}",
@@ -494,31 +467,7 @@ static void fscache_op_execute(struct slow_work *work)
start = jiffies;
op->processor(op);
fscache_hist(fscache_ops_histogram, start);
+ fscache_put_operation(op);
_leave("");
}
-
-/*
- * describe an operation for slow-work debugging
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-static void fscache_op_desc(struct slow_work *work, struct seq_file *m)
-{
- struct fscache_operation *op =
- container_of(work, struct fscache_operation, slow_work);
-
- seq_printf(m, "FSC: OBJ%x OP%x: %s/%s fl=%lx",
- op->object->debug_id, op->debug_id,
- op->name, op->state, op->flags);
-}
-#endif
-
-const struct slow_work_ops fscache_op_slow_work_ops = {
- .owner = THIS_MODULE,
- .get_ref = fscache_op_get_ref,
- .put_ref = fscache_op_put_ref,
- .execute = fscache_op_execute,
-#ifdef CONFIG_SLOW_WORK_DEBUG
- .desc = fscache_op_desc,
-#endif
-};
diff --git a/fs/fscache/page.c b/fs/fscache/page.c
index 723b889..41c441c 100644
--- a/fs/fscache/page.c
+++ b/fs/fscache/page.c
@@ -105,7 +105,7 @@ bool __fscache_maybe_release_page(struct fscache_cookie *cookie,
page_busy:
/* we might want to wait here, but that could deadlock the allocator as
- * the slow-work threads writing to the cache may all end up sleeping
+ * the work threads writing to the cache may all end up sleeping
* on memory allocation */
fscache_stat(&fscache_n_store_vmscan_busy);
return false;
@@ -188,9 +188,8 @@ int __fscache_attr_changed(struct fscache_cookie *cookie)
return -ENOMEM;
}
- fscache_operation_init(op, NULL);
- fscache_operation_init_slow(op, fscache_attr_changed_op);
- op->flags = FSCACHE_OP_SLOW | (1 << FSCACHE_OP_EXCLUSIVE);
+ fscache_operation_init(op, fscache_attr_changed_op, NULL);
+ op->flags = FSCACHE_OP_ASYNC | (1 << FSCACHE_OP_EXCLUSIVE);
fscache_set_op_name(op, "Attr");
spin_lock(&cookie->lock);
@@ -218,24 +217,6 @@ nobufs:
EXPORT_SYMBOL(__fscache_attr_changed);
/*
- * handle secondary execution given to a retrieval op on behalf of the
- * cache
- */
-static void fscache_retrieval_work(struct work_struct *work)
-{
- struct fscache_retrieval *op =
- container_of(work, struct fscache_retrieval, op.fast_work);
- unsigned long start;
-
- _enter("{OP%x}", op->op.debug_id);
-
- start = jiffies;
- op->op.processor(&op->op);
- fscache_hist(fscache_ops_histogram, start);
- fscache_put_operation(&op->op);
-}
-
-/*
* release a retrieval op reference
*/
static void fscache_release_retrieval_op(struct fscache_operation *_op)
@@ -269,13 +250,12 @@ static struct fscache_retrieval *fscache_alloc_retrieval(
return NULL;
}
- fscache_operation_init(&op->op, fscache_release_retrieval_op);
+ fscache_operation_init(&op->op, NULL, fscache_release_retrieval_op);
op->op.flags = FSCACHE_OP_MYTHREAD | (1 << FSCACHE_OP_WAITING);
op->mapping = mapping;
op->end_io_func = end_io_func;
op->context = context;
op->start_time = jiffies;
- INIT_WORK(&op->op.fast_work, fscache_retrieval_work);
INIT_LIST_HEAD(&op->to_do);
fscache_set_op_name(&op->op, "Retr");
return op;
@@ -795,9 +775,9 @@ int __fscache_write_page(struct fscache_cookie *cookie,
if (!op)
goto nomem;
- fscache_operation_init(&op->op, fscache_release_write_op);
- fscache_operation_init_slow(&op->op, fscache_write_op);
- op->op.flags = FSCACHE_OP_SLOW | (1 << FSCACHE_OP_WAITING);
+ fscache_operation_init(&op->op, fscache_write_op,
+ fscache_release_write_op);
+ op->op.flags = FSCACHE_OP_ASYNC | (1 << FSCACHE_OP_WAITING);
fscache_set_op_name(&op->op, "Write1");
ret = radix_tree_preload(gfp & ~__GFP_HIGHMEM);
@@ -852,7 +832,7 @@ int __fscache_write_page(struct fscache_cookie *cookie,
fscache_stat(&fscache_n_store_ops);
fscache_stat(&fscache_n_stores_ok);
- /* the slow work queue now carries its own ref on the object */
+ /* the work queue now carries its own ref on the object */
fscache_put_operation(&op->op);
_leave(" = 0");
return 0;
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index 27c8df5..17ed9c1 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -77,18 +77,14 @@ typedef void (*fscache_operation_release_t)(struct fscache_operation *op);
typedef void (*fscache_operation_processor_t)(struct fscache_operation *op);
struct fscache_operation {
- union {
- struct work_struct fast_work; /* record for fast ops */
- struct slow_work slow_work; /* record for (very) slow ops */
- };
+ struct work_struct work; /* record for async ops */
struct list_head pend_link; /* link in object->pending_ops */
struct fscache_object *object; /* object to be operated upon */
unsigned long flags;
#define FSCACHE_OP_TYPE 0x000f /* operation type */
-#define FSCACHE_OP_FAST 0x0001 /* - fast op, processor may not sleep for disk */
-#define FSCACHE_OP_SLOW 0x0002 /* - (very) slow op, processor may sleep for disk */
-#define FSCACHE_OP_MYTHREAD 0x0003 /* - processing is done be issuing thread, not pool */
+#define FSCACHE_OP_ASYNC 0x0001 /* - async op, processor may sleep for disk */
+#define FSCACHE_OP_MYTHREAD 0x0002 /* - processing is done be issuing thread, not pool */
#define FSCACHE_OP_WAITING 4 /* cleared when op is woken */
#define FSCACHE_OP_EXCLUSIVE 5 /* exclusive op, other ops must wait */
#define FSCACHE_OP_DEAD 6 /* op is now dead */
@@ -106,7 +102,8 @@ struct fscache_operation {
/* operation releaser */
fscache_operation_release_t release;
-#ifdef CONFIG_SLOW_WORK_DEBUG
+#ifdef CONFIG_WORKQUEUE_DEBUGFS
+ struct work_struct put_work; /* work to delay operation put */
const char *name; /* operation name */
const char *state; /* operation state */
#define fscache_set_op_name(OP, N) do { (OP)->name = (N); } while(0)
@@ -118,7 +115,7 @@ struct fscache_operation {
};
extern atomic_t fscache_op_debug_id;
-extern const struct slow_work_ops fscache_op_slow_work_ops;
+extern void fscache_op_work_func(struct work_struct *work);
extern void fscache_enqueue_operation(struct fscache_operation *);
extern void fscache_put_operation(struct fscache_operation *);
@@ -129,33 +126,21 @@ extern void fscache_put_operation(struct fscache_operation *);
* @release: The release function to assign
*
* Do basic initialisation of an operation. The caller must still set flags,
- * object, either fast_work or slow_work if necessary, and processor if needed.
+ * object and processor if needed.
*/
static inline void fscache_operation_init(struct fscache_operation *op,
- fscache_operation_release_t release)
+ fscache_operation_processor_t processor,
+ fscache_operation_release_t release)
{
+ INIT_WORK(&op->work, fscache_op_work_func);
atomic_set(&op->usage, 1);
op->debug_id = atomic_inc_return(&fscache_op_debug_id);
+ op->processor = processor;
op->release = release;
INIT_LIST_HEAD(&op->pend_link);
fscache_set_op_state(op, "Init");
}
-/**
- * fscache_operation_init_slow - Do additional initialisation of a slow op
- * @op: The operation to initialise
- * @processor: The processor function to assign
- *
- * Do additional initialisation of an operation as required for slow work.
- */
-static inline
-void fscache_operation_init_slow(struct fscache_operation *op,
- fscache_operation_processor_t processor)
-{
- op->processor = processor;
- slow_work_init(&op->slow_work, &fscache_op_slow_work_ops);
-}
-
/*
* data read operation
*/
--
1.6.4.2
Workqueue can now handle high concurrency. Convert gfs to use
workqueue instead of slow-work.
* Steven pointed out that recovery path might be run from allocation
path and thus requires forward progress guarantee without memory
allocation. Create and use gfs_recovery_wq with rescuer. Please
note that forward progress wasn't guaranteed with slow-work.
* Updated to use non-reentrant workqueue.
Signed-off-by: Tejun Heo <[email protected]>
Cc: Steven Whitehouse <[email protected]>
---
fs/gfs2/Kconfig | 1 -
fs/gfs2/incore.h | 3 +-
fs/gfs2/main.c | 14 +++++++-----
fs/gfs2/ops_fstype.c | 8 +++---
fs/gfs2/recovery.c | 54 +++++++++++++++++++------------------------------
fs/gfs2/recovery.h | 6 +++-
fs/gfs2/sys.c | 3 +-
7 files changed, 40 insertions(+), 49 deletions(-)
diff --git a/fs/gfs2/Kconfig b/fs/gfs2/Kconfig
index a47b431..cc96655 100644
--- a/fs/gfs2/Kconfig
+++ b/fs/gfs2/Kconfig
@@ -7,7 +7,6 @@ config GFS2_FS
select IP_SCTP if DLM_SCTP
select FS_POSIX_ACL
select CRC32
- select SLOW_WORK
select QUOTACTL
help
A cluster filesystem.
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index b5d7363..dd8f2e6 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -12,7 +12,6 @@
#include <linux/fs.h>
#include <linux/workqueue.h>
-#include <linux/slow-work.h>
#include <linux/dlm.h>
#include <linux/buffer_head.h>
@@ -383,7 +382,7 @@ struct gfs2_journal_extent {
struct gfs2_jdesc {
struct list_head jd_list;
struct list_head extent_list;
- struct slow_work jd_work;
+ struct work_struct jd_work;
struct inode *jd_inode;
unsigned long jd_flags;
#define JDF_RECOVERY 1
diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c
index fb2a5f9..b1e9630 100644
--- a/fs/gfs2/main.c
+++ b/fs/gfs2/main.c
@@ -15,7 +15,6 @@
#include <linux/init.h>
#include <linux/gfs2_ondisk.h>
#include <asm/atomic.h>
-#include <linux/slow-work.h>
#include "gfs2.h"
#include "incore.h"
@@ -24,6 +23,7 @@
#include "util.h"
#include "glock.h"
#include "quota.h"
+#include "recovery.h"
static struct shrinker qd_shrinker = {
.shrink = gfs2_shrink_qd_memory,
@@ -138,9 +138,11 @@ static int __init init_gfs2_fs(void)
if (error)
goto fail_unregister;
- error = slow_work_register_user(THIS_MODULE);
- if (error)
- goto fail_slow;
+ error = -ENOMEM;
+ gfs_recovery_wq = alloc_workqueue("gfs_recovery",
+ WQ_NON_REENTRANT | WQ_RESCUER, 0);
+ if (!gfs_recovery_wq)
+ goto fail_wq;
gfs2_register_debugfs();
@@ -148,7 +150,7 @@ static int __init init_gfs2_fs(void)
return 0;
-fail_slow:
+fail_wq:
unregister_filesystem(&gfs2meta_fs_type);
fail_unregister:
unregister_filesystem(&gfs2_fs_type);
@@ -190,7 +192,7 @@ static void __exit exit_gfs2_fs(void)
gfs2_unregister_debugfs();
unregister_filesystem(&gfs2_fs_type);
unregister_filesystem(&gfs2meta_fs_type);
- slow_work_unregister_user(THIS_MODULE);
+ destroy_workqueue(gfs_recovery_wq);
kmem_cache_destroy(gfs2_quotad_cachep);
kmem_cache_destroy(gfs2_rgrpd_cachep);
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index 3593b3a..9a08e1b 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -17,7 +17,6 @@
#include <linux/namei.h>
#include <linux/mount.h>
#include <linux/gfs2_ondisk.h>
-#include <linux/slow-work.h>
#include <linux/quotaops.h>
#include "gfs2.h"
@@ -673,7 +672,7 @@ static int gfs2_jindex_hold(struct gfs2_sbd *sdp, struct gfs2_holder *ji_gh)
break;
INIT_LIST_HEAD(&jd->extent_list);
- slow_work_init(&jd->jd_work, &gfs2_recover_ops);
+ INIT_WORK(&jd->jd_work, gfs2_recover_func);
jd->jd_inode = gfs2_lookupi(sdp->sd_jindex, &name, 1);
if (!jd->jd_inode || IS_ERR(jd->jd_inode)) {
if (!jd->jd_inode)
@@ -782,7 +781,8 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
if (sdp->sd_lockstruct.ls_first) {
unsigned int x;
for (x = 0; x < sdp->sd_journals; x++) {
- error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x));
+ error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x),
+ true);
if (error) {
fs_err(sdp, "error recovering journal %u: %d\n",
x, error);
@@ -792,7 +792,7 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
gfs2_others_may_mount(sdp);
} else if (!sdp->sd_args.ar_spectator) {
- error = gfs2_recover_journal(sdp->sd_jdesc);
+ error = gfs2_recover_journal(sdp->sd_jdesc, true);
if (error) {
fs_err(sdp, "error recovering my journal: %d\n", error);
goto fail_jinode_gh;
diff --git a/fs/gfs2/recovery.c b/fs/gfs2/recovery.c
index 4b9bece..f7f89a9 100644
--- a/fs/gfs2/recovery.c
+++ b/fs/gfs2/recovery.c
@@ -14,7 +14,6 @@
#include <linux/buffer_head.h>
#include <linux/gfs2_ondisk.h>
#include <linux/crc32.h>
-#include <linux/slow-work.h>
#include "gfs2.h"
#include "incore.h"
@@ -28,6 +27,8 @@
#include "util.h"
#include "dir.h"
+struct workqueue_struct *gfs_recovery_wq;
+
int gfs2_replay_read_block(struct gfs2_jdesc *jd, unsigned int blk,
struct buffer_head **bh)
{
@@ -443,23 +444,7 @@ static void gfs2_recovery_done(struct gfs2_sbd *sdp, unsigned int jid,
kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp);
}
-static int gfs2_recover_get_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- if (test_and_set_bit(JDF_RECOVERY, &jd->jd_flags))
- return -EBUSY;
- return 0;
-}
-
-static void gfs2_recover_put_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- clear_bit(JDF_RECOVERY, &jd->jd_flags);
- smp_mb__after_clear_bit();
- wake_up_bit(&jd->jd_flags, JDF_RECOVERY);
-}
-
-static void gfs2_recover_work(struct slow_work *work)
+void gfs2_recover_func(struct work_struct *work)
{
struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
@@ -578,7 +563,7 @@ static void gfs2_recover_work(struct slow_work *work)
gfs2_glock_dq_uninit(&j_gh);
fs_info(sdp, "jid=%u: Done\n", jd->jd_jid);
- return;
+ goto done;
fail_gunlock_tr:
gfs2_glock_dq_uninit(&t_gh);
@@ -590,32 +575,35 @@ fail_gunlock_j:
}
fs_info(sdp, "jid=%u: %s\n", jd->jd_jid, (error) ? "Failed" : "Done");
-
fail:
gfs2_recovery_done(sdp, jd->jd_jid, LM_RD_GAVEUP);
+done:
+ clear_bit(JDF_RECOVERY, &jd->jd_flags);
+ smp_mb__after_clear_bit();
+ wake_up_bit(&jd->jd_flags, JDF_RECOVERY);
}
-struct slow_work_ops gfs2_recover_ops = {
- .owner = THIS_MODULE,
- .get_ref = gfs2_recover_get_ref,
- .put_ref = gfs2_recover_put_ref,
- .execute = gfs2_recover_work,
-};
-
-
static int gfs2_recovery_wait(void *word)
{
schedule();
return 0;
}
-int gfs2_recover_journal(struct gfs2_jdesc *jd)
+int gfs2_recover_journal(struct gfs2_jdesc *jd, bool wait)
{
int rv;
- rv = slow_work_enqueue(&jd->jd_work);
- if (rv)
- return rv;
- wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait, TASK_UNINTERRUPTIBLE);
+
+ if (test_and_set_bit(JDF_RECOVERY, &jd->jd_flags))
+ return -EBUSY;
+
+ /* we have JDF_RECOVERY, queue should always succeed */
+ rv = queue_work(gfs_recovery_wq, &jd->jd_work);
+ BUG_ON(!rv);
+
+ if (wait)
+ wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait,
+ TASK_UNINTERRUPTIBLE);
+
return 0;
}
diff --git a/fs/gfs2/recovery.h b/fs/gfs2/recovery.h
index 1616ac2..2226136 100644
--- a/fs/gfs2/recovery.h
+++ b/fs/gfs2/recovery.h
@@ -12,6 +12,8 @@
#include "incore.h"
+extern struct workqueue_struct *gfs_recovery_wq;
+
static inline void gfs2_replay_incr_blk(struct gfs2_sbd *sdp, unsigned int *blk)
{
if (++*blk == sdp->sd_jdesc->jd_blocks)
@@ -27,8 +29,8 @@ extern void gfs2_revoke_clean(struct gfs2_sbd *sdp);
extern int gfs2_find_jhead(struct gfs2_jdesc *jd,
struct gfs2_log_header_host *head);
-extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd);
-extern struct slow_work_ops gfs2_recover_ops;
+extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd, bool wait);
+extern void gfs2_recover_func(struct work_struct *work);
#endif /* __RECOVERY_DOT_H__ */
diff --git a/fs/gfs2/sys.c b/fs/gfs2/sys.c
index 37f5393..6b60316 100644
--- a/fs/gfs2/sys.c
+++ b/fs/gfs2/sys.c
@@ -25,6 +25,7 @@
#include "quota.h"
#include "util.h"
#include "glops.h"
+#include "recovery.h"
struct gfs2_attr {
struct attribute attr;
@@ -352,7 +353,7 @@ static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
if (jd->jd_jid != jid)
continue;
- rv = slow_work_enqueue(&jd->jd_work);
+ rv = gfs2_recover_journal(jd, false);
break;
}
out:
--
1.6.4.2
fscache no longer uses slow-work. Drop references to it.
Signed-off-by: Tejun Heo <[email protected]>
Cc: David Howells <[email protected]>
---
fs/fscache/Kconfig | 1 -
fs/fscache/main.c | 7 -------
include/linux/fscache-cache.h | 1 -
3 files changed, 0 insertions(+), 9 deletions(-)
diff --git a/fs/fscache/Kconfig b/fs/fscache/Kconfig
index cc94bb9..3f6dfa9 100644
--- a/fs/fscache/Kconfig
+++ b/fs/fscache/Kconfig
@@ -1,7 +1,6 @@
config FSCACHE
tristate "General filesystem local caching manager"
- select SLOW_WORK
help
This option enables a generic filesystem caching manager that can be
used by various network and other filesystems to cache data locally.
diff --git a/fs/fscache/main.c b/fs/fscache/main.c
index 44d13dd..500936d 100644
--- a/fs/fscache/main.c
+++ b/fs/fscache/main.c
@@ -106,10 +106,6 @@ static int __init fscache_init(void)
unsigned int cpu;
int ret;
- ret = slow_work_register_user(THIS_MODULE);
- if (ret < 0)
- goto error_slow_work;
-
fscache_object_max_active =
clamp_val(nr_cpus,
fscache_object_max_active, WQ_UNBOUND_MAX_ACTIVE);
@@ -176,8 +172,6 @@ error_proc:
error_op_wq:
destroy_workqueue(fscache_object_wq);
error_object_wq:
- slow_work_unregister_user(THIS_MODULE);
-error_slow_work:
return ret;
}
@@ -196,7 +190,6 @@ static void __exit fscache_exit(void)
fscache_proc_cleanup();
destroy_workqueue(fscache_op_wq);
destroy_workqueue(fscache_object_wq);
- slow_work_unregister_user(THIS_MODULE);
printk(KERN_NOTICE "FS-Cache: Unloaded\n");
}
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index 17ed9c1..b8581c0 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -20,7 +20,6 @@
#include <linux/fscache.h>
#include <linux/sched.h>
-#include <linux/slow-work.h>
#include <linux/workqueue.h>
#define NR_MAXCACHES BITS_PER_LONG
--
1.6.4.2
On Tue, Jul 20, 2010 at 3:34 PM, Tejun Heo <[email protected]> wrote:
> Workqueue can now handle high concurrency. ?Use system_nrt_wq
> instead of slow-work.
>
> * Updated is_valid_oplock_break() to not call cifs_oplock_break_put()
> ?as advised by Steve French. ?It might cause deadlock. ?Instead,
> ?reference is increased after queueing succeeded and
> ?cifs_oplock_break() briefly grabs GlobalSMBSeslock before putting
> ?the cfile to make sure it doesn't put before the matching get is
> ?finished.
>
> * Anton Blanchard reported that cifs conversion was using now gone
> ?system_single_wq. ?Use system_nrt_wq which provides non-reentrance
> ?guarantee which is enough and much better.
>
> Signed-off-by: Tejun Heo <[email protected]>
> Cc: Steve French <[email protected]>
> Cc: Anton Blanchard <[email protected]>
> ---
> ?fs/cifs/Kconfig ? ?| ? ?1 -
> ?fs/cifs/cifsfs.c ? | ? ?5 -----
> ?fs/cifs/cifsglob.h | ? ?8 +++++---
> ?fs/cifs/dir.c ? ? ?| ? ?2 +-
> ?fs/cifs/file.c ? ? | ? 30 +++++++++++++-----------------
> ?fs/cifs/misc.c ? ? | ? 20 ++++++++++++--------
> ?6 files changed, 31 insertions(+), 35 deletions(-)
Acked-by: Steve French <[email protected]>
--
Thanks,
Steve
On 07/20/2010 10:35 PM, Tejun Heo wrote:
> Workqueue can now handle high concurrency. Convert drm_crtc_helper to
> use system_nrt_wq instead of slow-work. The conversion is mostly
> straight forward. One difference is that drm_helper_hpd_irq_event()
> no longer blocks and can be called from any context.
>
> Signed-off-by: Tejun Heo <[email protected]>
> Cc: David Airlie <[email protected]>
David, can you please review the change?
Thanks.
--
tejun
On 07/20/2010 10:35 PM, Tejun Heo wrote:
> Workqueue can now handle high concurrency. Convert gfs to use
> workqueue instead of slow-work.
>
> * Steven pointed out that recovery path might be run from allocation
> path and thus requires forward progress guarantee without memory
> allocation. Create and use gfs_recovery_wq with rescuer. Please
> note that forward progress wasn't guaranteed with slow-work.
>
> * Updated to use non-reentrant workqueue.
>
> Signed-off-by: Tejun Heo <[email protected]>
> Cc: Steven Whitehouse <[email protected]>
Steven, can you please review the change?
Thanks.
--
tejun
On 07/20/2010 10:34 PM, Tejun Heo wrote:
> 0001-fscache-convert-object-to-use-workqueue-instead-of-s.patch
> 0002-fscache-convert-operation-to-use-workqueue-instead-o.patch
> 0003-fscache-drop-references-to-slow-work.patch
> 0004-cifs-use-workqueue-instead-of-slow-work.patch
0001-0004 now pushed out to wq#for-next w/ acks added.
Thank you.
--
tejun
On Thu, 2010-07-22 at 22:45 +0200, Tejun Heo wrote:
> On 07/20/2010 10:35 PM, Tejun Heo wrote:
> > Workqueue can now handle high concurrency. Convert drm_crtc_helper to
> > use system_nrt_wq instead of slow-work. The conversion is mostly
> > straight forward. One difference is that drm_helper_hpd_irq_event()
> > no longer blocks and can be called from any context.
> >
> > Signed-off-by: Tejun Heo <[email protected]>
> > Cc: David Airlie <[email protected]>
>
> David, can you please review the change?
Yes seems fine to me, in as much as I understand the difference between
slow-work and workqueue, but if you say one replaces the other correctly
now I'm happy for you to carry this patch.
Acked-by: Dave Airlie <[email protected]>
Dave.
>
> Thanks.
>
On 07/22/2010 11:11 PM, Dave Airlie wrote:
> On Thu, 2010-07-22 at 22:45 +0200, Tejun Heo wrote:
>> On 07/20/2010 10:35 PM, Tejun Heo wrote:
>>> Workqueue can now handle high concurrency. Convert drm_crtc_helper to
>>> use system_nrt_wq instead of slow-work. The conversion is mostly
>>> straight forward. One difference is that drm_helper_hpd_irq_event()
>>> no longer blocks and can be called from any context.
>>>
>>> Signed-off-by: Tejun Heo <[email protected]>
>>> Cc: David Airlie <[email protected]>
>>
>> David, can you please review the change?
>
> Yes seems fine to me, in as much as I understand the difference between
> slow-work and workqueue, but if you say one replaces the other correctly
> now I'm happy for you to carry this patch.
>
> Acked-by: Dave Airlie <[email protected]>
Thank you. Will push out to #for-next.
--
tejun
Hi,
On Tue, 2010-07-20 at 22:35 +0200, Tejun Heo wrote:
> Workqueue can now handle high concurrency. Convert gfs to use
> workqueue instead of slow-work.
>
> * Steven pointed out that recovery path might be run from allocation
> path and thus requires forward progress guarantee without memory
> allocation. Create and use gfs_recovery_wq with rescuer. Please
> note that forward progress wasn't guaranteed with slow-work.
>
> * Updated to use non-reentrant workqueue.
>
> Signed-off-by: Tejun Heo <[email protected]>
> Cc: Steven Whitehouse <[email protected]>
Acked-by: Steven Whitehouse <[email protected]>
I'm assuming that you'll push this along with the workqueue changes?
Probably easier than pushing it through my tree,
Steve.
On 07/23/2010 12:20 PM, Steven Whitehouse wrote:
> Hi,
>
> On Tue, 2010-07-20 at 22:35 +0200, Tejun Heo wrote:
>> Workqueue can now handle high concurrency. Convert gfs to use
>> workqueue instead of slow-work.
>>
>> * Steven pointed out that recovery path might be run from allocation
>> path and thus requires forward progress guarantee without memory
>> allocation. Create and use gfs_recovery_wq with rescuer. Please
>> note that forward progress wasn't guaranteed with slow-work.
>>
>> * Updated to use non-reentrant workqueue.
>>
>> Signed-off-by: Tejun Heo <[email protected]>
>> Cc: Steven Whitehouse <[email protected]>
> Acked-by: Steven Whitehouse <[email protected]>
>
> I'm assuming that you'll push this along with the workqueue changes?
> Probably easier than pushing it through my tree,
Yeah, I'll route this through the wq tree with all other workqueue
related changes.
Thank you.
--
tejun
On 07/22/2010 11:00 PM, Tejun Heo wrote:
> On 07/20/2010 10:34 PM, Tejun Heo wrote:
>> 0001-fscache-convert-object-to-use-workqueue-instead-of-s.patch
>> 0002-fscache-convert-operation-to-use-workqueue-instead-o.patch
>> 0003-fscache-drop-references-to-slow-work.patch
>> 0004-cifs-use-workqueue-instead-of-slow-work.patch
>
> 0001-0004 now pushed out to wq#for-next w/ acks added.
The rest of the series pushed out to wq#for-next.
Thanks.
--
tejun