XRootD
Loading...
Searching...
No Matches
XrdFfsWcache.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* XrdFfsWcache.cc simple write cache that captures consecutive small writes */
3/* */
4/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5/* All Rights Reserved */
6/* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7/* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30/*
31 When direct_io is not used, kernel will break large write to 4Kbyte
32 writes. This significantly reduces the writting performance. This
33 simple cache mechanism is to improve the performace on small writes.
34
35 Note that fuse 2.8.0 pre2 or above and kernel 2.6.27 or above provide
36 a big_writes option to allow > 4KByte writing. It will make this
37 smiple write caching obsolete.
38*/
39
40#if defined(__linux__)
41/* For pread()/pwrite() */
42#ifndef _XOPEN_SOURCE
43#define _XOPEN_SOURCE 500
44#endif
45#endif
46
47#include <cstring>
48#include <cstdlib>
49#include <sys/types.h>
50#include <sys/resource.h>
51#include <unistd.h>
52#include <cerrno>
53
54#include <pthread.h>
55
57#ifndef NOXRD
58 #include "XrdFfs/XrdFfsPosix.hh"
59#endif
60
61#ifndef O_DIRECT
62#define O_DIRECT 0
63#endif
64
65#ifdef __cplusplus
66 extern "C" {
67#endif
68
70ssize_t XrdFfsWcacheBufsize = 131072;
71
73 off_t offset;
74 size_t len;
75 char *buf;
76 size_t bufsize;
77 pthread_mutex_t *mlock;
78};
79
81
82/* #include "xrdposix.h" */
83
85void XrdFfsWcache_init(int basefd, int maxfd)
86{
87 int fd;
88/* We are now using virtual file descriptors (from Xrootd Posix interface) in XrdFfsXrootdfs.cc so we need to set
89 * base (lowest) file descriptor, and max number of file descriptors..
90 *
91 struct rlimit rlp;
92
93 getrlimit(RLIMIT_NOFILE, &rlp);
94 XrdFfsWcacheNFILES = rlp.rlim_cur;
95 XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
96 */
97
98 XrdFfsPosix_baseFD = basefd;
99 XrdFfsWcacheNFILES = maxfd;
100
101/* printf("%d %d\n", XrdFfsWcacheNFILES, sizeof(struct XrdFfsWcacheFilebuf)); */
103 for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
104 {
106 XrdFfsWcacheFbufs[fd].len = 0;
107 XrdFfsWcacheFbufs[fd].buf = NULL;
108 XrdFfsWcacheFbufs[fd].mlock = NULL;
109 }
110 if (!getenv("XRDCL_EC"))
111 {
112 XrdFfsRcacheBufsize = 1024 * 128;
113 }
114 else
115 {
116 char *savptr;
117 int nbdat = atoi(strtok_r(getenv("XRDCL_EC"), ",", &savptr));
118 strtok_r(NULL, ",", &savptr);
119 int chsz = atoi(strtok_r(NULL, ",", &savptr));
120 XrdFfsRcacheBufsize = nbdat * chsz;
121 }
122 if (getenv("XROOTDFS_WCACHESZ"))
123 XrdFfsRcacheBufsize = atoi(getenv("XROOTDFS_WCACHESZ"));
124}
125
126int XrdFfsWcache_create(int fd, int flags)
127/* Create a write cache buffer for a given file descriptor
128 *
129 * fd: file descriptor
130 *
131 * returns: 1 - ok
132 * 0 - error, error code in errno
133 */
134{
136 fd -= XrdFfsPosix_baseFD;
137
139 XrdFfsWcacheFbufs[fd].len = 0;
140 if ( ((flags & O_ACCMODE) == O_RDONLY) &&
141 (flags & O_DIRECT) ) // Limit the usage scenario of the read cache
142 {
143 XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsRcacheBufsize);
145 }
146 else
147 {
148 XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
150 }
151 if (XrdFfsWcacheFbufs[fd].buf == NULL)
152 {
153 errno = ENOMEM;
154 return 0;
155 }
156 XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
157 if (XrdFfsWcacheFbufs[fd].mlock == NULL)
158 {
159 errno = ENOMEM;
160 return 0;
161 }
162 errno = pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
163 if (errno)
164 return 0;
165 return 1;
166}
167
169{
170/* XrdFfsWcache_flush(fd); */
171 fd -= XrdFfsPosix_baseFD;
172
174 XrdFfsWcacheFbufs[fd].len = 0;
175 if (XrdFfsWcacheFbufs[fd].buf != NULL)
176 free(XrdFfsWcacheFbufs[fd].buf);
177 XrdFfsWcacheFbufs[fd].buf = NULL;
178 if (XrdFfsWcacheFbufs[fd].mlock != NULL)
179 {
180 pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
181 free(XrdFfsWcacheFbufs[fd].mlock);
182 }
183 XrdFfsWcacheFbufs[fd].mlock = NULL;
184}
185
186ssize_t XrdFfsWcache_flush(int fd)
187{
188 ssize_t rc;
189 fd -= XrdFfsPosix_baseFD;
190
191 if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
192 return 0;
193
196 if (rc > 0)
197 {
199 XrdFfsWcacheFbufs[fd].len = 0;
200 }
201 return rc;
202}
203
204/*
205struct fd_n_offset {
206 int fd;
207 off_t offset;
208 fd_n_offset(int myfd, off_t myoffset) : fd(myfd), offset(myoffset) {}
209};
210
211void *XrdFfsWcache_updateReadCache(void *x)
212{
213 struct fd_n_offset *a = (struct fd_n_offset*) x;
214 size_t bufsize = XrdFfsWcacheFbufs[a->fd].bufsize;
215
216 pthread_mutex_lock(XrdFfsWcacheFbufs[a->fd].mlock);
217 XrdFfsWcacheFbufs[a->fd].offset = (a->offset / bufsize) * bufsize;
218 XrdFfsWcacheFbufs[a->fd].len = XrdFfsPosix_pread(a->fd + XrdFfsPosix_baseFD,
219 XrdFfsWcacheFbufs[a->fd].buf,
220 bufsize,
221 XrdFfsWcacheFbufs[a->fd].offset);
222 pthread_mutex_unlock(XrdFfsWcacheFbufs[a->fd].mlock);
223 return NULL;
224}
225*/
226
227// this is a read cache
228ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
229{
230 ssize_t rc;
231 fd -= XrdFfsPosix_baseFD;
232 if (fd < 0)
233 {
234 errno = EBADF;
235 return -1;
236 }
237
238 char *bufptr;
239 size_t bufsize = XrdFfsWcacheFbufs[fd].bufsize;
240
241 pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
242
243 // identity which block to cache
244 if (XrdFfsWcacheFbufs[fd].len == 0 ||
246 {
250 bufsize,
252 } // when XrdFfsWcacheFbufs[fd].len < bufsize, the block is partially cached.
253
254
255 // fetch data from the cache, up to the block's upper boundary.
256 if (XrdFfsWcacheFbufs[fd].offset <= offset &&
258 { // read from cache,
259//----------------------------------------------------------
260// FUSE doesn't like this block of the code, unless direct_io is enabled, or
261// O_DIRECT flags is used. Otherwise, FUSES will stop reading prematurely
262// when two processes read the same file at the same time.
266 memcpy(buf, bufptr, rc);
267//----------------------------------------------------------
268 }
269 else
270 { // offset fall into the uncached part of the partically cached block
272 }
273 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
274/*
275 // prefetch the next block
276 if ( (offset + rc) ==
277 (XrdFfsWcacheFbufs[fd].offset + bufsize) )
278 {
279 pthread_t thread;
280 pthread_attr_t attr;
281 //size_t stacksize = 4*1024*1024;
282
283 pthread_attr_init(&attr);
284 //pthread_attr_setstacksize(&attr, stacksize);
285 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
286
287 struct fd_n_offset nextblock(fd, (offset + bufsize));
288 if (! pthread_create(&thread, &attr, XrdFfsWcache_updateReadCache, &nextblock))
289 pthread_detach(thread);
290 pthread_attr_destroy(&attr);
291 }
292*/
293 return rc;
294}
295
296ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
297{
298 ssize_t rc;
299 char *bufptr;
300 fd -= XrdFfsPosix_baseFD;
301 if (fd < 0)
302 {
303 errno = EBADF;
304 return -1;
305 }
306
307/* do not use caching under these cases */
308 if (len > (size_t)(XrdFfsWcacheBufsize/2) || fd >= XrdFfsWcacheNFILES)
309 {
311 return rc;
312 }
313
314 pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
315 rc = XrdFfsWcacheFbufs[fd].len;
316/*
317 in the following two cases, a XrdFfsWcache_flush is required:
318 1. current offset isnn't pointing to the tail of data in buffer
319 2. adding new data will exceed the current buffer
320*/
321 if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
324
325 errno = 0;
326 if (rc < 0)
327 {
328 errno = ENOSPC;
329 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
330 return -1;
331 }
332
333 bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
334 memcpy(bufptr, buf, len);
335 if (XrdFfsWcacheFbufs[fd].len == 0)
338
339 pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
340 return (ssize_t)len;
341}
342
343#ifdef __cplusplus
344 }
345#endif
ssize_t XrdFfsPosix_pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
ssize_t XrdFfsPosix_pread(int fildes, void *buf, size_t nbyte, off_t offset)
void XrdFfsWcache_init(int basefd, int maxfd)
void XrdFfsWcache_destroy(int fd)
int XrdFfsWcacheNFILES
ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsWcacheBufsize
#define O_DIRECT
pthread_mutex_t * mlock
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsRcacheBufsize
int XrdFfsPosix_baseFD
ssize_t XrdFfsWcache_flush(int fd)
struct XrdFfsWcacheFilebuf * XrdFfsWcacheFbufs
int XrdFfsWcache_create(int fd, int flags)