Compare commits
35 commits
bandalore-
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0fb4273b2 | ||
|
|
478210ccb2 | ||
|
|
e4a15227f6 | ||
|
|
09e7772fd7 | ||
|
|
42751b891a | ||
|
|
daffed7bcc | ||
|
|
ce8c02bdc4 | ||
|
|
98cb434787 | ||
|
|
bfb0fca2e7 | ||
|
|
e9f1afbb50 | ||
|
|
3d1c859c62 | ||
|
|
382fd20418 | ||
|
|
960c5e4966 | ||
|
|
b7d186125c | ||
|
|
e31c010c08 | ||
|
|
14b7b05b70 | ||
|
|
e4fe404f72 | ||
|
|
5a84c6d71b | ||
|
|
a2fc3400bc | ||
|
|
21ed5eb278 | ||
|
|
48c5f17ad5 | ||
|
|
b76312303a | ||
|
|
47b44e7555 | ||
|
|
49e54854da | ||
|
|
eb2b3524ee | ||
|
|
9b782e4817 | ||
|
|
60ed575738 | ||
|
|
0ecacd49dd | ||
|
|
cdc1c59924 | ||
|
|
112d6121d5 | ||
|
|
6273806597 | ||
|
|
d55dbb0590 | ||
|
|
7ae72ff7d9 | ||
|
|
3dfd311a9c | ||
|
|
978cfe5945 |
6 changed files with 288 additions and 124 deletions
|
|
@ -1,69 +1,84 @@
|
||||||
= Bandalore
|
# Bandalore
|
||||||
|
|
||||||
http://github.com/cemerick/bandalore[Bandalore] is a Clojure client
|
[Bandalore](http://github.com/cemerick/bandalore) is a Clojure client
|
||||||
library for Amazon's http://aws.amazon.com/sqs/[Simple Queue Service (SQS)]. It depends upon
|
library for Amazon's [Simple Queue Service](http://aws.amazon.com/sqs/). It depends upon
|
||||||
the standard http://aws.amazon.com/sdkforjava/[AWS SDK for Java],
|
the standard [AWS SDK for Java](http://aws.amazon.com/sdkforjava/),
|
||||||
and provides a Clojure-idiomatic API for the SQS-related functionality
|
and provides a Clojure-idiomatic API for the SQS-related functionality
|
||||||
therein.
|
therein.
|
||||||
|
|
||||||
== "Installation"
|
## "Installation"
|
||||||
|
|
||||||
Bandalore is available in Maven central. Add it to your Maven project's `pom.xml`:
|
Bandalore is available in Maven central. Add it to your Maven project's `pom.xml`:
|
||||||
|
|
||||||
----
|
```xml
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>cemerick</groupId>
|
<groupId>com.cemerick</groupId>
|
||||||
<artifactId>bandalore</artifactId>
|
<artifactId>bandalore</artifactId>
|
||||||
<version>0.0.1</version>
|
<version>0.0.6</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
----
|
```
|
||||||
|
|
||||||
or your leiningen project.clj:
|
or your leiningen project.clj:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
[cemerick/bandalore "0.0.1"]
|
[com.cemerick/bandalore "0.0.6"]
|
||||||
----
|
```
|
||||||
|
|
||||||
== Logging
|
Bandalore is compatible with Clojure 1.2.0+.
|
||||||
|
|
||||||
|
## Logging
|
||||||
|
|
||||||
I strongly recommend squelching the AWS SDK's very verbose logging
|
I strongly recommend squelching the AWS SDK's very verbose logging
|
||||||
before using Bandalore (the former spews a variety of stuff out on
|
before using Bandalore (the former spews a variety of stuff out on
|
||||||
INFO that I personally think should be in DEBUG or TRACE). You can
|
INFO that I personally think should be in DEBUG or TRACE). You can
|
||||||
do this with this snippet:
|
do this with this snippet:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
|
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
|
||||||
java.util.logging.Level/WARNING)
|
java.util.logging.Level/WARNING)
|
||||||
----
|
```
|
||||||
|
|
||||||
Translate as necessary if you're using log4j, etc.
|
Translate as necessary if you're using log4j, etc.
|
||||||
|
|
||||||
== Usage
|
## Usage
|
||||||
|
|
||||||
You should be familiar with http://aws.amazon.com/sqs/[SQS itself]
|
You should be familiar with [SQS itself](http://aws.amazon.com/sqs/)
|
||||||
before sensibly using this library. That said, Bandalore's API
|
before sensibly using this library. That said, Bandalore's API
|
||||||
is well-documented.
|
is well-documented.
|
||||||
|
|
||||||
You'll first need to load the library and create a SQS client object
|
You'll first need to load the library and create a SQS client object
|
||||||
to do anything:
|
to do anything:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
(require '[cemerick.bandalore :as sqs])
|
(require '[cemerick.bandalore :as sqs])
|
||||||
(def client (sqs/create-client "your aws id" "your aws secret-key"))
|
(def client (sqs/create-client "your aws id" "your aws secret-key"))
|
||||||
----
|
```
|
||||||
|
|
||||||
|
**Security Note** If your application using Bandalore is deployed to EC2, _you
|
||||||
|
should not put your AWS credentials on those EC2 nodes_. Rather,
|
||||||
|
[give your EC2 instances IAM roles](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-roles.html),
|
||||||
|
and use the nullary arity of `create-client`:
|
||||||
|
|
||||||
|
```clojure
|
||||||
|
(require '[cemerick.bandalore :as sqs])
|
||||||
|
(def client (sqs/create-client))
|
||||||
|
```
|
||||||
|
|
||||||
|
This will use credentials assigned to your EC2 node based on its
|
||||||
|
role that are automatically rotated.
|
||||||
|
|
||||||
You can create, delete, and list queues:
|
You can create, delete, and list queues:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (sqs/create-queue client "foo")
|
#> (sqs/create-queue client "foo")
|
||||||
"https://queue.amazonaws.com/499312652346/foo"
|
"https://queue.amazonaws.com/499312652346/foo"
|
||||||
=> (sqs/list-queues client)
|
#> (sqs/list-queues client)
|
||||||
("https://queue.amazonaws.com/499312652346/foo")
|
("https://queue.amazonaws.com/499312652346/foo")
|
||||||
=> (sqs/delete-queue client (first *1))
|
#> (sqs/delete-queue client (first *1))
|
||||||
nil
|
nil
|
||||||
=> (list-queues client)
|
#> (list-queues client)
|
||||||
nil
|
nil
|
||||||
----
|
```
|
||||||
|
|
||||||
*Note that SQS is _eventually consistent_. This means that a created
|
*Note that SQS is _eventually consistent_. This means that a created
|
||||||
queue won't necessarily show up in an immediate listing of queues,
|
queue won't necessarily show up in an immediate listing of queues,
|
||||||
|
|
@ -71,95 +86,117 @@ messages aren't necessarily immediately available to be received, etc.*
|
||||||
|
|
||||||
You can send, receive, and delete messages:
|
You can send, receive, and delete messages:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (def q (sqs/create-queue client "foo"))
|
#> (def q (sqs/create-queue client "foo"))
|
||||||
#'cemerick.bandalore-test/q
|
#'cemerick.bandalore-test/q
|
||||||
=> (sqs/send client q "my message body")
|
#> (sqs/send client q "my message body")
|
||||||
{:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", :body-md5 "05d358de00fc63dd2fa2026b77e112f6"}
|
{:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", :body-md5 "05d358de00fc63dd2fa2026b77e112f6"}
|
||||||
=> (sqs/receive client q)
|
#> (sqs/receive client q)
|
||||||
({:attrs #<HashMap {}>, :body "my message body", :body-md5 "05d358de00fc63dd2fa2026b77e112f6",
|
({:attrs #<HashMap {}>, :body "my message body", :body-md5 "05d358de00fc63dd2fa2026b77e112f6",
|
||||||
:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee",
|
:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee",
|
||||||
:receipt-handle "…very long string…"})
|
:receipt-handle "…very long string…"})
|
||||||
;;
|
;;
|
||||||
;; …presumably do something with the received message(s)…
|
;; …presumably do something with the received message(s)…
|
||||||
;;
|
;;
|
||||||
=> (sqs/delete client q (first *1))
|
#> (sqs/delete client q (first *1))
|
||||||
nil
|
nil
|
||||||
=> (sqs/receive client q)
|
#> (sqs/receive client q)
|
||||||
()
|
()
|
||||||
----
|
```
|
||||||
|
|
||||||
That's cleaner than having to interop directly with the Java SDK, but it's all
|
That's cleaner than having to interop directly with the Java SDK, but it's all
|
||||||
pretty pedestrian stuff. You can do more interesting things with some
|
pretty pedestrian stuff. You can do more interesting things with some
|
||||||
simple higher-order functions and other nifty Clojure facilities.
|
simple higher-order functions and other nifty Clojure facilities.
|
||||||
|
|
||||||
=== Sending and receiving Clojure values
|
### Enabling SQS Long Polling
|
||||||
|
|
||||||
|
[Long polling](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html) reduces the number of empty responses by allowing Amazon SQS service to wait until a message is available in the queue before sending a response. You can enable long polling on an individual receive request by supplying the optional kwarg `:wait-time-seconds`:
|
||||||
|
|
||||||
|
:wait-time-seconds - time in seconds (bewteen 0 and 20) for SQS to wait if there are no messages in the queue. A value of 0 indicates no long polling.
|
||||||
|
|
||||||
|
```clojure
|
||||||
|
; ensure our queue is empty to start
|
||||||
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
|
"0"
|
||||||
|
#> (let [no-polling (future (sqs/receive client q))
|
||||||
|
long-polling (future (sqs/receive client q :wait-time-seconds 20))]
|
||||||
|
(Thread/sleep 10000) ;; Sleep 10s before sending message
|
||||||
|
(sqs/send client q "my message body")
|
||||||
|
(println (count @no-polling))
|
||||||
|
(println (count @long-polling)))
|
||||||
|
0
|
||||||
|
1
|
||||||
|
nil
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
### Sending and receiving Clojure values
|
||||||
|
|
||||||
SQS' message bodies are strings, so you can stuff anything in them that you can
|
SQS' message bodies are strings, so you can stuff anything in them that you can
|
||||||
serialize to a string. That said, `pr-str` and `read-string` are too handy
|
serialize to a string. That said, `pr-str` and `read-string` are too handy
|
||||||
to not use, assuming your consumers are using Clojure as well:
|
to not use, assuming your consumers are using Clojure as well:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23}))
|
#> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23}))
|
||||||
{:id "3756c302-866a-4fcc-a7a3-746e6f531f47", :body-md5 "60052fc2ffb835257c26b9957c6e9ffd"}
|
{:id "3756c302-866a-4fcc-a7a3-746e6f531f47", :body-md5 "60052fc2ffb835257c26b9957c6e9ffd"}
|
||||||
=> (-?> (sqs/receive client q) first :body read-string)
|
#> (-?> (sqs/receive client q) first :body read-string)
|
||||||
{:a 5, :b "blah", :c 6.022E23}
|
{:a 5, :b "blah", :c 6.022E23}
|
||||||
----
|
```
|
||||||
|
|
||||||
=== Sending seqs of messages
|
### Sending seqs of messages
|
||||||
|
|
||||||
…with more gratuitous use of `pr-str` and `read-string` to send and receive
|
…with more gratuitous use of `pr-str` and `read-string` to send and receive
|
||||||
Clojure values:
|
Clojure values:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}]
|
#> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}]
|
||||||
(map (comp (partial sqs/send client q) pr-str))
|
(map (comp (partial sqs/send client q) pr-str))
|
||||||
dorun)
|
dorun)
|
||||||
nil
|
nil
|
||||||
=> (map (comp read-string :body)
|
#> (map (comp read-string :body)
|
||||||
(sqs/receive client q :limit 10))
|
(sqs/receive client q :limit 10))
|
||||||
(bar ["some vector" 42])
|
(bar ["some vector" 42])
|
||||||
=> (map (comp read-string :body)
|
#> (map (comp read-string :body)
|
||||||
(sqs/receive client q :limit 10))
|
(sqs/receive client q :limit 10))
|
||||||
(#{#"silly place for a regex"})
|
(#{#"silly place for a regex"})
|
||||||
=> (map (comp read-string :body)
|
#> (map (comp read-string :body)
|
||||||
(sqs/receive client q :limit 10))
|
(sqs/receive client q :limit 10))
|
||||||
(:foo)
|
(:foo)
|
||||||
----
|
```
|
||||||
|
|
||||||
=== (Mostly) automatic deletion of consumed messages
|
### (Mostly) automatic deletion of consumed messages
|
||||||
|
|
||||||
When you're done processing a received message, you need to delete it from its
|
When you're done processing a received message, you need to delete it from its
|
||||||
originaing queue:
|
originaing queue:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
; ensure our queue is empty to start
|
; ensure our queue is empty to start
|
||||||
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
"0"
|
"0"
|
||||||
=> (dorun (map (partial sqs/send client q) (map str (range 100))))
|
#> (dorun (map (partial sqs/send client q) (map str (range 100))))
|
||||||
nil
|
nil
|
||||||
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
"100"
|
"100"
|
||||||
|
|
||||||
; received messages must be removed from the queue or they will
|
; received messages must be removed from the queue or they will
|
||||||
; be delivered again after their visibility timeout expires
|
; be delivered again after their visibility timeout expires
|
||||||
=> (sqs/receive client q)
|
#> (sqs/receive client q)
|
||||||
(…message seq…)
|
(…message seq…)
|
||||||
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
"100"
|
"100"
|
||||||
=> (->> (sqs/receive client q) first (sqs/delete client))
|
#> (->> (sqs/receive client q) first (sqs/delete client))
|
||||||
nil
|
nil
|
||||||
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
"99"
|
"99"
|
||||||
----
|
```
|
||||||
|
|
||||||
Rather than trying to remember to do this, just use the
|
Rather than trying to remember to do this, just use the
|
||||||
`deleting-consumer` "middleware" to produce a function that calls
|
`deleting-consumer` "middleware" to produce a function that calls
|
||||||
the message-processing function you provide to it, and then
|
the message-processing function you provide to it, and then
|
||||||
automatically deletes the processed message from the origining queue:
|
automatically deletes the processed message from the origining queue:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (doall (map
|
#> (doall (map
|
||||||
(sqs/deleting-consumer client (comp println :body))
|
(sqs/deleting-consumer client (comp println :body))
|
||||||
(sqs/receive client q :limit 10)))
|
(sqs/receive client q :limit 10)))
|
||||||
0
|
0
|
||||||
|
|
@ -173,11 +210,11 @@ automatically deletes the processed message from the origining queue:
|
||||||
52
|
52
|
||||||
55
|
55
|
||||||
(nil nil nil nil nil nil nil nil nil nil)
|
(nil nil nil nil nil nil nil nil nil nil)
|
||||||
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
|
||||||
"90"
|
"90"
|
||||||
----
|
```
|
||||||
|
|
||||||
=== Consuming queues as seqs
|
### Consuming queues as seqs
|
||||||
|
|
||||||
seqs being the _lingua franca_ of Clojure collections, it would be helpful if we
|
seqs being the _lingua franca_ of Clojure collections, it would be helpful if we
|
||||||
could treat an SQS queue as a seq of messages. While `receive` does return
|
could treat an SQS queue as a seq of messages. While `receive` does return
|
||||||
|
|
@ -187,11 +224,11 @@ a seq of messages, each `receive` call is limited to receiving a maximum of
|
||||||
The solution to this is `polling-receive`, which returns a lazy seq that
|
The solution to this is `polling-receive`, which returns a lazy seq that
|
||||||
reaches out to SQS as necessary:
|
reaches out to SQS as necessary:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (map (sqs/deleting-consumer client :body)
|
#> (map (sqs/deleting-consumer client :body)
|
||||||
(sqs/polling-receive client q :limit 10))
|
(sqs/polling-receive client q :limit 10))
|
||||||
("3" "5" "7" "8" ... "81" "90" "91")
|
("3" "5" "7" "8" ... "81" "90" "91")
|
||||||
----
|
```
|
||||||
|
|
||||||
`polling-receive` accepts all of the same optional kwargs as `receive` does,
|
`polling-receive` accepts all of the same optional kwargs as `receive` does,
|
||||||
but adds two more to control its usage of `receive`:
|
but adds two more to control its usage of `receive`:
|
||||||
|
|
@ -210,21 +247,21 @@ terminate because none have been available for a while.
|
||||||
Here's an example where one thread sends a message once a second for a minute,
|
Here's an example where one thread sends a message once a second for a minute,
|
||||||
and another consumes those messages using a lazy seq provided by `polling-receive`:
|
and another consumes those messages using a lazy seq provided by `polling-receive`:
|
||||||
|
|
||||||
----
|
```clojure
|
||||||
=> (defn send-dummy-messages
|
#> (defn send-dummy-messages
|
||||||
[client q count]
|
[client q count]
|
||||||
(future (doseq [n (range count)]
|
(future (doseq [n (range count)]
|
||||||
(Thread/sleep 100)
|
(Thread/sleep 100)
|
||||||
(sqs/send client q (str n)))))
|
(sqs/send client q (str n)))))
|
||||||
#'cemerick.bandalore-test/send-dummy-messages
|
#'cemerick.bandalore-test/send-dummy-messages
|
||||||
=> (defn consume-dummy-messages
|
#> (defn consume-dummy-messages
|
||||||
[client q]
|
[client q]
|
||||||
(future (dorun (map (sqs/deleting-consumer client (comp println :body))
|
(future (dorun (map (sqs/deleting-consumer client (comp println :body))
|
||||||
(sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10)))))
|
(sqs/polling-receive client q :max-wait Long/MAX_VALUE :limit 10)))))
|
||||||
#'cemerick.bandalore-test/consume-dummy-messages
|
#'cemerick.bandalore-test/consume-dummy-messages
|
||||||
=> (consume-dummy-messages client q) ;; start the consumer
|
#> (consume-dummy-messages client q) ;; start the consumer
|
||||||
#<core$future_call$reify__5500@a6f00bc: :pending>
|
#<core$future_call$reify__5500@a6f00bc: :pending>
|
||||||
=> (send-dummy-messages client q 1000) ;; start the sender
|
#> (send-dummy-messages client q 1000) ;; start the sender
|
||||||
#<core$future_call$reify__5500@18986032: :pending>
|
#<core$future_call$reify__5500@18986032: :pending>
|
||||||
3
|
3
|
||||||
4
|
4
|
||||||
|
|
@ -235,20 +272,20 @@ and another consumes those messages using a lazy seq provided by `polling-receiv
|
||||||
5
|
5
|
||||||
7
|
7
|
||||||
...
|
...
|
||||||
----
|
```
|
||||||
|
|
||||||
You'd presumably want to set up some ways to control your consumer, but hopefully
|
You'd presumably want to set up some ways to control your consumer, but hopefully
|
||||||
you see that it would be trivial to parallelize the processing function being
|
you see that it would be trivial to parallelize the processing function being
|
||||||
wrapped by `deleting-consumer` using `pmap`, distribute processing among agents
|
wrapped by `deleting-consumer` using `pmap`, distribute processing among agents
|
||||||
if that's more appropriate, etc.
|
if that's more appropriate, etc.
|
||||||
|
|
||||||
== Building Bandalore
|
## Building Bandalore
|
||||||
|
|
||||||
Have maven. From the command line:
|
Have maven. From the command line:
|
||||||
|
|
||||||
----
|
```
|
||||||
$ mvn clean install
|
$ mvn clean verify
|
||||||
----
|
```
|
||||||
|
|
||||||
*The tests are all live*, so:
|
*The tests are all live*, so:
|
||||||
|
|
||||||
|
|
@ -261,27 +298,27 @@ Since the tests are live, you either need to add your AWS credentials to your
|
||||||
`~/.m2/settings.xml` file as properties, or specify them on the command line
|
`~/.m2/settings.xml` file as properties, or specify them on the command line
|
||||||
using `-D` switches:
|
using `-D` switches:
|
||||||
|
|
||||||
----
|
```
|
||||||
$ mvn -Daws.id=XXXXXXX -Daws.secret-key=YYYYYYY clean install
|
$ mvn -Daws.id=XXXXXXX -Daws.secret-key=YYYYYYY clean install
|
||||||
----
|
```
|
||||||
|
|
||||||
Or, you can skip the tests entirely:
|
Or, you can skip the tests entirely:
|
||||||
|
|
||||||
----
|
```
|
||||||
$ mvn -Dmaven.test.skip=true clean install
|
$ mvn -Dmaven.test.skip=true clean install
|
||||||
----
|
```
|
||||||
|
|
||||||
In any case, you'll find a built `.jar` file in the `target` directory, and in
|
In any case, you'll find a built `.jar` file in the `target` directory, and in
|
||||||
its designated spot in `~/.m2/repository` (assuming you ran `install` rather than
|
its designated spot in `~/.m2/repository` (assuming you ran `install` rather than
|
||||||
e.g. `package`).
|
e.g. `package`).
|
||||||
|
|
||||||
== Need Help?
|
## Need Help?
|
||||||
|
|
||||||
Ping `cemerick` on freenode irc or twitter if you have questions
|
Ping `cemerick` on freenode irc or twitter if you have questions
|
||||||
or would like to contribute patches.
|
or would like to contribute patches.
|
||||||
|
|
||||||
== License
|
## License
|
||||||
|
|
||||||
Copyright © 2011 Chas Emerick
|
Copyright © 2011-2013 Chas Emerick and contributors.
|
||||||
|
|
||||||
Licensed under the EPL. (See the file epl-v10.html.)
|
Licensed under the EPL. (See the file epl-v10.html.)
|
||||||
34
pom.xml
34
pom.xml
|
|
@ -2,7 +2,7 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>com.cemerick</groupId>
|
<groupId>com.cemerick</groupId>
|
||||||
<artifactId>bandalore</artifactId>
|
<artifactId>bandalore</artifactId>
|
||||||
<version>0.0.1</version>
|
<version>0.0.7-SNAPSHOT</version>
|
||||||
<name>bandalore</name>
|
<name>bandalore</name>
|
||||||
<description>A Clojure library for Amazon's Simple Queue Service (SQS).</description>
|
<description>A Clojure library for Amazon's Simple Queue Service (SQS).</description>
|
||||||
<url>http://github.com/cemerick/bandalore</url>
|
<url>http://github.com/cemerick/bandalore</url>
|
||||||
|
|
@ -29,20 +29,14 @@
|
||||||
</scm>
|
</scm>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<clojure.version>1.2.0</clojure.version>
|
<clojure.version>1.4.0</clojure.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.amazonaws</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>aws-java-sdk</artifactId>
|
<artifactId>aws-java-sdk</artifactId>
|
||||||
<version>1.1.5</version>
|
<version>1.8.0</version>
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.clojure</groupId>
|
|
||||||
<artifactId>clojure-contrib</artifactId>
|
|
||||||
<version>1.2.0</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
@ -63,6 +57,28 @@
|
||||||
<warnOnReflection>true</warnOnReflection>
|
<warnOnReflection>true</warnOnReflection>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-invoker-plugin</artifactId>
|
||||||
|
<version>1.5</version>
|
||||||
|
<configuration>
|
||||||
|
<projectsDirectory>src/integration</projectsDirectory>
|
||||||
|
<cloneProjectsTo>${project.build.directory}/integration</cloneProjectsTo>
|
||||||
|
<pomIncludes>
|
||||||
|
<pomInclude>*/pom.xml</pomInclude>
|
||||||
|
</pomIncludes>
|
||||||
|
<streamLogs>true</streamLogs>
|
||||||
|
<goals><goal>clojure:test</goal></goals>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>integration-test</id>
|
||||||
|
<goals>
|
||||||
|
<goal>install</goal>
|
||||||
|
<goal>run</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
|
|
|
||||||
44
src/integration/clojure-1.2.0/pom.xml
Normal file
44
src/integration/clojure-1.2.0/pom.xml
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>clojure.tools</groupId>
|
||||||
|
<version>0.0.0</version>
|
||||||
|
<artifactId>test-clojure-1.2.0</artifactId>
|
||||||
|
<name>(Clojure 1.2.0 tests)</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.clojure</groupId>
|
||||||
|
<artifactId>clojure</artifactId>
|
||||||
|
<version>1.2.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>@project.groupId@</groupId>
|
||||||
|
<artifactId>@project.artifactId@</artifactId>
|
||||||
|
<version>@project.version@</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>com.theoryinpractise</groupId>
|
||||||
|
<artifactId>clojure-maven-plugin</artifactId>
|
||||||
|
<version>1.3.8</version>
|
||||||
|
<configuration>
|
||||||
|
<baseTestSourceDirectory>@basedir@/src/test/clojure</baseTestSourceDirectory>
|
||||||
|
<clojureOptions>-Daws.id=@aws.id@ -Daws.secret-key=@aws.secret-key@</clojureOptions>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>test-clojure</id>
|
||||||
|
<phase>test</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>test</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
||||||
44
src/integration/clojure-1.3.0/pom.xml
Normal file
44
src/integration/clojure-1.3.0/pom.xml
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>clojure.tools</groupId>
|
||||||
|
<version>0.0.0</version>
|
||||||
|
<artifactId>test-clojure-1.3.0</artifactId>
|
||||||
|
<name>(Clojure 1.3.0 tests)</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.clojure</groupId>
|
||||||
|
<artifactId>clojure</artifactId>
|
||||||
|
<version>1.3.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>@project.groupId@</groupId>
|
||||||
|
<artifactId>@project.artifactId@</artifactId>
|
||||||
|
<version>@project.version@</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>com.theoryinpractise</groupId>
|
||||||
|
<artifactId>clojure-maven-plugin</artifactId>
|
||||||
|
<version>1.3.8</version>
|
||||||
|
<configuration>
|
||||||
|
<baseTestSourceDirectory>@basedir@/src/test/clojure</baseTestSourceDirectory>
|
||||||
|
<clojureOptions>-Daws.id=@aws.id@ -Daws.secret-key=@aws.secret-key@</clojureOptions>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>test-clojure</id>
|
||||||
|
<phase>test</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>test</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
||||||
|
|
@ -19,22 +19,28 @@
|
||||||
(defn create-client
|
(defn create-client
|
||||||
"Creates a synchronous AmazonSQSClient using the provided account id, secret key,
|
"Creates a synchronous AmazonSQSClient using the provided account id, secret key,
|
||||||
and optional com.amazonaws.ClientConfiguration."
|
and optional com.amazonaws.ClientConfiguration."
|
||||||
|
([]
|
||||||
|
(create-client (com.amazonaws.ClientConfiguration.)))
|
||||||
|
([client-config]
|
||||||
|
(AmazonSQSClient.
|
||||||
|
(.withUserAgent client-config "Bandalore - SQS for Clojure")))
|
||||||
([id secret-key]
|
([id secret-key]
|
||||||
(create-client id secret-key (com.amazonaws.ClientConfiguration.)))
|
(create-client id secret-key (com.amazonaws.ClientConfiguration.)))
|
||||||
([id secret-key client-config]
|
([id secret-key client-config]
|
||||||
(AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key)
|
(AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key)
|
||||||
(.withUserAgent client-config "Bandalore - SQS for Clojure"))))
|
(.withUserAgent client-config "Bandalore - SQS for Clojure"))))
|
||||||
|
|
||||||
|
(def ^{:private true} visibility-warned? (atom false))
|
||||||
|
|
||||||
(defn create-queue
|
(defn create-queue
|
||||||
"Creates a queue with the given name, returning the corresponding URL string.
|
"Creates a queue with the given name, returning the corresponding URL string.
|
||||||
Returns successfully if the queue already exists.
|
Returns successfully if the queue already exists."
|
||||||
|
[^AmazonSQSClient client queue-name & {:as options}]
|
||||||
Specify an optional :visibility keyword arg to set the new queue's default
|
(when (and (:visibility options) (not @visibility-warned?))
|
||||||
visibility timeout in seconds."
|
(println "[WARNING] :visibility option to cemerick.bandalore/create-queue no longer supported;")
|
||||||
[^AmazonSQSClient client queue-name & {:keys [visibility]}]
|
(println "[WARNING] See https://github.com/cemerick/bandalore/issues/3")
|
||||||
(->> (if visibility
|
(reset! visibility-warned? true))
|
||||||
(CreateQueueRequest. queue-name visibility)
|
(->> (CreateQueueRequest. queue-name)
|
||||||
(CreateQueueRequest. queue-name))
|
|
||||||
(.createQueue client)
|
(.createQueue client)
|
||||||
.getQueueUrl))
|
.getQueueUrl))
|
||||||
|
|
||||||
|
|
@ -44,9 +50,10 @@
|
||||||
(.deleteQueue client (DeleteQueueRequest. queue-url)))
|
(.deleteQueue client (DeleteQueueRequest. queue-url)))
|
||||||
|
|
||||||
(defn list-queues
|
(defn list-queues
|
||||||
"Returns a seq of all queues' URL strings."
|
"Returns a seq of all queues' URL strings. Takes an optional string prefix
|
||||||
[^AmazonSQSClient client]
|
argument to only list queues with names that start with the prefix."
|
||||||
(->> (ListQueuesRequest.)
|
[^AmazonSQSClient client & {:keys [prefix]}]
|
||||||
|
(->> (ListQueuesRequest. prefix)
|
||||||
(.listQueues client)
|
(.listQueues client)
|
||||||
.getQueueUrls
|
.getQueueUrls
|
||||||
seq))
|
seq))
|
||||||
|
|
@ -99,6 +106,11 @@
|
||||||
Defaults to the empty set (i.e. no attributes will be included in
|
Defaults to the empty set (i.e. no attributes will be included in
|
||||||
received messages).
|
received messages).
|
||||||
See the SQS documentation for all support message attributes.
|
See the SQS documentation for all support message attributes.
|
||||||
|
:wait-time-seconds - enables long poll support. time is in seconds, bewteen
|
||||||
|
0 (default - no long polling) and 20.
|
||||||
|
Allows Amazon SQS service to wait until a message is available
|
||||||
|
in the queue before sending a response.
|
||||||
|
See the SQS documentation at (http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
|
||||||
|
|
||||||
Returns a seq of maps with these slots:
|
Returns a seq of maps with these slots:
|
||||||
|
|
||||||
|
|
@ -109,13 +121,17 @@
|
||||||
:receipt-handle - the ID used to delete the message from the queue after
|
:receipt-handle - the ID used to delete the message from the queue after
|
||||||
it has been fully processed.
|
it has been fully processed.
|
||||||
:source-queue - the URL of the queue from which the message was received"
|
:source-queue - the URL of the queue from which the message was received"
|
||||||
[^AmazonSQSClient client queue-url & {:keys [limit visibility ^java.util.Collection attributes]
|
[^AmazonSQSClient client queue-url & {:keys [limit
|
||||||
|
visibility
|
||||||
|
wait-time-seconds
|
||||||
|
^java.util.Collection attributes]
|
||||||
:or {limit 1
|
:or {limit 1
|
||||||
attributes #{}}}]
|
attributes #{}}}]
|
||||||
(let [req (-> (ReceiveMessageRequest. queue-url)
|
(let [req (-> (ReceiveMessageRequest. queue-url)
|
||||||
(.withMaxNumberOfMessages (-> limit (min 10) (max 1)))
|
(.withMaxNumberOfMessages (-> limit (min 10) (max 1) int Integer/valueOf))
|
||||||
(.withAttributeNames attributes))
|
(.withAttributeNames attributes))
|
||||||
req (if visibility (.withVisibilityTimeout req visibility) req)]
|
req (if wait-time-seconds (.withWaitTimeSeconds req (Integer/valueOf (int wait-time-seconds))) req)
|
||||||
|
req (if visibility (.withVisibilityTimeout req (Integer/valueOf (int visibility))) req)]
|
||||||
(->> (.receiveMessage client req)
|
(->> (.receiveMessage client req)
|
||||||
.getMessages
|
.getMessages
|
||||||
(map (partial message-map queue-url)))))
|
(map (partial message-map queue-url)))))
|
||||||
|
|
@ -199,4 +215,4 @@
|
||||||
;; void removePermission(RemovePermissionRequest removePermissionRequest)
|
;; void removePermission(RemovePermissionRequest removePermissionRequest)
|
||||||
; The RemovePermission action revokes any permissions in the queue policy that matches the specified Label parameter.
|
; The RemovePermission action revokes any permissions in the queue policy that matches the specified Label parameter.
|
||||||
;; void addPermission(AddPermissionRequest addPermissionRequest)
|
;; void addPermission(AddPermissionRequest addPermissionRequest)
|
||||||
;;The AddPermission action adds a permission to a queue for a specific principal.
|
;;The AddPermission action adds a permission to a queue for a specific principal.
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,8 @@
|
||||||
(ns cemerick.bandalore-test
|
(ns cemerick.bandalore-test
|
||||||
(:use cemerick.bandalore
|
(:use cemerick.bandalore
|
||||||
clojure.test
|
clojure.test)
|
||||||
clojure.contrib.core)
|
|
||||||
(:refer-clojure :exclude (send)))
|
(:refer-clojure :exclude (send)))
|
||||||
|
|
||||||
#_(do
|
|
||||||
(System/setProperty "aws.id" "")
|
|
||||||
(System/setProperty "aws.secret-key" ""))
|
|
||||||
|
|
||||||
; kill the verbose aws logging
|
; kill the verbose aws logging
|
||||||
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
|
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
|
||||||
java.util.logging.Level/WARNING)
|
java.util.logging.Level/WARNING)
|
||||||
|
|
@ -18,7 +13,7 @@
|
||||||
(assert (and id secret-key))
|
(assert (and id secret-key))
|
||||||
(create-client id secret-key)))
|
(create-client id secret-key)))
|
||||||
|
|
||||||
(def *test-queue-url* nil)
|
(def ^{:dynamic true} *test-queue-url* nil)
|
||||||
|
|
||||||
(defn- uuid
|
(defn- uuid
|
||||||
[]
|
[]
|
||||||
|
|
@ -56,30 +51,34 @@
|
||||||
`(deftest ~name
|
`(deftest ~name
|
||||||
(println '~name) ; lots of sleeping in these tests, give some indication of life
|
(println '~name) ; lots of sleeping in these tests, give some indication of life
|
||||||
(binding [*test-queue-url* (create-queue client (test-queue-name))]
|
(binding [*test-queue-url* (create-queue client (test-queue-name))]
|
||||||
(try
|
(is *test-queue-url*)
|
||||||
(is *test-queue-url*)
|
~@body)))
|
||||||
~@body
|
|
||||||
(finally
|
|
||||||
(delete-queue client *test-queue-url*))))))
|
|
||||||
|
|
||||||
(defsqstest test-list-queues
|
(defsqstest test-list-queues
|
||||||
(let [msg (uuid)]
|
(let [msg (uuid)]
|
||||||
; sending a msg seems to "force" the queue's existence in listings
|
; sending a msg seems to "force" the queue's existence in listings
|
||||||
(send client *test-queue-url* msg)
|
(send client *test-queue-url* msg)
|
||||||
(wait-for-condition #((set (list-queues client)) *test-queue-url*)
|
(wait-for-condition #((set (list-queues client)) *test-queue-url*)
|
||||||
"Created queue not visible in result of list-queues")))
|
"Created queue not visible in result of list-queues")
|
||||||
|
(wait-for-condition #((set (list-queues client :prefix test-queue-name-prefix)) *test-queue-url*)
|
||||||
|
"Created queue not visible in result of list-queues with prefix")))
|
||||||
|
|
||||||
(defsqstest test-queue-attrs
|
(defsqstest test-queue-attrs
|
||||||
(let [{:strs [VisibilityTimeout MaximumMessageSize] :as base-attrs} (queue-attrs client *test-queue-url*)
|
(let [{:strs [MaximumMessageSize] :as base-attrs} (queue-attrs client *test-queue-url*)
|
||||||
expected {"VisibilityTimeout" "117" "MaximumMessageSize" "1535"}]
|
expected {"MaximumMessageSize" "1535"}]
|
||||||
(is (and VisibilityTimeout MaximumMessageSize))
|
(is MaximumMessageSize)
|
||||||
(queue-attrs client *test-queue-url* expected)
|
(queue-attrs client *test-queue-url* expected)
|
||||||
(wait-for-condition #(= expected
|
(wait-for-condition #(= expected (select-keys (queue-attrs client *test-queue-url*) (keys expected)))
|
||||||
(->> (queue-attrs client *test-queue-url*)
|
|
||||||
(filter (comp (set (keys expected)) first))
|
|
||||||
(into {})))
|
|
||||||
"Queue attribute test failed after waiting for test condition")))
|
"Queue attribute test failed after waiting for test condition")))
|
||||||
|
|
||||||
|
(defsqstest receive-delete
|
||||||
|
(let [msg (uuid)]
|
||||||
|
(send client *test-queue-url* msg)
|
||||||
|
(let [[{:keys [body] :as rmsg}] (receive client *test-queue-url*)]
|
||||||
|
(is (= msg body))
|
||||||
|
(delete client *test-queue-url* rmsg)
|
||||||
|
(is (empty? (receive client *test-queue-url*))))))
|
||||||
|
|
||||||
(defn- wait-for-full-queue
|
(defn- wait-for-full-queue
|
||||||
[q min-cnt queue-name]
|
[q min-cnt queue-name]
|
||||||
(wait-for-condition #(-> (queue-attrs client q)
|
(wait-for-condition #(-> (queue-attrs client q)
|
||||||
|
|
@ -122,3 +121,11 @@
|
||||||
(let [v (-> (receive client *test-queue-url* :visibility 5) first :body read-string)]
|
(let [v (-> (receive client *test-queue-url* :visibility 5) first :body read-string)]
|
||||||
(is (some #(= v (-> % :body read-string)) (polling-receive client *test-queue-url* :max-wait 10000)))))
|
(is (some #(= v (-> % :body read-string)) (polling-receive client *test-queue-url* :max-wait 10000)))))
|
||||||
|
|
||||||
|
(defsqstest test-receive-long-polling
|
||||||
|
(let [q *test-queue-url*
|
||||||
|
no-poll (future (receive client q))
|
||||||
|
long-poll (future (receive client q :wait-time-seconds 20))]
|
||||||
|
(Thread/sleep 10000)
|
||||||
|
(send client q "1")
|
||||||
|
(is (== 0 (count @no-poll)) "Should not return messages")
|
||||||
|
(is (== 1 (count @long-poll)) "Should return 1 message")))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue