From cab0f2fd293de896f45f99138b9cc1e104af9b64 Mon Sep 17 00:00:00 2001 From: Chas Emerick Date: Fri, 18 Feb 2011 13:32:40 -0500 Subject: [PATCH] Initial commit. --- .gitignore | 14 + README.asciidoc | 270 +++++++++++++++++++ epl-v10.html | 261 ++++++++++++++++++ pom.xml | 69 +++++ src/main/clojure/cemerick/bandalore.clj | 198 ++++++++++++++ src/test/clojure/cemerick/bandalore_test.clj | 41 +++ 6 files changed, 853 insertions(+) create mode 100644 .gitignore create mode 100644 README.asciidoc create mode 100644 epl-v10.html create mode 100644 pom.xml create mode 100644 src/main/clojure/cemerick/bandalore.clj create mode 100644 src/test/clojure/cemerick/bandalore_test.clj diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0c0cfa1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +# emacs + vi backup files +*~ +.*.sw* + +# various IDE junk +*.ipr +*.iml +*.iws +.project +.classpath +.settings + +target +classes diff --git a/README.asciidoc b/README.asciidoc new file mode 100644 index 0000000..db94f6f --- /dev/null +++ b/README.asciidoc @@ -0,0 +1,270 @@ += Bandalore + +http://github.com/cemerick/bandalore[Bandalore] is a Clojure client +library for Amazon's http://aws.amazon.com/sqs/[Simple Queue Service (SQS)]. It depends upon +the standard http://aws.amazon.com/sdkforjava/[AWS SDK for Java], +and provides a Clojure-idiomatic API for the SQS-related functionality +therein. + +== "Installation" + ++++Bandalore is available in Maven central.+++ + +*I'll get Bandalore releases to central in the next couple of days, promise. :-)* + +Add it to your Maven project's `pom.xml`: + +---- + + cemerick + bandalore + 0.0.1-SNAPSHOT + +---- + +or your leiningen project.clj: + +---- +[cemerick/bandalore "0.0.1"] +---- + +== Usage + +You should be familiar with http://aws.amazon.com/sqs/[SQS itself] +before sensibly using this library. That said, Bandalore's API +is well-documented. + +You'll first need to load the library and create a SQS client object +to do anything: + +---- +(require '[cemerick.bandalore :as sqs]) +(def client (sqs/create-client "your aws id" "your aws secret-key")) +---- + +You can create, delete, and list queues: + +---- +=> (sqs/create-queue client "foo") +"https://queue.amazonaws.com/499312652346/foo" +=> (sqs/list-queues client) +("https://queue.amazonaws.com/499312652346/foo") +=> (sqs/delete-queue client (first *1)) +nil +=> (list-queues client) +nil +---- + +*Note that SQS is _eventually consistent_. This means that a created +queue won't necessarily show up in an immediate listing of queues, +messages aren't necessarily immediately available to be received, etc.* + +You can send, receive, and delete messages: + +---- +=> (def q (sqs/create-queue client "foo")) +#'cemerick.bandalore-test/q +=> (sqs/send client q "my message body") +{:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", :body-md5 "05d358de00fc63dd2fa2026b77e112f6"} +=> (sqs/receive client q) +({:attrs #, :body "my message body", :body-md5 "05d358de00fc63dd2fa2026b77e112f6", + :id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", + :receipt-handle "…very long string…"}) +;; +;; …presumably do something with the received message(s)… +;; +=> (sqs/delete client q (first *1)) +nil +=> (sqs/receive client q) +() +---- + +That's cleaner than having to interop directly with the Java SDK, but it's all +pretty pedestrian stuff. You can do more interesting things with some +simple higher-order functions and other nifty Clojure facilities. + +=== Sending and receiving Clojure values + +SQS' message bodies are strings, so you can stuff anything in them that you can +serialize to a string. That said, `pr-str` and `read-string` are too handy +to not use, assuming your consumers are using Clojure as well: + +---- +=> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23})) +{:id "3756c302-866a-4fcc-a7a3-746e6f531f47", :body-md5 "60052fc2ffb835257c26b9957c6e9ffd"} +=> (-?> (sqs/receive client q) first :body read-string) +{:a 5, :b "blah", :c 6.022E23} +---- + +=== Sending seqs of messages + +…with more gratuitous use of `pr-str` and `read-string` to send and receive +Clojure values: + +---- +=> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}] + (map (comp (partial sqs/send client q) pr-str)) + dorun) +nil +=> (map (comp read-string :body) + (sqs/receive client q :limit 10)) +(bar ["some vector" 42]) +=> (map (comp read-string :body) + (sqs/receive client q :limit 10)) +(#{#"silly place for a regex"}) +=> (map (comp read-string :body) + (sqs/receive client q :limit 10)) +(:foo) +---- + +=== (Mostly) automatic deletion of consumed messages + +When you're done processing a received message, you need to delete it from its +originaing queue: + +---- + ; ensure our queue is empty to start +=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages") +"0" +=> (dorun (map (partial sqs/send client q) (map str (range 100)))) +nil +=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages") +"100" + + ; received messages must be removed from the queue or they will + ; be delivered again after their visibility timeout expires +=> (sqs/receive client q) +(…message seq…) +=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages") +"100" +=> (->> (sqs/receive client q) first (sqs/delete client)) +nil +=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages") +"99" +---- + +Rather than trying to remember to do this, just use the +`deleting-consumer` "middleware" to produce a function that calls +the message-processing function you provide to it, and then +automatically deletes the processed message from the origining queue: + +---- +=> (doall (map + (sqs/deleting-consumer client (comp println :body)) + (sqs/receive client q :limit 10))) +0 +4 +9 +12 +26 +36 +40 +44 +52 +55 +(nil nil nil nil nil nil nil nil nil nil) +=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages") +"90" +---- + +=== Consuming queues as seqs + +seqs being the _lingua franca_ of Clojure collections, it would be helpful if we +could treat an SQS queue as a seq of messages. While `receive` does return +a seq of messages, each `receive` call is limited to receiving a maximum of +10 messages, and there is no streaming or push counterpart in the SQS API. + +The solution to this is `polling-receive`, which returns a lazy seq that +reaches out to SQS as necessary: + +---- +=> (map (sqs/deleting-consumer client :body) + (sqs/polling-receive client q :limit 10)) +("3" "5" "7" "8" ... "81" "90" "91") +---- + +`polling-receive` accepts all of the same optional kwargs as `receive` does, +but adds two more to control its usage of `receive`: + + :period - time in ms to wait after an unsuccessful `receive` request (default: 500) + :max-wait - maximum time in ms to wait to successfully receive messages before terminating + the lazy seq (default 5000ms) + +Often queues are used to direct compute resources, so you'd like to be able to saturate +those boxen with as much work as your queue can offer up. The obvious solution +is to `pmap` across a seq of incoming messages, which you can do trivially with the seq +provided by `polling-receive`. Just make sure you tweak the `:max-wait` time so that, +assuming you want to continuously process incoming messages, the seq of messages doesn't +terminate because none have been available for a while. + +Here's an example where one thread sends a message once a second for a minute, +and another consumes those messages using a lazy seq provided by `polling-receive`: + +---- +=> (defn send-dummy-messages + [client q count] + (future (doseq [n (range count)] + (Thread/sleep 100) + (sqs/send client q (str n))))) +#'cemerick.bandalore-test/send-dummy-messages +=> (defn consume-dummy-messages + [client q] + (future (dorun (map (sqs/deleting-consumer client (comp println :body)) + (sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10))))) +#'cemerick.bandalore-test/consume-dummy-messages +=> (consume-dummy-messages client q) ;; start the consumer +# +=> (send-dummy-messages client q 1000) ;; start the sender +# +3 +4 +1 +0 +2 +8 +5 +7 +... +---- + +You'd presumably want to set up some ways to control your consumer, but hopefully +you see that it would be trivial to parallelize the processing function being +wrapped by `deleting-consumer` using `pmap`, distribute processing among agents +if that's more appropriate, etc. + +== Building Bandalore + +Have maven. From the command line: + +---- +$ mvn clean install +---- + +The tests are all live, so you either need to add your AWS credentials to your +`~/.m2/settings.xml` file as properties, or specify them on the command line +using `-D` switches: + +---- +$ mvn -Daws.id=XXXXXXX -Daws.secret-key=YYYYYYY clean install +---- + +Or, you can skip the tests entirely if you're brave: + +---- +$ mvn -Dmaven.test.skip=true clean install +---- + +In any case, you'll find a built `.jar` file in the `target` directory, and in +its designated spot in `~/.m2/repository` (assuming you ran `install` rather than +e.g. `package`). + +== Need Help? + +Ping `cemerick` on freenode irc or twitter if you have questions +or would like to contribute patches. + +== License + +Copyright © 2011 Chas Emerick + +Licensed under the EPL. (See the file epl-v10.html.) diff --git a/epl-v10.html b/epl-v10.html new file mode 100644 index 0000000..3998fce --- /dev/null +++ b/epl-v10.html @@ -0,0 +1,261 @@ + + + + + + +Eclipse Public License - Version 1.0 + + + + + + +

