Giter Club home page Giter Club logo

scramjet's Introduction

Scramjet - Transform as a Service

Scramjet Company Logo

Welcome to Scramjet's GitHub page!

We are a deep-tech company with a mission of delivering technology to improve the life of developers by allowing them to write fewer lines of code and be more effective in handling data. We want to let developers connect programs, not environments.

Scramjet Cloud Platform

Scramjet Cloud Platform Hub Logo

Scramjet Cloud Platforms brings clouds and local infrastructures together with simplicity at its core. It is a distributed data processing platform that allows you to deploy and run your programs in a serverless manner. It allows customers to run programs simultaneously in multiple languages, and interconnect multiple environments in ubiquitous data spaces, where programs can share the data and expose it through a single, globally available API.

Features:

  • write and deploy simple long-running data processing programs called Sequences,
  • use the platform console panel to interact with your Sequences,
  • invoke Sequences programmatically via CLI or API or a set of client libraries,
  • monitor and control running Sequences via one central, publicly available, secure API,
  • send and receive data produces and required by running Sequences,
  • share data between multiple Sequences by enclosing them in virtual Data Spaces,
  • connect self-hosted servers to existing Data Spaces with minimum configuration,
  • create execution environments at multiple cloud providers with a click of a button.
  • low memory footprint compared to, over 20 times lower memory usage than the Kafka/Flink stack

Use cases:

Scramjet Cloud Platform can become handy in any use case involving streaming data from one source to another, it can handle many types of inputs (basically any input that can be handled by Node.js application such as Text, JSON, XML, SOAP, Audio, Video and more). In the Sequence, you can perform any operation on the data with your code. Scramjet Cloud Platform exposes its REST API to provide and receive data and manage Sequences, Instances and itself. Lastly, our engine outputs can be managed in several ways - you can output to a local or remote file, STDOUT and STDERR output systems are supported, outputs can be consumed from our REST API, you can request URL / webhook, or stream the output to a destination of your choosing - or a mix of the above!

Samples:

We have several ready-to-use samples prepared in Python, JavaScript and TypeScript - you can check them all out in our documentation here: https://docs.scramjet.org/platform/samples.

Register and get 30-days trial:

Register

Scramjet Transform Hub

Scramjet Transform Hub Logo

Scramjet Transform Hub is the open-source core of the Scramjet Cloud Platform. It allows customers to start the execution part of the platform in any environment, on any system, on-premises or at any cloud or infrastructure provider. Our customers can install STH (Scramjet Transform Hub) on any POSIX-compatible system and access their data from any location, using private APIs or deploy a whole solution deep behind firewalls, leveraging your available computing resources.

The main repository for Scramjet Transform Hub is scramjetorg/transform-hub.

You can also find Scramjet Transform Hub packages on NPM: npmjs.com/package/@scramjet/sth and npmjs.com/package/@scramjet/cli.

Scramjet Framework

Scramjet Framework Logo

Fast, simple, functional reactive stream programming framework written on top of node.js object streams. The code is written by chaining functions that transform the streamed data, including the well-known map, filter and reduce and is fully compatible with ES7 async/await. Thanks to it some built-in optimizations scramjet is much faster and much simpler than similar frameworks when using asynchronous operations.

The main repository for the Scramjet framework is scramjetorg/framework-v4.

You can also find the Scramjet Framework package on NPM npmjs.com/package/scramjet

More about us

scramjet's People

Contributors

luccam-scr avatar michalcz avatar s4adam avatar stokowski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scramjet's Issues

Typescript 3.8 definition file error


node_modules/scramjet/.d.ts/scramjet.d.ts:38:96 - error TS2304: Cannot find name 'AsyncFunction'.

38     function from<T>(input: any[] | Iterable<T> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                                  ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:38:132 - error TS2304: Cannot find name 'Readable'.

38     function from<T>(input: any[] | Iterable<T> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                                                                      ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:38:172 - error TS2304: Cannot find name 'Writable'.

38     function from<T>(input: any[] | Iterable<T> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                                                                                                              ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:45:48 - error TS2304: Cannot find name 'ScramjetOptions'.

45     function fromArray(array: any[], options?: ScramjetOptions): DataStream;
                                                  ~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:61:49 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

61     function createReadModule(anything: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: CreateModuleOptions, ...initialArgs: any): Function;     
                                                   ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:61:105 - error TS2304: Cannot find name 'AsyncFunction'.

61     function createReadModule(anything: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: CreateModuleOptions, ...initialArgs: any): Function;     
                                                                                                           ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:61:141 - error TS2304: Cannot find name 'Readable'.

61     function createReadModule(anything: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: CreateModuleOptions, ...initialArgs: any): Function;     
                                                                                                                                               ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:190:36 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

190         static from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                       ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:190:92 - error TS2304: Cannot find name 'AsyncFunction'.

190         static from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                               ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:190:128 - error TS2304: Cannot find name 'Readable'.

190         static from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                                                                   ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:190:168 - error TS2304: Cannot find name 'Writable'.

190         static from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
                                                                                                                                                                           ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:318:64 - error TS2315: Type 'UseCallback' is not generic.

318         use(func: AsyncGeneratorFunction | GeneratorFunction | UseCallback<this> | String | Readable, ...parameters: any): this;
                                                                   ~~~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:318:93 - error TS2304: Cannot find name 'Readable'.

318         use(func: AsyncGeneratorFunction | GeneratorFunction | UseCallback<this> | String | Readable, ...parameters: any): this;
                                                                                                ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:348:43 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

348         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;       
                                              ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:348:99 - error TS2304: Cannot find name 'AsyncFunction'.

348         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;       
                                                                                                      ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:348:135 - error TS2304: Cannot find name 'Readable'.

348         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;       
                                                                                                                                          ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:348:160 - error TS2304: Cannot find name 'AsyncFunction'.

348         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;       
                                                                                                                                                                   ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:348:187 - error TS2304: Cannot find name 'Transform'.

348         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;       
                                                                                                                                                                                              ~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:365:38 - error TS1047: A rest parameter cannot be optional.

365         whenWrote(chunk: any, ...more?: any): Promise<any>;
                                         ~
node_modules/scramjet/.d.ts/scramjet.d.ts:400:33 - error TS2304: Cannot find name 'Writable'.

400         tee(func: TeeCallback | Writable): this;
                                    ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:458:18 - error TS2304: Cannot find name 'Writable'.

458         pipe(to: Writable, options?: WritableOptions): Writable;
                     ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:458:38 - error TS2304: Cannot find name 'WritableOptions'.

458         pipe(to: Writable, options?: WritableOptions): Writable;
                                         ~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:458:56 - error TS2304: Cannot find name 'Writable'.

458         pipe(to: Writable, options?: WritableOptions): Writable;
                                                           ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:491:39 - error TS2707: Generic type 'Iterator<T, TReturn, TNext>' requires between 1 and 3 type arguments.

491         static fromIterator(iterator: Iterator, options?: DataStreamOptions): DataStream;
                                          ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:500:48 - error TS8020: JSDoc types can only be used inside documentation comments.

500         toArray(initial?: any[]): Promise<Array.<any>>;
                                                   ~
node_modules/scramjet/.d.ts/scramjet.d.ts:505:33 - error TS8020: JSDoc types can only be used inside documentation comments.

505         toGenerator(): Generator.<Promise.<any>>;
                                    ~
node_modules/scramjet/.d.ts/scramjet.d.ts:505:42 - error TS8020: JSDoc types can only be used inside documentation comments.

505         toGenerator(): Generator.<Promise.<any>>;
                                             ~
node_modules/scramjet/.d.ts/scramjet.d.ts:517:32 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

517         pull(pullable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...args: any): Promise<Promise>;
                                   ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:517:88 - error TS2304: Cannot find name 'AsyncFunction'.

517         pull(pullable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...args: any): Promise<Promise>;
                                                                                           ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:517:124 - error TS2304: Cannot find name 'Readable'.

517         pull(pullable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...args: any): Promise<Promise>;
                                                                                                                               ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:517:157 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

517         pull(pullable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...args: any): Promise<Promise>;
                                                                                                                                                                ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:588:66 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

588         accumulate(func: AccumulateCallback, into: any): Promise<Promise>;
                                                                     ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:613:53 - error TS2304: Cannot find name 'EventEmitter'.

613         reduceNow(func: ReduceCallback, into: any | EventEmitter): any;
                                                        ~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:646:28 - error TS2503: Cannot find namespace 'stream'.

646         concat(...streams: stream.Readable): this;
                               ~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:690:58 - error TS1016: A required parameter cannot follow an optional parameter.

690         distribute(affinity?: AffinityCallback | Number, clusterFunc: ClusterCallback, options?: DataStreamOptions): this;
                                                             ~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:690:71 - error TS2304: Cannot find name 'ClusterCallback'.

690         distribute(affinity?: AffinityCallback | Number, clusterFunc: ClusterCallback, options?: DataStreamOptions): this;
                                                                          ~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:697:31 - error TS2315: Type 'Object' is not generic.

697         separateInto(streams: Object<DataStream>, affinity: AffinityCallback): this;
                                  ~~~~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:716:32 - error TS2304: Cannot find name 'DelegateCallback'.

716         delegate(delegateFunc: DelegateCallback, worker: WorkerStream, plugins?: any[]): this;
                                   ~~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:716:58 - error TS2304: Cannot find name 'WorkerStream'.

716         delegate(delegateFunc: DelegateCallback, worker: WorkerStream, plugins?: any[]): this;
                                                             ~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:760:33 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

760         toJSONArray(enclosure?: Iterable): StringStream;
                                    ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:767:63 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

767         toJSONObject(entryCallback?: MapCallback, enclosure?: Iterable): StringStream;
                                                                  ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:793:58 - error TS1016: A required parameter cannot follow an optional parameter.

793         exec(command: String, options?: ExecDataOptions, args: String): void;
                                                             ~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:842:38 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

842     type MapCallback = (chunk: any)=>Promise | any;
                                         ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:857:62 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

857     type ReduceCallback = (accumulator: any, chunk: Object)=>Promise | any;
                                                                 ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:982:9 - error TS2416: Property 'shift' in type 'StringStream' is not assignable to the same property in base type 'DataStream'.
  Type '(bytes: Number, func: ShiftStringCallback) => this' is not assignable to type '(count: Number, func: ShiftCallback) => this'.
    Types of parameters 'func' and 'func' are incompatible.
      Types of parameters 'shifted' and 'shifted' are incompatible.
        Type 'String' is missing the following properties from type 'Object[]': pop, push, join, reverse, and 19 more.

982         shift(bytes: Number, func: ShiftStringCallback): this;
            ~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1040:43 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1040         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, transforms: AsyncFunction | Function | Transform): StringStream;       
                                               ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1040:99 - error TS2304: Cannot find name 'AsyncFunction'.

1040         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, transforms: AsyncFunction | Function | Transform): StringStream;       
                                                                                                       ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1040:135 - error TS2304: Cannot find name 'Readable'.

1040         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, transforms: AsyncFunction | Function | Transform): StringStream;       
                                                                                                                                           ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1040:157 - error TS2304: Cannot find name 'AsyncFunction'.

