feat(StreamRecordProcessingError): add new error to stop worker
This commit is contained in:
parent
f0230c9040
commit
8642f0b40b
2 changed files with 18 additions and 11 deletions
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
public class StreamRecordProcessingError extends Error {
|
||||||
|
|
||||||
|
public static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records for shard id: ";
|
||||||
|
|
||||||
|
public StreamRecordProcessingError(String shardId) {
|
||||||
|
super(PROCESS_RECORDS_ERROR_MESSAGE + shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -14,8 +14,6 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
@ -31,12 +29,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
|
||||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
|
||||||
import com.amazonaws.services.kinesis.leases.impl.Lease;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -86,9 +79,6 @@ public class Worker implements Runnable {
|
||||||
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
||||||
|
|
||||||
// WARNING: dynamo connector depends upon this exact string
|
|
||||||
private static final String PROCESS_RECORDS_ERROR_MESSAGE = "Failure to process records";
|
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
private final String applicationName;
|
private final String applicationName;
|
||||||
|
|
@ -531,7 +521,7 @@ public class Worker implements Runnable {
|
||||||
wlog.info("Sleeping ...");
|
wlog.info("Sleeping ...");
|
||||||
Thread.sleep(idleTimeInMilliseconds);
|
Thread.sleep(idleTimeInMilliseconds);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e.getCause().getMessage().contains(PROCESS_RECORDS_ERROR_MESSAGE))
|
if (causedByStreamRecordProcessingError(e))
|
||||||
throw new RuntimeException("Failing worker after irrecoverable failure in processing records");
|
throw new RuntimeException("Failing worker after irrecoverable failure in processing records");
|
||||||
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
|
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
|
||||||
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
||||||
|
|
@ -550,6 +540,13 @@ public class Worker implements Runnable {
|
||||||
wlog.resetInfoLogging();
|
wlog.resetInfoLogging();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean causedByStreamRecordProcessingError(Throwable t) {
|
||||||
|
if (t.getCause() == null) return false;
|
||||||
|
if (t.getCause().getClass().equals(StreamRecordProcessingError.class)) return true;
|
||||||
|
|
||||||
|
return causedByStreamRecordProcessingError(t.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
private void initialize() {
|
private void initialize() {
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
|
||||||
boolean isDone = false;
|
boolean isDone = false;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue