/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* @author Jorge Bay Gondra
*/
'use strict';
const EventEmitter = require('events');
const WebSocket = require('ws');
const util = require('util');
const utils = require('../utils');
const serializer = require('../structure/io/graph-serializer');
const ResultSet = require('./result-set');
const responseStatusCode = {
success: 200,
noContent: 204,
partialContent: 206,
authenticationChallenge: 407,
};
const defaultMimeType = 'application/vnd.gremlin-v3.0+json';
const pingIntervalDelay = 60 * 1000;
const pongTimeoutDelay = 30 * 1000;
/**
* Represents a single connection to a Gremlin Server.
*/
class Connection extends EventEmitter {
/**
* Creates a new instance of {@link Connection}.
* @param {String} url The resource uri.
* @param {Object} [options] The connection options.
* @param {Array} [options.ca] Trusted certificates.
* @param {String|Array|Buffer} [options.cert] The certificate key.
* @param {String} [options.mimeType] The mime type to use.
* @param {String|Buffer} [options.pfx] The private key, certificate, and CA certs.
* @param {GraphSONReader} [options.reader] The reader to use.
* @param {Boolean} [options.rejectUnauthorized] Determines whether to verify or not the server certificate.
* @param {String} [options.traversalSource] The traversal source. Defaults to: 'g'.
* @param {GraphSONWriter} [options.writer] The writer to use.
* @param {Authenticator} [options.authenticator] The authentication handler to use.
* @param {Object} [options.headers] An associative array containing the additional header key/values for the initial request.
* @param {Boolean} [options.pingEnabled] Setup ping interval. Defaults to: true.
* @param {Number} [options.pingInterval] Ping request interval in ms if ping enabled. Defaults to: 60000.
* @param {Number} [options.pongTimeout] Timeout of pong response in ms after sending a ping. Defaults to: 30000.
* @param {Boolean} [options.connectOnStartup] Open websocket on startup. Defaults to: true.
* @constructor
*/
constructor(url, options) {
super();
this.url = url;
this.options = options = options || {};
// A map containing the request id and the handler
this._responseHandlers = {};
this._reader = options.reader || new serializer.GraphSONReader();
this._writer = options.writer || new serializer.GraphSONWriter();
this._openPromise = null;
this._openCallback = null;
this._closePromise = null;
this._closeCallback = null;
this._pingInterval = null;
this._pongTimeout = null;
/**
* Gets the MIME type.
* @type {String}
*/
this.mimeType = options.mimeType || defaultMimeType;
this._header = String.fromCharCode(this.mimeType.length) + this.mimeType;
this.isOpen = false;
this.traversalSource = options.traversalSource || 'g';
this._authenticator = options.authenticator;
this._pingEnabled = this.options.pingEnabled === false ? false : true;
this._pingIntervalDelay = this.options.pingInterval || pingIntervalDelay;
this._pongTimeoutDelay = this.options.pongTimeout || pongTimeoutDelay;
if (this.options.connectOnStartup !== false) {
this.open();
}
}
/**
* Opens the connection, if its not already opened.
* @returns {Promise}
*/
open() {
if (this.isOpen) {
return Promise.resolve();
}
if (this._openPromise) {
return this._openPromise;
}
this.emit('log', `ws open`);
this._ws = new WebSocket(this.url, {
headers: this.options.headers,
ca: this.options.ca,
cert: this.options.cert,
pfx: this.options.pfx,
rejectUnauthorized: this.options.rejectUnauthorized
});
this._ws.on('message', (data) => this._handleMessage(data));
this._ws.on('error', (err) => this._handleError(err));
this._ws.on('close', (code, message) => this._handleClose(code, message));
this._ws.on('pong', () => {
this.emit('log', 'ws pong received');
if (this._pongTimeout) {
clearTimeout(this._pongTimeout);
this._pongTimeout = null;
}
});
this._ws.on('ping', () => {
this.emit('log', 'ws ping received');
this._ws.pong();
});
return this._openPromise = new Promise((resolve, reject) => {
this._ws.on('open', () => {
this.isOpen = true;
if (this._pingEnabled) {
this._pingHeartbeat();
}
resolve();
});
});
}
/** @override */
submit(bytecode, op, args, requestId, processor) {
return this.open().then(() => new Promise((resolve, reject) => {
if (requestId === null || requestId === undefined) {
requestId = utils.getUuid();
this._responseHandlers[requestId] = {
callback: (err, result) => err ? reject(err) : resolve(result),
result: null
};
}
const message = Buffer.from(this._header + JSON.stringify(this._getRequest(requestId, bytecode, op, args, processor)));
this._ws.send(message);
}));
}
_getRequest(id, bytecode, op, args, processor) {
if (args) {
args = this._adaptArgs(args, true);
}
return ({
'requestId': { '@type': 'g:UUID', '@value': id },
'op': op || 'bytecode',
// if using op eval need to ensure processor stays unset if caller didn't set it.
'processor': (!processor && op !== 'eval') ? 'traversal' : processor,
'args': args || {
'gremlin': this._writer.adaptObject(bytecode),
'aliases': { 'g': this.traversalSource }
}
});
}
_pingHeartbeat() {
if (this._pingInterval) {
clearInterval(this._pingInterval);
this._pingInterval = null;
}
this._pingInterval = setInterval(() => {
if (this.isOpen === false) {
// in case of if not open..
if (this._pingInterval) {
clearInterval(this._pingInterval);
this._pingInterval = null;
}
}
this._pongTimeout = setTimeout(() => {
this._ws.terminate();
}, this._pongTimeoutDelay);
this._ws.ping();
}, this._pingIntervalDelay);
}
_handleError(err) {
this.emit('log', `ws error ${err}`);
this._cleanupWebsocket();
this.emit('error', err);
}
_handleClose(code, message) {
this.emit('log', `ws close code=${code} message=${message}`);
this._cleanupWebsocket();
if (this._closeCallback) {
this._closeCallback();
}
this.emit('close', code, message);
}
_handleMessage(data) {
const response = this._reader.read(JSON.parse(data.toString()));
if (response.requestId === null || response.requestId === undefined) {
// There was a serialization issue on the server that prevented the parsing of the request id
// We invoke any of the pending handlers with an error
Object.keys(this._responseHandlers).forEach(requestId => {
const handler = this._responseHandlers[requestId];
this._clearHandler(requestId);
if (response.status !== undefined && response.status.message) {
return handler.callback(
new Error(util.format(
'Server error (no request information): %s (%d)', response.status.message, response.status.code)));
} else {
return handler.callback(new Error(util.format('Server error (no request information): %j', response)));
}
});
return;
}
const handler = this._responseHandlers[response.requestId];
if (!handler) {
// The handler for a given request id was not found
// It was probably invoked earlier due to a serialization issue.
return;
}
if (response.status.code === responseStatusCode.authenticationChallenge && this._authenticator) {
this._authenticator.evaluateChallenge(response.result.data).then(res => {
return this.submit(null, 'authentication', res, response.requestId);
}).catch(handler.callback);
return;
}
else if (response.status.code >= 400) {
// callback in error
return handler.callback(
new Error(util.format('Server error: %s (%d)', response.status.message, response.status.code)));
}
switch (response.status.code) {
case responseStatusCode.noContent:
this._clearHandler(response.requestId);
return handler.callback(null, new ResultSet(utils.emptyArray));
case responseStatusCode.partialContent:
handler.result = handler.result || [];
handler.result.push.apply(handler.result, response.result.data);
break;
default:
if (handler.result) {
handler.result.push.apply(handler.result, response.result.data);
}
else {
handler.result = response.result.data;
}
this._clearHandler(response.requestId);
return handler.callback(null, new ResultSet(handler.result));
}
}
/**
* clean websocket context
*/
_cleanupWebsocket() {
if (this._pingInterval) {
clearInterval(this._pingInterval);
}
this._pingInterval = null;
if (this._pongTimeout) {
clearTimeout(this._pongTimeout);
}
this._pongTimeout = null;
this._ws.removeAllListeners();
this._openPromise = null;
this._closePromise = null;
this.isOpen = false;
}
/**
* Clears the internal state containing the callback and result buffer of a given request.
* @param requestId
* @private
*/
_clearHandler(requestId) {
delete this._responseHandlers[requestId];
}
/**
* Takes the given args map and ensures all arguments are passed through to _write.adaptObject
* @param {Object} args Map of arguments to process.
* @param {Boolean} protocolLevel Determines whether it's a protocol level binding.
* @returns {Object}
* @private
*/
_adaptArgs(args, protocolLevel) {
if (args instanceof Object) {
let newObj = {};
Object.keys(args).forEach((key) => {
// bindings key (at the protocol-level needs special handling. without this, it wraps the generated Map
// in another map for types like EnumValue. Could be a nicer way to do this but for now it's solving the
// problem with script submission of non JSON native types
if (protocolLevel && key === 'bindings')
newObj[key] = this._adaptArgs(args[key], false);
else
newObj[key] = this._writer.adaptObject(args[key]);
});
return newObj;
}
return args;
}
/**
* Closes the Connection.
* @return {Promise}
*/
close() {
if (this.isOpen === false) {
return Promise.resolve();
}
if (!this._closePromise) {
this._closePromise = new Promise(resolve => {
this._closeCallback = resolve;
this._ws.close();
});
}
return this._closePromise;
}
}
module.exports = Connection;