The document discusses techniques for making event processing safer and faster using the LMAX Disruptor library. It describes how to define event and message classes, create event translators and handlers, and implement a custom ring buffer that allows directly accessing events without copying to improve performance. Safe handling patterns are introduced using an isValid flag and checking messages are valid before processing.
8. class PlayerHandler
implements EventHandler<PlayerMove> {
public void onEvent(PlayerMove event,
long sequence,
boolean onBatchEnd) {
Player player = findPlayer(event.id);
player.move(event.direction, event.distance);
}
}
Friday, 20 September 13
9. class PlayerHandler
implements EventHandler<PlayerMove> {
public void onEvent(PlayerMove event,
long sequence,
boolean onBatchEnd) {
Player player = findPlayer(event.id);
player.move(event.direction, event.distance);
}
}
Friday, 20 September 13
10. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
11. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
12. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
13. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
14. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
15. class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
try {
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
} finally {
buffer.publish(next);
}
}
}
Friday, 20 September 13
16. class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
17. class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
18. class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
19. class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
21. public abstract class BaseMessage {
public boolean isValid;
}
Friday, 20 September 13
22. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
23. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
24. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
25. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
26. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
27. class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
28. class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
29. class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
30. class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
31. class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
32. class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
34. public class SimpleEvent {
private final long id;
private final long v1;
private final long v2;
private final long v3;
public SimpleEvent(long id, long v1,
long v2, long v3) {
this.id = id;
this.v1 = v1;
this.v2 = v2;
this.v3 = v3;
}
}
public class EventHolder {
public SimpleEvent event;
}
Friday, 20 September 13
35. public class SimpleEvent {
private final long id;
private final long v1;
private final long v2;
private final long v3;
public SimpleEvent(long id, long v1,
long v2, long v3) {
this.id = id;
this.v1 = v1;
this.v2 = v2;
this.v3 = v3;
}
}
public class EventHolder {
public SimpleEvent event;
}
Friday, 20 September 13
38. next() : long
get(long) : T
publish(long) : void
RingBuffer<T>
next() : long
publish(long) : void
Sequencer
WaitStrategy
SingleProducer
Sequencer
MultiProducer
Sequencer
get(long) : T
DataProvider
<T>
run() : void
BatchEvent
Processor<T>
onEvent(T, long,
boolean) : void
Event
Handler<T>
Friday, 20 September 13
39. next() : long
get(long) : T
publish(long) : void
RingBuffer<T>
next() : long
publish(long) : void
Sequencer
WaitStrategy
SingleProducer
Sequencer
MultiProducer
Sequencer
get(long) : T
DataProvider
<T>
run() : void
BatchEvent
Processor<T>
onEvent(T, long,
boolean) : void
Event
Handler<T>
Friday, 20 September 13
40. public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
41. public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
42. public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
43. public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
44. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
45. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
46. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
47. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
48. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
49. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
50. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
51. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
52. public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
53. public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
54. public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
55. public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
56. public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
57. public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
58. Simple Custom
Mean 0.032 0.002
Median 0.021 0.002
Mode 0.020 0.002
Performance - Its my laptop,YMMV!!!
Friday, 20 September 13
60. public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
61. public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
62. public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
63. public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
64. public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
65. public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
66. public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
67. public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
68. public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
69. public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
70. public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
71. public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
72. public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
73. public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
74. public class BufferEventHandler
implements EventHandler<ByteBuffer> {
public void onEvent(ByteBuffer buffer,
long sequence,
boolean endOfBatch) {
// Do stuff...
}
}
Friday, 20 September 13