Lightweight Event Emitter

Both thread-safe and non-thread-safe implementation.

License

License

GroupId

GroupId

com.zmannotes.event
ArtifactId

ArtifactId

event-emitter
Last Version

Last Version

0.0.5
Release Date

Release Date

Type

Type

jar
Description

Description

Lightweight Event Emitter
Both thread-safe and non-thread-safe implementation.
Project URL

Project URL

https://github.com/zman2013/event-emitter
Source Code Management

Source Code Management

https://github.com/zman2013/pull-stream

Download event-emitter

How to add to project

<!-- https://jarcasting.com/artifacts/com.zmannotes.event/event-emitter/ -->
<dependency>
    <groupId>com.zmannotes.event</groupId>
    <artifactId>event-emitter</artifactId>
    <version>0.0.5</version>
</dependency>
// https://jarcasting.com/artifacts/com.zmannotes.event/event-emitter/
implementation 'com.zmannotes.event:event-emitter:0.0.5'
// https://jarcasting.com/artifacts/com.zmannotes.event/event-emitter/
implementation ("com.zmannotes.event:event-emitter:0.0.5")
'com.zmannotes.event:event-emitter:jar:0.0.5'
<dependency org="com.zmannotes.event" name="event-emitter" rev="0.0.5">
  <artifact name="event-emitter" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.zmannotes.event', module='event-emitter', version='0.0.5')
)
libraryDependencies += "com.zmannotes.event" % "event-emitter" % "0.0.5"
[com.zmannotes.event/event-emitter "0.0.5"]

Dependencies

test (3)

Group / Artifact Type Version
ch.qos.logback : logback-classic jar 1.2.3
junit : junit jar 4.12
org.mockito : mockito-core jar 3.1.0

Project Modules

There are no modules declared in this project.

Travis Build Coverage Status

pull-stream

A java implementation of the pull-stream while resolved the error propagation and the non-blocking back-pressure. https://pull-stream.github.io/

Getting started

Setting up the dependency

  • Gradle
implementation "com.zmannotes.stream:pull-stream:2.1.5"
  • Maven
<dependency>
    <groupId>com.zmannotes.stream</groupId>
    <artifactId>pull-stream</artifactId>
    <version>2.1.5</version>
</dependency>

Hello World

The sink will pull random numbers through the through from the source.

public static void main(String[] args){
    // prepare source
    ISource<Integer> source = new DefaultSource<>(
            () -> new Random().nextInt()
    );

    // mod 1000
    IThrough<Integer> through = new DefaultThrough<>(d->d%1000);

    // output data
    ISink<Integer> sink = new DefaultSink<>(System.out::println);

    // pull: source -> through -> sink
    pull(source, through, sink);
}

Base classes

Pull-stream features several base interfaces you can discover operators on:

Example

support lambda

public class DuplexExample {
    
    private IDuplex duplex;
    
    public DuplexExample(){
        duplex = new DefaultDuplex(this::onData, this::onClosed);
    }
    
    // 消费数据
    private boolean onData(Object data){}
    // 流关闭时回调
    private void onClosed(Throwable throwable){}
    
}

duplex

Holder<Integer> holderA = new Holder<>(0);
Holder<Integer> holderB = new Holder<>(0);

IStreamBuffer<Integer> bufferA = new DefaultStreamBuffer<>();
IDuplex<Integer> a = new DefaultDuplex<>(bufferA, 
        data -> {
            holderA.value = data;
            return false;
        }
    }
});


IStreamBuffer<Integer> bufferB = new DefaultStreamBuffer<>();
IDuplex<Integer> b = new DefaultDuplex<>(bufferB, 
        data ->{
            holderB.value = data;
            return false;
        }
});

Pull.link(a, b);
bufferA.offer(100);
bufferB.offer(-100);

a.close();
b.close();

callback

You can create an ISinkCallback\ISourceCallback\IDuplexCallback to be invoked when sth. occurs on the streams.

IStreamBuffer<Integer> buffer = new DefaultStreamBuffer<>();
ISource<Integer> source = new DefaultSource<>(buffer, t->{});
ISink<Integer> sink = new DefaultSink<>(onNext, onWait, onClosed);

buffer.offer(1);

when(onNext.apply(any())).thenReturn(false);

sink.read(source);

buffer.offer(2);

sink.close(null);

// 验证
verify(onNext, times(1)).apply(1);
verify(onNext, times(1)).apply(2);
verify(onWait, times(2)).run();
verify(onClosed, times(1)).accept(null);

Versions

Version
0.0.5
0.0.4