source: SRUClient/trunk/src/main/java/eu/clarin/sru/client/SRUThreadedClient.java @ 6938

Last change on this file since 6938 was 6938, checked in by Oliver Schonefeld, 8 years ago
  • fix copyright
  • fix some code formatting
  • fix JavaDoc? warnings
  • Property svn:eol-style set to native
File size: 13.4 KB
Line 
1/**
2 * This software is copyright (c) 2012-2016 by
3 *  - Institut fuer Deutsche Sprache (http://www.ids-mannheim.de)
4 * This is free software. You can redistribute it
5 * and/or modify it under the terms described in
6 * the GNU General Public License v3 of which you
7 * should have received a copy. Otherwise you can download
8 * it from
9 *
10 *   http://www.gnu.org/licenses/gpl-3.0.txt
11 *
12 * @copyright Institut fuer Deutsche Sprache (http://www.ids-mannheim.de)
13 *
14 * @license http://www.gnu.org/licenses/gpl-3.0.txt
15 *  GNU General Public License v3
16 */
17package eu.clarin.sru.client;
18
19import java.util.concurrent.Callable;
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.Executors;
22import java.util.concurrent.Future;
23import java.util.concurrent.ThreadFactory;
24
25import javax.xml.parsers.DocumentBuilderFactory;
26
27import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30
31/**
32 * A client to perform SRU operations in parallel. The response of a SRU request
33 * is wrapped in a SRU response.
34 * <p>
35 * This client is reusable and thread-safe: the application may reuse a
36 * client object and may shared it between multiple threads.
37 * </p>
38 * <p>
39 * NB: The registered {@link SRURecordDataParser} need to be thread-safe
40 * </p>
41 */
42public class SRUThreadedClient {
43    private static final Logger logger =
44            LoggerFactory.getLogger(SRUThreadedClient.class);
45    private final DocumentBuilderFactory documentBuilderFactory =
46            DocumentBuilderFactory.newInstance();
47    private final ThreadLocal<SRUClient> client;
48    private final ExecutorService executor;
49
50
51    /**
52     * Constructor.
53     *
54     * @param config
55     *            the configuration to be used for this client.
56     * @throws NullPointerException
57     *             if argument <code>config</code> is <code>null</code>
58     * @throws IllegalArgumentException
59     *             if an error occurred while registering record data parsers
60     * @see SRUClientConfig
61     */
62   public SRUThreadedClient(final SRUClientConfig config) {
63        client = new ThreadLocal<SRUClient>() {
64            @Override
65            protected SRUClient initialValue() {
66                logger.debug("instantiated new sru client");
67                return new SRUClient(config, documentBuilderFactory);
68            }
69        };
70
71        // launch workers ...
72        final int threadCount = config.getThreadCount();
73        logger.debug("using {} workers", threadCount);
74        executor = Executors.newFixedThreadPool(threadCount, new Factory());
75    }
76
77
78    /**
79     * Perform a <em>explain</em> operation.
80     *
81     * @param request
82     *            an instance of a {@link SRUExplainRequest} object
83     * @return a {@link Future} object that wraps a {@link SRUExplainResponse}
84     *         object
85     * @throws SRUClientException
86     *             if an unrecoverable error occurred
87     * @throws NullPointerException
88     *             if any required argument is <code>null</code>
89     * @deprecated Use asynchronous callback interface
90     *             {@link #explain(SRUExplainRequest, SRUCallback)}. This method
91     *             will be removed in the future.
92     */
93    @Deprecated
94    public Future<SRUExplainResponse> explain(SRUExplainRequest request)
95            throws SRUClientException {
96        if (request == null) {
97            throw new NullPointerException("request == null");
98        }
99        if (executor.isShutdown()) {
100            throw new SRUClientException("client is shutting down");
101        }
102        return executor.submit(new Request<SRUExplainRequest, SRUExplainResponse>(request) {
103            @Override
104            public SRUExplainResponse doRequest(SRUClient client)
105                    throws SRUClientException {
106                return client.explain(request);
107            }
108        });
109    }
110
111
112    /**
113     * Perform a <em>explain</em> operation and invoke a user supplied callback
114     * after the request has been completed.
115     *
116     * @param request
117     *            an instance of a {@link SRUExplainRequest} object
118     * @param callback
119     *            the callback to be invoked
120     * @throws SRUClientException
121     *             if an unrecoverable error occurred
122     * @throws NullPointerException
123     *             if any required argument is <code>null</code>
124     * @see SRUCallback
125     */
126    public void explain(final SRUExplainRequest request,
127            final SRUCallback<SRUExplainRequest, SRUExplainResponse> callback)
128            throws SRUClientException {
129        if (request == null) {
130            throw new NullPointerException("request == null");
131        }
132        if (callback == null) {
133            throw new NullPointerException("callback == null");
134        }
135        if (executor.isShutdown()) {
136            throw new SRUClientException("client is shutting down");
137        }
138        executor.submit(new AsyncRequest<SRUExplainRequest, SRUExplainResponse>(
139                request, callback) {
140            @Override
141            protected SRUExplainResponse doRequest(SRUClient client)
142                    throws SRUClientException {
143                return client.explain(request);
144            }
145        });
146    }
147
148
149    /**
150     * Perform a <em>scan</em> operation.
151     *
152     * @param request
153     *            an instance of a {@link SRUScanRequest} object
154     * @return a {@link Future} object that wraps a {@link SRUScanResponse}
155     *         object
156     * @throws SRUClientException
157     *             if an unrecoverable error occurred
158     * @throws NullPointerException
159     *             if any required argument is <code>null</code>
160     * @deprecated Use asynchronous callback interface
161     *             {@link #scan(SRUScanRequest, SRUCallback)}. This method
162     *             will be removed in the future.
163     */
164    @Deprecated
165    public Future<SRUScanResponse> scan(SRUScanRequest request)
166            throws SRUClientException {
167        if (request == null) {
168            throw new NullPointerException("request == null");
169        }
170        if (executor.isShutdown()) {
171            throw new SRUClientException("client is shutting down");
172        }
173        return executor.submit(new Request<SRUScanRequest, SRUScanResponse>(
174                request) {
175            @Override
176            public SRUScanResponse doRequest(SRUClient client)
177                    throws SRUClientException {
178                return client.scan(request);
179            }
180        });
181    }
182
183
184    /**
185     * Perform a <em>scan</em> operation and invoke a user supplied callback
186     * after the request has been completed.
187     *
188     * @param request
189     *            an instance of a {@link SRUScanRequest} object
190     * @param callback
191     *            the callback to be invoked
192     * @throws SRUClientException
193     *             if an unrecoverable error occurred
194     * @throws NullPointerException
195     *             if any required argument is <code>null</code>
196     * @see SRUCallback
197     */
198    public void scan(final SRUScanRequest request,
199            final SRUCallback<SRUScanRequest, SRUScanResponse> callback)
200            throws SRUClientException {
201        if (request == null) {
202            throw new NullPointerException("request == null");
203        }
204        if (callback == null) {
205            throw new NullPointerException("callback == null");
206        }
207        if (executor.isShutdown()) {
208            throw new SRUClientException("client is shutting down");
209        }
210        executor.submit(new AsyncRequest<SRUScanRequest, SRUScanResponse>(
211                request, callback) {
212            @Override
213            protected SRUScanResponse doRequest(SRUClient client)
214                    throws SRUClientException {
215                return client.scan(request);
216            }
217        });
218    }
219
220
221    /**
222     * Perform a <em>searchRetrieve</em> operation.
223     *
224     * @param request
225     *            an instance of a {@link SRUSearchRetrieveRequest} object
226     * @return a {@link Future} object that wraps a {@link SRUExplainResponse}
227     *         object
228     * @throws SRUClientException
229     *             if an unrecoverable error occurred
230     * @throws NullPointerException
231     *             if any required argument is <code>null</code>
232     * @deprecated Use asynchronous callback interface
233     *             {@link #searchRetrieve(SRUSearchRetrieveRequest, SRUCallback)}
234     *             . This method will be removed in the future.
235     */
236    @Deprecated
237    public Future<SRUSearchRetrieveResponse> searchRetrieve(
238            SRUSearchRetrieveRequest request) throws SRUClientException {
239        if (request == null) {
240            throw new NullPointerException("request == null");
241        }
242        if (executor.isShutdown()) {
243            throw new SRUClientException("client is shutting down");
244        }
245        return executor.submit(new Request<SRUSearchRetrieveRequest,
246                SRUSearchRetrieveResponse>(request) {
247            @Override
248            protected SRUSearchRetrieveResponse doRequest(SRUClient client)
249                    throws SRUClientException {
250                return client.searchRetrieve(request);
251            }
252        });
253    }
254
255
256    /**
257     * Perform a <em>searchRetrieve</em> operation and invoke a user supplied
258     * callback after the request has been completed.
259     *
260     * @param request
261     *            an instance of a {@link SRUSearchRetrieveRequest} object
262     * @param callback
263     *            the callback to be invoked
264     * @throws SRUClientException
265     *             if an unrecoverable error occurred
266     * @throws NullPointerException
267     *             if any required argument is <code>null</code>
268     * @see SRUCallback
269     */
270    public void searchRetrieve(final SRUSearchRetrieveRequest request,
271            final SRUCallback<SRUSearchRetrieveRequest, SRUSearchRetrieveResponse> callback)
272            throws SRUClientException {
273        if (request == null) {
274            throw new NullPointerException("request == null");
275        }
276        if (callback == null) {
277            throw new NullPointerException("callback == null");
278        }
279        if (executor.isShutdown()) {
280            throw new SRUClientException("client is shutting down");
281        }
282        executor.submit(new AsyncRequest<SRUSearchRetrieveRequest,
283                SRUSearchRetrieveResponse>(request, callback) {
284            @Override
285            protected SRUSearchRetrieveResponse doRequest(SRUClient client)
286                    throws SRUClientException {
287                return client.searchRetrieve(request);
288            }
289        });
290    }
291
292
293    /**
294     * Invokes <code>shutdown</code> when this no longer referenced
295     */
296    @Override
297    protected void finalize() throws Throwable {
298        shutdown();
299    }
300
301
302    /**
303     * Initiates an orderly shutdown in which previously submitted requests are
304     * executed, but no new requests will be accepted.
305     */
306    public void shutdown() {
307        executor.shutdown();
308    }
309
310
311    /**
312     * Terminate the client but drain queued requests.
313     */
314    public void shutdownNow() {
315        executor.shutdownNow();
316    }
317
318
319    private class Factory implements ThreadFactory {
320        @Override
321        public Thread newThread(Runnable r) {
322            return new Thread(r) {
323                @Override
324                public void run() {
325                    try {
326                        logger.debug("launched new worker");
327
328                        // pre-initialize client
329                        client.get();
330
331                        // do work
332                        super.run();
333                    } finally {
334                        // do not leak resources and clean ThreadLocal ...
335                        client.remove();
336                        logger.debug("cleared sru client");
337                    }
338                }
339            };
340        }
341    }
342
343
344    private abstract class Request<V extends SRUAbstractRequest, S extends SRUAbstractResponse<V>>
345            implements Callable<S> {
346        protected final V request;
347        private long now = System.nanoTime();
348
349
350        Request(V request) {
351            this.request = request;
352        }
353
354
355        @Override
356        public final S call() throws Exception {
357            final SRUClient c = client.get();
358            c.setTimeQueued(System.nanoTime() - now);
359            return doRequest(c);
360        }
361
362
363        protected abstract S doRequest(SRUClient client)
364                throws SRUClientException;
365    }
366
367
368    private abstract class AsyncRequest<V extends SRUAbstractRequest,
369                                        S extends SRUAbstractResponse<V>>
370            implements Runnable {
371        protected final V request;
372        private long now = System.nanoTime();
373        private final SRUCallback<V, S> callback;
374
375
376        public AsyncRequest(V request, SRUCallback<V, S> callback) {
377            this.callback = callback;
378            this.request = request;
379        }
380
381
382        @Override
383        public void run() {
384            try {
385                try {
386                    final SRUClient c = client.get();
387                    c.setTimeQueued(System.nanoTime() - now);
388                    final S response = doRequest(c);
389                    callback.onSuccess(response);
390                } catch (SRUClientException e) {
391                    callback.onError(request, e);
392                }
393            } catch (Throwable t) {
394                callback.onError(request, new SRUClientException(
395                        "unexpected error while processing the request", t));
396            }
397        }
398
399
400        protected abstract S doRequest(SRUClient client)
401                throws SRUClientException;
402    }
403
404} // class SRUThreadedClient
Note: See TracBrowser for help on using the repository browser.