Eclipse Public License - v 1.0

+ +

THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE +PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR +DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS +AGREEMENT.

+ +

1. DEFINITIONS

+ +

"Contribution" means:

+ +

a) in the case of the initial Contributor, the initial +code and documentation distributed under this Agreement, and

+

b) in the case of each subsequent Contributor:

+

i) changes to the Program, and

+

ii) additions to the Program;

+

where such changes and/or additions to the Program +originate from and are distributed by that particular Contributor. A +Contribution 'originates' from a Contributor if it was added to the +Program by such Contributor itself or anyone acting on such +Contributor's behalf. Contributions do not include additions to the +Program which: (i) are separate modules of software distributed in +conjunction with the Program under their own license agreement, and (ii) +are not derivative works of the Program.

+ +

"Contributor" means any person or entity that distributes +the Program.

+ +

"Licensed Patents" mean patent claims licensable by a +Contributor which are necessarily infringed by the use or sale of its +Contribution alone or when combined with the Program.

+ +

"Program" means the Contributions distributed in accordance +with this Agreement.

+ +

"Recipient" means anyone who receives the Program under +this Agreement, including all Contributors.

+ +

2. GRANT OF RIGHTS

+ +

a) Subject to the terms of this Agreement, each +Contributor hereby grants Recipient a non-exclusive, worldwide, +royalty-free copyright license to reproduce, prepare derivative works +of, publicly display, publicly perform, distribute and sublicense the +Contribution of such Contributor, if any, and such derivative works, in +source code and object code form.

