XRootD
Loading...
Searching...
No Matches
XrdThrottleFile.cc
Go to the documentation of this file.
1
2#include "XrdSfs/XrdSfsAio.hh"
5
6#include "XrdThrottle.hh"
7
8using namespace XrdThrottle;
9
10#define DO_LOADSHED if (m_throttle.CheckLoadShed(m_loadshed)) \
11{ \
12 unsigned port; \
13 std::string host; \
14 m_throttle.PerformLoadShed(m_loadshed, host, port); \
15 m_eroute.Emsg("File", "Performing load-shed for client", m_connection_id.c_str()); \
16 error.setErrInfo(port, host.c_str()); \
17 return SFS_REDIRECT; \
18}
19
20#define DO_THROTTLE(amount) \
21DO_LOADSHED \
22m_throttle.Apply(amount, 1, m_uid); \
23bool ok; \
24auto xtimer = m_throttle.StartIOTimer(m_uid, ok); \
25if (!ok) { \
26 error.setErrInfo(EMFILE, "I/O limit exceeded and wait time hit"); \
27 return SFS_ERROR; \
28}
29
30
31File::File(const char *user,
33 XrdThrottleManager &throttle,
34 XrdSysError &eroute)
35 : XrdSfsFile(sfs->error), // Use underlying error object as ours
36#if __cplusplus >= 201103L
37 m_sfs(std::move(sfs)), // Guaranteed to be non-null by FileSystem::newFile
38#else
39 m_sfs(sfs),
40#endif
41 m_uid(0),
42 m_connection_id(user ? user : ""),
43 m_throttle(throttle),
44 m_eroute(eroute)
45{}
46
47File::~File()
48{
49 if (m_is_open) {
50 m_throttle.CloseFile(m_user);
51 }
52}
53
54int
55File::open(const char *fileName,
56 XrdSfsFileOpenMode openMode,
57 mode_t createMode,
58 const XrdSecEntity *client,
59 const char *opaque)
60{
61 std::tie(m_user, m_uid) = m_throttle.GetUserInfo(client);
62 m_throttle.PrepLoadShed(opaque, m_loadshed);
63 std::string open_error_message;
64 if (!m_throttle.OpenFile(m_user, open_error_message)) {
65 error.setErrInfo(EMFILE, open_error_message.c_str());
66 return SFS_ERROR;
67 }
68 auto retval = m_sfs->open(fileName, openMode, createMode, client, opaque);
69 if (retval != SFS_ERROR) {
70 m_is_open = true;
71 } else {
72 m_throttle.CloseFile(m_user);
73 }
74 return retval;
75}
76
77int
79{
80 m_is_open = false;
81 m_throttle.CloseFile(m_user);
82 return m_sfs->close();
83}
84
85int File::checkpoint(cpAct act, struct iov *range, int n)
86{ return m_sfs->checkpoint(act, range, n);}
87
88int
89File::fctl(const int cmd,
90 const char *args,
91 XrdOucErrInfo &out_error)
92{
93 // Disable sendfile
94 if (cmd == SFS_FCTL_GETFD)
95 {
96 error.setErrInfo(ENOTSUP, "Sendfile not supported by throttle plugin.");
97 return SFS_ERROR;
98 }
99 else return m_sfs->fctl(cmd, args, out_error);
100}
101
102const char *
104{
105 return m_sfs->FName();
106}
107
108int
109File::getMmap(void **Addr, off_t &Size)
110{ // We cannot monitor mmap-based reads, so we disable them.
111 error.setErrInfo(ENOTSUP, "Mmap not supported by throttle plugin.");
112 return SFS_ERROR;
113}
114
117 char *buffer,
118 XrdSfsXferSize rdlen,
119 uint32_t *csvec,
120 uint64_t opts)
121{
122 DO_THROTTLE(rdlen)
123 return m_sfs->pgRead(offset, buffer, rdlen, csvec, opts);
124}
125
127File::pgRead(XrdSfsAio *aioparm, uint64_t opts)
128{ // We disable all AIO-based reads.
129 aioparm->Result = this->pgRead((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
130 (char *)aioparm->sfsAio.aio_buf,
132 aioparm->cksVec, opts);
133 aioparm->doneRead();
134 return SFS_OK;
135}
136
139 char *buffer,
140 XrdSfsXferSize rdlen,
141 uint32_t *csvec,
142 uint64_t opts)
143{
144 DO_THROTTLE(rdlen)
145 return m_sfs->pgWrite(offset, buffer, rdlen, csvec, opts);
146}
147
149File::pgWrite(XrdSfsAio *aioparm, uint64_t opts)
150{ // We disable all AIO-based writes.
151 aioparm->Result = this->pgWrite((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
152 (char *)aioparm->sfsAio.aio_buf,
154 aioparm->cksVec, opts);
155 aioparm->doneWrite();
156 return SFS_OK;
157}
158
159int
161 XrdSfsXferSize amount)
162{
163 DO_THROTTLE(amount)
164 return m_sfs->read(fileOffset, amount);
165}
166
169 char *buffer,
170 XrdSfsXferSize buffer_size)
171{
172 DO_THROTTLE(buffer_size);
173 return m_sfs->read(fileOffset, buffer, buffer_size);
174}
175
176int
178{ // We disable all AIO-based reads.
179 aioparm->Result = this->read((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
180 (char *)aioparm->sfsAio.aio_buf,
182 aioparm->doneRead();
183 return SFS_OK;
184}
185
188 const char *buffer,
189 XrdSfsXferSize buffer_size)
190{
191 DO_THROTTLE(buffer_size);
192 return m_sfs->write(fileOffset, buffer, buffer_size);
193}
194
195int
197{
198 aioparm->Result = this->write((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
199 (char *)aioparm->sfsAio.aio_buf,
201 aioparm->doneWrite();
202 return SFS_OK;
203}
204
205int
207{
208 return m_sfs->sync();
209}
210
211int
213{
214 return m_sfs->sync(aiop);
215}
216
217int
218File::stat(struct stat *buf)
219{
220 return m_sfs->stat(buf);
221}
222
223int
225{
226 return m_sfs->truncate(fileOffset);
227}
228
229int
230File::getCXinfo(char cxtype[4], int &cxrsz)
231{
232 return m_sfs->getCXinfo(cxtype, cxrsz);
233}
234
235int
237 XrdSfsFileOffset offset,
238 XrdSfsXferSize size)
239{
240 DO_THROTTLE(size);
241 return m_sfs->SendData(sfDio, offset, size);
242}
243
#define write(a, b, c)
Definition XrdPosix.hh:115
#define read(a, b, c)
Definition XrdPosix.hh:82
struct myOpts opts
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
#define SFS_ERROR
#define SFS_FCTL_GETFD
int XrdSfsFileOpenMode
#define SFS_OK
long long XrdSfsFileOffset
int XrdSfsXferSize
if(Avsz)
#define DO_THROTTLE(amount)
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
XrdOucErrInfo & error
bool CloseFile(const std::string &entity)
virtual int getCXinfo(char cxtype[4], int &cxrsz) override
virtual const char * FName() override
virtual int sync() override
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client, const char *opaque=0) override
virtual int close() override
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0) override
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0) override
virtual int SendData(XrdSfsDio *sfDio, XrdSfsFileOffset offset, XrdSfsXferSize size) override
virtual int checkpoint(cpAct act, struct iov *range=0, int n=0) override
virtual int read(XrdSfsFileOffset fileOffset, XrdSfsXferSize amount) override
virtual int truncate(XrdSfsFileOffset fileOffset) override
virtual int getMmap(void **Addr, off_t &Size) override
virtual int fctl(const int cmd, const char *args, XrdOucErrInfo &out_error) override
virtual XrdSfsXferSize write(XrdSfsFileOffset fileOffset, const char *buffer, XrdSfsXferSize buffer_size) override
virtual int stat(struct stat *buf) override
std::auto_ptr< XrdSfsFile > unique_sfs_ptr