1 | /** |
---|
2 | * This software is copyright (c) 2012-2016 by |
---|
3 | * - Institut fuer Deutsche Sprache (http://www.ids-mannheim.de) |
---|
4 | * This is free software. You can redistribute it |
---|
5 | * and/or modify it under the terms described in |
---|
6 | * the GNU General Public License v3 of which you |
---|
7 | * should have received a copy. Otherwise you can download |
---|
8 | * it from |
---|
9 | * |
---|
10 | * http://www.gnu.org/licenses/gpl-3.0.txt |
---|
11 | * |
---|
12 | * @copyright Institut fuer Deutsche Sprache (http://www.ids-mannheim.de) |
---|
13 | * |
---|
14 | * @license http://www.gnu.org/licenses/gpl-3.0.txt |
---|
15 | * GNU General Public License v3 |
---|
16 | */ |
---|
17 | package eu.clarin.sru.client; |
---|
18 | |
---|
19 | import java.util.concurrent.Callable; |
---|
20 | import java.util.concurrent.ExecutorService; |
---|
21 | import java.util.concurrent.Executors; |
---|
22 | import java.util.concurrent.Future; |
---|
23 | import java.util.concurrent.ThreadFactory; |
---|
24 | |
---|
25 | import javax.xml.parsers.DocumentBuilderFactory; |
---|
26 | |
---|
27 | import org.slf4j.Logger; |
---|
28 | import org.slf4j.LoggerFactory; |
---|
29 | |
---|
30 | |
---|
31 | /** |
---|
32 | * A client to perform SRU operations in parallel. The response of a SRU request |
---|
33 | * is wrapped in a SRU response. |
---|
34 | * <p> |
---|
35 | * This client is reusable and thread-safe: the application may reuse a |
---|
36 | * client object and may shared it between multiple threads. |
---|
37 | * </p> |
---|
38 | * <p> |
---|
39 | * NB: The registered {@link SRURecordDataParser} need to be thread-safe |
---|
40 | * </p> |
---|
41 | */ |
---|
42 | public class SRUThreadedClient { |
---|
43 | private static final Logger logger = |
---|
44 | LoggerFactory.getLogger(SRUThreadedClient.class); |
---|
45 | private final DocumentBuilderFactory documentBuilderFactory = |
---|
46 | DocumentBuilderFactory.newInstance(); |
---|
47 | private final ThreadLocal<SRUClient> client; |
---|
48 | private final ExecutorService executor; |
---|
49 | |
---|
50 | |
---|
51 | /** |
---|
52 | * Constructor. |
---|
53 | * |
---|
54 | * @param config |
---|
55 | * the configuration to be used for this client. |
---|
56 | * @throws NullPointerException |
---|
57 | * if argument <code>config</code> is <code>null</code> |
---|
58 | * @throws IllegalArgumentException |
---|
59 | * if an error occurred while registering record data parsers |
---|
60 | * @see SRUClientConfig |
---|
61 | */ |
---|
62 | public SRUThreadedClient(final SRUClientConfig config) { |
---|
63 | client = new ThreadLocal<SRUClient>() { |
---|
64 | @Override |
---|
65 | protected SRUClient initialValue() { |
---|
66 | logger.debug("instantiated new sru client"); |
---|
67 | return new SRUClient(config, documentBuilderFactory); |
---|
68 | } |
---|
69 | }; |
---|
70 | |
---|
71 | // launch workers ... |
---|
72 | final int threadCount = config.getThreadCount(); |
---|
73 | logger.debug("using {} workers", threadCount); |
---|
74 | executor = Executors.newFixedThreadPool(threadCount, new Factory()); |
---|
75 | } |
---|
76 | |
---|
77 | |
---|
78 | /** |
---|
79 | * Perform a <em>explain</em> operation. |
---|
80 | * |
---|
81 | * @param request |
---|
82 | * an instance of a {@link SRUExplainRequest} object |
---|
83 | * @return a {@link Future} object that wraps a {@link SRUExplainResponse} |
---|
84 | * object |
---|
85 | * @throws SRUClientException |
---|
86 | * if an unrecoverable error occurred |
---|
87 | * @throws NullPointerException |
---|
88 | * if any required argument is <code>null</code> |
---|
89 | * @deprecated Use asynchronous callback interface |
---|
90 | * {@link #explain(SRUExplainRequest, SRUCallback)}. This method |
---|
91 | * will be removed in the future. |
---|
92 | */ |
---|
93 | @Deprecated |
---|
94 | public Future<SRUExplainResponse> explain(SRUExplainRequest request) |
---|
95 | throws SRUClientException { |
---|
96 | if (request == null) { |
---|
97 | throw new NullPointerException("request == null"); |
---|
98 | } |
---|
99 | if (executor.isShutdown()) { |
---|
100 | throw new SRUClientException("client is shutting down"); |
---|
101 | } |
---|
102 | return executor.submit(new Request<SRUExplainRequest, SRUExplainResponse>(request) { |
---|
103 | @Override |
---|
104 | public SRUExplainResponse doRequest(SRUClient client) |
---|
105 | throws SRUClientException { |
---|
106 | return client.explain(request); |
---|
107 | } |
---|
108 | }); |
---|
109 | } |
---|
110 | |
---|
111 | |
---|
112 | /** |
---|
113 | * Perform a <em>explain</em> operation and invoke a user supplied callback |
---|
114 | * after the request has been completed. |
---|
115 | * |
---|
116 | * @param request |
---|
117 | * an instance of a {@link SRUExplainRequest} object |
---|
118 | * @param callback |
---|
119 | * the callback to be invoked |
---|
120 | * @throws SRUClientException |
---|
121 | * if an unrecoverable error occurred |
---|
122 | * @throws NullPointerException |
---|
123 | * if any required argument is <code>null</code> |
---|
124 | * @see SRUCallback |
---|
125 | */ |
---|
126 | public void explain(final SRUExplainRequest request, |
---|
127 | final SRUCallback<SRUExplainRequest, SRUExplainResponse> callback) |
---|
128 | throws SRUClientException { |
---|
129 | if (request == null) { |
---|
130 | throw new NullPointerException("request == null"); |
---|
131 | } |
---|
132 | if (callback == null) { |
---|
133 | throw new NullPointerException("callback == null"); |
---|
134 | } |
---|
135 | if (executor.isShutdown()) { |
---|
136 | throw new SRUClientException("client is shutting down"); |
---|
137 | } |
---|
138 | executor.submit(new AsyncRequest<SRUExplainRequest, SRUExplainResponse>( |
---|
139 | request, callback) { |
---|
140 | @Override |
---|
141 | protected SRUExplainResponse doRequest(SRUClient client) |
---|
142 | throws SRUClientException { |
---|
143 | return client.explain(request); |
---|
144 | } |
---|
145 | }); |
---|
146 | } |
---|
147 | |
---|
148 | |
---|
149 | /** |
---|
150 | * Perform a <em>scan</em> operation. |
---|
151 | * |
---|
152 | * @param request |
---|
153 | * an instance of a {@link SRUScanRequest} object |
---|
154 | * @return a {@link Future} object that wraps a {@link SRUScanResponse} |
---|
155 | * object |
---|
156 | * @throws SRUClientException |
---|
157 | * if an unrecoverable error occurred |
---|
158 | * @throws NullPointerException |
---|
159 | * if any required argument is <code>null</code> |
---|
160 | * @deprecated Use asynchronous callback interface |
---|
161 | * {@link #scan(SRUScanRequest, SRUCallback)}. This method |
---|
162 | * will be removed in the future. |
---|
163 | */ |
---|
164 | @Deprecated |
---|
165 | public Future<SRUScanResponse> scan(SRUScanRequest request) |
---|
166 | throws SRUClientException { |
---|
167 | if (request == null) { |
---|
168 | throw new NullPointerException("request == null"); |
---|
169 | } |
---|
170 | if (executor.isShutdown()) { |
---|
171 | throw new SRUClientException("client is shutting down"); |
---|
172 | } |
---|
173 | return executor.submit(new Request<SRUScanRequest, SRUScanResponse>( |
---|
174 | request) { |
---|
175 | @Override |
---|
176 | public SRUScanResponse doRequest(SRUClient client) |
---|
177 | throws SRUClientException { |
---|
178 | return client.scan(request); |
---|
179 | } |
---|
180 | }); |
---|
181 | } |
---|
182 | |
---|
183 | |
---|
184 | /** |
---|
185 | * Perform a <em>scan</em> operation and invoke a user supplied callback |
---|
186 | * after the request has been completed. |
---|
187 | * |
---|
188 | * @param request |
---|
189 | * an instance of a {@link SRUScanRequest} object |
---|
190 | * @param callback |
---|
191 | * the callback to be invoked |
---|
192 | * @throws SRUClientException |
---|
193 | * if an unrecoverable error occurred |
---|
194 | * @throws NullPointerException |
---|
195 | * if any required argument is <code>null</code> |
---|
196 | * @see SRUCallback |
---|
197 | */ |
---|
198 | public void scan(final SRUScanRequest request, |
---|
199 | final SRUCallback<SRUScanRequest, SRUScanResponse> callback) |
---|
200 | throws SRUClientException { |
---|
201 | if (request == null) { |
---|
202 | throw new NullPointerException("request == null"); |
---|
203 | } |
---|
204 | if (callback == null) { |
---|
205 | throw new NullPointerException("callback == null"); |
---|
206 | } |
---|
207 | if (executor.isShutdown()) { |
---|
208 | throw new SRUClientException("client is shutting down"); |
---|
209 | } |
---|
210 | executor.submit(new AsyncRequest<SRUScanRequest, SRUScanResponse>( |
---|
211 | request, callback) { |
---|
212 | @Override |
---|
213 | protected SRUScanResponse doRequest(SRUClient client) |
---|
214 | throws SRUClientException { |
---|
215 | return client.scan(request); |
---|
216 | } |
---|
217 | }); |
---|
218 | } |
---|
219 | |
---|
220 | |
---|
221 | /** |
---|
222 | * Perform a <em>searchRetrieve</em> operation. |
---|
223 | * |
---|
224 | * @param request |
---|
225 | * an instance of a {@link SRUSearchRetrieveRequest} object |
---|
226 | * @return a {@link Future} object that wraps a {@link SRUExplainResponse} |
---|
227 | * object |
---|
228 | * @throws SRUClientException |
---|
229 | * if an unrecoverable error occurred |
---|
230 | * @throws NullPointerException |
---|
231 | * if any required argument is <code>null</code> |
---|
232 | * @deprecated Use asynchronous callback interface |
---|
233 | * {@link #searchRetrieve(SRUSearchRetrieveRequest, SRUCallback)} |
---|
234 | * . This method will be removed in the future. |
---|
235 | */ |
---|
236 | @Deprecated |
---|
237 | public Future<SRUSearchRetrieveResponse> searchRetrieve( |
---|
238 | SRUSearchRetrieveRequest request) throws SRUClientException { |
---|
239 | if (request == null) { |
---|
240 | throw new NullPointerException("request == null"); |
---|
241 | } |
---|
242 | if (executor.isShutdown()) { |
---|
243 | throw new SRUClientException("client is shutting down"); |
---|
244 | } |
---|
245 | return executor.submit(new Request<SRUSearchRetrieveRequest, |
---|
246 | SRUSearchRetrieveResponse>(request) { |
---|
247 | @Override |
---|
248 | protected SRUSearchRetrieveResponse doRequest(SRUClient client) |
---|
249 | throws SRUClientException { |
---|
250 | return client.searchRetrieve(request); |
---|
251 | } |
---|
252 | }); |
---|
253 | } |
---|
254 | |
---|
255 | |
---|
256 | /** |
---|
257 | * Perform a <em>searchRetrieve</em> operation and invoke a user supplied |
---|
258 | * callback after the request has been completed. |
---|
259 | * |
---|
260 | * @param request |
---|
261 | * an instance of a {@link SRUSearchRetrieveRequest} object |
---|
262 | * @param callback |
---|
263 | * the callback to be invoked |
---|
264 | * @throws SRUClientException |
---|
265 | * if an unrecoverable error occurred |
---|
266 | * @throws NullPointerException |
---|
267 | * if any required argument is <code>null</code> |
---|
268 | * @see SRUCallback |
---|
269 | */ |
---|
270 | public void searchRetrieve(final SRUSearchRetrieveRequest request, |
---|
271 | final SRUCallback<SRUSearchRetrieveRequest, SRUSearchRetrieveResponse> callback) |
---|
272 | throws SRUClientException { |
---|
273 | if (request == null) { |
---|
274 | throw new NullPointerException("request == null"); |
---|
275 | } |
---|
276 | if (callback == null) { |
---|
277 | throw new NullPointerException("callback == null"); |
---|
278 | } |
---|
279 | if (executor.isShutdown()) { |
---|
280 | throw new SRUClientException("client is shutting down"); |
---|
281 | } |
---|
282 | executor.submit(new AsyncRequest<SRUSearchRetrieveRequest, |
---|
283 | SRUSearchRetrieveResponse>(request, callback) { |
---|
284 | @Override |
---|
285 | protected SRUSearchRetrieveResponse doRequest(SRUClient client) |
---|
286 | throws SRUClientException { |
---|
287 | return client.searchRetrieve(request); |
---|
288 | } |
---|
289 | }); |
---|
290 | } |
---|
291 | |
---|
292 | |
---|
293 | /** |
---|
294 | * Invokes <code>shutdown</code> when this no longer referenced |
---|
295 | */ |
---|
296 | @Override |
---|
297 | protected void finalize() throws Throwable { |
---|
298 | shutdown(); |
---|
299 | } |
---|
300 | |
---|
301 | |
---|
302 | /** |
---|
303 | * Initiates an orderly shutdown in which previously submitted requests are |
---|
304 | * executed, but no new requests will be accepted. |
---|
305 | */ |
---|
306 | public void shutdown() { |
---|
307 | executor.shutdown(); |
---|
308 | } |
---|
309 | |
---|
310 | |
---|
311 | /** |
---|
312 | * Terminate the client but drain queued requests. |
---|
313 | */ |
---|
314 | public void shutdownNow() { |
---|
315 | executor.shutdownNow(); |
---|
316 | } |
---|
317 | |
---|
318 | |
---|
319 | private class Factory implements ThreadFactory { |
---|
320 | @Override |
---|
321 | public Thread newThread(Runnable r) { |
---|
322 | return new Thread(r) { |
---|
323 | @Override |
---|
324 | public void run() { |
---|
325 | try { |
---|
326 | logger.debug("launched new worker"); |
---|
327 | |
---|
328 | // pre-initialize client |
---|
329 | client.get(); |
---|
330 | |
---|
331 | // do work |
---|
332 | super.run(); |
---|
333 | } finally { |
---|
334 | // do not leak resources and clean ThreadLocal ... |
---|
335 | client.remove(); |
---|
336 | logger.debug("cleared sru client"); |
---|
337 | } |
---|
338 | } |
---|
339 | }; |
---|
340 | } |
---|
341 | } |
---|
342 | |
---|
343 | |
---|
344 | private abstract class Request<V extends SRUAbstractRequest, S extends SRUAbstractResponse<V>> |
---|
345 | implements Callable<S> { |
---|
346 | protected final V request; |
---|
347 | private long now = System.nanoTime(); |
---|
348 | |
---|
349 | |
---|
350 | Request(V request) { |
---|
351 | this.request = request; |
---|
352 | } |
---|
353 | |
---|
354 | |
---|
355 | @Override |
---|
356 | public final S call() throws Exception { |
---|
357 | final SRUClient c = client.get(); |
---|
358 | c.setTimeQueued(System.nanoTime() - now); |
---|
359 | return doRequest(c); |
---|
360 | } |
---|
361 | |
---|
362 | |
---|
363 | protected abstract S doRequest(SRUClient client) |
---|
364 | throws SRUClientException; |
---|
365 | } |
---|
366 | |
---|
367 | |
---|
368 | private abstract class AsyncRequest<V extends SRUAbstractRequest, |
---|
369 | S extends SRUAbstractResponse<V>> |
---|
370 | implements Runnable { |
---|
371 | protected final V request; |
---|
372 | private long now = System.nanoTime(); |
---|
373 | private final SRUCallback<V, S> callback; |
---|
374 | |
---|
375 | |
---|
376 | public AsyncRequest(V request, SRUCallback<V, S> callback) { |
---|
377 | this.callback = callback; |
---|
378 | this.request = request; |
---|
379 | } |
---|
380 | |
---|
381 | |
---|
382 | @Override |
---|
383 | public void run() { |
---|
384 | try { |
---|
385 | try { |
---|
386 | final SRUClient c = client.get(); |
---|
387 | c.setTimeQueued(System.nanoTime() - now); |
---|
388 | final S response = doRequest(c); |
---|
389 | callback.onSuccess(response); |
---|
390 | } catch (SRUClientException e) { |
---|
391 | callback.onError(request, e); |
---|
392 | } |
---|
393 | } catch (Throwable t) { |
---|
394 | callback.onError(request, new SRUClientException( |
---|
395 | "unexpected error while processing the request", t)); |
---|
396 | } |
---|
397 | } |
---|
398 | |
---|
399 | |
---|
400 | protected abstract S doRequest(SRUClient client) |
---|
401 | throws SRUClientException; |
---|
402 | } |
---|
403 | |
---|
404 | } // class SRUThreadedClient |
---|