java pull-stream

Pull-stream implementation in java.

License

License

GroupId

GroupId

com.zmannotes.stream
ArtifactId

ArtifactId

pull-stream
Last Version

Last Version

2.1.5
Release Date

Release Date

Type

Type

jar
Description

Description

java pull-stream
Pull-stream implementation in java.
Project URL

Project URL

https://github.com/zman2013/pull-stream
Source Code Management

Source Code Management

https://github.com/zman2013/pull-stream

Download pull-stream

How to add to project

<!-- https://jarcasting.com/artifacts/com.zmannotes.stream/pull-stream/ -->
<dependency>
    <groupId>com.zmannotes.stream</groupId>
    <artifactId>pull-stream</artifactId>
    <version>2.1.5</version>
</dependency>
// https://jarcasting.com/artifacts/com.zmannotes.stream/pull-stream/
implementation 'com.zmannotes.stream:pull-stream:2.1.5'
// https://jarcasting.com/artifacts/com.zmannotes.stream/pull-stream/
implementation ("com.zmannotes.stream:pull-stream:2.1.5")
'com.zmannotes.stream:pull-stream:jar:2.1.5'
<dependency org="com.zmannotes.stream" name="pull-stream" rev="2.1.5">
  <artifact name="pull-stream" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.zmannotes.stream', module='pull-stream', version='2.1.5')
)
libraryDependencies += "com.zmannotes.stream" % "pull-stream" % "2.1.5"
[com.zmannotes.stream/pull-stream "2.1.5"]

Dependencies

compile (1)

Group / Artifact Type Version
com.zmannotes.event : event-emitter jar 0.0.4

test (3)

Group / Artifact Type Version
ch.qos.logback : logback-classic jar 1.2.3
junit : junit jar 4.12
org.mockito : mockito-core jar 3.1.0

Project Modules

There are no modules declared in this project.

Travis Build Coverage Status

pull-stream

A java implementation of the pull-stream while resolved the error propagation and the non-blocking back-pressure. https://pull-stream.github.io/

Getting started

Setting up the dependency

  • Gradle
implementation "com.zmannotes.stream:pull-stream:2.1.5"
  • Maven
<dependency>
    <groupId>com.zmannotes.stream</groupId>
    <artifactId>pull-stream</artifactId>
    <version>2.1.5</version>
</dependency>

Hello World

The sink will pull random numbers through the through from the source.

public static void main(String[] args){
    // prepare source
    ISource<Integer> source = new DefaultSource<>(
            () -> new Random().nextInt()
    );

    // mod 1000
    IThrough<Integer> through = new DefaultThrough<>(d->d%1000);

    // output data
    ISink<Integer> sink = new DefaultSink<>(System.out::println);

    // pull: source -> through -> sink
    pull(source, through, sink);
}

Base classes

Pull-stream features several base interfaces you can discover operators on:

Example

support lambda

public class DuplexExample {
    
    private IDuplex duplex;
    
    public DuplexExample(){
        duplex = new DefaultDuplex(this::onData, this::onClosed);
    }
    
    // 消费数据
    private boolean onData(Object data){}
    // 流关闭时回调
    private void onClosed(Throwable throwable){}
    
}

duplex

Holder<Integer> holderA = new Holder<>(0);
Holder<Integer> holderB = new Holder<>(0);

IStreamBuffer<Integer> bufferA = new DefaultStreamBuffer<>();
IDuplex<Integer> a = new DefaultDuplex<>(bufferA, 
        data -> {
            holderA.value = data;
            return false;
        }
    }
});


IStreamBuffer<Integer> bufferB = new DefaultStreamBuffer<>();
IDuplex<Integer> b = new DefaultDuplex<>(bufferB, 
        data ->{
            holderB.value = data;
            return false;
        }
});

Pull.link(a, b);
bufferA.offer(100);
bufferB.offer(-100);

a.close();
b.close();

callback

You can create an ISinkCallback\ISourceCallback\IDuplexCallback to be invoked when sth. occurs on the streams.

IStreamBuffer<Integer> buffer = new DefaultStreamBuffer<>();
ISource<Integer> source = new DefaultSource<>(buffer, t->{});
ISink<Integer> sink = new DefaultSink<>(onNext, onWait, onClosed);

buffer.offer(1);

when(onNext.apply(any())).thenReturn(false);

sink.read(source);

buffer.offer(2);

sink.close(null);

// 验证
verify(onNext, times(1)).apply(1);
verify(onNext, times(1)).apply(2);
verify(onWait, times(2)).run();
verify(onClosed, times(1)).accept(null);

Versions

Version
2.1.5
2.1.3
2.1.2
2.1.1
2.1.0
2.0.0
1.0.0
0.0.13
0.0.12
0.0.11
0.0.9
0.0.8
0.0.7
0.0.6
0.0.5
0.0.4