1040         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, transforms: AsyncFunction | Function | Transform): StringStream;       
                                                                                                                                                                 ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1040:184 - error TS2304: Cannot find name 'Transform'.

1040         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, transforms: AsyncFunction | Function | Transform): StringStream;       
                                                                                                                                                                                            ~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1049:46 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1049         static from(source: String | any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): StringStream;
                                                  ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1049:102 - error TS2304: Cannot find name 'AsyncFunction'.

1049         static from(source: String | any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): StringStream;
                                                                                                          ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1049:129 - error TS2304: Cannot find name 'Readable'.

1049         static from(source: String | any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): StringStream;
                                                                                                                                     ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1049:169 - error TS2304: Cannot find name 'Writable'.

1049         static from(source: String | any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): StringStream;
                                                                                                                                                                             ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1095:9 - error TS2416: Property 'exec' in type 'StringStream' is not assignable to the same property in base type 'DataStream'.
  Type '(command: String, options?: ExecOptions, ...args: String) => void' is not assignable to type '(command: String, options?: ExecDataOptions, args: String) => void'.
    Types of parameters 'options' and 'options' are incompatible.
      Type 'ExecDataOptions' has no properties in common with type 'ExecOptions'.

1095         exec(command: String, options?: ExecOptions, ...args: String): void;
             ~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1095:54 - error TS2370: A rest parameter must be of an array type.

1095         exec(command: String, options?: ExecOptions, ...args: String): void;
                                                          ~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1109:10 - error TS2300: Duplicate identifier 'ParseCallback'.

1109     type ParseCallback = (chunk: String)=>Promise;
              ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1109:43 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

1109     type ParseCallback = (chunk: String)=>Promise;
                                               ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1115:10 - error TS2300: Duplicate identifier 'ParseCallback'.

1115     type ParseCallback = (chunk: Buffer)=>Promise;
              ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1115:43 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

1115     type ParseCallback = (chunk: Buffer)=>Promise;
                                               ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1151:9 - error TS2416: Property 'shift' in type 'BufferStream' is not assignable to the same property in base type 'DataStream'.
  Type '(chars: Number, func: ShiftBufferCallback) => BufferStream' is not assignable to type '(count: Number, func: ShiftCallback) => this'.
    Types of parameters 'func' and 'func' are incompatible.
      Types of parameters 'shifted' and 'shifted' are incompatible.
        Type 'Buffer' is missing the following properties from type 'Object[]': pop, push, concat, shift, and 3 more.

1151         shift(chars: Number, func: ShiftBufferCallback): BufferStream;
             ~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1174:9 - error TS2416: Property 'stringify' in type 'BufferStream' is not assignable to the same property in base type 'DataStream'.
  Type '(encoding?: String) => StringStream' is not assignable to type '(serializer?: MapCallback) => StringStream'.
    Types of parameters 'encoding' and 'serializer' are incompatible.
      Type 'MapCallback' is missing the following properties from type 'String': charAt, charCodeAt, concat, indexOf, and 38 more.

1174         stringify(encoding?: String): StringStream;
             ~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1192:43 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1192         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): BufferStream;    
                                               ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1192:99 - error TS2304: Cannot find name 'AsyncFunction'.

1192         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): BufferStream;    
                                                                                                       ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1192:135 - error TS2304: Cannot find name 'Readable'.

1192         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): BufferStream;    
                                                                                                                                           ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1192:160 - error TS2304: Cannot find name 'AsyncFunction'.

1192         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): BufferStream;    
                                                                                                                                                                    ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1192:187 - error TS2304: Cannot find name 'Transform'.

1192         static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): BufferStream;    
                                                                                                                                                                                               ~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1200:37 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1200         static from(stream: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): BufferStream;
                                         ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1200:93 - error TS2304: Cannot find name 'AsyncFunction'.

1200         static from(stream: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): BufferStream;
                                                                                                 ~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1200:120 - error TS2304: Cannot find name 'Readable'.

1200         static from(stream: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): BufferStream;
                                                                                                                            ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1200:160 - error TS2304: Cannot find name 'Writable'.

1200         static from(stream: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | Readable, options?: DataStreamOptions | Writable): BufferStream;
                                                                                                                                                                    ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1232:53 - error TS2304: Cannot find name 'Readable'.

