source: SRUClient/trunk/src/main/java/eu/clarin/sru/client/SRUThreadedClient.java @ 2168

Last change on this file since 2168 was 2168, checked in by oschonef, 12 years ago
  • update copyright
  • add missing copyright statements
  • Property svn:eol-style set to native
File size: 9.4 KB
Line 
1/**
2 * This software is copyright (c) 2011-2012 by
3 *  - Institut fuer Deutsche Sprache (http://www.ids-mannheim.de)
4 * This is free software. You can redistribute it
5 * and/or modify it under the terms described in
6 * the GNU General Public License v3 of which you
7 * should have received a copy. Otherwise you can download
8 * it from
9 *
10 *   http://www.gnu.org/licenses/gpl-3.0.txt
11 *
12 * @copyright Institut fuer Deutsche Sprache (http://www.ids-mannheim.de)
13 *
14 * @license http://www.gnu.org/licenses/gpl-3.0.txt
15 *  GNU General Public License v3
16 */
17package eu.clarin.sru.client;
18
19import java.util.concurrent.Callable;
20import java.util.concurrent.ConcurrentHashMap;
21import java.util.concurrent.ConcurrentMap;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24import java.util.concurrent.Future;
25import java.util.concurrent.ThreadFactory;
26
27import org.slf4j.Logger;
28import org.slf4j.LoggerFactory;
29
30
31/**
32 * A client to perform SRU operations in parallel. The response of a SRU request
33 * is wrapped in a SRU response.
34 * <p>
35 * This client is reusable and thread-safe: the application may reuse a
36 * client object and may shared it between multiple threads. <br />
37 * NB: The registered {@link SRURecordDataParser} need to be thread-safe
38 * </p>
39 */
40public class SRUThreadedClient {
41    private static final Logger logger =
42            LoggerFactory.getLogger(SRUThreadedClient.class);
43    private ConcurrentMap<String, SRURecordDataParser> parsers =
44            new ConcurrentHashMap<String, SRURecordDataParser>();
45    private final ThreadLocal<SRUClient> client;
46    private final ExecutorService executor;
47
48
49    /**
50     * Constructor. This constructor will create a <em>strict</em> client and
51     * use the default SRU version.
52     *
53     * @see #SRUThreadedClient(SRUVersion, boolean)
54     * @see SRUSimpleClient#DEFAULT_SRU_VERSION
55     */
56    public SRUThreadedClient() {
57        this(SRUSimpleClient.DEFAULT_SRU_VERSION, true);
58    }
59
60
61    /**
62     * Constructor. This constructor will create a <em>strict</em> client.
63     *
64     * @param defaultVersion
65     *            the default version to use for SRU requests; may be overridden
66     *            by individual requests
67     * @see #SRUThreadedClient(SRUVersion, boolean)
68     */
69    public SRUThreadedClient(SRUVersion defaultVersion) {
70        this(defaultVersion, true);
71    }
72
73
74    /**
75     * Constructor.
76     *
77     * @param defaultVersion
78     *            the default version to use for SRU requests; may be overridden
79     *            by individual requests
80     * @param strictMode
81     *            if <code>true</code> the client will strictly adhere to the
82     *            SRU standard and raise fatal errors on violations, if
83     *            <code>false</code> it will act more forgiving and ignore
84     *            certain violations
85     */
86    public SRUThreadedClient(final SRUVersion defaultVersion,
87            final boolean strictMode) {
88        client = new ThreadLocal<SRUClient>() {
89            @Override
90            protected SRUClient initialValue() {
91                logger.debug("instantiated new sru client");
92                return new SRUClient(defaultVersion, strictMode, parsers);
93            }
94        };
95        // TODO: make worker count configurable
96        int workerCount = Runtime.getRuntime().availableProcessors() * 2;
97        logger.debug("using {} workers", workerCount);
98        executor = Executors.newFixedThreadPool(workerCount, new Factory());
99    }
100
101
102    /**
103     * Register a record data parser.
104     *
105     * @param parser
106     *            a parser instance
107     * @throws SRUClientException
108     *             if a parser handing the same record schema is already
109     *             registered
110     * @throws NullPointerException
111     *             if any required argument is <code>null</code>
112     * @throws IllegalArgumentException
113     *             if the supplied parser is invalid
114     */
115    public void registerRecordParser(SRURecordDataParser parser)
116            throws SRUClientException {
117        if (parser == null) {
118            throw new NullPointerException("parser == null");
119        }
120        final String recordSchema = parser.getRecordSchema();
121        if (recordSchema == null) {
122            throw new NullPointerException("parser.getRecordSchema() == null");
123        }
124        if (recordSchema.isEmpty()) {
125            throw new IllegalArgumentException(
126                    "parser.getRecordSchema() returns empty string");
127        }
128
129        if (parsers.putIfAbsent(recordSchema, parser) != null) {
130            throw new SRUClientException(
131                    "record data parser already registered: " + recordSchema);
132
133        }
134    }
135
136
137    /**
138     * Perform a <em>explain</em> operation.
139     *
140     * @param request
141     *            an instance of a {@link SRUExplainRequest} object
142     * @return a {@link SRUExplainResponse} object
143     * @throws SRUClientException
144     *             if an unrecoverable error occurred
145     * @throws NullPointerException
146     *             if any required argument is <code>null</code>
147     */
148    public Future<SRUExplainResponse> explain(SRUExplainRequest request)
149            throws SRUClientException {
150        if (request == null) {
151            throw new NullPointerException("request == null");
152        }
153        if (executor.isShutdown()) {
154            throw new SRUClientException("client is shutting down");
155        }
156        return executor.submit(new Request<SRUExplainRequest, SRUExplainResponse>(request) {
157            @Override
158            public SRUExplainResponse doRequest(SRUClient client)
159                    throws SRUClientException {
160                return client.explain(request);
161            }
162        });
163    }
164
165
166    /**
167     * Perform a <em>scan</em> operation.
168     *
169     * @param request
170     *            an instance of a {@link SRUScanRequest} object
171     * @return a {@link SRUScanResponse} object
172     * @throws SRUClientException
173     *             if an unrecoverable error occurred
174     * @throws NullPointerException
175     *             if any required argument is <code>null</code>
176     */
177    public Future<SRUScanResponse> scan(SRUScanRequest request)
178            throws SRUClientException {
179        if (request == null) {
180            throw new NullPointerException("request == null");
181        }
182        if (executor.isShutdown()) {
183            throw new SRUClientException("client is shutting down");
184        }
185        return executor.submit(new Request<SRUScanRequest, SRUScanResponse>(
186                request) {
187            @Override
188            public SRUScanResponse doRequest(SRUClient client)
189                    throws SRUClientException {
190                return client.scan(request);
191            }
192        });
193    }
194
195
196    /**
197     * Perform a <em>searchRetrieve</em> operation.
198     *
199     * @param request
200     *            an instance of a {@link SRUSearchRetrieveRequest} object
201     * @return a {@link SRUSearchRetrieveRequest} object
202     * @throws SRUClientException
203     *             if an unrecoverable error occurred
204     * @throws NullPointerException
205     *             if any required argument is <code>null</code>
206     */
207    public Future<SRUSearchRetrieveResponse> searchRetrieve(
208            SRUSearchRetrieveRequest request) throws SRUClientException {
209        if (request == null) {
210            throw new NullPointerException("request == null");
211        }
212        if (executor.isShutdown()) {
213            throw new SRUClientException("client is shutting down");
214        }
215        return executor.submit(new Request<SRUSearchRetrieveRequest, SRUSearchRetrieveResponse>(request) {
216            @Override
217            protected SRUSearchRetrieveResponse doRequest(SRUClient client)
218                    throws SRUClientException {
219                return client.searchRetrieve(request);
220            }
221        });
222    }
223
224
225    /**
226     * Invokes <code>shutdown</code> when this no longer referenced
227     */
228    @Override
229    protected void finalize() throws Throwable {
230        shutdown();
231    }
232
233
234    /**
235     * Initiates an orderly shutdown in which previously submitted requests are
236     * executed, but no new requests will be accepted.
237     */
238    public void shutdown() {
239        executor.shutdown();
240    }
241
242
243    /**
244     * Terminate the client but drain queued requests.
245     */
246    public void shutdownNow() {
247        executor.shutdownNow();
248    }
249
250
251    private class Factory implements ThreadFactory {
252        @Override
253        public Thread newThread(Runnable r) {
254            return new Thread(r) {
255                @Override
256                public void run() {
257                    try {
258                        logger.debug("launched new worker");
259
260                        // pre-initialize client
261                        client.get();
262
263                        // do work
264                        super.run();
265                    } finally {
266                        // do not leak resources and clean ThreadLocal ...
267                        client.remove();
268                        logger.debug("cleared sru client");
269                    }
270                }
271            };
272        }
273    }
274
275
276    private abstract class Request<V, S> implements Callable<S> {
277        protected final V request;
278
279
280        Request(V request) {
281            this.request = request;
282        }
283
284
285        @Override
286        public final S call() throws Exception {
287            return doRequest(client.get());
288        }
289
290
291        protected abstract S doRequest(SRUClient client)
292                throws SRUClientException;
293    }
294
295} // class SRUThreadedClient
Note: See TracBrowser for help on using the repository browser.