2011-02-18 18:32:40 +00:00
|
|
|
= Bandalore
|
|
|
|
|
|
|
|
|
|
http://github.com/cemerick/bandalore[Bandalore] is a Clojure client
|
|
|
|
|
library for Amazon's http://aws.amazon.com/sqs/[Simple Queue Service (SQS)]. It depends upon
|
|
|
|
|
the standard http://aws.amazon.com/sdkforjava/[AWS SDK for Java],
|
|
|
|
|
and provides a Clojure-idiomatic API for the SQS-related functionality
|
|
|
|
|
therein.
|
|
|
|
|
|
|
|
|
|
== "Installation"
|
|
|
|
|
|
|
|
|
|
+++<del>Bandalore is available in Maven central.</del>+++
|
|
|
|
|
|
|
|
|
|
*I'll get Bandalore releases to central in the next couple of days, promise. :-)*
|
|
|
|
|
|
|
|
|
|
Add it to your Maven project's `pom.xml`:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>cemerick</groupId>
|
|
|
|
|
<artifactId>bandalore</artifactId>
|
|
|
|
|
<version>0.0.1-SNAPSHOT</version>
|
|
|
|
|
</dependency>
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
or your leiningen project.clj:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
[cemerick/bandalore "0.0.1"]
|
|
|
|
|
----
|
|
|
|
|
|
2011-02-22 19:07:29 +00:00
|
|
|
== Logging
|
|
|
|
|
|
|
|
|
|
I strongly recommend squelching the AWS SDK's very verbose logging
|
|
|
|
|
before using Bandalore (the former spews a variety of stuff out on
|
|
|
|
|
INFO that I personally think should be in DEBUG or TRACE). You can
|
|
|
|
|
do this with this snippet:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
|
|
|
|
|
java.util.logging.Level/WARNING)
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
Translate as necessary if you're using log4j, etc.
|
|
|
|
|
|
2011-02-18 18:32:40 +00:00
|
|
|
== Usage
|
|
|
|
|
|
|
|
|
|
You should be familiar with http://aws.amazon.com/sqs/[SQS itself]
|
|
|
|
|
before sensibly using this library. That said, Bandalore's API
|
|
|
|
|
is well-documented.
|
|
|
|
|
|
|
|
|
|
You'll first need to load the library and create a SQS client object
|
|
|
|
|
to do anything:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
(require '[cemerick.bandalore :as sqs])
|
|
|
|
|
(def client (sqs/create-client "your aws id" "your aws secret-key"))
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
You can create, delete, and list queues:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (sqs/create-queue client "foo")
|
|
|
|
|
"https://queue.amazonaws.com/499312652346/foo"
|
|
|
|
|
=> (sqs/list-queues client)
|
|
|
|
|
("https://queue.amazonaws.com/499312652346/foo")
|
|
|
|
|
=> (sqs/delete-queue client (first *1))
|
|
|
|
|
nil
|
|
|
|
|
=> (list-queues client)
|
|
|
|
|
nil
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
*Note that SQS is _eventually consistent_. This means that a created
|
|
|
|
|
queue won't necessarily show up in an immediate listing of queues,
|
|
|
|
|
messages aren't necessarily immediately available to be received, etc.*
|
|
|
|
|
|
|
|
|
|
You can send, receive, and delete messages:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (def q (sqs/create-queue client "foo"))
|
|
|
|
|
#'cemerick.bandalore-test/q
|
|
|
|
|
=> (sqs/send client q "my message body")
|
|
|
|
|
{:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", :body-md5 "05d358de00fc63dd2fa2026b77e112f6"}
|
|
|
|
|
=> (sqs/receive client q)
|
|
|
|
|
({:attrs #<HashMap {}>, :body "my message body", :body-md5 "05d358de00fc63dd2fa2026b77e112f6",
|
|
|
|
|
:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee",
|
|
|
|
|
:receipt-handle "…very long string…"})
|
|
|
|
|
;;
|
|
|
|
|
;; …presumably do something with the received message(s)…
|
|
|
|
|
;;
|
|
|
|
|
=> (sqs/delete client q (first *1))
|
|
|
|
|
nil
|
|
|
|
|
=> (sqs/receive client q)
|
|
|
|
|
()
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
That's cleaner than having to interop directly with the Java SDK, but it's all
|
|
|
|
|
pretty pedestrian stuff. You can do more interesting things with some
|
|
|
|
|
simple higher-order functions and other nifty Clojure facilities.
|
|
|
|
|
|
|
|
|
|
=== Sending and receiving Clojure values
|
|
|
|
|
|
|
|
|
|
SQS' message bodies are strings, so you can stuff anything in them that you can
|
|
|
|
|
serialize to a string. That said, `pr-str` and `read-string` are too handy
|
|
|
|
|
to not use, assuming your consumers are using Clojure as well:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23}))
|
|
|
|
|
{:id "3756c302-866a-4fcc-a7a3-746e6f531f47", :body-md5 "60052fc2ffb835257c26b9957c6e9ffd"}
|
|
|
|
|
=> (-?> (sqs/receive client q) first :body read-string)
|
|
|
|
|
{:a 5, :b "blah", :c 6.022E23}
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
=== Sending seqs of messages
|
|
|
|
|
|
|
|
|
|
…with more gratuitous use of `pr-str` and `read-string` to send and receive
|
|
|
|
|
Clojure values:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}]
|
|
|
|
|
(map (comp (partial sqs/send client q) pr-str))
|
|
|
|
|
dorun)
|
|
|
|
|
nil
|
|
|
|
|
=> (map (comp read-string :body)
|
|
|
|
|
(sqs/receive client q :limit 10))
|
|
|
|
|
(bar ["some vector" 42])
|
|
|
|
|
=> (map (comp read-string :body)
|
|
|
|
|
(sqs/receive client q :limit 10))
|
|
|
|
|
(#{#"silly place for a regex"})
|
|
|
|
|
=> (map (comp read-string :body)
|
|
|
|
|
(sqs/receive client q :limit 10))
|
|
|
|
|
(:foo)
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
=== (Mostly) automatic deletion of consumed messages
|
|
|
|
|
|
|
|
|
|
When you're done processing a received message, you need to delete it from its
|
|
|
|
|
originaing queue:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
; ensure our queue is empty to start
|
|
|
|
|
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
|
|
|
|
"0"
|
|
|
|
|
=> (dorun (map (partial sqs/send client q) (map str (range 100))))
|
|
|
|
|
nil
|
|
|
|
|
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
|
|
|
|
"100"
|
|
|
|
|
|
|
|
|
|
; received messages must be removed from the queue or they will
|
|
|
|
|
; be delivered again after their visibility timeout expires
|
|
|
|
|
=> (sqs/receive client q)
|
|
|
|
|
(…message seq…)
|
|
|
|
|
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
|
|
|
|
"100"
|
|
|
|
|
=> (->> (sqs/receive client q) first (sqs/delete client))
|
|
|
|
|
nil
|
|
|
|
|
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
|
|
|
|
"99"
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
Rather than trying to remember to do this, just use the
|
|
|
|
|
`deleting-consumer` "middleware" to produce a function that calls
|
|
|
|
|
the message-processing function you provide to it, and then
|
|
|
|
|
automatically deletes the processed message from the origining queue:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (doall (map
|
|
|
|
|
(sqs/deleting-consumer client (comp println :body))
|
|
|
|
|
(sqs/receive client q :limit 10)))
|
|
|
|
|
0
|
|
|
|
|
4
|
|
|
|
|
9
|
|
|
|
|
12
|
|
|
|
|
26
|
|
|
|
|
36
|
|
|
|
|
40
|
|
|
|
|
44
|
|
|
|
|
52
|
|
|
|
|
55
|
|
|
|
|
(nil nil nil nil nil nil nil nil nil nil)
|
|
|
|
|
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
|
|
|
|
"90"
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
=== Consuming queues as seqs
|
|
|
|
|
|
|
|
|
|
seqs being the _lingua franca_ of Clojure collections, it would be helpful if we
|
|
|
|
|
could treat an SQS queue as a seq of messages. While `receive` does return
|
|
|
|
|
a seq of messages, each `receive` call is limited to receiving a maximum of
|
|
|
|
|
10 messages, and there is no streaming or push counterpart in the SQS API.
|
|
|
|
|
|
|
|
|
|
The solution to this is `polling-receive`, which returns a lazy seq that
|
|
|
|
|
reaches out to SQS as necessary:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (map (sqs/deleting-consumer client :body)
|
|
|
|
|
(sqs/polling-receive client q :limit 10))
|
|
|
|
|
("3" "5" "7" "8" ... "81" "90" "91")
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
`polling-receive` accepts all of the same optional kwargs as `receive` does,
|
|
|
|
|
but adds two more to control its usage of `receive`:
|
|
|
|
|
|
|
|
|
|
:period - time in ms to wait after an unsuccessful `receive` request (default: 500)
|
|
|
|
|
:max-wait - maximum time in ms to wait to successfully receive messages before terminating
|
|
|
|
|
the lazy seq (default 5000ms)
|
|
|
|
|
|
|
|
|
|
Often queues are used to direct compute resources, so you'd like to be able to saturate
|
|
|
|
|
those boxen with as much work as your queue can offer up. The obvious solution
|
|
|
|
|
is to `pmap` across a seq of incoming messages, which you can do trivially with the seq
|
|
|
|
|
provided by `polling-receive`. Just make sure you tweak the `:max-wait` time so that,
|
|
|
|
|
assuming you want to continuously process incoming messages, the seq of messages doesn't
|
|
|
|
|
terminate because none have been available for a while.
|
|
|
|
|
|
|
|
|
|
Here's an example where one thread sends a message once a second for a minute,
|
|
|
|
|
and another consumes those messages using a lazy seq provided by `polling-receive`:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
=> (defn send-dummy-messages
|
|
|
|
|
[client q count]
|
|
|
|
|
(future (doseq [n (range count)]
|
|
|
|
|
(Thread/sleep 100)
|
|
|
|
|
(sqs/send client q (str n)))))
|
|
|
|
|
#'cemerick.bandalore-test/send-dummy-messages
|
|
|
|
|
=> (defn consume-dummy-messages
|
|
|
|
|
[client q]
|
|
|
|
|
(future (dorun (map (sqs/deleting-consumer client (comp println :body))
|
|
|
|
|
(sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10)))))
|
|
|
|
|
#'cemerick.bandalore-test/consume-dummy-messages
|
|
|
|
|
=> (consume-dummy-messages client q) ;; start the consumer
|
|
|
|
|
#<core$future_call$reify__5500@a6f00bc: :pending>
|
|
|
|
|
=> (send-dummy-messages client q 1000) ;; start the sender
|
|
|
|
|
#<core$future_call$reify__5500@18986032: :pending>
|
|
|
|
|
3
|
|
|
|
|
4
|
|
|
|
|
1
|
|
|
|
|
0
|
|
|
|
|
2
|
|
|
|
|
8
|
|
|
|
|
5
|
|
|
|
|
7
|
|
|
|
|
...
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
You'd presumably want to set up some ways to control your consumer, but hopefully
|
|
|
|
|
you see that it would be trivial to parallelize the processing function being
|
|
|
|
|
wrapped by `deleting-consumer` using `pmap`, distribute processing among agents
|
|
|
|
|
if that's more appropriate, etc.
|
|
|
|
|
|
|
|
|
|
== Building Bandalore
|
|
|
|
|
|
|
|
|
|
Have maven. From the command line:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
$ mvn clean install
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
The tests are all live, so you either need to add your AWS credentials to your
|
|
|
|
|
`~/.m2/settings.xml` file as properties, or specify them on the command line
|
|
|
|
|
using `-D` switches:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
$ mvn -Daws.id=XXXXXXX -Daws.secret-key=YYYYYYY clean install
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
Or, you can skip the tests entirely if you're brave:
|
|
|
|
|
|
|
|
|
|
----
|
|
|
|
|
$ mvn -Dmaven.test.skip=true clean install
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
In any case, you'll find a built `.jar` file in the `target` directory, and in
|
|
|
|
|
its designated spot in `~/.m2/repository` (assuming you ran `install` rather than
|
|
|
|
|
e.g. `package`).
|
|
|
|
|
|
|
|
|
|
== Need Help?
|
|
|
|
|
|
|
|
|
|
Ping `cemerick` on freenode irc or twitter if you have questions
|
|
|
|
|
or would like to contribute patches.
|
|
|
|
|
|
|
|
|
|
== License
|
|
|
|
|
|
|
|
|
|
Copyright © 2011 Chas Emerick
|
|
|
|
|
|
|
|
|
|
Licensed under the EPL. (See the file epl-v10.html.)
|