source: SRUClient/trunk/src/main/java/eu/clarin/sru/client/SRUThreadedClient.java @ 2166

Last change on this file since 2166 was 2166, checked in by oschonef, 12 years ago
  • add SRUClient (client that wraps responses in Java objects)
  • add SRUThreadedClient (client that executes requests in parallel and wraps responses in Java objects)
  • add additional classed for wrapping responses in Java objects

NOTE: The clients do neither handle extraResponseData, extraRecordData nor extraTermData, yet!

The API for the extra*Data is subject to change.

OT: Everybody leaves, so why, why wouldn't you?

  • Property svn:eol-style set to native
File size: 8.9 KB
Line 
1package eu.clarin.sru.client;
2
3import java.util.concurrent.Callable;
4import java.util.concurrent.ConcurrentHashMap;
5import java.util.concurrent.ConcurrentMap;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.Future;
9import java.util.concurrent.ThreadFactory;
10
11import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
14
15/**
16 * A client to perform SRU operations in parallel. The response of a SRU request
17 * is wrapped in a SRU response.
18 * <p>
19 * This client is reusable and thread-safe: the application may reuse a
20 * client object and may shared it between multiple threads. <br />
21 * NB: The registered {@link SRURecordDataParser} need to be thread-safe
22 * </p>
23 */
24public class SRUThreadedClient {
25    private static final Logger logger =
26            LoggerFactory.getLogger(SRUThreadedClient.class);
27    private ConcurrentMap<String, SRURecordDataParser> parsers =
28            new ConcurrentHashMap<String, SRURecordDataParser>();
29    private final ThreadLocal<SRUClient> client;
30    private final ExecutorService executor;
31
32
33    /**
34     * Constructor. This constructor will create a <em>strict</em> client and
35     * use the default SRU version.
36     *
37     * @see #SRUThreadedClient(SRUVersion, boolean)
38     * @see SRUSimpleClient#DEFAULT_SRU_VERSION
39     */
40    public SRUThreadedClient() {
41        this(SRUSimpleClient.DEFAULT_SRU_VERSION, true);
42    }
43
44
45    /**
46     * Constructor. This constructor will create a <em>strict</em> client.
47     *
48     * @param defaultVersion
49     *            the default version to use for SRU requests; may be overridden
50     *            by individual requests
51     * @see #SRUThreadedClient(SRUVersion, boolean)
52     */
53    public SRUThreadedClient(SRUVersion defaultVersion) {
54        this(defaultVersion, true);
55    }
56
57
58    /**
59     * Constructor.
60     *
61     * @param defaultVersion
62     *            the default version to use for SRU requests; may be overridden
63     *            by individual requests
64     * @param strictMode
65     *            if <code>true</code> the client will strictly adhere to the
66     *            SRU standard and raise fatal errors on violations, if
67     *            <code>false</code> it will act more forgiving and ignore
68     *            certain violations
69     */
70    public SRUThreadedClient(final SRUVersion defaultVersion,
71            final boolean strictMode) {
72        client = new ThreadLocal<SRUClient>() {
73            @Override
74            protected SRUClient initialValue() {
75                logger.debug("instantiated new sru client");
76                return new SRUClient(defaultVersion, strictMode, parsers);
77            }
78        };
79        // TODO: make worker count configurable
80        int workerCount = Runtime.getRuntime().availableProcessors() * 2;
81        logger.debug("using {} workers", workerCount);
82        executor = Executors.newFixedThreadPool(workerCount, new Factory());
83    }
84
85
86    /**
87     * Register a record data parser.
88     *
89     * @param parser
90     *            a parser instance
91     * @throws SRUClientException
92     *             if a parser handing the same record schema is already
93     *             registered
94     * @throws NullPointerException
95     *             if any required argument is <code>null</code>
96     * @throws IllegalArgumentException
97     *             if the supplied parser is invalid
98     */
99    public void registerRecordParser(SRURecordDataParser parser)
100            throws SRUClientException {
101        if (parser == null) {
102            throw new NullPointerException("parser == null");
103        }
104        final String recordSchema = parser.getRecordSchema();
105        if (recordSchema == null) {
106            throw new NullPointerException("parser.getRecordSchema() == null");
107        }
108        if (recordSchema.isEmpty()) {
109            throw new IllegalArgumentException(
110                    "parser.getRecordSchema() returns empty string");
111        }
112
113        if (parsers.putIfAbsent(recordSchema, parser) != null) {
114            throw new SRUClientException(
115                    "record data parser already registered: " + recordSchema);
116
117        }
118    }
119
120
121    /**
122     * Perform a <em>explain</em> operation.
123     *
124     * @param request
125     *            an instance of a {@link SRUExplainRequest} object
126     * @return a {@link SRUExplainResponse} object
127     * @throws SRUClientException
128     *             if an unrecoverable error occurred
129     * @throws NullPointerException
130     *             if any required argument is <code>null</code>
131     */
132    public Future<SRUExplainResponse> explain(SRUExplainRequest request)
133            throws SRUClientException {
134        if (request == null) {
135            throw new NullPointerException("request == null");
136        }
137        if (executor.isShutdown()) {
138            throw new SRUClientException("client is shutting down");
139        }
140        return executor.submit(new Request<SRUExplainRequest, SRUExplainResponse>(request) {
141            @Override
142            public SRUExplainResponse doRequest(SRUClient client)
143                    throws SRUClientException {
144                return client.explain(request);
145            }
146        });
147    }
148
149
150    /**
151     * Perform a <em>scan</em> operation.
152     *
153     * @param request
154     *            an instance of a {@link SRUScanRequest} object
155     * @return a {@link SRUScanResponse} object
156     * @throws SRUClientException
157     *             if an unrecoverable error occurred
158     * @throws NullPointerException
159     *             if any required argument is <code>null</code>
160     */
161    public Future<SRUScanResponse> scan(SRUScanRequest request)
162            throws SRUClientException {
163        if (request == null) {
164            throw new NullPointerException("request == null");
165        }
166        if (executor.isShutdown()) {
167            throw new SRUClientException("client is shutting down");
168        }
169        return executor.submit(new Request<SRUScanRequest, SRUScanResponse>(
170                request) {
171            @Override
172            public SRUScanResponse doRequest(SRUClient client)
173                    throws SRUClientException {
174                return client.scan(request);
175            }
176        });
177    }
178
179
180    /**
181     * Perform a <em>searchRetrieve</em> operation.
182     *
183     * @param request
184     *            an instance of a {@link SRUSearchRetrieveRequest} object
185     * @return a {@link SRUSearchRetrieveRequest} object
186     * @throws SRUClientException
187     *             if an unrecoverable error occurred
188     * @throws NullPointerException
189     *             if any required argument is <code>null</code>
190     */
191    public Future<SRUSearchRetrieveResponse> searchRetrieve(
192            SRUSearchRetrieveRequest request) throws SRUClientException {
193        if (request == null) {
194            throw new NullPointerException("request == null");
195        }
196        if (executor.isShutdown()) {
197            throw new SRUClientException("client is shutting down");
198        }
199        return executor.submit(new Request<SRUSearchRetrieveRequest, SRUSearchRetrieveResponse>(request) {
200            @Override
201            protected SRUSearchRetrieveResponse doRequest(SRUClient client)
202                    throws SRUClientException {
203                return client.searchRetrieve(request);
204            }
205        });
206    }
207
208
209    /**
210     * Invokes <code>shutdown</code> when this no longer referenced
211     */
212    @Override
213    protected void finalize() throws Throwable {
214        shutdown();
215    }
216
217
218    /**
219     * Initiates an orderly shutdown in which previously submitted requests are
220     * executed, but no new requests will be accepted.
221     */
222    public void shutdown() {
223        executor.shutdown();
224    }
225
226
227    /**
228     * Terminate the client but drain queued requests.
229     */
230    public void shutdownNow() {
231        executor.shutdownNow();
232    }
233
234
235    private class Factory implements ThreadFactory {
236        @Override
237        public Thread newThread(Runnable r) {
238            return new Thread(r) {
239                @Override
240                public void run() {
241                    try {
242                        logger.debug("launched new worker");
243
244                        // pre-initialize client
245                        client.get();
246
247                        // do work
248                        super.run();
249                    } finally {
250                        // do not leak resources and clean ThreadLocal ...
251                        client.remove();
252                        logger.debug("cleared sru client");
253                    }
254                }
255            };
256        }
257    }
258
259
260    private abstract class Request<V, S> implements Callable<S> {
261        protected final V request;
262
263
264        Request(V request) {
265            this.request = request;
266        }
267
268
269        @Override
270        public final S call() throws Exception {
271            return doRequest(client.get());
272        }
273
274
275        protected abstract S doRequest(SRUClient client)
276                throws SRUClientException;
277    }
278
279} // class SRUThreadedClient
Note: See TracBrowser for help on using the repository browser.