+ +

b) Subject to the terms of this Agreement, each +Contributor hereby grants Recipient a non-exclusive, worldwide, +royalty-free patent license under Licensed Patents to make, use, sell, +offer to sell, import and otherwise transfer the Contribution of such +Contributor, if any, in source code and object code form. This patent +license shall apply to the combination of the Contribution and the +Program if, at the time the Contribution is added by the Contributor, +such addition of the Contribution causes such combination to be covered +by the Licensed Patents. The patent license shall not apply to any other +combinations which include the Contribution. No hardware per se is +licensed hereunder.

+ +

c) Recipient understands that although each Contributor +grants the licenses to its Contributions set forth herein, no assurances +are provided by any Contributor that the Program does not infringe the +patent or other intellectual property rights of any other entity. Each +Contributor disclaims any liability to Recipient for claims brought by +any other entity based on infringement of intellectual property rights +or otherwise. As a condition to exercising the rights and licenses +granted hereunder, each Recipient hereby assumes sole responsibility to +secure any other intellectual property rights needed, if any. For +example, if a third party patent license is required to allow Recipient +to distribute the Program, it is Recipient's responsibility to acquire +that license before distributing the Program.

+ +

d) Each Contributor represents that to its knowledge it +has sufficient copyright rights in its Contribution, if any, to grant +the copyright license set forth in this Agreement.

