diff -Nru RxJava-1.0.13/pom.xml RxJava-1.0.13.jctools/pom.xml --- RxJava-1.0.13/pom.xml 2015-07-20 20:08:27.000000000 +0200 +++ RxJava-1.0.13.jctools/pom.xml 2015-08-07 16:11:25.260235129 +0200 @@ -48,4 +48,13 @@ repo + + + + org.jctools + jctools-core + 1.1-SNAPSHOT + + + diff -Nru RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorMapNotification.java RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorMapNotification.java --- RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorMapNotification.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorMapNotification.java 2015-08-07 16:12:12.945949901 +0200 @@ -19,6 +19,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import org.jctools.queues.SpscArrayQueue; + import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; @@ -27,7 +29,6 @@ import rx.exceptions.OnErrorThrowable; import rx.functions.Func0; import rx.functions.Func1; -import rx.internal.util.unsafe.SpscArrayQueue; import rx.internal.util.unsafe.UnsafeAccess; /** diff -Nru RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorObserveOn.java RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorObserveOn.java --- RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorObserveOn.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorObserveOn.java 2015-08-07 16:11:25.261235081 +0200 @@ -19,6 +19,8 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.jctools.queues.SpscArrayQueue; + import rx.Observable.Operator; import rx.Producer; import rx.Scheduler; @@ -28,7 +30,6 @@ import rx.functions.Action0; import rx.internal.util.RxRingBuffer; import rx.internal.util.SynchronizedQueue; -import rx.internal.util.unsafe.SpscArrayQueue; import rx.internal.util.unsafe.UnsafeAccess; import rx.schedulers.ImmediateScheduler; import rx.schedulers.TrampolineScheduler; diff -Nru RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorPublish.java RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorPublish.java --- RxJava-1.0.13/src/main/java/rx/internal/operators/OperatorPublish.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/operators/OperatorPublish.java 2015-08-07 16:11:25.262235033 +0200 @@ -18,6 +18,8 @@ import java.util.Queue; import java.util.concurrent.atomic.*; +import org.jctools.queues.SpscArrayQueue; + import rx.*; import rx.exceptions.MissingBackpressureException; import rx.functions.*; diff -Nru RxJava-1.0.13/src/main/java/rx/internal/producers/QueuedProducer.java RxJava-1.0.13.jctools/src/main/java/rx/internal/producers/QueuedProducer.java --- RxJava-1.0.13/src/main/java/rx/internal/producers/QueuedProducer.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/producers/QueuedProducer.java 2015-08-07 16:13:12.218109419 +0200 @@ -18,6 +18,8 @@ import java.util.Queue; import java.util.concurrent.atomic.*; +import org.jctools.queues.SpscLinkedQueue; + import rx.*; import rx.exceptions.*; import rx.internal.operators.BackpressureUtils; diff -Nru RxJava-1.0.13/src/main/java/rx/internal/producers/QueuedValueProducer.java RxJava-1.0.13.jctools/src/main/java/rx/internal/producers/QueuedValueProducer.java --- RxJava-1.0.13/src/main/java/rx/internal/producers/QueuedValueProducer.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/producers/QueuedValueProducer.java 2015-08-07 16:13:49.938301765 +0200 @@ -18,6 +18,8 @@ import java.util.Queue; import java.util.concurrent.atomic.*; +import org.jctools.queues.SpscLinkedQueue; + import rx.*; import rx.exceptions.*; import rx.internal.operators.BackpressureUtils; diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/ObjectPool.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/ObjectPool.java --- RxJava-1.0.13/src/main/java/rx/internal/util/ObjectPool.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/ObjectPool.java 2015-08-07 16:11:25.262235033 +0200 @@ -21,9 +21,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import org.jctools.queues.MpmcArrayQueue; + import rx.Scheduler; import rx.functions.Action0; -import rx.internal.util.unsafe.MpmcArrayQueue; import rx.internal.util.unsafe.UnsafeAccess; import rx.schedulers.Schedulers; diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/RxRingBuffer.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/RxRingBuffer.java --- RxJava-1.0.13/src/main/java/rx/internal/util/RxRingBuffer.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/RxRingBuffer.java 2015-08-07 16:11:25.262235033 +0200 @@ -17,12 +17,13 @@ import java.util.Queue; +import org.jctools.queues.SpmcArrayQueue; +import org.jctools.queues.SpscArrayQueue; + import rx.Observer; import rx.Subscription; import rx.exceptions.MissingBackpressureException; import rx.internal.operators.NotificationLite; -import rx.internal.util.unsafe.SpmcArrayQueue; -import rx.internal.util.unsafe.SpscArrayQueue; import rx.internal.util.unsafe.UnsafeAccess; /** diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,126 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -import java.util.*; - -import rx.internal.util.atomic.LinkedQueueNode; - -abstract class BaseLinkedQueuePad0 extends AbstractQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - -abstract class BaseLinkedQueueProducerNodeRef extends BaseLinkedQueuePad0 { - protected final static long P_NODE_OFFSET = UnsafeAccess.addressOf(BaseLinkedQueueProducerNodeRef.class, "producerNode"); - - protected LinkedQueueNode producerNode; - protected final void spProducerNode(LinkedQueueNode node) { - producerNode = node; - } - - @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvProducerNode() { - return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET); - } - - protected final LinkedQueueNode lpProducerNode() { - return producerNode; - } -} - -abstract class BaseLinkedQueuePad1 extends BaseLinkedQueueProducerNodeRef { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - -abstract class BaseLinkedQueueConsumerNodeRef extends BaseLinkedQueuePad1 { - protected final static long C_NODE_OFFSET = UnsafeAccess.addressOf(BaseLinkedQueueConsumerNodeRef.class, "consumerNode"); - protected LinkedQueueNode consumerNode; - protected final void spConsumerNode(LinkedQueueNode node) { - consumerNode = node; - } - - @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvConsumerNode() { - return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, C_NODE_OFFSET); - } - - protected final LinkedQueueNode lpConsumerNode() { - return consumerNode; - } -} - -/** - * A base data structure for concurrent linked queues. - * - * @author nitsanw - * - * @param - */ -abstract class BaseLinkedQueue extends BaseLinkedQueueConsumerNodeRef { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; - - - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * This is an O(n) operation as we run through all the nodes and count them.
- * - * @see java.util.Queue#size() - */ - @Override - public final int size() { - // Read consumer first, this is important because if the producer is node is 'older' than the consumer the - // consumer may overtake it (consume past it). This will lead to an infinite loop below. - LinkedQueueNode chaserNode = lvConsumerNode(); - final LinkedQueueNode producerNode = lvProducerNode(); - int size = 0; - // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. - while (chaserNode != producerNode && size < Integer.MAX_VALUE) { - LinkedQueueNode next; - while((next = chaserNode.lvNext()) == null); - chaserNode = next; - size++; - } - return size; - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe - * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to - * be null. - * - * @see MessagePassingQueue#isEmpty() - */ - @Override - public final boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); - } -} \ Manca newline alla fine del file diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,186 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/ConcurrentCircularArrayQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -import java.util.AbstractQueue; -import java.util.Iterator; - -abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue implements MessagePassingQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - -/** - * A concurrent access enabling class used by circular array based queues this class exposes an offset computation - * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and - * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post - * padding. - *

- * Offset calculation is separate from access to enable the reuse of a give compute offset. - *

- * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a - * LoadLoad barrier. - *

- * - * @author nitsanw - * - * @param - */ -public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); - protected static final int BUFFER_PAD = 32; - private static final long REF_ARRAY_BASE; - private static final int REF_ELEMENT_SHIFT; - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - // Including the buffer pad in the array base offset - REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class) - + (BUFFER_PAD << (REF_ELEMENT_SHIFT - SPARSE_SHIFT)); - } - protected final long mask; - // @Stable :( - protected final E[] buffer; - - @SuppressWarnings("unchecked") - public ConcurrentCircularArrayQueue(int capacity) { - int actualCapacity = Pow2.roundToPowerOfTwo(capacity); - mask = actualCapacity - 1; - // pad data on either end with some empty slots. - buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - } - - /** - * @param index desirable element index - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index) { - return calcElementOffset(index, mask); - } - /** - * @param index desirable element index - * @param mask - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e a kitty - */ - protected final void spElement(long offset, E e) { - spElement(buffer, offset, e); - } - - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void spElement(E[] buffer, long offset, E e) { - UNSAFE.putObject(buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(long offset, E e) { - soElement(buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(E[] buffer, long offset, E e) { - UNSAFE.putOrderedObject(buffer, offset, e); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lpElement(long offset) { - return lpElement(buffer, offset); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lpElement(E[] buffer, long offset) { - return (E) UNSAFE.getObject(buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lvElement(long offset) { - return lvElement(buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected final E lvElement(E[] buffer, long offset) { - return (E) UNSAFE.getObjectVolatile(buffer, offset); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override - public void clear() { - // we have to test isEmpty because of the weaker poll() guarantee - while (poll() != null || !isEmpty()) - ; - } -} diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,58 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/ConcurrentSequencedCircularArrayQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -public abstract class ConcurrentSequencedCircularArrayQueue extends ConcurrentCircularArrayQueue { - private static final long ARRAY_BASE; - private static final int ELEMENT_SHIFT; - static { - final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class); - if (8 == scale) { - ELEMENT_SHIFT = 3 + SPARSE_SHIFT; - } else { - throw new IllegalStateException("Unexpected long[] element size"); - } - // Including the buffer pad in the array base offset - ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT)); - } - protected final long[] sequenceBuffer; - - public ConcurrentSequencedCircularArrayQueue(int capacity) { - super(capacity); - int actualCapacity = (int) (this.mask + 1); - // pad data on either end with some empty slots. - sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2]; - for (long i = 0; i < actualCapacity; i++) { - soSequence(sequenceBuffer, calcSequenceOffset(i), i); - } - } - - protected final long calcSequenceOffset(long index) { - return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT); - } - - protected final void soSequence(long[] buffer, long offset, long e) { - UNSAFE.putOrderedLong(buffer, offset, e); - } - - protected final long lvSequence(long[] buffer, long offset) { - return UNSAFE.getLongVolatile(buffer, offset); - } - -} diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java - */ -package rx.internal.util.unsafe; - -import java.util.Queue; - -/** - * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface - * sufficient for concurrent message passing.
- * Message passing queues offer happens before semantics to messages passed through, namely that writes made by the - * producer before offering the message are visible to the consuming thread after the message has been polled out of the - * queue. - * - * @author nitsanw - * - * @param the event/message type - */ -interface MessagePassingQueue { - - /** - * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the - * {@link Queue#offer(Object)} interface. - * - * @param message - * @return true if element was inserted into the queue, false iff full - */ - boolean offer(M message); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#poll()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M poll(); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#peek()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - M peek(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a - * best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1). - * - * @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded - */ - int size(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the observation is carried out. - * - * @return true if empty, false otherwise - */ - boolean isEmpty(); - -} diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,254 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -abstract class MpmcArrayQueueL1Pad extends ConcurrentSequencedCircularArrayQueue { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL1Pad(int capacity) { - super(capacity); - } -} - -abstract class MpmcArrayQueueProducerField extends MpmcArrayQueueL1Pad { - private final static long P_INDEX_OFFSET; - static { - try { - P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class - .getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long producerIndex; - - public MpmcArrayQueueProducerField(int capacity) { - super(capacity); - } - - protected final long lvProducerIndex() { - return producerIndex; - } - - protected final boolean casProducerIndex(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } -} - -abstract class MpmcArrayQueueL2Pad extends MpmcArrayQueueProducerField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueueL2Pad(int capacity) { - super(capacity); - } -} - -abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad { - private final static long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class - .getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long consumerIndex; - - public MpmcArrayQueueConsumerField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndex() { - return consumerIndex; - } - - protected final boolean casConsumerIndex(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); - } -} - -/** - * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that - * any and all threads may call the offer/poll/peek methods and correctness is maintained.
- * This implementation follows patterns documented on the package level for False Sharing protection.
- * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original - * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in - * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each - * field of the struct. There is a further alternative in the experimental project which uses iteration phase - * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as - * well as this implementation.
- * Tradeoffs to keep in mind: - *

    - *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of - * both arrays. We are trading memory to avoid false sharing(active and passive). - *
  2. 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the - * elements array. This is doubling/tripling the memory allocated for the buffer. - *
  3. Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or - * equal to the requested capacity. - *
- * - * @param - * type of the element stored in the {@link java.util.Queue} - */ -public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpmcArrayQueue(final int capacity) { - super(Math.max(2, capacity)); - } - - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - - // local load of field to avoid repeated loads after volatile reads - final long capacity = mask + 1; - final long[] lSequenceBuffer = sequenceBuffer; - long currentProducerIndex; - long seqOffset; - long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - while (true) { - currentProducerIndex = lvProducerIndex(); // LoadLoad - seqOffset = calcSequenceOffset(currentProducerIndex); - final long seq = lvSequence(lSequenceBuffer, seqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= cIndex && // test against cached cIndex - currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - return false; - } - - // another producer has moved the sequence by one, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long elementOffset = calcElementOffset(currentProducerIndex); - spElement(elementOffset, e); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentProducerIndex + 1); // StoreStore - - return true; - } - - /** - * {@inheritDoc} - *

- * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll - * and must test producer index when next element is not visible. - */ - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final long[] lSequenceBuffer = sequenceBuffer; - long currentConsumerIndex; - long seqOffset; - long pIndex = -1; // start with bogus value, hope we don't need it - while (true) { - currentConsumerIndex = lvConsumerIndex();// LoadLoad - seqOffset = calcSequenceOffset(currentConsumerIndex); - final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad - final long delta = seq - (currentConsumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - currentConsumerIndex >= pIndex && // test against cached pIndex - currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; - } - - // another consumer beat us and moved sequence ahead, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex); - final E e = lpElement(offset); - spElement(offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore - - return e; - } - - @Override - public E peek() { - long currConsumerIndex; - E e; - do { - currConsumerIndex = lvConsumerIndex(); - // other consumers may have grabbed the element, or queue might be empty - e = lpElement(calcElementOffset(currConsumerIndex)); - // only return null if queue is empty - } while (e == null && currConsumerIndex != lvProducerIndex()); - return e; - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return (lvConsumerIndex() == lvProducerIndex()); - } -} diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,136 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; -import rx.internal.util.atomic.LinkedQueueNode; -/** - * This is a direct Java port of the MPSC algorithm as presented on 1024 - * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and - * layout: - *

    - *
  1. Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields. - *
  2. Use XCHG functionality to the best of the JDK ability (see differences in JDK7/8 impls). - *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this - * point follow the notes on offer/poll. - * - * @author nitsanw - * - * @param - */ -public final class MpscLinkedQueue extends BaseLinkedQueue { - - public MpscLinkedQueue() { - consumerNode = new LinkedQueueNode(); - xchgProducerNode(consumerNode);// this ensures correct construction: StoreLoad - } - - @SuppressWarnings("unchecked") - protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode newVal) { - Object oldVal; - do { - oldVal = producerNode; - } while(!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal)); - return (LinkedQueueNode) oldVal; - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Offer is allowed from multiple threads.
- * Offer allocates a new node and: - *

    - *
  1. Swaps it atomically with current producer node (only one producer 'wins') - *
  2. Sets the new node as the node following from the swapped producer node - *
- * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can - * get the same producer node as part of XCHG guarantee. - * - * @see MessagePassingQueue#offer(Object) - * @see java.util.Queue#offer(java.lang.Object) - */ - @Override - public final boolean offer(final E nextValue) { - if (nextValue == null) { - throw new IllegalArgumentException("null elements not allowed"); - } - final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); - final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); - // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed - // and completes the store in prev.next. - prevProducerNode.soNext(nextNode); // StoreStore - return true; - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *

    - *
  1. If it is null, the queue is assumed empty (though it might not be). - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null - * values are not allowed to be offered this is the only node with it's value set to null at any one time. - * - * @see MessagePassingQueue#poll() - * @see java.util.Queue#poll() - */ - @Override - public final E poll() { - LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright - LinkedQueueNode nextNode = currConsumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final E nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } - else if (currConsumerNode != lvProducerNode()) { - // spin, we are no longer wait free - while((nextNode = currConsumerNode.lvNext()) == null); - // got the next node... - - // we have to null out the value because we are going to hang on to the node - final E nextValue = nextNode.getAndNullValue(); - consumerNode = nextNode; - return nextValue; - } - return null; - } - - @Override - public final E peek() { - LinkedQueueNode currConsumerNode = consumerNode; // don't load twice, it's alright - LinkedQueueNode nextNode = currConsumerNode.lvNext(); - if (nextNode != null) { - return nextNode.lpValue(); - } - else if (currConsumerNode != lvProducerNode()) { - // spin, we are no longer wait free - while((nextNode = currConsumerNode.lvNext()) == null); - // got the next node... - return nextNode.lpValue(); - } - return null; - } -} \ Manca newline alla fine del file diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/Pow2.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/Pow2.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/Pow2.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/Pow2.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,44 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/util/Pow2.java - */ -package rx.internal.util.unsafe; - -public final class Pow2 { - private Pow2() { - throw new IllegalStateException("No instances!"); - } - - /** - * Find the next larger positive power of two value up from the given value. If value is a power of two then - * this value will be returned. - * - * @param value from which next positive power of two will be found. - * @return the next positive power of 2 or this value if it is a power of 2. - */ - public static int roundToPowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - - /** - * Is this value a power of two. - * - * @param value to be tested to see if it is a power of two. - * @return true if the value is a power of 2 otherwise false. - */ - public static boolean isPowerOfTwo(final int value) { - return (value & (value - 1)) == 0; - } -} \ Manca newline alla fine del file diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/README.md RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/README.md --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/README.md 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/README.md 1970-01-01 01:00:00.000000000 +0100 @@ -1,212 +0,0 @@ -This package contains code that relies on sun.misc.Unsafe. Before using it you MUST assert UnsafeAccess.isUnsafeAvailable() == true - -Much of the code in this package comes from or is inspired by the JCTools project: https://github.com/JCTools/JCTools - -JCTools has now published artifacts (https://github.com/JCTools/JCTools/issues/17) so RxJava could add JCTools as a "shadow" dependency (https://github.com/ReactiveX/RxJava/issues/1735). -RxJava has a "zero dependency" policy for the core library, so if we do add it as a dependency, it won't be an externally visible dependency that results in a separate jar. - -The license for the JCTools code is https://github.com/JCTools/JCTools/blob/master/LICENSE - -As of June 10 2014 when this code was copied the LICENSE read as: - -Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright {yyyy} {name of copyright owner} - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,229 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -abstract class SpmcArrayQueueL1Pad extends ConcurrentCircularArrayQueue { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpmcArrayQueueL1Pad(int capacity) { - super(capacity); - } -} - -abstract class SpmcArrayQueueProducerField extends SpmcArrayQueueL1Pad { - protected final static long P_INDEX_OFFSET; - static { - try { - P_INDEX_OFFSET = - UNSAFE.objectFieldOffset(SpmcArrayQueueProducerField.class.getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long producerIndex; - - protected final long lvProducerIndex() { - return producerIndex; - } - - protected final void soTail(long v) { - UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); - } - - public SpmcArrayQueueProducerField(int capacity) { - super(capacity); - } -} - -abstract class SpmcArrayQueueL2Pad extends SpmcArrayQueueProducerField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpmcArrayQueueL2Pad(int capacity) { - super(capacity); - } -} - -abstract class SpmcArrayQueueConsumerField extends SpmcArrayQueueL2Pad { - protected final static long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = - UNSAFE.objectFieldOffset(SpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long consumerIndex; - - public SpmcArrayQueueConsumerField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndex() { - return consumerIndex; - } - - protected final boolean casHead(long expect, long newValue) { - return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); - } -} - -abstract class SpmcArrayQueueMidPad extends SpmcArrayQueueConsumerField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpmcArrayQueueMidPad(int capacity) { - super(capacity); - } -} - -abstract class SpmcArrayQueueProducerIndexCacheField extends SpmcArrayQueueMidPad { - // This is separated from the consumerIndex which will be highly contended in the hope that this value spends most - // of it's time in a cache line that is Shared(and rarely invalidated) - private volatile long producerIndexCache; - - public SpmcArrayQueueProducerIndexCacheField(int capacity) { - super(capacity); - } - - protected final long lvProducerIndexCache() { - return producerIndexCache; - } - - protected final void svProducerIndexCache(long v) { - producerIndexCache = v; - } -} - -abstract class SpmcArrayQueueL3Pad extends SpmcArrayQueueProducerIndexCacheField { - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpmcArrayQueueL3Pad(int capacity) { - super(capacity); - } -} - -public final class SpmcArrayQueue extends SpmcArrayQueueL3Pad { - - public SpmcArrayQueue(final int capacity) { - super(capacity); - } - - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - final E[] lb = buffer; - final long lMask = mask; - final long currProducerIndex = lvProducerIndex(); - final long offset = calcElementOffset(currProducerIndex); - if (null != lvElement(lb, offset)) { - long size = currProducerIndex - lvConsumerIndex(); - - if(size > lMask) { - return false; - } - else { - // spin wait for slot to clear, buggers wait freedom - while(null != lvElement(lb, offset)); - } - } - spElement(lb, offset, e); - // single producer, so store ordered is valid. It is also required to correctly publish the element - // and for the consumers to pick up the tail value. - soTail(currProducerIndex + 1); - return true; - } - - @Override - public E poll() { - long currentConsumerIndex; - final long currProducerIndexCache = lvProducerIndexCache(); - do { - currentConsumerIndex = lvConsumerIndex(); - if (currentConsumerIndex >= currProducerIndexCache) { - long currProducerIndex = lvProducerIndex(); - if (currentConsumerIndex >= currProducerIndex) { - return null; - } else { - svProducerIndexCache(currProducerIndex); - } - } - } while (!casHead(currentConsumerIndex, currentConsumerIndex + 1)); - // consumers are gated on latest visible tail, and so can't see a null value in the queue or overtake - // and wrap to hit same location. - final long offset = calcElementOffset(currentConsumerIndex); - final E[] lb = buffer; - // load plain, element happens before it's index becomes visible - final E e = lpElement(lb, offset); - // store ordered, make sure nulling out is visible. Producer is waiting for this value. - soElement(lb, offset, null); - return e; - } - - @Override - public E peek() { - long currentConsumerIndex; - final long currProducerIndexCache = lvProducerIndexCache(); - E e; - do { - currentConsumerIndex = lvConsumerIndex(); - if (currentConsumerIndex >= currProducerIndexCache) { - long currProducerIndex = lvProducerIndex(); - if (currentConsumerIndex >= currProducerIndex) { - return null; - } else { - svProducerIndexCache(currProducerIndex); - } - } - } while (null == (e = lvElement(calcElementOffset(currentConsumerIndex)))); - return e; - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer - * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent - * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures the correctness of this method at least for the consumer thread. Other threads POV is not really - // something we can fix here. - return (lvConsumerIndex() == lvProducerIndex()); - } -} diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,195 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java - */ -package rx.internal.util.unsafe; - -import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; - -abstract class SpscArrayQueueColdField extends ConcurrentCircularArrayQueue { - private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); - protected final int lookAheadStep; - public SpscArrayQueueColdField(int capacity) { - super(capacity); - lookAheadStep = Math.min(capacity/4, MAX_LOOK_AHEAD_STEP); - } -} -abstract class SpscArrayQueueL1Pad extends SpscArrayQueueColdField { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpscArrayQueueL1Pad(int capacity) { - super(capacity); - } -} - -abstract class SpscArrayQueueProducerFields extends SpscArrayQueueL1Pad { - protected final static long P_INDEX_OFFSET; - static { - try { - P_INDEX_OFFSET = - UNSAFE.objectFieldOffset(SpscArrayQueueProducerFields.class.getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - protected long producerIndex; - protected long producerLookAhead; - - public SpscArrayQueueProducerFields(int capacity) { - super(capacity); - } -} - -abstract class SpscArrayQueueL2Pad extends SpscArrayQueueProducerFields { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpscArrayQueueL2Pad(int capacity) { - super(capacity); - } -} - -abstract class SpscArrayQueueConsumerField extends SpscArrayQueueL2Pad { - protected long consumerIndex; - protected final static long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = - UNSAFE.objectFieldOffset(SpscArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - public SpscArrayQueueConsumerField(int capacity) { - super(capacity); - } -} - -abstract class SpscArrayQueueL3Pad extends SpscArrayQueueConsumerField { - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public SpscArrayQueueL3Pad(int capacity) { - super(capacity); - } -} - -/** - * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. - *

- * This implementation is a mashup of the Fast Flow - * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast - * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
- * For convenience the relevant papers are available in the resources folder:
- * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
- * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
- *
This implementation is wait free. - * - * @author nitsanw - * - * @param - */ -public final class SpscArrayQueue extends SpscArrayQueueL3Pad { - - public SpscArrayQueue(final int capacity) { - super(capacity); - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single producer thread use only. - */ - @Override - public boolean offer(final E e) { - // local load of field to avoid repeated loads after volatile reads - final E[] lElementBuffer = buffer; - final long index = producerIndex; - final long offset = calcElementOffset(index); - if (null != lvElement(lElementBuffer, offset)){ - return false; - } - soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() - soElement(lElementBuffer, offset, e); // StoreStore - return true; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @Override - public E poll() { - final long index = consumerIndex; - final long offset = calcElementOffset(index); - // local load of field to avoid repeated loads after volatile reads - final E[] lElementBuffer = buffer; - final E e = lvElement(lElementBuffer, offset);// LoadLoad - if (null == e) { - return null; - } - soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() - soElement(lElementBuffer, offset, null);// StoreStore - return e; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @Override - public E peek() { - return lvElement(calcElementOffset(consumerIndex)); - } - - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer - * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent - * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - private void soProducerIndex(long v) { - UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); - } - - private void soConsumerIndex(long v) { - UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v); - } - - private long lvProducerIndex() { - return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); - } - - private long lvConsumerIndex() { - return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); - } -} - diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java --- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java 1970-01-01 01:00:00.000000000 +0100 @@ -1,108 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE - * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java - */ -package rx.internal.util.unsafe; - -import rx.internal.util.atomic.LinkedQueueNode; - - - -/** - * This is a weakened version of the MPSC algorithm as presented on 1024 - * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and - * layout: - *

    - *
  1. Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields. - *
  2. As this is an SPSC we have no need for XCHG, an ordered store is enough. - *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this - * point follow the notes on offer/poll. - * - * @author nitsanw - * - * @param - */ -public final class SpscLinkedQueue extends BaseLinkedQueue { - - public SpscLinkedQueue() { - spProducerNode(new LinkedQueueNode()); - spConsumerNode(producerNode); - consumerNode.soNext(null); // this ensures correct construction: StoreStore - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Offer is allowed from a SINGLE thread.
- * Offer allocates a new node (holding the offered value) and: - *
    - *
  1. Sets that node as the producerNode.next - *
  2. Sets the new node as the producerNode - *
- * From this follows that producerNode.next is always null and for all other nodes node.next is not null. - * - * @see MessagePassingQueue#offer(Object) - * @see java.util.Queue#offer(java.lang.Object) - */ - @Override - public boolean offer(final E nextValue) { - if (nextValue == null) { - throw new IllegalArgumentException("null elements not allowed"); - } - final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); - producerNode.soNext(nextNode); - producerNode = nextNode; - return true; - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *
    - *
  1. If it is null, the queue is empty. - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null - * values are not allowed to be offered this is the only node with it's value set to null at any one time. - * - */ - @Override - public E poll() { - final LinkedQueueNode nextNode = consumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final E nextValue = nextNode.getAndNullValue(); - consumerNode = nextNode; - return nextValue; - } - return null; - } - - @Override - public E peek() { - final LinkedQueueNode nextNode = consumerNode.lvNext(); - if (nextNode != null) { - return nextNode.lpValue(); - } else { - return null; - } - } -} \ Manca newline alla fine del file diff -Nru RxJava-1.0.13/src/test/java/rx/internal/util/JCToolsQueueTests.java RxJava-1.0.13.jctools/src/test/java/rx/internal/util/JCToolsQueueTests.java --- RxJava-1.0.13/src/test/java/rx/internal/util/JCToolsQueueTests.java 2015-07-20 19:58:51.000000000 +0200 +++ RxJava-1.0.13.jctools/src/test/java/rx/internal/util/JCToolsQueueTests.java 2015-08-07 16:11:25.265234890 +0200 @@ -17,6 +17,10 @@ import static org.junit.Assert.*; +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.SpmcArrayQueue; +import org.jctools.queues.SpscArrayQueue; + import org.junit.Test; import rx.internal.util.unsafe.*;