際際滷

際際滷Share a Scribd company logo
LMAX Disruptor 3.x
Details & Advanced Patterns
@mikeb2701
Friday, 20 September 13
<Intro>
Friday, 20 September 13
Agenda
 Safer
 Safer
 Faster
 Faster
Friday, 20 September 13
<safer>
Friday, 20 September 13
class PlayerMove {
long id;
long direction;
long distance;
}
Friday, 20 September 13
class PlayerMoveFactory
implements EventFactory<PlayerMove> {
public PlayerMove newInstance() {
return new PlayerMove();
}
}
Friday, 20 September 13
class PlayerMoveFactory
implements EventFactory<PlayerMove> {
public PlayerMove newInstance() {
return new PlayerMove();
}
}
Friday, 20 September 13
class PlayerHandler
implements EventHandler<PlayerMove> {
public void onEvent(PlayerMove event,
long sequence,
boolean onBatchEnd) {
Player player = findPlayer(event.id);
player.move(event.direction, event.distance);
}
}
Friday, 20 September 13
class PlayerHandler
implements EventHandler<PlayerMove> {
public void onEvent(PlayerMove event,
long sequence,
boolean onBatchEnd) {
Player player = findPlayer(event.id);
player.move(event.direction, event.distance);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
buffer.publish(next);
}
}
Friday, 20 September 13
class NetworkHandler {
RingBuffer<PlayerMove> buffer;
void handle(ByteBuffer packet) {
long next = buffer.next();
try {
PlayerMove playerMove = buffer.get(next);
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
} finally {
buffer.publish(next);
}
}
}
Friday, 20 September 13
class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
class PlayerMoveTranslator
implements EventTranslatorOneArg<PlayerMove, ByteBuffer> {
public static final PlayerMoveTranslator INSTANCE =
new PlayerMoveTranslator();
public void translateTo(PlayerMove playerMove,
long sequence,
ByteBuffer packet) {
playerMove.id = packet.getLong();
playerMove.direction = packet.getLong();
playerMove.distance = packet.getLong();
}
}
class NetworkHandler2 {
RingBuffer<PlayerMove> buffer;
void handle(final ByteBuffer packet) {
buffer.publishEvent(PlayerMoveTranslator.INSTANCE,
packet);
}
}
Friday, 20 September 13
<safer>
Friday, 20 September 13
public abstract class BaseMessage {
public boolean isValid;
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeTranslator<T extends BaseMessage, A>
implements EventTranslatorOneArg<T, A> {
private EventTranslatorOneArg<T, A> delegate;
public SafeTranslator(EventTranslatorOneArg<T, A> delegate) {
this.delegate = delegate;
}
public void translateTo(T t, long sequence, A arg0) {
t.isValid = false;
delegate.translateTo(t, sequence, arg0);
t.isValid = true;
}
}
Friday, 20 September 13
class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
class SafeHandler<T extends BaseMessage>
implements EventHandler<T> {
private EventHandler<T> delegate;
public SafeHandler(EventHandler<T> handler) {
this.delegate = handler;
}
public void onEvent(T t, long sequence, boolean endOfBatch) {
if (!t.isValid) {
return;
}
delegate.onEvent(t, sequence, endOfBatch);
}
}
Friday, 20 September 13
<faster>
Friday, 20 September 13
public class SimpleEvent {
private final long id;
private final long v1;
private final long v2;
private final long v3;
public SimpleEvent(long id, long v1,
long v2, long v3) {
this.id = id;
this.v1 = v1;
this.v2 = v2;
this.v3 = v3;
}
}
public class EventHolder {
public SimpleEvent event;
}
Friday, 20 September 13
public class SimpleEvent {
private final long id;
private final long v1;
private final long v2;
private final long v3;
public SimpleEvent(long id, long v1,
long v2, long v3) {
this.id = id;
this.v1 = v1;
this.v2 = v2;
this.v3 = v3;
}
}
public class EventHolder {
public SimpleEvent event;
}
Friday, 20 September 13
EventTranslatorOneArg<EventHolder, SimpleEvent> TRANSLATOR
= new EventTranslatorOneArg<>() {
public void translateTo(EventHolder holder,
long sequence,
SimpleEvent event) {
holder.event = event;
}
};
Friday, 20 September 13
<GC>
Friday, 20 September 13
next() : long
get(long) : T
publish(long) : void
RingBuffer<T>
next() : long
publish(long) : void
Sequencer
WaitStrategy
SingleProducer
Sequencer
MultiProducer
Sequencer
get(long) : T
DataProvider
<T>
run() : void
BatchEvent
Processor<T>
onEvent(T, long,
boolean) : void
Event
Handler<T>
Friday, 20 September 13
next() : long
get(long) : T
publish(long) : void
RingBuffer<T>
next() : long
publish(long) : void
Sequencer
WaitStrategy
SingleProducer
Sequencer
MultiProducer
Sequencer
get(long) : T
DataProvider
<T>
run() : void
BatchEvent
Processor<T>
onEvent(T, long,
boolean) : void
Event
Handler<T>
Friday, 20 September 13
public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
public interface EventAccessor<T> {
T take(long sequence);
}
public class CustomRingBuffer<T>
implements DataProvider<EventAccessor<T>>,
EventAccessor<T> {
private final Sequencer sequencer;
private final Object[] buffer;
private final int mask;
public CustomRingBuffer(Sequencer sequencer) {
this.sequencer = sequencer;
buffer = new Object[sequencer.getBufferSize()];
mask = sequencer.getBufferSize() - 1;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public void put(T t) {
long next = sequencer.next();
buffer[index(next)] = t;
sequencer.publish(next);
}
public T take(long sequence) {
T t = (T) buffer[index(sequence)];
buffer[index(sequence)] = null;
return t;
}
public EventAccessor<T> get(long sequence) {
return this;
}
Friday, 20 September 13
public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
public BatchEventProcessor<EventAccessor<T>>
createHandler(final EventHandler<T> handler) {
BatchEventProcessor<EventAccessor<T>> processor =
new BatchEventProcessor<>(this, sequencer.newBarrier(),
new EventHandler<EventAccessor<T>>() {
public void onEvent(EventAccessor<T> accessor,
long sequence,
boolean endOfBatch) {
handler.onEvent(accessor.take(sequence),
sequence,
endOfBatch);
}
});
sequencer.addGatingSequences(processor.getSequence());
return processor;
}
Friday, 20 September 13
Simple Custom
Mean 0.032 0.002
Median 0.021 0.002
Mode 0.020 0.002
Performance - Its my laptop,YMMV!!!
Friday, 20 September 13
<faster>
Friday, 20 September 13
public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
public class OffHeapRingBuffer
implements DataProvider<ByteBuffer> {
private final Sequencer sequencer;
private final int entrySize;
private final ByteBuffer buffer;
private final int mask;
ThreadLocal<ByteBuffer> perThreadBuffer =
new ThreadLocal<ByteBuffer>() {
protected ByteBuffer initialValue() {
return buffer.duplicate();
}
};
public OffHeapRingBuffer(Sequencer sequencer,
int entrySize) {
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(
sequencer.getBufferSize() * entrySize);
}
Friday, 20 September 13
public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
public ByteBuffer get(long sequence) {
int index = index(sequence);
int position = index * entrySize;
int limit = position + entrySize;
ByteBuffer byteBuffer = perThreadBuffer.get();
byteBuffer.position(position).limit(limit);
return byteBuffer;
}
Friday, 20 September 13
public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
public void put(byte[] data) {
long next = sequencer.next();
try {
get(next).put(data);
} finally {
sequencer.publish(next);
}
}
Friday, 20 September 13
public class BufferEventHandler
implements EventHandler<ByteBuffer> {
public void onEvent(ByteBuffer buffer,
long sequence,
boolean endOfBatch) {
// Do stuff...
}
}
Friday, 20 September 13
<Q&A>
 Disruptor: http://lmax-exchange.github.io/
disruptor/
 Sample code: https://github.com/mikeb01/
yow2013
Friday, 20 September 13

More Related Content

Disruptor yow2013 v2

  • 1. LMAX Disruptor 3.x Details & Advanced Patterns @mikeb2701 Friday, 20 September 13
  • 3. Agenda Safer Safer Faster Faster Friday, 20 September 13
  • 5. class PlayerMove { long id; long direction; long distance; } Friday, 20 September 13
  • 6. class PlayerMoveFactory implements EventFactory<PlayerMove> { public PlayerMove newInstance() { return new PlayerMove(); } } Friday, 20 September 13
  • 7. class PlayerMoveFactory implements EventFactory<PlayerMove> { public PlayerMove newInstance() { return new PlayerMove(); } } Friday, 20 September 13
  • 8. class PlayerHandler implements EventHandler<PlayerMove> { public void onEvent(PlayerMove event, long sequence, boolean onBatchEnd) { Player player = findPlayer(event.id); player.move(event.direction, event.distance); } } Friday, 20 September 13
  • 9. class PlayerHandler implements EventHandler<PlayerMove> { public void onEvent(PlayerMove event, long sequence, boolean onBatchEnd) { Player player = findPlayer(event.id); player.move(event.direction, event.distance); } } Friday, 20 September 13
  • 10. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); buffer.publish(next); } } Friday, 20 September 13
  • 11. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); buffer.publish(next); } } Friday, 20 September 13
  • 12. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); buffer.publish(next); } } Friday, 20 September 13
  • 13. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); buffer.publish(next); } } Friday, 20 September 13
  • 14. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); buffer.publish(next); } } Friday, 20 September 13
  • 15. class NetworkHandler { RingBuffer<PlayerMove> buffer; void handle(ByteBuffer packet) { long next = buffer.next(); try { PlayerMove playerMove = buffer.get(next); playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); } finally { buffer.publish(next); } } } Friday, 20 September 13
  • 16. class PlayerMoveTranslator implements EventTranslatorOneArg<PlayerMove, ByteBuffer> { public static final PlayerMoveTranslator INSTANCE = new PlayerMoveTranslator(); public void translateTo(PlayerMove playerMove, long sequence, ByteBuffer packet) { playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); } } class NetworkHandler2 { RingBuffer<PlayerMove> buffer; void handle(final ByteBuffer packet) { buffer.publishEvent(PlayerMoveTranslator.INSTANCE, packet); } } Friday, 20 September 13
  • 17. class PlayerMoveTranslator implements EventTranslatorOneArg<PlayerMove, ByteBuffer> { public static final PlayerMoveTranslator INSTANCE = new PlayerMoveTranslator(); public void translateTo(PlayerMove playerMove, long sequence, ByteBuffer packet) { playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); } } class NetworkHandler2 { RingBuffer<PlayerMove> buffer; void handle(final ByteBuffer packet) { buffer.publishEvent(PlayerMoveTranslator.INSTANCE, packet); } } Friday, 20 September 13
  • 18. class PlayerMoveTranslator implements EventTranslatorOneArg<PlayerMove, ByteBuffer> { public static final PlayerMoveTranslator INSTANCE = new PlayerMoveTranslator(); public void translateTo(PlayerMove playerMove, long sequence, ByteBuffer packet) { playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); } } class NetworkHandler2 { RingBuffer<PlayerMove> buffer; void handle(final ByteBuffer packet) { buffer.publishEvent(PlayerMoveTranslator.INSTANCE, packet); } } Friday, 20 September 13
  • 19. class PlayerMoveTranslator implements EventTranslatorOneArg<PlayerMove, ByteBuffer> { public static final PlayerMoveTranslator INSTANCE = new PlayerMoveTranslator(); public void translateTo(PlayerMove playerMove, long sequence, ByteBuffer packet) { playerMove.id = packet.getLong(); playerMove.direction = packet.getLong(); playerMove.distance = packet.getLong(); } } class NetworkHandler2 { RingBuffer<PlayerMove> buffer; void handle(final ByteBuffer packet) { buffer.publishEvent(PlayerMoveTranslator.INSTANCE, packet); } } Friday, 20 September 13
  • 21. public abstract class BaseMessage { public boolean isValid; } Friday, 20 September 13
  • 22. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 23. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 24. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 25. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 26. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 27. class SafeTranslator<T extends BaseMessage, A> implements EventTranslatorOneArg<T, A> { private EventTranslatorOneArg<T, A> delegate; public SafeTranslator(EventTranslatorOneArg<T, A> delegate) { this.delegate = delegate; } public void translateTo(T t, long sequence, A arg0) { t.isValid = false; delegate.translateTo(t, sequence, arg0); t.isValid = true; } } Friday, 20 September 13
  • 28. class SafeHandler<T extends BaseMessage> implements EventHandler<T> { private EventHandler<T> delegate; public SafeHandler(EventHandler<T> handler) { this.delegate = handler; } public void onEvent(T t, long sequence, boolean endOfBatch) { if (!t.isValid) { return; } delegate.onEvent(t, sequence, endOfBatch); } } Friday, 20 September 13
  • 29. class SafeHandler<T extends BaseMessage> implements EventHandler<T> { private EventHandler<T> delegate; public SafeHandler(EventHandler<T> handler) { this.delegate = handler; } public void onEvent(T t, long sequence, boolean endOfBatch) { if (!t.isValid) { return; } delegate.onEvent(t, sequence, endOfBatch); } } Friday, 20 September 13
  • 30. class SafeHandler<T extends BaseMessage> implements EventHandler<T> { private EventHandler<T> delegate; public SafeHandler(EventHandler<T> handler) { this.delegate = handler; } public void onEvent(T t, long sequence, boolean endOfBatch) { if (!t.isValid) { return; } delegate.onEvent(t, sequence, endOfBatch); } } Friday, 20 September 13
  • 31. class SafeHandler<T extends BaseMessage> implements EventHandler<T> { private EventHandler<T> delegate; public SafeHandler(EventHandler<T> handler) { this.delegate = handler; } public void onEvent(T t, long sequence, boolean endOfBatch) { if (!t.isValid) { return; } delegate.onEvent(t, sequence, endOfBatch); } } Friday, 20 September 13
  • 32. class SafeHandler<T extends BaseMessage> implements EventHandler<T> { private EventHandler<T> delegate; public SafeHandler(EventHandler<T> handler) { this.delegate = handler; } public void onEvent(T t, long sequence, boolean endOfBatch) { if (!t.isValid) { return; } delegate.onEvent(t, sequence, endOfBatch); } } Friday, 20 September 13
  • 34. public class SimpleEvent { private final long id; private final long v1; private final long v2; private final long v3; public SimpleEvent(long id, long v1, long v2, long v3) { this.id = id; this.v1 = v1; this.v2 = v2; this.v3 = v3; } } public class EventHolder { public SimpleEvent event; } Friday, 20 September 13
  • 35. public class SimpleEvent { private final long id; private final long v1; private final long v2; private final long v3; public SimpleEvent(long id, long v1, long v2, long v3) { this.id = id; this.v1 = v1; this.v2 = v2; this.v3 = v3; } } public class EventHolder { public SimpleEvent event; } Friday, 20 September 13
  • 36. EventTranslatorOneArg<EventHolder, SimpleEvent> TRANSLATOR = new EventTranslatorOneArg<>() { public void translateTo(EventHolder holder, long sequence, SimpleEvent event) { holder.event = event; } }; Friday, 20 September 13
  • 38. next() : long get(long) : T publish(long) : void RingBuffer<T> next() : long publish(long) : void Sequencer WaitStrategy SingleProducer Sequencer MultiProducer Sequencer get(long) : T DataProvider <T> run() : void BatchEvent Processor<T> onEvent(T, long, boolean) : void Event Handler<T> Friday, 20 September 13
  • 39. next() : long get(long) : T publish(long) : void RingBuffer<T> next() : long publish(long) : void Sequencer WaitStrategy SingleProducer Sequencer MultiProducer Sequencer get(long) : T DataProvider <T> run() : void BatchEvent Processor<T> onEvent(T, long, boolean) : void Event Handler<T> Friday, 20 September 13
  • 40. public interface EventAccessor<T> { T take(long sequence); } public class CustomRingBuffer<T> implements DataProvider<EventAccessor<T>>, EventAccessor<T> { private final Sequencer sequencer; private final Object[] buffer; private final int mask; public CustomRingBuffer(Sequencer sequencer) { this.sequencer = sequencer; buffer = new Object[sequencer.getBufferSize()]; mask = sequencer.getBufferSize() - 1; } Friday, 20 September 13
  • 41. public interface EventAccessor<T> { T take(long sequence); } public class CustomRingBuffer<T> implements DataProvider<EventAccessor<T>>, EventAccessor<T> { private final Sequencer sequencer; private final Object[] buffer; private final int mask; public CustomRingBuffer(Sequencer sequencer) { this.sequencer = sequencer; buffer = new Object[sequencer.getBufferSize()]; mask = sequencer.getBufferSize() - 1; } Friday, 20 September 13
  • 42. public interface EventAccessor<T> { T take(long sequence); } public class CustomRingBuffer<T> implements DataProvider<EventAccessor<T>>, EventAccessor<T> { private final Sequencer sequencer; private final Object[] buffer; private final int mask; public CustomRingBuffer(Sequencer sequencer) { this.sequencer = sequencer; buffer = new Object[sequencer.getBufferSize()]; mask = sequencer.getBufferSize() - 1; } Friday, 20 September 13
  • 43. public interface EventAccessor<T> { T take(long sequence); } public class CustomRingBuffer<T> implements DataProvider<EventAccessor<T>>, EventAccessor<T> { private final Sequencer sequencer; private final Object[] buffer; private final int mask; public CustomRingBuffer(Sequencer sequencer) { this.sequencer = sequencer; buffer = new Object[sequencer.getBufferSize()]; mask = sequencer.getBufferSize() - 1; } Friday, 20 September 13
  • 44. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 45. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 46. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 47. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 48. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 49. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 50. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 51. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 52. public void put(T t) { long next = sequencer.next(); buffer[index(next)] = t; sequencer.publish(next); } public T take(long sequence) { T t = (T) buffer[index(sequence)]; buffer[index(sequence)] = null; return t; } public EventAccessor<T> get(long sequence) { return this; } Friday, 20 September 13
  • 53. public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<>(this, sequencer.newBarrier(), new EventHandler<EventAccessor<T>>() { public void onEvent(EventAccessor<T> accessor, long sequence, boolean endOfBatch) { handler.onEvent(accessor.take(sequence), sequence, endOfBatch); } }); sequencer.addGatingSequences(processor.getSequence()); return processor; } Friday, 20 September 13
  • 54. public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<>(this, sequencer.newBarrier(), new EventHandler<EventAccessor<T>>() { public void onEvent(EventAccessor<T> accessor, long sequence, boolean endOfBatch) { handler.onEvent(accessor.take(sequence), sequence, endOfBatch); } }); sequencer.addGatingSequences(processor.getSequence()); return processor; } Friday, 20 September 13
  • 55. public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<>(this, sequencer.newBarrier(), new EventHandler<EventAccessor<T>>() { public void onEvent(EventAccessor<T> accessor, long sequence, boolean endOfBatch) { handler.onEvent(accessor.take(sequence), sequence, endOfBatch); } }); sequencer.addGatingSequences(processor.getSequence()); return processor; } Friday, 20 September 13
  • 56. public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<>(this, sequencer.newBarrier(), new EventHandler<EventAccessor<T>>() { public void onEvent(EventAccessor<T> accessor, long sequence, boolean endOfBatch) { handler.onEvent(accessor.take(sequence), sequence, endOfBatch); } }); sequencer.addGatingSequences(processor.getSequence()); return processor; } Friday, 20 September 13
  • 57. public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<>(this, sequencer.newBarrier(), new EventHandler<EventAccessor<T>>() { public void onEvent(EventAccessor<T> accessor, long sequence, boolean endOfBatch) { handler.onEvent(accessor.take(sequence), sequence, endOfBatch); } }); sequencer.addGatingSequences(processor.getSequence()); return processor; } Friday, 20 September 13
  • 58. Simple Custom Mean 0.032 0.002 Median 0.021 0.002 Mode 0.020 0.002 Performance - Its my laptop,YMMV!!! Friday, 20 September 13
  • 60. public class OffHeapRingBuffer implements DataProvider<ByteBuffer> { private final Sequencer sequencer; private final int entrySize; private final ByteBuffer buffer; private final int mask; ThreadLocal<ByteBuffer> perThreadBuffer = new ThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return buffer.duplicate(); } }; public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect( sequencer.getBufferSize() * entrySize); } Friday, 20 September 13
  • 61. public class OffHeapRingBuffer implements DataProvider<ByteBuffer> { private final Sequencer sequencer; private final int entrySize; private final ByteBuffer buffer; private final int mask; ThreadLocal<ByteBuffer> perThreadBuffer = new ThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return buffer.duplicate(); } }; public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect( sequencer.getBufferSize() * entrySize); } Friday, 20 September 13
  • 62. public class OffHeapRingBuffer implements DataProvider<ByteBuffer> { private final Sequencer sequencer; private final int entrySize; private final ByteBuffer buffer; private final int mask; ThreadLocal<ByteBuffer> perThreadBuffer = new ThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return buffer.duplicate(); } }; public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect( sequencer.getBufferSize() * entrySize); } Friday, 20 September 13
  • 63. public class OffHeapRingBuffer implements DataProvider<ByteBuffer> { private final Sequencer sequencer; private final int entrySize; private final ByteBuffer buffer; private final int mask; ThreadLocal<ByteBuffer> perThreadBuffer = new ThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return buffer.duplicate(); } }; public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect( sequencer.getBufferSize() * entrySize); } Friday, 20 September 13
  • 64. public class OffHeapRingBuffer implements DataProvider<ByteBuffer> { private final Sequencer sequencer; private final int entrySize; private final ByteBuffer buffer; private final int mask; ThreadLocal<ByteBuffer> perThreadBuffer = new ThreadLocal<ByteBuffer>() { protected ByteBuffer initialValue() { return buffer.duplicate(); } }; public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect( sequencer.getBufferSize() * entrySize); } Friday, 20 September 13
  • 65. public ByteBuffer get(long sequence) { int index = index(sequence); int position = index * entrySize; int limit = position + entrySize; ByteBuffer byteBuffer = perThreadBuffer.get(); byteBuffer.position(position).limit(limit); return byteBuffer; } Friday, 20 September 13
  • 66. public ByteBuffer get(long sequence) { int index = index(sequence); int position = index * entrySize; int limit = position + entrySize; ByteBuffer byteBuffer = perThreadBuffer.get(); byteBuffer.position(position).limit(limit); return byteBuffer; } Friday, 20 September 13
  • 67. public ByteBuffer get(long sequence) { int index = index(sequence); int position = index * entrySize; int limit = position + entrySize; ByteBuffer byteBuffer = perThreadBuffer.get(); byteBuffer.position(position).limit(limit); return byteBuffer; } Friday, 20 September 13
  • 68. public ByteBuffer get(long sequence) { int index = index(sequence); int position = index * entrySize; int limit = position + entrySize; ByteBuffer byteBuffer = perThreadBuffer.get(); byteBuffer.position(position).limit(limit); return byteBuffer; } Friday, 20 September 13
  • 69. public ByteBuffer get(long sequence) { int index = index(sequence); int position = index * entrySize; int limit = position + entrySize; ByteBuffer byteBuffer = perThreadBuffer.get(); byteBuffer.position(position).limit(limit); return byteBuffer; } Friday, 20 September 13
  • 70. public void put(byte[] data) { long next = sequencer.next(); try { get(next).put(data); } finally { sequencer.publish(next); } } Friday, 20 September 13
  • 71. public void put(byte[] data) { long next = sequencer.next(); try { get(next).put(data); } finally { sequencer.publish(next); } } Friday, 20 September 13
  • 72. public void put(byte[] data) { long next = sequencer.next(); try { get(next).put(data); } finally { sequencer.publish(next); } } Friday, 20 September 13
  • 73. public void put(byte[] data) { long next = sequencer.next(); try { get(next).put(data); } finally { sequencer.publish(next); } } Friday, 20 September 13
  • 74. public class BufferEventHandler implements EventHandler<ByteBuffer> { public void onEvent(ByteBuffer buffer, long sequence, boolean endOfBatch) { // Do stuff... } } Friday, 20 September 13
  • 75. <Q&A> Disruptor: http://lmax-exchange.github.io/ disruptor/ Sample code: https://github.com/mikeb01/ yow2013 Friday, 20 September 13