+ +

3. REQUIREMENTS

+ +

A Contributor may choose to distribute the Program in object code +form under its own license agreement, provided that:

+ +

a) it complies with the terms and conditions of this +Agreement; and

+ +

b) its license agreement:

+ +

i) effectively disclaims on behalf of all Contributors +all warranties and conditions, express and implied, including warranties +or conditions of title and non-infringement, and implied warranties or +conditions of merchantability and fitness for a particular purpose;

+ +

ii) effectively excludes on behalf of all Contributors +all liability for damages, including direct, indirect, special, +incidental and consequential damages, such as lost profits;

+ +

iii) states that any provisions which differ from this +Agreement are offered by that Contributor alone and not by any other +party; and

+ +

iv) states that source code for the Program is available +from such Contributor, and informs licensees how to obtain it in a +reasonable manner on or through a medium customarily used for software +exchange.

+ +

When the Program is made available in source code form:

+ +

a) it must be made available under this Agreement; and

+ +

b) a copy of this Agreement must be included with each +copy of the Program.

+ +

Contributors may not remove or alter any copyright notices contained +within the Program.

+ +

Each Contributor must identify itself as the originator of its +Contribution, if any, in a manner that reasonably allows subsequent +Recipients to identify the originator of the Contribution.

+ +

4. COMMERCIAL DISTRIBUTION

+ +

Commercial distributors of software may accept certain +responsibilities with respect to end users, business partners and the +like. While this license is intended to facilitate the commercial use of +the Program, the Contributor who includes the Program in a commercial +product offering should do so in a manner which does not create +potential liability for other Contributors. Therefore, if a Contributor +includes the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and +indemnify every other Contributor ("Indemnified Contributor") +against any losses, damages and costs (collectively "Losses") +arising from claims, lawsuits and other legal actions brought by a third +party against the Indemnified Contributor to the extent caused by the +acts or omissions of such Commercial Contributor in connection with its +distribution of the Program in a commercial product offering. The +obligations in this section do not apply to any claims or Losses +relating to any actual or alleged intellectual property infringement. In +order to qualify, an Indemnified Contributor must: a) promptly notify +the Commercial Contributor in writing of such claim, and b) allow the +Commercial Contributor to control, and cooperate with the Commercial +Contributor in, the defense and any related settlement negotiations. The +Indemnified Contributor may participate in any such claim at its own +expense.

+ +

For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those +performance claims and warranties, and if a court requires any other +Contributor to pay any damages as a result, the Commercial Contributor +must pay those damages.

+ +

5. NO WARRANTY

+ +

EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS +PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, +ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY +OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely +responsible for determining the appropriateness of using and +distributing the Program and assumes all risks associated with its +exercise of rights under this Agreement , including but not limited to +the risks and costs of program errors, compliance with applicable laws, +damage to or loss of data, programs or equipment, and unavailability or +interruption of operations.

+ +

6. DISCLAIMER OF LIABILITY

+ +

EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT +NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING +WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR +DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED +HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.

+ +

7. GENERAL

+ +

If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further action +by the parties hereto, such provision shall be reformed to the minimum +extent necessary to make such provision valid and enforceable.

+ +

If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other +software or hardware) infringes such Recipient's patent(s), then such +Recipient's rights granted under Section 2(b) shall terminate as of the +date such litigation is filed.

+ +