1232         constructor(streams: any[] | AsyncGenerator<Readable> | Generator<Readable>, options?: Object);
                                                         ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1232:75 - error TS2304: Cannot find name 'Readable'.

1232         constructor(streams: any[] | AsyncGenerator<Readable> | Generator<Readable>, options?: Object);
                                                                               ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1274:23 - error TS2304: Cannot find name 'Arguments'.

1274         find(...args: Arguments): DataStream;
                           ~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1282:22 - error TS2304: Cannot find name 'TransformFunction'.

1282         filter(func: TransformFunction): MultiStream;
                          ~~~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1295:26 - error TS2304: Cannot find name 'ComparatorFunction'.

1295         mux(comparator?: ComparatorFunction, ClassType?: Function): DataStream;
                              ~~~~~~~~~~~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1304:21 - error TS2503: Cannot find namespace 'stream'.

1304         add(stream: stream.Readable): void;
                         ~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1313:24 - error TS2503: Cannot find namespace 'stream'.

1313         remove(stream: stream.Readable): void;
                            ~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1357:63 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

1357     type AccumulateCallback = (accumulator: any, chunk: any)=>Promise | any;
                                                                   ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1363:42 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

1363     type ConsumeCallback = (chunk: any)=>Promise | any;
                                              ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1371:56 - error TS2314: Generic type 'Promise<T>' requires 1 type argument(s).

1371     type RemapCallback = (emit: Function, chunk: any)=>Promise | any;
                                                            ~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1378:50 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1378     type FlatMapCallback = (chunk: any)=>Promise<Iterable> | Iterable;
                                                      ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1378:62 - error TS2314: Generic type 'Iterable<T>' requires 1 type argument(s).

1378     type FlatMapCallback = (chunk: any)=>Promise<Iterable> | Iterable;
                                                                  ~~~~~~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1486:9 - error TS2416: Property 'sum' in type 'WindowStream' is not assignable to the same property in base type 'NumberStream'.
  Type '(valueOf?: ValueOfCallback) => NumberStream' is not assignable to type '() => Promise<Number>'.
    Type 'NumberStream' is missing the following properties from type 'Promise<Number>': then, [Symbol.toStringTag], finally

1486         sum(valueOf?: ValueOfCallback): NumberStream;
             ~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1492:9 - error TS2416: Property 'avg' in type 'WindowStream' is not assignable to the same property in base type 'NumberStream'.
  Type '(valueOf?: ValueOfCallback) => NumberStream' is not assignable to type '() => Promise<Number>'.
    Type 'NumberStream' is not assignable to type 'Promise<Number>'.

1492         avg(valueOf?: ValueOfCallback): NumberStream;
             ~~~
node_modules/scramjet/.d.ts/scramjet.d.ts:1519:24 - error TS2304: Cannot find name 'StreamWorker'.

1519         StreamWorker?: StreamWorker;

Add a `pipeline` method to pass errors on non-scramjet streams.

As raised at openaq-fetch#561 we're missing out on a part of streams implementation in node v10 - pipeline.

The idea here is to create a number of operations before we kick scramjet transforms in, especially pass the data through a number of Transform streams.

The common use case is fetch |> JSONStream or request |> JSONStream.

From the top of my head it should be something like:

/**
 * Creates a pipeline of streams and returns a scramjet stream.
 *
 * @param {Array|Iterable|AsyncGeneratorFunction|GeneratorFunction|AsyncFunction|Function|String|Readable} readable
 * @param {AsyncGeneratorFunction|GeneratorFunction|AsyncFunction|Function|Transform} transforms
 */
static pipeline(readable, ...transforms) {
    const out = new this();

    (async () => {
        let current = readable;
        for (let trans of transforms) {
            current = current.pipe(trans);
            // handle errors
            // handle non-transforms, generators etc.
        }

    })().catch(e => out.raise(e));

    return out;
}

Until and while may cause an error in the unpiped stream.

Consider this more a use case than a problem (as it is outside of scramjet).

Let's consider code:

StringStream.from(request(someUrl))
    .lines()
    .parse(JSON.parse)
    .until(x => x.date < Date.now())

What we do is unpipe the response, but that is not closed by request so how do we handle this?

Mention: openaq-fetch#547

CSVParse() and batch() seem not to work anymore

Hi,
first of all, thanks for this great library. Worked absolutely great up to now!

We use the following code:

fileStream.pipe(new StringStream('utf-8'))
      .CSVParse({
        skipEmptyLines: true,
        header: true,
      })
      .batch(500)
      .map((items) => {
        console.log(items);
       });

It seems batch and CSVParse do not work together anymore. If we remove batch the console.log is called. If not, no error occurs, but the console.log statement within the map is never executed. Any ideas? I checked the last commits, but couldn't find anything suspicious.

Thanks in advance

Functions have inappropriate type for optional parameters in TypeScript declaration

Basics
TypeScript declarations do not allow not leaving out optional parameters.

Describe the bug
In the TypeScript declarations optional parameters do not have types with ?, making them required parameters. Examples include toArray(initial: any[]): any[];, where initial is described as an optional argument. However, this should be toArray(initial?: any[]): any[];. This also goes for DataStream.map functions, which take an optional sub-class constructor.

To Reproduce
Attempt using the functions without supplying optional parameters in TypeScript. This should produce errors during transpilation.

Expected behavior
Use functions without supplying optional parameters. Compilation should not produce errors.

Test case
If possible, please provide an automated test case to include, better yet in a forked scramjet repo in test/cases.

Screenshots
Not applicable

System:

  • Any OS
  • TypeScript 3.4.1

Additional context
Not applicable

Batch issues yet again.

Hi Thanks for great library.

but this issue happened again with some strange scenario

I have two functions like this one

one function work like expect but this one process stalled at first .batch() wiith no error no return just process timeout


