iterator() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * This is an O(n) operation as we run through all the nodes and count them.
- *
- * @see java.util.Queue#size()
- */
- @Override
- public final int size() {
- // Read consumer first, this is important because if the producer is node is 'older' than the consumer the
- // consumer may overtake it (consume past it). This will lead to an infinite loop below.
- LinkedQueueNode chaserNode = lvConsumerNode();
- final LinkedQueueNode producerNode = lvProducerNode();
- int size = 0;
- // must chase the nodes all the way to the producer node, but there's no need to chase a moving target.
- while (chaserNode != producerNode && size < Integer.MAX_VALUE) {
- LinkedQueueNode next;
- while((next = chaserNode.lvNext()) == null);
- chaserNode = next;
- size++;
- }
- return size;
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe
- * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to
- * be null.
- *
- * @see MessagePassingQueue#isEmpty()
- */
- @Override
- public final boolean isEmpty() {
- return lvConsumerNode() == lvProducerNode();
- }
-}
\ Manca newline alla fine del file
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,186 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/ConcurrentCircularArrayQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-
-import java.util.AbstractQueue;
-import java.util.Iterator;
-
-abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue implements MessagePassingQueue {
- long p00, p01, p02, p03, p04, p05, p06, p07;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-}
-
-/**
- * A concurrent access enabling class used by circular array based queues this class exposes an offset computation
- * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
- * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
- * padding.
- *
- * Offset calculation is separate from access to enable the reuse of a give compute offset.
- *
- * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a
- * LoadLoad barrier.
- *
- *
- * @author nitsanw
- *
- * @param
- */
-public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad {
- protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
- protected static final int BUFFER_PAD = 32;
- private static final long REF_ARRAY_BASE;
- private static final int REF_ELEMENT_SHIFT;
- static {
- final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
- if (4 == scale) {
- REF_ELEMENT_SHIFT = 2 + SPARSE_SHIFT;
- } else if (8 == scale) {
- REF_ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
- } else {
- throw new IllegalStateException("Unknown pointer size");
- }
- // Including the buffer pad in the array base offset
- REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
- + (BUFFER_PAD << (REF_ELEMENT_SHIFT - SPARSE_SHIFT));
- }
- protected final long mask;
- // @Stable :(
- protected final E[] buffer;
-
- @SuppressWarnings("unchecked")
- public ConcurrentCircularArrayQueue(int capacity) {
- int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
- mask = actualCapacity - 1;
- // pad data on either end with some empty slots.
- buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
- }
-
- /**
- * @param index desirable element index
- * @return the offset in bytes within the array for a given index.
- */
- protected final long calcElementOffset(long index) {
- return calcElementOffset(index, mask);
- }
- /**
- * @param index desirable element index
- * @param mask
- * @return the offset in bytes within the array for a given index.
- */
- protected final long calcElementOffset(long index, long mask) {
- return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
- }
- /**
- * A plain store (no ordering/fences) of an element to a given offset
- *
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @param e a kitty
- */
- protected final void spElement(long offset, E e) {
- spElement(buffer, offset, e);
- }
-
- /**
- * A plain store (no ordering/fences) of an element to a given offset
- *
- * @param buffer this.buffer
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @param e an orderly kitty
- */
- protected final void spElement(E[] buffer, long offset, E e) {
- UNSAFE.putObject(buffer, offset, e);
- }
-
- /**
- * An ordered store(store + StoreStore barrier) of an element to a given offset
- *
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @param e an orderly kitty
- */
- protected final void soElement(long offset, E e) {
- soElement(buffer, offset, e);
- }
-
- /**
- * An ordered store(store + StoreStore barrier) of an element to a given offset
- *
- * @param buffer this.buffer
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @param e an orderly kitty
- */
- protected final void soElement(E[] buffer, long offset, E e) {
- UNSAFE.putOrderedObject(buffer, offset, e);
- }
-
- /**
- * A plain load (no ordering/fences) of an element from a given offset.
- *
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @return the element at the offset
- */
- protected final E lpElement(long offset) {
- return lpElement(buffer, offset);
- }
-
- /**
- * A plain load (no ordering/fences) of an element from a given offset.
- *
- * @param buffer this.buffer
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @return the element at the offset
- */
- @SuppressWarnings("unchecked")
- protected final E lpElement(E[] buffer, long offset) {
- return (E) UNSAFE.getObject(buffer, offset);
- }
-
- /**
- * A volatile load (load + LoadLoad barrier) of an element from a given offset.
- *
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @return the element at the offset
- */
- protected final E lvElement(long offset) {
- return lvElement(buffer, offset);
- }
-
- /**
- * A volatile load (load + LoadLoad barrier) of an element from a given offset.
- *
- * @param buffer this.buffer
- * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
- * @return the element at the offset
- */
- @SuppressWarnings("unchecked")
- protected final E lvElement(E[] buffer, long offset) {
- return (E) UNSAFE.getObjectVolatile(buffer, offset);
- }
-
- @Override
- public Iterator iterator() {
- throw new UnsupportedOperationException();
- }
- @Override
- public void clear() {
- // we have to test isEmpty because of the weaker poll() guarantee
- while (poll() != null || !isEmpty())
- ;
- }
-}
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,58 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/ConcurrentSequencedCircularArrayQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-
-public abstract class ConcurrentSequencedCircularArrayQueue extends ConcurrentCircularArrayQueue {
- private static final long ARRAY_BASE;
- private static final int ELEMENT_SHIFT;
- static {
- final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class);
- if (8 == scale) {
- ELEMENT_SHIFT = 3 + SPARSE_SHIFT;
- } else {
- throw new IllegalStateException("Unexpected long[] element size");
- }
- // Including the buffer pad in the array base offset
- ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
- }
- protected final long[] sequenceBuffer;
-
- public ConcurrentSequencedCircularArrayQueue(int capacity) {
- super(capacity);
- int actualCapacity = (int) (this.mask + 1);
- // pad data on either end with some empty slots.
- sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
- for (long i = 0; i < actualCapacity; i++) {
- soSequence(sequenceBuffer, calcSequenceOffset(i), i);
- }
- }
-
- protected final long calcSequenceOffset(long index) {
- return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT);
- }
-
- protected final void soSequence(long[] buffer, long offset, long e) {
- UNSAFE.putOrderedLong(buffer, offset, e);
- }
-
- protected final long lvSequence(long[] buffer, long offset) {
- return UNSAFE.getLongVolatile(buffer, offset);
- }
-
-}
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,74 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java
- */
-package rx.internal.util.unsafe;
-
-import java.util.Queue;
-
-/**
- * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface
- * sufficient for concurrent message passing.
- * Message passing queues offer happens before semantics to messages passed through, namely that writes made by the
- * producer before offering the message are visible to the consuming thread after the message has been polled out of the
- * queue.
- *
- * @author nitsanw
- *
- * @param the event/message type
- */
-interface MessagePassingQueue {
-
- /**
- * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
- * {@link Queue#offer(Object)} interface.
- *
- * @param message
- * @return true if element was inserted into the queue, false iff full
- */
- boolean offer(M message);
-
- /**
- * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
- * the {@link Queue#poll()} interface.
- *
- * @return a message from the queue if one is available, null iff empty
- */
- M poll();
-
- /**
- * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
- * the {@link Queue#peek()} interface.
- *
- * @return a message from the queue if one is available, null iff empty
- */
- M peek();
-
- /**
- * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a
- * best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
- *
- * @return number of messages in the queue, between 0 and queue capacity or {@link Integer#MAX_VALUE} if not bounded
- */
- int size();
-
- /**
- * This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
- *
- * @return true if empty, false otherwise
- */
- boolean isEmpty();
-
-}
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,254 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-
-abstract class MpmcArrayQueueL1Pad extends ConcurrentSequencedCircularArrayQueue {
- long p10, p11, p12, p13, p14, p15, p16;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public MpmcArrayQueueL1Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class MpmcArrayQueueProducerField extends MpmcArrayQueueL1Pad {
- private final static long P_INDEX_OFFSET;
- static {
- try {
- P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
- .getDeclaredField("producerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- private volatile long producerIndex;
-
- public MpmcArrayQueueProducerField(int capacity) {
- super(capacity);
- }
-
- protected final long lvProducerIndex() {
- return producerIndex;
- }
-
- protected final boolean casProducerIndex(long expect, long newValue) {
- return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
- }
-}
-
-abstract class MpmcArrayQueueL2Pad extends MpmcArrayQueueProducerField {
- long p20, p21, p22, p23, p24, p25, p26;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public MpmcArrayQueueL2Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class MpmcArrayQueueConsumerField extends MpmcArrayQueueL2Pad {
- private final static long C_INDEX_OFFSET;
- static {
- try {
- C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class
- .getDeclaredField("consumerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- private volatile long consumerIndex;
-
- public MpmcArrayQueueConsumerField(int capacity) {
- super(capacity);
- }
-
- protected final long lvConsumerIndex() {
- return consumerIndex;
- }
-
- protected final boolean casConsumerIndex(long expect, long newValue) {
- return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
- }
-}
-
-/**
- * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
- * any and all threads may call the offer/poll/peek methods and correctness is maintained.
- * This implementation follows patterns documented on the package level for False Sharing protection.
- * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original
- * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
- * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
- * field of the struct. There is a further alternative in the experimental project which uses iteration phase
- * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
- * well as this implementation.
- * Tradeoffs to keep in mind:
- *
- * - Padding for false sharing: counter fields and queue fields are all padded as well as either side of
- * both arrays. We are trading memory to avoid false sharing(active and passive).
- *
- 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
- * elements array. This is doubling/tripling the memory allocated for the buffer.
- *
- Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
- * equal to the requested capacity.
- *
- *
- * @param
- * type of the element stored in the {@link java.util.Queue}
- */
-public class MpmcArrayQueue extends MpmcArrayQueueConsumerField {
- long p40, p41, p42, p43, p44, p45, p46;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public MpmcArrayQueue(final int capacity) {
- super(Math.max(2, capacity));
- }
-
- @Override
- public boolean offer(final E e) {
- if (null == e) {
- throw new NullPointerException("Null is not a valid element");
- }
-
- // local load of field to avoid repeated loads after volatile reads
- final long capacity = mask + 1;
- final long[] lSequenceBuffer = sequenceBuffer;
- long currentProducerIndex;
- long seqOffset;
- long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
- while (true) {
- currentProducerIndex = lvProducerIndex(); // LoadLoad
- seqOffset = calcSequenceOffset(currentProducerIndex);
- final long seq = lvSequence(lSequenceBuffer, seqOffset); // LoadLoad
- final long delta = seq - currentProducerIndex;
-
- if (delta == 0) {
- // this is expected if we see this first time around
- if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
- // Successful CAS: full barrier
- break;
- }
- // failed cas, retry 1
- } else if (delta < 0 && // poll has not moved this value forward
- currentProducerIndex - capacity <= cIndex && // test against cached cIndex
- currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
- // Extra check required to ensure [Queue.offer == false iff queue is full]
- return false;
- }
-
- // another producer has moved the sequence by one, retry 2
- }
-
- // on 64bit(no compressed oops) JVM this is the same as seqOffset
- final long elementOffset = calcElementOffset(currentProducerIndex);
- spElement(elementOffset, e);
-
- // increment sequence by 1, the value expected by consumer
- // (seeing this value from a producer will lead to retry 2)
- soSequence(lSequenceBuffer, seqOffset, currentProducerIndex + 1); // StoreStore
-
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
- * and must test producer index when next element is not visible.
- */
- @Override
- public E poll() {
- // local load of field to avoid repeated loads after volatile reads
- final long[] lSequenceBuffer = sequenceBuffer;
- long currentConsumerIndex;
- long seqOffset;
- long pIndex = -1; // start with bogus value, hope we don't need it
- while (true) {
- currentConsumerIndex = lvConsumerIndex();// LoadLoad
- seqOffset = calcSequenceOffset(currentConsumerIndex);
- final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad
- final long delta = seq - (currentConsumerIndex + 1);
-
- if (delta == 0) {
- if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
- // Successful CAS: full barrier
- break;
- }
- // failed cas, retry 1
- } else if (delta < 0 && // slot has not been moved by producer
- currentConsumerIndex >= pIndex && // test against cached pIndex
- currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
- // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
- return null;
- }
-
- // another consumer beat us and moved sequence ahead, retry 2
- }
-
- // on 64bit(no compressed oops) JVM this is the same as seqOffset
- final long offset = calcElementOffset(currentConsumerIndex);
- final E e = lpElement(offset);
- spElement(offset, null);
-
- // Move sequence ahead by capacity, preparing it for next offer
- // (seeing this value from a consumer will lead to retry 2)
- soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore
-
- return e;
- }
-
- @Override
- public E peek() {
- long currConsumerIndex;
- E e;
- do {
- currConsumerIndex = lvConsumerIndex();
- // other consumers may have grabbed the element, or queue might be empty
- e = lpElement(calcElementOffset(currConsumerIndex));
- // only return null if queue is empty
- } while (e == null && currConsumerIndex != lvProducerIndex());
- return e;
- }
-
- @Override
- public int size() {
- /*
- * It is possible for a thread to be interrupted or reschedule between the read of the producer and
- * consumer indices, therefore protection is required to ensure size is within valid range. In the
- * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
- * index BEFORE the producer index.
- */
- long after = lvConsumerIndex();
- while (true) {
- final long before = after;
- final long currentProducerIndex = lvProducerIndex();
- after = lvConsumerIndex();
- if (before == after) {
- return (int) (currentProducerIndex - after);
- }
- }
- }
-
- @Override
- public boolean isEmpty() {
- // Order matters!
- // Loading consumer before producer allows for producer increments after consumer index is read.
- // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
- // nothing we can do to make this an exact method.
- return (lvConsumerIndex() == lvProducerIndex());
- }
-}
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,136 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-import rx.internal.util.atomic.LinkedQueueNode;
-/**
- * This is a direct Java port of the MPSC algorithm as presented on 1024
- * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and
- * layout:
- *
- * - Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields.
- *
- Use XCHG functionality to the best of the JDK ability (see differences in JDK7/8 impls).
- *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this
- * point follow the notes on offer/poll.
- *
- * @author nitsanw
- *
- * @param
- */
-public final class MpscLinkedQueue extends BaseLinkedQueue {
-
- public MpscLinkedQueue() {
- consumerNode = new LinkedQueueNode();
- xchgProducerNode(consumerNode);// this ensures correct construction: StoreLoad
- }
-
- @SuppressWarnings("unchecked")
- protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode newVal) {
- Object oldVal;
- do {
- oldVal = producerNode;
- } while(!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal));
- return (LinkedQueueNode) oldVal;
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Offer is allowed from multiple threads.
- * Offer allocates a new node and:
- *
- * - Swaps it atomically with current producer node (only one producer 'wins')
- *
- Sets the new node as the node following from the swapped producer node
- *
- * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can
- * get the same producer node as part of XCHG guarantee.
- *
- * @see MessagePassingQueue#offer(Object)
- * @see java.util.Queue#offer(java.lang.Object)
- */
- @Override
- public final boolean offer(final E nextValue) {
- if (nextValue == null) {
- throw new IllegalArgumentException("null elements not allowed");
- }
- final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue);
- final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode);
- // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
- // and completes the store in prev.next.
- prevProducerNode.soNext(nextNode); // StoreStore
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and:
- *
- * - If it is null, the queue is assumed empty (though it might not be).
- *
- If it is not null set it as the consumer node and return it's now evacuated value.
- *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null
- * values are not allowed to be offered this is the only node with it's value set to null at any one time.
- *
- * @see MessagePassingQueue#poll()
- * @see java.util.Queue#poll()
- */
- @Override
- public final E poll() {
- LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright
- LinkedQueueNode nextNode = currConsumerNode.lvNext();
- if (nextNode != null) {
- // we have to null out the value because we are going to hang on to the node
- final E nextValue = nextNode.getAndNullValue();
- spConsumerNode(nextNode);
- return nextValue;
- }
- else if (currConsumerNode != lvProducerNode()) {
- // spin, we are no longer wait free
- while((nextNode = currConsumerNode.lvNext()) == null);
- // got the next node...
-
- // we have to null out the value because we are going to hang on to the node
- final E nextValue = nextNode.getAndNullValue();
- consumerNode = nextNode;
- return nextValue;
- }
- return null;
- }
-
- @Override
- public final E peek() {
- LinkedQueueNode currConsumerNode = consumerNode; // don't load twice, it's alright
- LinkedQueueNode nextNode = currConsumerNode.lvNext();
- if (nextNode != null) {
- return nextNode.lpValue();
- }
- else if (currConsumerNode != lvProducerNode()) {
- // spin, we are no longer wait free
- while((nextNode = currConsumerNode.lvNext()) == null);
- // got the next node...
- return nextNode.lpValue();
- }
- return null;
- }
-}
\ Manca newline alla fine del file
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/Pow2.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/Pow2.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/Pow2.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/Pow2.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,44 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/util/Pow2.java
- */
-package rx.internal.util.unsafe;
-
-public final class Pow2 {
- private Pow2() {
- throw new IllegalStateException("No instances!");
- }
-
- /**
- * Find the next larger positive power of two value up from the given value. If value is a power of two then
- * this value will be returned.
- *
- * @param value from which next positive power of two will be found.
- * @return the next positive power of 2 or this value if it is a power of 2.
- */
- public static int roundToPowerOfTwo(final int value) {
- return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
- }
-
- /**
- * Is this value a power of two.
- *
- * @param value to be tested to see if it is a power of two.
- * @return true if the value is a power of 2 otherwise false.
- */
- public static boolean isPowerOfTwo(final int value) {
- return (value & (value - 1)) == 0;
- }
-}
\ Manca newline alla fine del file
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/README.md RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/README.md
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/README.md 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/README.md 1970-01-01 01:00:00.000000000 +0100
@@ -1,212 +0,0 @@
-This package contains code that relies on sun.misc.Unsafe. Before using it you MUST assert UnsafeAccess.isUnsafeAvailable() == true
-
-Much of the code in this package comes from or is inspired by the JCTools project: https://github.com/JCTools/JCTools
-
-JCTools has now published artifacts (https://github.com/JCTools/JCTools/issues/17) so RxJava could add JCTools as a "shadow" dependency (https://github.com/ReactiveX/RxJava/issues/1735).
-RxJava has a "zero dependency" policy for the core library, so if we do add it as a dependency, it won't be an externally visible dependency that results in a separate jar.
-
-The license for the JCTools code is https://github.com/JCTools/JCTools/blob/master/LICENSE
-
-As of June 10 2014 when this code was copied the LICENSE read as:
-
-Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "{}"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright {yyyy} {name of copyright owner}
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpmcArrayQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,229 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-
-abstract class SpmcArrayQueueL1Pad extends ConcurrentCircularArrayQueue {
- long p10, p11, p12, p13, p14, p15, p16;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpmcArrayQueueL1Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpmcArrayQueueProducerField extends SpmcArrayQueueL1Pad {
- protected final static long P_INDEX_OFFSET;
- static {
- try {
- P_INDEX_OFFSET =
- UNSAFE.objectFieldOffset(SpmcArrayQueueProducerField.class.getDeclaredField("producerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- private volatile long producerIndex;
-
- protected final long lvProducerIndex() {
- return producerIndex;
- }
-
- protected final void soTail(long v) {
- UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v);
- }
-
- public SpmcArrayQueueProducerField(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpmcArrayQueueL2Pad extends SpmcArrayQueueProducerField {
- long p20, p21, p22, p23, p24, p25, p26;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpmcArrayQueueL2Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpmcArrayQueueConsumerField extends SpmcArrayQueueL2Pad {
- protected final static long C_INDEX_OFFSET;
- static {
- try {
- C_INDEX_OFFSET =
- UNSAFE.objectFieldOffset(SpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- private volatile long consumerIndex;
-
- public SpmcArrayQueueConsumerField(int capacity) {
- super(capacity);
- }
-
- protected final long lvConsumerIndex() {
- return consumerIndex;
- }
-
- protected final boolean casHead(long expect, long newValue) {
- return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
- }
-}
-
-abstract class SpmcArrayQueueMidPad extends SpmcArrayQueueConsumerField {
- long p20, p21, p22, p23, p24, p25, p26;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpmcArrayQueueMidPad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpmcArrayQueueProducerIndexCacheField extends SpmcArrayQueueMidPad {
- // This is separated from the consumerIndex which will be highly contended in the hope that this value spends most
- // of it's time in a cache line that is Shared(and rarely invalidated)
- private volatile long producerIndexCache;
-
- public SpmcArrayQueueProducerIndexCacheField(int capacity) {
- super(capacity);
- }
-
- protected final long lvProducerIndexCache() {
- return producerIndexCache;
- }
-
- protected final void svProducerIndexCache(long v) {
- producerIndexCache = v;
- }
-}
-
-abstract class SpmcArrayQueueL3Pad extends SpmcArrayQueueProducerIndexCacheField {
- long p40, p41, p42, p43, p44, p45, p46;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpmcArrayQueueL3Pad(int capacity) {
- super(capacity);
- }
-}
-
-public final class SpmcArrayQueue extends SpmcArrayQueueL3Pad {
-
- public SpmcArrayQueue(final int capacity) {
- super(capacity);
- }
-
- @Override
- public boolean offer(final E e) {
- if (null == e) {
- throw new NullPointerException("Null is not a valid element");
- }
- final E[] lb = buffer;
- final long lMask = mask;
- final long currProducerIndex = lvProducerIndex();
- final long offset = calcElementOffset(currProducerIndex);
- if (null != lvElement(lb, offset)) {
- long size = currProducerIndex - lvConsumerIndex();
-
- if(size > lMask) {
- return false;
- }
- else {
- // spin wait for slot to clear, buggers wait freedom
- while(null != lvElement(lb, offset));
- }
- }
- spElement(lb, offset, e);
- // single producer, so store ordered is valid. It is also required to correctly publish the element
- // and for the consumers to pick up the tail value.
- soTail(currProducerIndex + 1);
- return true;
- }
-
- @Override
- public E poll() {
- long currentConsumerIndex;
- final long currProducerIndexCache = lvProducerIndexCache();
- do {
- currentConsumerIndex = lvConsumerIndex();
- if (currentConsumerIndex >= currProducerIndexCache) {
- long currProducerIndex = lvProducerIndex();
- if (currentConsumerIndex >= currProducerIndex) {
- return null;
- } else {
- svProducerIndexCache(currProducerIndex);
- }
- }
- } while (!casHead(currentConsumerIndex, currentConsumerIndex + 1));
- // consumers are gated on latest visible tail, and so can't see a null value in the queue or overtake
- // and wrap to hit same location.
- final long offset = calcElementOffset(currentConsumerIndex);
- final E[] lb = buffer;
- // load plain, element happens before it's index becomes visible
- final E e = lpElement(lb, offset);
- // store ordered, make sure nulling out is visible. Producer is waiting for this value.
- soElement(lb, offset, null);
- return e;
- }
-
- @Override
- public E peek() {
- long currentConsumerIndex;
- final long currProducerIndexCache = lvProducerIndexCache();
- E e;
- do {
- currentConsumerIndex = lvConsumerIndex();
- if (currentConsumerIndex >= currProducerIndexCache) {
- long currProducerIndex = lvProducerIndex();
- if (currentConsumerIndex >= currProducerIndex) {
- return null;
- } else {
- svProducerIndexCache(currProducerIndex);
- }
- }
- } while (null == (e = lvElement(calcElementOffset(currentConsumerIndex))));
- return e;
- }
-
- @Override
- public int size() {
- /*
- * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
- * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
- * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
- */
- long after = lvConsumerIndex();
- while (true) {
- final long before = after;
- final long currentProducerIndex = lvProducerIndex();
- after = lvConsumerIndex();
- if (before == after) {
- return (int) (currentProducerIndex - after);
- }
- }
- }
-
- @Override
- public boolean isEmpty() {
- // Order matters!
- // Loading consumer before producer allows for producer increments after consumer index is read.
- // This ensures the correctness of this method at least for the consumer thread. Other threads POV is not really
- // something we can fix here.
- return (lvConsumerIndex() == lvProducerIndex());
- }
-}
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,195 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java
- */
-package rx.internal.util.unsafe;
-
-import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE;
-
-abstract class SpscArrayQueueColdField extends ConcurrentCircularArrayQueue {
- private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
- protected final int lookAheadStep;
- public SpscArrayQueueColdField(int capacity) {
- super(capacity);
- lookAheadStep = Math.min(capacity/4, MAX_LOOK_AHEAD_STEP);
- }
-}
-abstract class SpscArrayQueueL1Pad extends SpscArrayQueueColdField {
- long p10, p11, p12, p13, p14, p15, p16;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpscArrayQueueL1Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpscArrayQueueProducerFields extends SpscArrayQueueL1Pad {
- protected final static long P_INDEX_OFFSET;
- static {
- try {
- P_INDEX_OFFSET =
- UNSAFE.objectFieldOffset(SpscArrayQueueProducerFields.class.getDeclaredField("producerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- protected long producerIndex;
- protected long producerLookAhead;
-
- public SpscArrayQueueProducerFields(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpscArrayQueueL2Pad extends SpscArrayQueueProducerFields {
- long p20, p21, p22, p23, p24, p25, p26;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpscArrayQueueL2Pad(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpscArrayQueueConsumerField extends SpscArrayQueueL2Pad {
- protected long consumerIndex;
- protected final static long C_INDEX_OFFSET;
- static {
- try {
- C_INDEX_OFFSET =
- UNSAFE.objectFieldOffset(SpscArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
- } catch (NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
- public SpscArrayQueueConsumerField(int capacity) {
- super(capacity);
- }
-}
-
-abstract class SpscArrayQueueL3Pad extends SpscArrayQueueConsumerField {
- long p40, p41, p42, p43, p44, p45, p46;
- long p30, p31, p32, p33, p34, p35, p36, p37;
-
- public SpscArrayQueueL3Pad(int capacity) {
- super(capacity);
- }
-}
-
-/**
- * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
- *
- * This implementation is a mashup of the Fast Flow
- * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast
- * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
- * For convenience the relevant papers are available in the resources folder:
- * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
- * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
- * This implementation is wait free.
- *
- * @author nitsanw
- *
- * @param
- */
-public final class SpscArrayQueue extends SpscArrayQueueL3Pad {
-
- public SpscArrayQueue(final int capacity) {
- super(capacity);
- }
-
- /**
- * {@inheritDoc}
- *
- * This implementation is correct for single producer thread use only.
- */
- @Override
- public boolean offer(final E e) {
- // local load of field to avoid repeated loads after volatile reads
- final E[] lElementBuffer = buffer;
- final long index = producerIndex;
- final long offset = calcElementOffset(index);
- if (null != lvElement(lElementBuffer, offset)){
- return false;
- }
- soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
- soElement(lElementBuffer, offset, e); // StoreStore
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * This implementation is correct for single consumer thread use only.
- */
- @Override
- public E poll() {
- final long index = consumerIndex;
- final long offset = calcElementOffset(index);
- // local load of field to avoid repeated loads after volatile reads
- final E[] lElementBuffer = buffer;
- final E e = lvElement(lElementBuffer, offset);// LoadLoad
- if (null == e) {
- return null;
- }
- soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
- soElement(lElementBuffer, offset, null);// StoreStore
- return e;
- }
-
- /**
- * {@inheritDoc}
- *
- * This implementation is correct for single consumer thread use only.
- */
- @Override
- public E peek() {
- return lvElement(calcElementOffset(consumerIndex));
- }
-
- @Override
- public int size() {
- /*
- * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
- * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
- * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
- */
- long after = lvConsumerIndex();
- while (true) {
- final long before = after;
- final long currentProducerIndex = lvProducerIndex();
- after = lvConsumerIndex();
- if (before == after) {
- return (int) (currentProducerIndex - after);
- }
- }
- }
-
- private void soProducerIndex(long v) {
- UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v);
- }
-
- private void soConsumerIndex(long v) {
- UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v);
- }
-
- private long lvProducerIndex() {
- return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
- }
-
- private long lvConsumerIndex() {
- return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
- }
-}
-
diff -Nru RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java
--- RxJava-1.0.13/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java 1970-01-01 01:00:00.000000000 +0100
@@ -1,108 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
- * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java
- */
-package rx.internal.util.unsafe;
-
-import rx.internal.util.atomic.LinkedQueueNode;
-
-
-
-/**
- * This is a weakened version of the MPSC algorithm as presented on 1024
- * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and
- * layout:
- *
- * - Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields.
- *
- As this is an SPSC we have no need for XCHG, an ordered store is enough.
- *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this
- * point follow the notes on offer/poll.
- *
- * @author nitsanw
- *
- * @param
- */
-public final class SpscLinkedQueue extends BaseLinkedQueue {
-
- public SpscLinkedQueue() {
- spProducerNode(new LinkedQueueNode());
- spConsumerNode(producerNode);
- consumerNode.soNext(null); // this ensures correct construction: StoreStore
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Offer is allowed from a SINGLE thread.
- * Offer allocates a new node (holding the offered value) and:
- *
- * - Sets that node as the producerNode.next
- *
- Sets the new node as the producerNode
- *
- * From this follows that producerNode.next is always null and for all other nodes node.next is not null.
- *
- * @see MessagePassingQueue#offer(Object)
- * @see java.util.Queue#offer(java.lang.Object)
- */
- @Override
- public boolean offer(final E nextValue) {
- if (nextValue == null) {
- throw new IllegalArgumentException("null elements not allowed");
- }
- final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue);
- producerNode.soNext(nextNode);
- producerNode = nextNode;
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and:
- *
- * - If it is null, the queue is empty.
- *
- If it is not null set it as the consumer node and return it's now evacuated value.
- *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null
- * values are not allowed to be offered this is the only node with it's value set to null at any one time.
- *
- */
- @Override
- public E poll() {
- final LinkedQueueNode nextNode = consumerNode.lvNext();
- if (nextNode != null) {
- // we have to null out the value because we are going to hang on to the node
- final E nextValue = nextNode.getAndNullValue();
- consumerNode = nextNode;
- return nextValue;
- }
- return null;
- }
-
- @Override
- public E peek() {
- final LinkedQueueNode nextNode = consumerNode.lvNext();
- if (nextNode != null) {
- return nextNode.lpValue();
- } else {
- return null;
- }
- }
-}
\ Manca newline alla fine del file
diff -Nru RxJava-1.0.13/src/test/java/rx/internal/util/JCToolsQueueTests.java RxJava-1.0.13.jctools/src/test/java/rx/internal/util/JCToolsQueueTests.java
--- RxJava-1.0.13/src/test/java/rx/internal/util/JCToolsQueueTests.java 2015-07-20 19:58:51.000000000 +0200
+++ RxJava-1.0.13.jctools/src/test/java/rx/internal/util/JCToolsQueueTests.java 2015-08-07 16:11:25.265234890 +0200
@@ -17,6 +17,10 @@
import static org.junit.Assert.*;
+import org.jctools.queues.MpmcArrayQueue;
+import org.jctools.queues.SpmcArrayQueue;
+import org.jctools.queues.SpscArrayQueue;
+
import org.junit.Test;
import rx.internal.util.unsafe.*;