All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of time +after becoming aware of such noncompliance. If all Recipient's rights +under this Agreement terminate, Recipient agrees to cease use and +distribution of the Program as soon as reasonably practicable. However, +Recipient's obligations under this Agreement and any licenses granted by +Recipient relating to the Program shall continue and survive.

+ +

Everyone is permitted to copy and distribute copies of this +Agreement, but in order to avoid inconsistency the Agreement is +copyrighted and may only be modified in the following manner. The +Agreement Steward reserves the right to publish new versions (including +revisions) of this Agreement from time to time. No one other than the +Agreement Steward has the right to modify this Agreement. The Eclipse +Foundation is the initial Agreement Steward. The Eclipse Foundation may +assign the responsibility to serve as the Agreement Steward to a +suitable separate entity. Each new version of the Agreement will be +given a distinguishing version number. The Program (including +Contributions) may always be distributed subject to the version of the +Agreement under which it was received. In addition, after a new version +of the Agreement is published, Contributor may elect to distribute the +Program (including its Contributions) under the new version. Except as +expressly stated in Sections 2(a) and 2(b) above, Recipient receives no +rights or licenses to the intellectual property of any Contributor under +this Agreement, whether expressly, by implication, estoppel or +otherwise. All rights in the Program not expressly granted under this +Agreement are reserved.

+ +

This Agreement is governed by the laws of the State of New York and +the intellectual property laws of the United States of America. No party +to this Agreement will bring a legal action under this Agreement more +than one year after the cause of action arose. Each party waives its +rights to a jury trial in any resulting litigation.

+ + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2152186 --- /dev/null +++ b/pom.xml @@ -0,0 +1,69 @@ + + 4.0.0 + cemerick + bandalore + 0.0.1-SNAPSHOT + bandalore + A Clojure library for Amazon's Simple Queue Service (SQS). + + + org.clojure + pom.contrib + 0.0.20 + + + + + Chas Emerick + http://cemerick.com + cemerick@snowtide.com + -5 + + + + + scm:git:git@github.com:cemerick/bandalore.git + scm:git:git@github.com:cemerick/bandalore.git + git@github.com:cemerick/bandalore.git + + + + 1.2.0 + + + + + com.amazonaws + aws-java-sdk + 1.1.5 + + + org.clojure + clojure-contrib + 1.2.0 + test + + + + + + + com.theoryinpractise + clojure-maven-plugin + + + clojure-test + + -Daws.id=${aws.id} -Daws.secret-key=${aws.secret-key} + + + + + true + + + + + + diff --git a/src/main/clojure/cemerick/bandalore.clj b/src/main/clojure/cemerick/bandalore.clj new file mode 100644 index 0000000..73e5726 --- /dev/null +++ b/src/main/clojure/cemerick/bandalore.clj @@ -0,0 +1,198 @@ +; Copyright (c) Chas Emerick. All rights reserved. +; The use and distribution terms for this software are covered by the +; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) +; which can be found in the file epl-v10.html at the root of this distribution. +; By using this software in any fashion, you are agreeing to be bound by +; the terms of this license. +; You must not remove this notice, or any other, from this software. + +(ns cemerick.bandalore + (:import com.amazonaws.services.sqs.AmazonSQSClient + (com.amazonaws.services.sqs.model + AddPermissionRequest ChangeMessageVisibilityRequest CreateQueueRequest + DeleteMessageRequest DeleteQueueRequest GetQueueAttributesRequest + ListQueuesRequest Message ReceiveMessageRequest ReceiveMessageResult + RemovePermissionRequest SendMessageRequest SendMessageResult + SetQueueAttributesRequest)) + (:refer-clojure :exclude (send))) + +(defn create-client + "Creates a synchronous AmazonSQSClient using the provided account id, secret key, + and optional com.amazonaws.ClientConfiguration." + ([id secret-key] + (create-client id secret-key (com.amazonaws.ClientConfiguration.))) + ([id secret-key client-config] + (AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key) + (.withUserAgent client-config "Bandalore - SQS for Clojure")))) + +(defn create-queue + "Creates a queue with the given name, returning the corresponding URL string. + Returns successfully if the queue already exists. + + Specify an optional :visibility keyword arg to set the new queue's default + visibility timeout in seconds." + [^AmazonSQSClient client queue-name & {:keys [visibility]}] + (->> (if visibility + (CreateQueueRequest. queue-name visibility) + (CreateQueueRequest. queue-name)) + (.createQueue client) + .getQueueUrl)) + +(defn delete-queue + "Deletes the queue specified by the given URL string." + [^AmazonSQSClient client queue-url] + (.deleteQueue client (DeleteQueueRequest. queue-url))) + +(defn list-queues + "Returns a seq of all queues' URL strings." + [^AmazonSQSClient client] + (->> (ListQueuesRequest.) + (.listQueues client) + .getQueueUrls + seq)) + +(defn queue-attrs + "Gets or sets the attributes of a queue specified by its URL string. + When setting attributes on a queue, the attribute map must have String keys + and values." + ([^AmazonSQSClient client queue-url] + (->> + (.withAttributeNames (GetQueueAttributesRequest. queue-url) #{"All"}) + (.getQueueAttributes client) + .getAttributes + (into {}))) + ([^AmazonSQSClient client queue-url attr-map] + (.setQueueAttributes client + (SetQueueAttributesRequest. queue-url attr-map)))) + +(defn send + "Sends a new message with the given string body to the queue specified + by the string URL. Returns a map with :id and :body-md5 slots." + [^AmazonSQSClient client queue-url message] + (let [resp (.sendMessage client (SendMessageRequest. queue-url message))] + {:id (.getMessageId resp) + :body-md5 (.getMD5OfMessageBody resp)})) + +(defn- message-map + [queue-url ^Message msg] + {:attributes (.getAttributes msg) + :body (.getBody msg) + :body-md5 (.getMD5OfBody msg) + :id (.getMessageId msg) + :receipt-handle (.getReceiptHandle msg) + :source-queue queue-url}) + +(defn receive + "Receives one or more messages from the queue specified by the given URL. + Optionally accepts keyword arguments: + + :limit - between 1 (default) and 10, the maximum number of messages to receive + :visibility - seconds the received messages should not be delivered to other + receivers; defaults to the queue's visibility attribute + :attributes - a collection of string names of :attributes to include in + received messages; e.g. #{\"All\"} will include all attributes, + #{\"SentTimestamp\"} will include only the SentTimestamp attribute, etc. + Defaults to the empty set (i.e. no attributes will be included in + received messages). + See the SQS documentation for all support message attributes. + + Returns a seq of maps with these slots: + + :attributes - message attributes + :body - the string body of the message + :body-md5 - the MD5 checksum of :body + :id - the message's ID + :receipt-handle - the ID used to delete the message from the queue after + it has been fully processed. + :source-queue - the URL of the queue from which the message was received" + [^AmazonSQSClient client queue-url & {:keys [limit visibility ^java.util.Collection attributes] + :or {limit 1 + attributes #{"All"}}}] + (let [req (-> (ReceiveMessageRequest. queue-url) + (.withMaxNumberOfMessages (-> limit (min 10) (max 1))) + (.withAttributeNames attributes)) + req (if visibility (.withVisibilityTimeout req visibility) req)] + (->> (.receiveMessage client req) + .getMessages + (map (partial message-map queue-url))))) + +(defn polling-receive + "Receives one or more messages from the queue specified by the given URL. + Returns a lazy seq of messages from multiple receive calls, performed + as the seq is consumed. + + Accepts the same keyword arguments as `receive`, in addition to: + + :period - time in ms to wait after an unsuccessful `receive` request (default: 500) + :max-wait - maximum time in ms to wait to successfully receive messages before + terminating the lazy seq (default 5000)" + [client queue-url & {:keys [period max-wait] + :or {period 500 + max-wait 5000} + :as receive-opts}] + (let [waiting (atom 0) + receive-opts (mapcat identity receive-opts) + message-seq (fn message-seq [] + (lazy-seq + (if-let [msgs (seq (apply receive client queue-url receive-opts))] + (do + (reset! waiting 0) + (concat msgs (message-seq))) + (do + (when (<= (swap! waiting + period) max-wait) + (Thread/sleep period) + (message-seq))))))] + (message-seq))) + +(defn- receipt-handle + [msg-ish] + (cond + (string? msg-ish) msg-ish + (map? msg-ish) (:receipt-handle msg-ish) + :else (.getReceiptHandle ^Message msg-ish))) + +(defn delete + "Deletes a message from the queue from which it was received. + + The message must be a message map provided by `receive` or `polling-receive` + if the `queue-url` is not provided explicitly. Otherwise, + `message` may be a message map, Message object, + or the :receipt-handle value from a received message map." + ([^AmazonSQSClient client message] + (delete client (:source-queue message) message)) + ([^AmazonSQSClient client queue-url message] + (->> message + receipt-handle + (DeleteMessageRequest. queue-url) + (.deleteMessage client)))) + +(defn deleting-consumer + "Some minor middleware for simplifying the deletion of processed messages + from their originating queue. Given a client and a function `f`, returns + a function that accepts a single message (as provided by `receive` and `polling-receive`); + it calls `f` with that message, and then deletes the message from its + originating queue. `f`'s return value is passed along." + [client f] + (fn [message] + (let [ret (f message)] + (delete client message) + ret))) + +(defn change-message-visibility + "Updates the visibility timeout for a single message in the queue identified by the + given URL. + + `message` may be a message map, Message object, + or the :receipt-handle value from a received message map." + [^AmazonSQSClient client queue-url message visibility-timeout] + (.changeMessageVisibility client + (ChangeMessageVisibilityRequest. + queue-url (receipt-handle message) visibility-timeout))) + + +;; ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) + ;; Returns additional metadata for a previously executed successful, request, typically used for debugging issues where a service isn't acting as expected. +;; void removePermission(RemovePermissionRequest removePermissionRequest) +; The RemovePermission action revokes any permissions in the queue policy that matches the specified Label parameter. +;; void addPermission(AddPermissionRequest addPermissionRequest) + ;;The AddPermission action adds a permission to a queue for a specific principal. \ No newline at end of file diff --git a/src/test/clojure/cemerick/bandalore_test.clj b/src/test/clojure/cemerick/bandalore_test.clj new file mode 100644 index 0000000..207ed2e --- /dev/null +++ b/src/test/clojure/cemerick/bandalore_test.clj @@ -0,0 +1,41 @@ +(ns cemerick.bandalore-test + (:use cemerick.bandalore + clojure.test + clojure.contrib.core) + (:refer-clojure :exclude (send))) + +; kill the verbose aws logging +(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws") + java.util.logging.Level/WARNING) + +(def client + (let [id (System/getProperty "aws.id") + secret-key (System/getProperty "aws.secret-key")] + (assert (and id secret-key)) + (create-client id secret-key))) + +(def *test-queue-url* nil) + +(defn- uuid + [] + (str (java.util.UUID/randomUUID))) + +(defn- test-queue-name + [] + (str "bandalore-test-" (uuid))) + +(defmacro defqueuetest + [name & body] + `(deftest ~name + (binding [*test-queue-url* (create-queue client (test-queue-name))] + (try + (is *test-queue-url*) + ~@body + (finally + (delete-queue client *test-queue-url*)))))) + +(defqueuetest listing-queues + (let [msg (uuid)] + (send client *test-queue-url* msg) + (is ((set (list-queues client)) *test-queue-url*)))) +