` res.setHeader('Content-Type', 'application/json; charset=utf-8')
            StringStream
              .from(async function* () {
                const response = await axios.get(csv_url, {
                  responseType: 'stream'
                })
                yield* response.data.pipe(await stripBomStream())
              }, { maxParallel: 4 })
              .CSVParse({ skipEmptyLines: true, header: true })
              .filter((item: any) => (item.Class_Id && parseInt(item.Class_Id) !== 0 && item.Class_Discount > 0))
              .map(async (item: any) => {
                
                const master_name = master_subgroup.find(
                  (master_item: any) => (parseInt(master_item.id) === parseInt(item.Class_Id)))
              
                if (!master_name.name || master_name.name === "") return {}
                all_data_count++
                batch_data_count++
                console.log("pass 3")
                const class_obj = {
                  id: item.Class_Id,
                  name: master_name.name,
                  discount_rate: parseFloat(item.Class_Discount),
                  parent_id: master_name.parent_id,
                  status: "active"
                }
            
             
                return class_obj

              })
              .batch(500)  < process Stalled here
              .map(async (items: any) => {
                console.log("All Items is : " + items.length)
                const start_at = (batch_count * round_count) + 1
                round_count += 1
                const end_at = (start_at + items.length) - 1
                const range = start_at + "-" + end_at

                const unique_items = discount.objectToArray(discount.arrayToObject(items, "id"))
                const complete_subgroup = unique_items.filter((item: any) => {
                  return Object.keys(item).length > 0
                })
                console.log("Master group is : " + JSON.stringify(complete_subgroup))

            
                if (complete_subgroup.length > 0) {
                  const discount_obj = { subgroup: complete_subgroup }
                  console.log(discount_obj)
                  console.log("before send")
                  const update_result = await discount.update(req.params.uid, discount_obj)

             
                  console.log("before result")
                  console.log(update_result.success)
                  console.log("before return")
                  if (update_result.success) {
                    return JSON.stringify({
                      rows: range,
                      received: items.length,
                      accepted: unique_items.length - 1,
                      status: "import_successful",
                      message: "ΰΈ‚ΰΉ‰ΰΈ­ΰΈ‘ΰΈΉΰΈ₯ΰΉ„ΰΈ”ΰΉ‰ΰΈ£ΰΈ±ΰΈšΰΈΰΈ²ΰΈ£ΰΈšΰΈ±ΰΈ™ΰΈ—ΰΈΆΰΈΰΉΰΈ₯ΰΉ‰ΰΈ§"
                    }
                    )
                  } else {
                    return JSON.stringify({
                      rows: range,
                      received: items.length,
                      accepted: 0,
                      status: "problem_with_database",
                      message: "ΰΉ„ΰΈ‘ΰΉˆΰΈͺΰΈ²ΰΈ‘ΰΈ²ΰΈ£ΰΈ–ΰΈšΰΈ±ΰΈ™ΰΈ—ΰΈΆΰΈΰΈ‚ΰΉ‰ΰΈ­ΰΈ‘ΰΈΉΰΈ₯ ΰΈŠΰΉˆΰΈ§ΰΈ‡ΰΈ‚ΰΉ‰ΰΈ­ΰΈ‘ΰΈΉΰΈ₯ΰΈ—ΰΈ΅ΰΉˆ: " + range,
                    })
                  }
                } else {
                  return JSON.stringify({
                    rows: range,
                    received: items.length,
                    accepted: 0,
                    status: "data_id_not_exits",
                    message: "ΰΈŠΰΉˆΰΈ§ΰΈ‡ΰΈ‚ΰΉ‰ΰΈ­ΰΈ‘ΰΈΉΰΈ₯ΰΈ—ΰΈ΅ΰΉˆ: " + range + " ΰΈ£ΰΈ«ΰΈ±ΰΈͺΰΉ„ΰΈ‘ΰΉˆΰΈ‘ΰΈ΅ΰΈ­ΰΈ’ΰΈΉΰΉˆΰΈˆΰΈ£ΰΈ΄ΰΈ‡ΰΉƒΰΈ™ΰΈ£ΰΈ°ΰΈšΰΈš ",
                  })
                }
              })
              .catch((error: any) => {
                console.log("catch block 1" + error.stack)
                if (error.code === "ERR_SCRAMJET_EXTERNAL") {
                  res.statusCode = error.cause.response.status
                  const error_obj = makeErr("can_not_reach_product_api",
                    "ΰΉ„ΰΈ‘ΰΉˆΰΈͺΰΈ²ΰΈ‘ΰΈ²ΰΈ£ΰΈ–ΰΈ•ΰΈ΄ΰΈ”ΰΈ•ΰΉˆΰΈ­ Product API ΰΉ„ΰΈ”ΰΉ‰ ΰΈΰΈ£ΰΈΈΰΈ“ΰΈ²ΰΈ•ΰΈ£ΰΈ§ΰΈˆΰΈͺอบ",
                    error.cause.response.status,
                    process.env.PRODUCT_API_URL)
                  res.send(JSON.stringify(error_obj))
                  return
                } else {
                  res.statusCode = 422
                  const error_obj = makeErr(error.code, error.message, 422)
                  res.send(JSON.stringify(error_obj))
                  return
                  //  return JSON.stringify(error_obj)
                }
              })
              .batch(10000)
              .stringify((resp: any) => {
                // console.log(resp)
                res.statusCode = 200
                return "[" + resp + "]"
              })
              .catch((error: any) => {
                console.log("catch block 3")
                if (error.code === "ERR_SCRAMJET_EXTERNAL") {
                  res.statusCode = error.cause.response.status
                  const error_obj = makeErr(error.cause.response.data.code,
                    error.cause.response.data.message,
                    error.cause.response.status)
                  //res.send(JSON.stringify(error_obj))
                  return JSON.stringify(error_obj)
                } else {
                  res.statusCode = 500
                  const error_obj = makeErr(error.code, error.message, 500)
                  // res.send(JSON.stringify(error_obj))
                  return JSON.stringify(error_obj)
                }
              })
              .toStringStream()
              .pipe(res)`


Originally posted by @KwangGan in #2 (comment)

More specific parallelism

Is your feature request related to a problem? Please describe.
Different parts of a pipeline might want different levels of parallelism, for example you might not want to send more than 20 concurrent http requests, but be fine with 100 concurrent file reads.
Describe the solution you'd like
It would be nice to have another parameter in the .map and .flatMap functions (and probably others) which would allow you to set a level of parallelism for a specific mapping.

I know Highland implements this with a .parallel function, where you .map into a stream of streams and the .parallel() function streams them into a single stream in parallel. A stream of streams is kind of clunky, so likely accepting stream elements that would otherwise be accepted to .flatMap seems like it would be a nice implementation.

Describe alternatives you've considered
I think it's possible to implement this as a module, but that's obviously not ideal for something that I think is a pretty common use case.

I've also considered mapping a batch into a Promise.all, but this is also not ideal because it forces minimum batch sizes which adds a overhead in buffers or backpressure, not to mention that it seemed to block the stream and didn't work when I tried it.

Projects description and up-for-grabs

A clear an concise roadmap needs to be created for the next couple releases until v5.

There's a need to integrate with up-for-grabs.org to attract contributors and investigate other possibilities.

Typescript compilation error: Top-level declarations in .d.ts files must start with either a 'declare' or 'export' modifier.

$ tsc && cp -r src/api dist/api
node_modules/scramjet/.d.ts/index.d.ts:8:1 - error TS1046: Top-level declarations in .d.ts files must start with either a 'declare' or 'export' modifier.

8 function pipelineOverride(...a: never[]): never;
  ~~~~~~~~

node_modules/scramjet/.d.ts/index.d.ts:13:7 - error TS2417: Class static side 'typeof PromiseTransform' incorrectly extends base class static side 'typeof Transform'.
  The types of 'pipeline.__promisify__' are incompatible between these types.
    Type '(...a: never[]) => never' is not assignable to type '{ (stream1: ReadableStream, stream2: WritableStream): Promise<void>; (stream1: ReadableStream, stream2: ReadWriteStream, stream3: WritableStream): Promise<...>; (stream1: ReadableStream, stream2: ReadWriteStream, stream3: ReadWriteStream, stream4: WritableStream): Promise<...>; (stream1: ReadableStream, stream2: Rea...'.
      Types of parameters 'a' and 'stream1' are incompatible.
        Type 'ReadableStream' is not assignable to type 'never'.

**TS version: ** 3.8.3

**Scramjet version: ** 4.33.4

**Solution: **

downgraded to 4.30.1

Typescript types are massively broken

Trying to use scramjet in TypeScript and getting a bucket of errors such as:

  • Cannot find name 'ScramjetOptions'.
  • Generic type 'Promise' requires 1 type argument(s).
  • Generic type 'Iterable' requires 1 type argument(s).

Just looking at them I'm not sure if they were ever meant to work or in what version of TypeScript they did but there's clearly references to types that aren't ever provided like ScramjetOptions:

https://github.com/signicode/scramjet/blob/2fcaefcb6c29064457f789f8303a9ec24cc7ec5a/.d.ts/scramjet.d.ts#L40-L45

Screenshots
Screenshot 2020-01-23 at 21 14 40

System:

  • OS: [e.g. Ubuntu Server 18.04] OSX
  • Node version: [e.g. v10.6.0] 12.13.1
  • TypeScript version: 3.7.2
  • Scramjet Version: [e.g. 4.19] 4.27.0

