1 | /** |
---|
2 | * This software is copyright (c) 2011-2012 by |
---|
3 | * - Institut fuer Deutsche Sprache (http://www.ids-mannheim.de) |
---|
4 | * This is free software. You can redistribute it |
---|
5 | * and/or modify it under the terms described in |
---|
6 | * the GNU General Public License v3 of which you |
---|
7 | * should have received a copy. Otherwise you can download |
---|
8 | * it from |
---|
9 | * |
---|
10 | * http://www.gnu.org/licenses/gpl-3.0.txt |
---|
11 | * |
---|
12 | * @copyright Institut fuer Deutsche Sprache (http://www.ids-mannheim.de) |
---|
13 | * |
---|
14 | * @license http://www.gnu.org/licenses/gpl-3.0.txt |
---|
15 | * GNU General Public License v3 |
---|
16 | */ |
---|
17 | package eu.clarin.sru.client; |
---|
18 | |
---|
19 | import java.util.concurrent.Callable; |
---|
20 | import java.util.concurrent.ConcurrentHashMap; |
---|
21 | import java.util.concurrent.ConcurrentMap; |
---|
22 | import java.util.concurrent.ExecutorService; |
---|
23 | import java.util.concurrent.Executors; |
---|
24 | import java.util.concurrent.Future; |
---|
25 | import java.util.concurrent.ThreadFactory; |
---|
26 | |
---|
27 | import org.slf4j.Logger; |
---|
28 | import org.slf4j.LoggerFactory; |
---|
29 | |
---|
30 | |
---|
31 | /** |
---|
32 | * A client to perform SRU operations in parallel. The response of a SRU request |
---|
33 | * is wrapped in a SRU response. |
---|
34 | * <p> |
---|
35 | * This client is reusable and thread-safe: the application may reuse a |
---|
36 | * client object and may shared it between multiple threads. <br /> |
---|
37 | * NB: The registered {@link SRURecordDataParser} need to be thread-safe |
---|
38 | * </p> |
---|
39 | */ |
---|
40 | public class SRUThreadedClient { |
---|
41 | private static final Logger logger = |
---|
42 | LoggerFactory.getLogger(SRUThreadedClient.class); |
---|
43 | private ConcurrentMap<String, SRURecordDataParser> parsers = |
---|
44 | new ConcurrentHashMap<String, SRURecordDataParser>(); |
---|
45 | private final ThreadLocal<SRUClient> client; |
---|
46 | private final ExecutorService executor; |
---|
47 | |
---|
48 | |
---|
49 | /** |
---|
50 | * Constructor. This constructor will create a <em>strict</em> client and |
---|
51 | * use the default SRU version. |
---|
52 | * |
---|
53 | * @see #SRUThreadedClient(SRUVersion, boolean) |
---|
54 | * @see SRUSimpleClient#DEFAULT_SRU_VERSION |
---|
55 | */ |
---|
56 | public SRUThreadedClient() { |
---|
57 | this(SRUSimpleClient.DEFAULT_SRU_VERSION, true); |
---|
58 | } |
---|
59 | |
---|
60 | |
---|
61 | /** |
---|
62 | * Constructor. This constructor will create a <em>strict</em> client. |
---|
63 | * |
---|
64 | * @param defaultVersion |
---|
65 | * the default version to use for SRU requests; may be overridden |
---|
66 | * by individual requests |
---|
67 | * @see #SRUThreadedClient(SRUVersion, boolean) |
---|
68 | */ |
---|
69 | public SRUThreadedClient(SRUVersion defaultVersion) { |
---|
70 | this(defaultVersion, true); |
---|
71 | } |
---|
72 | |
---|
73 | |
---|
74 | /** |
---|
75 | * Constructor. |
---|
76 | * |
---|
77 | * @param defaultVersion |
---|
78 | * the default version to use for SRU requests; may be overridden |
---|
79 | * by individual requests |
---|
80 | * @param strictMode |
---|
81 | * if <code>true</code> the client will strictly adhere to the |
---|
82 | * SRU standard and raise fatal errors on violations, if |
---|
83 | * <code>false</code> it will act more forgiving and ignore |
---|
84 | * certain violations |
---|
85 | */ |
---|
86 | public SRUThreadedClient(final SRUVersion defaultVersion, |
---|
87 | final boolean strictMode) { |
---|
88 | client = new ThreadLocal<SRUClient>() { |
---|
89 | @Override |
---|
90 | protected SRUClient initialValue() { |
---|
91 | logger.debug("instantiated new sru client"); |
---|
92 | return new SRUClient(defaultVersion, strictMode, parsers); |
---|
93 | } |
---|
94 | }; |
---|
95 | // TODO: make worker count configurable |
---|
96 | int workerCount = Runtime.getRuntime().availableProcessors() * 2; |
---|
97 | logger.debug("using {} workers", workerCount); |
---|
98 | executor = Executors.newFixedThreadPool(workerCount, new Factory()); |
---|
99 | } |
---|
100 | |
---|
101 | |
---|
102 | /** |
---|
103 | * Register a record data parser. |
---|
104 | * |
---|
105 | * @param parser |
---|
106 | * a parser instance |
---|
107 | * @throws SRUClientException |
---|
108 | * if a parser handing the same record schema is already |
---|
109 | * registered |
---|
110 | * @throws NullPointerException |
---|
111 | * if any required argument is <code>null</code> |
---|
112 | * @throws IllegalArgumentException |
---|
113 | * if the supplied parser is invalid |
---|
114 | */ |
---|
115 | public void registerRecordParser(SRURecordDataParser parser) |
---|
116 | throws SRUClientException { |
---|
117 | if (parser == null) { |
---|
118 | throw new NullPointerException("parser == null"); |
---|
119 | } |
---|
120 | final String recordSchema = parser.getRecordSchema(); |
---|
121 | if (recordSchema == null) { |
---|
122 | throw new NullPointerException("parser.getRecordSchema() == null"); |
---|
123 | } |
---|
124 | if (recordSchema.isEmpty()) { |
---|
125 | throw new IllegalArgumentException( |
---|
126 | "parser.getRecordSchema() returns empty string"); |
---|
127 | } |
---|
128 | |
---|
129 | if (parsers.putIfAbsent(recordSchema, parser) != null) { |
---|
130 | throw new SRUClientException( |
---|
131 | "record data parser already registered: " + recordSchema); |
---|
132 | |
---|
133 | } |
---|
134 | } |
---|
135 | |
---|
136 | |
---|
137 | /** |
---|
138 | * Perform a <em>explain</em> operation. |
---|
139 | * |
---|
140 | * @param request |
---|
141 | * an instance of a {@link SRUExplainRequest} object |
---|
142 | * @return a {@link SRUExplainResponse} object |
---|
143 | * @throws SRUClientException |
---|
144 | * if an unrecoverable error occurred |
---|
145 | * @throws NullPointerException |
---|
146 | * if any required argument is <code>null</code> |
---|
147 | */ |
---|
148 | public Future<SRUExplainResponse> explain(SRUExplainRequest request) |
---|
149 | throws SRUClientException { |
---|
150 | if (request == null) { |
---|
151 | throw new NullPointerException("request == null"); |
---|
152 | } |
---|
153 | if (executor.isShutdown()) { |
---|
154 | throw new SRUClientException("client is shutting down"); |
---|
155 | } |
---|
156 | return executor.submit(new Request<SRUExplainRequest, SRUExplainResponse>(request) { |
---|
157 | @Override |
---|
158 | public SRUExplainResponse doRequest(SRUClient client) |
---|
159 | throws SRUClientException { |
---|
160 | return client.explain(request); |
---|
161 | } |
---|
162 | }); |
---|
163 | } |
---|
164 | |
---|
165 | |
---|
166 | /** |
---|
167 | * Perform a <em>scan</em> operation. |
---|
168 | * |
---|
169 | * @param request |
---|
170 | * an instance of a {@link SRUScanRequest} object |
---|
171 | * @return a {@link SRUScanResponse} object |
---|
172 | * @throws SRUClientException |
---|
173 | * if an unrecoverable error occurred |
---|
174 | * @throws NullPointerException |
---|
175 | * if any required argument is <code>null</code> |
---|
176 | */ |
---|
177 | public Future<SRUScanResponse> scan(SRUScanRequest request) |
---|
178 | throws SRUClientException { |
---|
179 | if (request == null) { |
---|
180 | throw new NullPointerException("request == null"); |
---|
181 | } |
---|
182 | if (executor.isShutdown()) { |
---|
183 | throw new SRUClientException("client is shutting down"); |
---|
184 | } |
---|
185 | return executor.submit(new Request<SRUScanRequest, SRUScanResponse>( |
---|
186 | request) { |
---|
187 | @Override |
---|
188 | public SRUScanResponse doRequest(SRUClient client) |
---|
189 | throws SRUClientException { |
---|
190 | return client.scan(request); |
---|
191 | } |
---|
192 | }); |
---|
193 | } |
---|
194 | |
---|
195 | |
---|
196 | /** |
---|
197 | * Perform a <em>searchRetrieve</em> operation. |
---|
198 | * |
---|
199 | * @param request |
---|
200 | * an instance of a {@link SRUSearchRetrieveRequest} object |
---|
201 | * @return a {@link SRUSearchRetrieveRequest} object |
---|
202 | * @throws SRUClientException |
---|
203 | * if an unrecoverable error occurred |
---|
204 | * @throws NullPointerException |
---|
205 | * if any required argument is <code>null</code> |
---|
206 | */ |
---|
207 | public Future<SRUSearchRetrieveResponse> searchRetrieve( |
---|
208 | SRUSearchRetrieveRequest request) throws SRUClientException { |
---|
209 | if (request == null) { |
---|
210 | throw new NullPointerException("request == null"); |
---|
211 | } |
---|
212 | if (executor.isShutdown()) { |
---|
213 | throw new SRUClientException("client is shutting down"); |
---|
214 | } |
---|
215 | return executor.submit(new Request<SRUSearchRetrieveRequest, SRUSearchRetrieveResponse>(request) { |
---|
216 | @Override |
---|
217 | protected SRUSearchRetrieveResponse doRequest(SRUClient client) |
---|
218 | throws SRUClientException { |
---|
219 | return client.searchRetrieve(request); |
---|
220 | } |
---|
221 | }); |
---|
222 | } |
---|
223 | |
---|
224 | |
---|
225 | /** |
---|
226 | * Invokes <code>shutdown</code> when this no longer referenced |
---|
227 | */ |
---|
228 | @Override |
---|
229 | protected void finalize() throws Throwable { |
---|
230 | shutdown(); |
---|
231 | } |
---|
232 | |
---|
233 | |
---|
234 | /** |
---|
235 | * Initiates an orderly shutdown in which previously submitted requests are |
---|
236 | * executed, but no new requests will be accepted. |
---|
237 | */ |
---|
238 | public void shutdown() { |
---|
239 | executor.shutdown(); |
---|
240 | } |
---|
241 | |
---|
242 | |
---|
243 | /** |
---|
244 | * Terminate the client but drain queued requests. |
---|
245 | */ |
---|
246 | public void shutdownNow() { |
---|
247 | executor.shutdownNow(); |
---|
248 | } |
---|
249 | |
---|
250 | |
---|
251 | private class Factory implements ThreadFactory { |
---|
252 | @Override |
---|
253 | public Thread newThread(Runnable r) { |
---|
254 | return new Thread(r) { |
---|
255 | @Override |
---|
256 | public void run() { |
---|
257 | try { |
---|
258 | logger.debug("launched new worker"); |
---|
259 | |
---|
260 | // pre-initialize client |
---|
261 | client.get(); |
---|
262 | |
---|
263 | // do work |
---|
264 | super.run(); |
---|
265 | } finally { |
---|
266 | // do not leak resources and clean ThreadLocal ... |
---|
267 | client.remove(); |
---|
268 | logger.debug("cleared sru client"); |
---|
269 | } |
---|
270 | } |
---|
271 | }; |
---|
272 | } |
---|
273 | } |
---|
274 | |
---|
275 | |
---|
276 | private abstract class Request<V, S> implements Callable<S> { |
---|
277 | protected final V request; |
---|
278 | |
---|
279 | |
---|
280 | Request(V request) { |
---|
281 | this.request = request; |
---|
282 | } |
---|
283 | |
---|
284 | |
---|
285 | @Override |
---|
286 | public final S call() throws Exception { |
---|
287 | return doRequest(client.get()); |
---|
288 | } |
---|
289 | |
---|
290 | |
---|
291 | protected abstract S doRequest(SRUClient client) |
---|
292 | throws SRUClientException; |
---|
293 | } |
---|
294 | |
---|
295 | } // class SRUThreadedClient |
---|