DataStream.distribute causes "StreamWorker child timeout"

First of all, thanks for this awesome library! I am very eager to test out hwich perfroamnce gains I can achieve using the distribute method, but somehow I cannot get it to run. Posting as a regular issue, because I am not sure if this is a bug or something wrong on my side.

I derived a small script from the testcase of the distribute method to test out any performance gains, but it keeps exiting on a

Error: StreamWorker child timeout!

const { DataStream } = require('scramjet');

const cpus = require("os").cpus().length * 2;

function* gen() {
    for (let z = 0; z < 1e3; z++)
        yield z;
}

DataStream.fromIterator(gen())
    .distribute(
        i => i % cpus,
        (stream) =>
            stream
            //.each((x) => console.log(process.pid, x))
                .filter(num => {
                    if (num < 2) return false;
                    if (num == 2) return true;
                    for(var i = 2; i < num/2; i++) {
                        if (num % i === 0) return false;
                    }
                    return true;
                })
    )
    .toArray()
    .then((arr) => {
        console.log("arr", arr)
    });

produces:

(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:18749) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 3)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 4)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 5)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 6)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 7)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 8)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 9)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 10)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 11)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 12)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 13)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 14)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 15)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 16)
(node:18749) UnhandledPromiseRejectionWarning: Error: StreamWorker child timeout!
    at Timeout.setTimeout [as _onTimeout] (/Users/peavey/webprojects/airtb/node-jobs/node_modules/scramjet/lib/stream-worker.js:66:79)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10)
(node:18749) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 17)

System:

  • OS: MacOSX 10.14.6
  • Node version: v10.16.0
  • Scramjet Version: 4.27.0

Thanks for any pointer on how to solve this.

An in-range update of scramjet-core is breaking the build 🚨

The dependency scramjet-core was updated from 4.16.13 to 4.16.14.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

scramjet-core is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Last line of CSV makes node crash?

It's me again.

After playing with multiple CSV files, it seems that the last line doesn't get inserted.

I'm outputing the line in the .each loop.
On a CSV file containing the list of countries, all values of the invisible last line are undefined or NaN.

arg: [ 'BVT', 'BV', '74', 'Bouvet Island', '55', '', null, null, '', 'AN' ]
arg: [ undefined,
  undefined,
  undefined,
  '',
  undefined,
  undefined,
  NaN,
  NaN,
  undefined,
  undefined ]

I opened the file, there is no last line (the code was published in another issue here). I receive 'Read entire file' at the end since it didn't crash.

On a CSV file containing provider's rate list, the last line is properly interpreted in the .each loop but node crashes.

arg: [ 'standard',
  'Singapore - Singapore - National - Voice (with CLI)',
  '2018-10-04',
  60,
  0.506,
  2,
  0,
  60,
  0,
  2,
  50,
  30,
  0.209,
  'SGP' ]
β„Ή info Inserted   standard,Singapore - Singapore - National - Voice (with CLI),2018-10-04,60,0.506,2,0,60,0,2,50,30,0.209,SGP
Country found for Portugal PRT
arg: [ 'standard',
  'Portugal - Portugal - Local - Voice',
  '2018-10-25',
  60,
  0.0285,
  2,
  0,
  60,
  0,
  2,
  4.75,
  0,
  0.03,
  'PRT' ]
[nodemon] app crashed - waiting for file changes before starting...

I'm using the same code for both CSV file, exept the each loop where I treat each file differently because of different columns location. I'll try with a third CSV file.

An in-range update of scramjet-core is breaking the build 🚨

The dependency scramjet-core was updated from 4.18.6 to 4.19.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

scramjet-core is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • βœ… continuous-integration/appveyor/branch: AppVeyor build succeeded (Details).
  • ❌ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of eslint is breaking the build 🚨

The devDependency eslint was updated from 5.15.3 to 5.16.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

eslint is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • βœ… continuous-integration/appveyor/branch: AppVeyor build succeeded (Details).
  • ❌ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Release Notes for v5.16.0
  • dfef227 Build: gensite passes rulesMeta to formatter rendering (#11567) (Kevin Partington)
  • c06d38c Fix: Allow HTML formatter to handle no meta data (#11566) (Ilya Volodin)
  • 87a5c03 Docs: func-style: clarify when allowArrowFunctions is used (#11548) (Oliver Joseph Ash)
  • bc3e427 Update: pass rule meta to formatters RFC 10 (#11551) (Chris Meyer)
  • b452f27 Chore: Update README to pull in reviewer data (#11506) (Nicholas C. Zakas)
  • afe3d25 Upgrade: Bump js-yaml dependency to fix Denial of Service vulnerability (#11550) (Vernon de Goede)
  • 4fe7eb7 Chore: use nyc instead of istanbul (#11532) (Toru Nagashima)
  • f16af43 Chore: fix formatters/table test (#11534) (Toru Nagashima)
  • 78358a8 Docs: fix duplicate punctuation in CLI docs (#11528) (Teddy Katz)
Commits

The new version differs by 11 commits.

  • ded2f94 5.16.0
  • ea36e13 Build: changelog update for 5.16.0
  • dfef227 Build: gensite passes rulesMeta to formatter rendering (#11567)
  • c06d38c Fix: Allow HTML formatter to handle no meta data (#11566)
  • 87a5c03 Docs: func-style: clarify when allowArrowFunctions is used (#11548)
  • bc3e427 Update: pass rule meta to formatters RFC 10 (#11551)
  • b452f27 Chore: Update README to pull in reviewer data (#11506)
  • afe3d25 Upgrade: Bump js-yaml dependency to fix Denial of Service vulnerability (#11550)
  • 4fe7eb7 Chore: use nyc instead of istanbul (#11532)
  • f16af43 Chore: fix formatters/table test (#11534)
  • 78358a8 Docs: fix duplicate punctuation in CLI docs (#11528)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

XMLParse()

Any chance you'll be including XMLParse() functionality?

Move all the stream classes out of scramjet-core

The stream classes are currently extended in scramjet and the base resides in scramjet-core. This situation is quite bad for any new release which rarely changes the actual core class PromiseTransformStream.

This issue is to move all the stream base classes to scramjet repo and leave only PromiseTransformStream and utility classes in scramjet-core.

Investigate possible vuln in papaparse

Basics:

According to the report here mholt/PapaParse#793 there's a chance that someone could export a formula in the produced csv and use it as an attack on spreadsheets...

This sounds a bit crazy as:

  • what if someone actually wanted to output a formula in csv?
  • it isn't us who's under attack

Anyway Snyk seems to have marked this as a high level, so let's leave it hanging and revisit.

Let's watch this for now and see how the situation develops.

It would be nice to have a way to stop processing records on first error.

think in an scenario using the following code:

await DataStream
	.from(productsStream)
	.setOptions({ maxParallel : 1 })
	.map(doSomething)
	.batch(25)
	.map(doSomethingElse)
	.catch(e => {
		e.stream.end();
	})
	.run();

When any error occurs the streams is closed but for some reason scramjet is still processing data, is there a way to stop processing the data and return/throw/rise the error?

maxParallel stalls when items are processed out of order

Say I would like to process some SQL tables two at a time. Each table is mapped into() a stream, and each of those streams will take variable time to complete. If table B ends before table A, then table C wont start processing until A also completes. Here's some code!

// A and B immediately start streaming.
// If B finishes streaming first, I would expect C to start streaming.
// Instead C doesn't start streaming until A also completes.

await DataStream.from(['A', 'B', 'C'])
    .setOptions({ maxParallel: 2 })
    .into(
        async (out, tableName) => await out.pull(streamTable(tableName)),
        new DataStream()
    )
    .run();

[Help Wanted] Typescript testing

Is your feature request related to a problem? Please describe.

Every now and then a change in scramjet breaks the typescript compatibility. How can we check if the ts.d files are ok?

Describe the solution you'd like

There should be a npm run test:typescript-definitions that should at least install scramjet as a dependency from a file:// location and try to tsc a sample ts file.

Any ideas appreciated. PR's also.

Question: Approach to streamed grouping (when data is sorted)

Hey,

I am slowly getting warm with scramjet and trying some more advanced scenarios. I want to do a groupBy. I understand that in order to really group, all keys need to be known. I did take care of sorting beforehand. I already found a almost working solution with remap:

  const exampleData = [
      { group: 1, id: 1 },
      { group: 1, id: 2 },
      { group: 2, id: 3 },
      { group: 2, id: 4 },
      { group: 3, id: 5 },
      { group: 4, id: 6 },
      { group: 5, id: 7 },
      { group: 5, id: 8 },
      { group: 6, id: 9 },
      { group: 6, id: 10 },
      { group: 7, id: 11 },
  ];

const groupBy = () => {
  let currentGroup;
  let currentGroupEvents = [];
  return (emit, event) => {
    if (!currentGroup) {
      currentGroup = event.group;
    }
   // If group changes, emit all collected events for old group
    if (currentGroup !== event.group) {
      emit(currentGroupEvents);
      currentGroupEvents = [];
      currentGroup = event.group;
    }
    currentGroupEvents.push(event);
  };
};

DataStream.from(exampleData)
      .remap(groupBy())
      .toArray()
      .then((grouped) => {
        // Should have 7 groups, but it only has 6 since the LAST one is never emitted
        expect(grouped).toHaveLength(7);
      });

The approach is to collect events for the same group and as soon as the group changes emit all collected events. The problem is that this change never occurs for the last group. Is there a better way to approach this or to somehow emit the last group?

Introduce some test automation around distribute.

Related: #61

Is your feature request related to a problem? Please describe.
Currently the stream worker tests are not made since Travis doesn't allow these to be executed - this leads to issues like #61 to reappear.

Describe the solution you'd like
Perhaps drone would be a good solution.

Update example on home page now that request is deprecated?

Hello,

I came across this project and tried the first example shown using StringStream which seems to work great.

However it seems the request package is deprecated. Maybe update the example to instead use axios, fetch etc?

request("https://api.example.org/v1/shows/list")

Thanks!

DataStream..use(function* {}) times out during test.

Basics
The issue affects DataStream..use(function* {})

Describe the bug
The current test times out.

To Reproduce
Steps to reproduce the behavior:

  1. Checkout this repo.
  2. npm install
  3. npm test
  4. See error:
βœ— data-stream/test_use/generator
   βœ“ ok(Must return a stream synchonously)
   βœ“ ok(Must not be called and executed synchronously)
   βœ“ equal(Must be called with self as first argument)
   βœ“ equal(Additional arguments must be passed)
   βœ— fail(test timed out after 10000ms)
   ! test timed out after 10000ms

Expected behavior
Should not time out.

Test case
nodeunit-tape -t node_modules/scramjet_core/test/v1/data-stream.js

System:

  • OS: Ubuntu Server 18.04
  • Node version: v8, works on v10/latest.
  • Scramjet Version: 4.20

Additional context
It is possible that PromiseTransformStream.whenRead times out if reading beyond stream end.

`StringStream.exec` command

StringStream should allow execution of any executable on the system piping own data to stdin on one end outputting stdout/stderr data on the other end.

Distribute throws EADDRNOTAVAIL

Hi,

I stumbled upon your library and have to admit I really like it. A few more examples and more documentation would help a lot (willing to help her). But conceptually really great.

While playing around with it, I wanted to reduce the time, my processing takes in order to test more variations on my logic. When using the distribute function, I am getting the following error:

Error: listen EADDRNOTAVAIL 127.190.121.182
    at Object._errnoException (util.js:992:11)
    at _exceptionWithHostPort (util.js:1014:20)
    at Server.setupListenHandle [as _listen2] (net.js:1338:19)
    at listenInCluster (net.js:1396:12)
    at doListen (net.js:1505:7)
    at _combinedTickCallback (internal/process/next_tick.js:141:11)
    at process._tickCallback (internal/process/next_tick.js:180:9)
    at Function.Module.runMain (module.js:695:11)
    at startup (bootstrap_node.js:191:16)
    at bootstrap_node.js:612:3

My Pipeline looks like this:

const nodeStream = readStream.pipe(JSONStream.parse());
DataStream.from(nodeStream)
  .each(validate)
  .catch(console.error)
  .remap(makeRemap())
  .distribute(cpus, subprocess)
  .reduce(reduceSum, result)
  .then(writeToFile)
  .then(() => console.log(result))
  .catch(console.error)
  .then(() => console.timeEnd('process'));

Some of the functions are not in this code, so here is the gist with everything:
https://gist.github.com/KeKs0r/f44834378254b7a6a1a953759467648d

Cant read remote file. UnhandledPromiseRejectionWarning: Error: Trailing quote on quoted field is malformed

Describe the bug
Showing error

To Reproduce

request
  .get("https://storage.cloud.google.com/***/some.csv") 
  .pipe(new StringStream()) 
  .CSVParse() // parse into objects
  .consume((object) => console.log("Row:", object)) 
  .then(() => console.log("all done"));

Error
(node:5521) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 336)
(node:5521) UnhandledPromiseRejectionWarning: Error: Trailing quote on quoted field is malformed
at /node_modules/scramjet-core/lib/util/promise-transform-stream.js:230:31
at processTicksAndRejections (internal/process/task_queues.js:89:5)
caused by:
at Object.chunk (
/node_modules/scramjet/lib/string-stream.js:57:52)

System:

  • OS: Mac
  • Node version: v12.1.0
  • Scramjet Version: 4.30.1

DataStream..pull tests fail in Travis

Describe the bug
Travis build fails on DataStream.pull.

The order of resolved elements is not correct - the streams are mixed, not resolved in parallel.

Expected behavior
The streams should resolve one by one.

Additional context
Local tests are passing...

Node v6 compatibility

As mentioned in #39 scramjet is not compatible with node v6. Amazon, Google and likes use old versions of Node (even though 6 end of support is coming April this year).

We should be able to provide a transpiled version for use with runtimes provided in cloud.

In a branch here: scramjet-core#feature/node6-support there's some work already done - a little more is required (especially support for node version >=8.0.0 <=8.6.0 where we have an issue appearing).

What needs be done:

  • remove circular dependency on scramjet (otherwise won't pass tests on travis)
  • fix issues appearing in node v8.
  • make tests on linked scramjet
  • remove nodeunit, rewrite tests to tape.

Question: StringStream parsing large CSV files and memory

I am trying to use StringStream to parse very large CSV files into JSON objects (over 100k to 500k rows) on AWS Lambda and then upload them to a third party service. The following is the code I am using:

    var counter = 0
    var header: string
    await StringStream.from(readableStream, { maxParallel: 2 })
      .lines('\n')
      .batch(5000)
      .map(async function(chunks: Array<string>) {
        var input = ''
        console.debug(`Chunks length: ${chunks.length}`)
        if (!header) {
          header = chunks[0]
          input = chunks.join('\n')
        } else {
          input = header + '\n' + chunks.join('\n')
        }
        const results = parse(input, {
          delimiter: ',',
          header: true,
          skipEmptyLines: true,
          worker: true,
	        step: function(results) {
		        console.log("Row:", results.data);
	        }
        })
        console.debug(`results loaded: ${results.data.length}`)
        counter = counter + results.data.length
        // upload to API
        await uploadEvents(events)
      })
      .run()
    console.log(`header: ${header}`)
    console.log(`counter: ${counter}`)

When I run this code on a file with 100k lines it maxes out my Lambda memory of 1024mb. If I remove the uploadEvents call it maxes out at around 300mb of memory.

My question is: is there a way to make this more memory efficient? Basically, I am thinking that there must be a way to pause the stream, upload the batch, then release the memory and upload the next batch.

I've tried various permutations but have had no luck, so any help here would be greatly appreciated.

Thanks in advance...

An in-range update of nodeunit-tape-compat is breaking the build 🚨

The devDependency nodeunit-tape-compat was updated from 1.1.3 to 1.1.4.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

nodeunit-tape-compat is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Accessing non-existent property 'WindowStream' of module exports inside circular dependency

I get this warning message every time Scramjet is imported? Seems to run ok however.

To reproduce..

test.js:

const Scramjet = require('scramjet')
$ node --trace-warnings test
(node:21638) Warning: Accessing non-existent property 'WindowStream' of module exports inside circular dependency
    at emitCircularRequireWarning (internal/modules/cjs/loader.js:823:11)
    at Object.get (internal/modules/cjs/loader.js:837:5)
    at Object.<anonymous> (/Users/max/test/node_modules/scramjet/lib/data-stream.js:1:72)
    at Module._compile (internal/modules/cjs/loader.js:1200:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1220:10)
    at Module.load (internal/modules/cjs/loader.js:1049:32)
    at Function.Module._load (internal/modules/cjs/loader.js:937:14)
    at Module.require (internal/modules/cjs/loader.js:1089:19)
    at require (internal/modules/cjs/helpers.js:73:18)
    at Object.<anonymous> (/Users/max/test/node_modules/scramjet/lib/index.js:15:17)

I'm using Scramjet v4.30.2 with Node v14.3.0 on MacOS v10.14.6.

Question: JSON Array File - Process records in parallel batches.

Hi,

Looking for a bit of advice. I have a file that contains a JSON array of data. This is held on AWS S3.

Format similar to [{"name": "paul"}, {"name": "bob"}, {"name": "stuart"}] but much more data/properties.

I want to stream that data and process a number of records in parallel. e.g. 100 at a time.

I've tried the below

 return DataStream
      .pipeline(
        stream,
        JSONStream.parse('*')
      )
      .setOptions({
        'maxParallel': 100
      })
      .each(async (record) => {
        return importer(record)
      })
      .run()
      .then(() => {
        console.log('Processed entire file')
      })
      .catch((err) => {
        console.error(err)
      })

I want the importer routine to run which will run through a lot of validation/processing before finally updating the database.

I need to do this in parallel for performance/efficiency eg. 100 at a time, and as soon as one is finished start processing the next available until all finished but never increasing above that 100 limit.

When all have finished processing the processed entire file message should be displayed.

Tried a few things but can't quite get it to work. Any help is much appreciated.

Thanks

Paul

Question: Skip first CSV line

My current Scramjet code look something like this

  StringStream.from(
    fs.createReadStream(
      ies.csv'
    )
  )
.lines()
.parse()
.setOptions()
.each()
.run()
.then()

How can I skip the first line in the CSV?
I've been reading for 30 minutes but I didn't find anything.

Thanks!

DataStream.slice(0) does not conform to documented or tested behavior

Basics
The documentation for DataStream.slice reads

Returns a stream consisting of an array of items with 0 to start omitted and length items after start included. Works similarily to Array.prototype.slice.

However the test cases and code do not reflect the fact that a call to slice() or slice(0, x) does not behave as expected.

To Reproduce
Both of the following tests demonstrate what I'm talking about:

DataStream.fromArray([1,2,3,4,5,6,7,8,9,0])
            .slice()
            .toArray()
            .then(arr => {
                test.deepEqual(arr, [1,2,3,4,5,6,7,8,9,0], "Shifted items should be same as starting items");
            });

DataStream.fromArray([1,2,3,4,5,6,7,8,9,0])
            .slice(0,3)
            .toArray()
            .then(arr => {
                test.deepEqual(arr, [1, 2, 3], "Shifted items should be starting with first element");
            })

I'll open a PR with additional tests demonstrating these.

System:

  • OS: Ubuntu Desktop 16.04
  • Node version: 10.11.0
  • Scramjet Version: 4.20.3, 4fb50f5

how can I pipe to regular node transformStream with a scramjet workflow?

Hi I have some code that looks like this.

async function server() {
  return await (StringStream.from(getCredential()) // create a DataStream
  .JSONParse()
  .map((res) => res.accessToken)
  .map(getWS) //getWS returns a `websocket-stream` 
  .do(onmessageSubscribeQuote))
  .pipe(myNodeTransFormStream)
}
server()

the call to pipe always breaks with a UnhandledPromiseRejectionWarning: Error: The "chunk" argument must be one of type string or Buffer. what am I missing?

PostgreSQL doesn't like scramjet

I have multiple CSV files that I'd like to import in PostgreSQL. Here is my code:

async function insert_countries() {
  return new Promise(async function(resolve, reject) {
    const { StringStream } = require('scramjet')
    const fs = require('fs')
    const path = require('path')

    const INSERT_ENTRY =
      'INSERT INTO countries (iso3, iso2, name, dialing_code, dialing_exit, population, area_km2) VALUES ($1, $2, $3, $4, $5, $6, $7);'
    StringStream.from(
      fs.createReadStream(
        'countries.csv'
      )
    )
      .lines()
      .parse(line => {
        // Each line need to be properly formated
        const entry = line.split(',') //comma split

        let reconstruction = []

        // country,country_code,external,iso2,iso3,population,area_km2

        /*       iso3	character varying(3) PRIMARY KEY,
        iso2	character varying(2),
        name	TEXT NOT NULL,
        dialing_code character varying(5),
        dialing_exit character varying(8)
        */
        // The following fields need formating
        reconstruction.push(entry[4])
        reconstruction.push(entry[3])
        reconstruction.push(entry[0])
        reconstruction.push(entry[1])
        reconstruction.push(entry[2])
        reconstruction.push(parseInt(entry[5]))
        reconstruction.push(parseInt(entry[6]))
        console.log(reconstruction)
        return reconstruction
      })
      .setOptions({ maxParallel: 1 }) // FIXME: Testing only
      .each(async reconstruction => {
        const client = await this.pg.connect()
        try {
          await client.query(INSERT_ENTRY, reconstruction)
        } catch (err) {
          console.log('Error while adding line...', err)
        } finally {
          client.release()
        }
      })
      .run()
      .then(
        () => {
          console.log('Read entire file.')
          return resolve(true)
        },
        e => console.log('Error while handling file.', err)
      )
  })
}

If I comment out the postgresql part where the insert happen, the CSV is read to the end. But with postgresql I see the vague [nodemon] app crashed - waiting for file changes before starting...

Is there an option of some kind I should be aware of or some limitation to make this compatible ?

Reading from subject or result of Datastream.tee() appears to fail silently beyond 32nd item.

Basics
Affects master and teed streams when tee() is called on the master.

Describe the bug
The process silently halts, ostensibly when attempting to read the 33rd item from either stream.

To Reproduce (self-contained except node/scramjet)

const scramjet = require("scramjet");

async function test()  {
    try {

        const minimaltestArr = [{x: 1}, {x: 2}, {x: 3}, {x: 4}, {x: 5}]; // Succeeds
        let thirtyTwoElements = Array.from(Array(32).keys()); //Succeeds
        let thirtyThreeElements = Array.from(Array(33).keys()); //Silently fails/exits, ostensibly after 32nd element (index 31)
        let fiftyElements = Array.from(Array(50).keys()); //Also fails after 32nd element


        const masterStream = scramjet.DataStream.from(fiftyElements);
        const teedStream = masterStream.tee((stream) => stream);

        let masterCount = 0;
        let teedCount = 0;

        ////Uncomment one of the following three blocks to test a scenario

        // Exits after hitting 32 elements from each
        const results = await Promise.all([
            masterStream.each((chunk => {console.log(`p ${masterCount}`);masterCount++;})).toArray(),
            teedStream.each((chunk => {console.log(`c ${teedCount}`);teedCount++;})).toArray()
        ])

        // // Exits after hitting 32 elements of masterStream
        // const masterResults = await masterStream.each((chunk => {console.log(`M ${masterCount}`);masterCount++;})).toArray();
        // const teedResults = await teedStream.each((chunk => {console.log(`T ${teedCount}`);teedCount++;})).toArray();

        // // Exits after hitting 32 elements of teedStream
        // const teedResults = await teedStream.each((chunk => {console.log(`T ${teedCount}`);teedCount++;})).toArray();
        // const masterResults = await masterStream.each((chunk => {console.log(`M ${masterCount}`);masterCount++;})).toArray();

        console.log("Done!");
        const breakpoint = ''; 
    } catch (e) {
    console.log(e);
    const breakpoint = ''
    }   
}

test();

Expected behavior
results should be an array with two elements, each consisting of an array of fifty integers (when using fiftyElements)

Test case
See code, above

System:

  • OS: Ubuntu 19.10
  • Node version: v10.15.2
  • Scramjet Version: 4.28.5

Additional context
Add any other context about the problem here.

BufferStream.from not running AsyncGenerator

Basics

Bluebird.resolve()
	.then(v => BufferStream.from(async function* ()
	{
		console.log(222)
		for await (const chunk of ipfs.cat(cid, { timeout }))
			{
				console.log(chunk.length)
				yield chunk
			}
		}, createWriteStream('./test/temp/temp3.png')))
		.tapCatch(e => console.error(e))
		.finally(() =>
		{
			console.log(777)
				return stop()
		})

Describe the bug

function at BufferStream.from didn't run

To Reproduce
Steps to reproduce the behavior:

  1. Checkout repo: '...'
  2. Install '....'
  3. Execute command '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Test case
If possible, please provide an automated test case to include, better yet in a forked scramjet repo in test/cases.

Screenshots
If applicable, add screenshots to help explain your problem.

System:

  • OS: [e.g. Ubuntu Server 18.04]
  • Node version: [e.g. v10.6.0]
  • Scramjet Version: [e.g. 4.19]

Additional context
Add any other context about the problem here.

maxParallel stalls when items are processed out of order

Copied from scramjet, raised there by @devinivy:

Say I would like to process some SQL tables two at a time. Each table is mapped into() a stream, and each of those streams will take variable time to complete. If table B ends before table A, then table C wont start processing until A also completes. Here's some code!

// A and B immediately start streaming.
// If B finishes streaming first, I would expect C to start streaming.
// Instead C doesn't start streaming until A also completes.

await DataStream.from(['A', 'B', 'C'])
    .setOptions({ maxParallel: 2 })
    .into(
        async (out, tableName) => await out.pull(streamTable(tableName)),
        new DataStream()
    )
    .run();

Firebase Compatibility

Thanks for the chat Michael! This issue relates to using ScramJet on Firebase Cloud Functions, which uses Nodejs 6.

Here's my index.ts file which I'm deploying to Firebase Cloud Functions:

import * as functions from 'firebase-functions';
import * as admin from "firebase-admin";

// Initialize Firebase App
admin.initializeApp()
const gcs = admin.storage();

// This is the function I want to send to Firebase Cloud Functions
export const generateCSV = functions.storage
    .object()
    .onFinalize(async object => { // <--- This is triggered whenever a file is uploaded to the storage bucket
        const bucket = gcs.bucket(object.bucket);
        const filePath = object.name as string;
        
        const { StringStream } = require("scramjet");

        StringStream.pipeline(
            await bucket.file(filePath).createReadStream(),      // fetch csv
            (stream : any) => stream.CSVParse({}) // parse into objects
        )
            .consume(async (row: Object) => await console.log("Row:", row))  // do whatever you like with the objects
            .then(() => console.log("all done"))

            // Note to M: Once I can start logging these rows, I plan on doing a call that will insert this data into a Firestore Database

        return 
    });

When executing the above function, the logs show the following error:

/user_code/node_modules/scramjet/node_modules/scramjet-core/lib/util/promise-transform-stream.js:153
    async whenRead(count) {
          ^^^^^^^^

SyntaxError: Unexpected identifier
    at createScript (vm.js:56:10)
    at Object.runInThisContext (vm.js:97:10)
    at Module._compile (module.js:549:28)
    at Object.Module._extensions..js (module.js:586:10)
    at Module.load (module.js:494:32)
    at tryModuleLoad (module.js:453:12)
    at Function.Module._load (module.js:445:3)
    at Module.require (module.js:504:17)
    at require (internal/module.js:20:19)
    at Object.<anonymous> (/user_code/node_modules/scramjet/node_modules/scramjet-core/lib/index.js:2:19)

I would LOVE to be able to use ScramJet on Firebase Cloud Functions :)

DataStream.batch() streaming data from BigQuery

If I remove the .batch() or .timeBatch() line then it works fine.

With it I get this error:

Cannot read property 'value' of undefined

      at node_modules/.pnpm/[email protected]/node_modules/scramjet-core/lib/util/mk-transform.js:59:44
      at processTicksAndRejections (internal/process/task_queues.js:97:5)
        caused by:
      at DataStream.<anonymous> (src/bq-to-mssql.ts:99:55)
        --- raised in DataStream(15) constructed ---
      at new PromiseTransformStream (node_modules/.pnpm/[email protected]/node_modules/scramjet-core/lib/util/promise-transform-stream.js:65:27)
      at new DataStream (node_modules/.pnpm/[email protected]/node_modules/scramjet-core/lib/data-stream.js:43:9)    
      at DataStream.map (node_modules/.pnpm/[email protected]/node_modules/scramjet-core/lib/data-stream.js:186:26) 

To Reproduce

import { BigQuery } from '@google-cloud/bigquery';

const bq = new BigQuery();

async function bqStreamToMSSQL(
  trx: Knex.Transaction<any, any>,
  table: string,
  query: string
) {

// BigQuery.createQueryStream: (options?: Query) => ResourceStream<any>
  return await bq
    .createQueryStream(query)
    .pipe(new DataStream({ maxParallel: 1 }))
    .timeBatch(7000, 10000)
    .map((row) => ({ ...row, at_ingest: row.at_ingest.value }))
    .do(async (row) => {
      await trx
        .delete()
        .from(table)
        .where('pk', '=', row.pk)
        .andWhere('at_ingest', '<=', row.at_ingest);

      await trx.insert(row).into(table);
    })
    .run();
}

maybe it is a limitation of the BigQuery method or maybe I'm doing this wrong

An in-range update of scramjet-core is breaking the build 🚨

The dependency scramjet-core was updated from 4.21.0 to 4.22.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

scramjet-core is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • βœ… continuous-integration/appveyor/branch: AppVeyor build succeeded (Details).
  • ❌ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Async iterators should work in flatMap

Is your feature request related to a problem? Please describe.
flatMap is a powerfull function, but should support async iteration.

There may be additional methods needing this.

Describe the solution you'd like

This simple example should not error:

DataStream.from(generator1)
    .flatMap(async function*(chunk) { yield* await chunk.items(); })

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.