From 99849b29e002a0db1a087ea1f2fefedc7dd8dcf3 Mon Sep 17 00:00:00 2001 From: Alexander Wolden Date: Thu, 4 Jan 2018 15:43:27 -0800 Subject: [PATCH] Delta support (#126) * initial delta support * added tests for delta feature * updated documentation and default * fixed type in config * fixed eslint errors * switching travis to modern node * Additional test for test coverage to pass. * Additional validateConfig() test. * fixed mislabeled variable * fixed sync issue caused by validation of deleted instances --- .travis.yml | 3 +- README.md | 1 + package.json | 2 +- src/EurekaClient.js | 129 +++++++++++++++++++---- src/defaultConfig.js | 1 + src/deltaUtils.js | 17 ++++ test/EurekaClient.test.js | 209 +++++++++++++++++++++++++++++++++++--- test/deltaUtil.test.js | 39 +++++++ 8 files changed, 363 insertions(+), 38 deletions(-) create mode 100644 src/deltaUtils.js create mode 100644 test/deltaUtil.test.js diff --git a/.travis.yml b/.travis.yml index 3c84329..09e50ba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,10 +6,9 @@ services: - docker node_js: - - "0.10" - - "0.12" - "4" - "6" + - "8" script: - npm run test && npm run integration diff --git a/README.md b/README.md index 165065d..6d1302a 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,7 @@ option | default value | description ---- | --- | --- `requestMiddleware` | noop | Custom middleware function to modify the outgoing [request](https://www.npmjs.com/package/request) to eureka `logger` | console logging | logger implementation for the client to use +`shouldUseDelta` | false | Experimental mode to fetch deltas from eureka instead of full registry on update `eureka.maxRetries` | `3` | Number of times to retry all requests to eureka `eureka.requestRetryDelay` | `500` | milliseconds to wait between retries. This will be multiplied by the # of failed retries. `eureka.heartbeatInterval` | `30000` | milliseconds to wait between heartbeats diff --git a/package.json b/package.json index 6766663..e3fcb94 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "eureka-js-client", - "version": "4.3.0", + "version": "4.4.0", "description": "A JavaScript implementation the Netflix OSS service registry, Eureka.", "main": "lib/index.js", "scripts": { diff --git a/src/EurekaClient.js b/src/EurekaClient.js index feb29a2..2208fef 100644 --- a/src/EurekaClient.js +++ b/src/EurekaClient.js @@ -1,7 +1,8 @@ import request from 'request'; import fs from 'fs'; import yaml from 'js-yaml'; -import merge from 'lodash/merge'; +import { merge, findIndex } from 'lodash'; +import { normalizeDelta, findInstance } from './deltaUtils'; import path from 'path'; import { series, waterfall } from 'async'; import { EventEmitter } from 'events'; @@ -69,6 +70,8 @@ export default class Eureka extends EventEmitter { this.requestMiddleware = this.config.requestMiddleware; + this.hasFullRegistry = false; + if (this.amazonDataCenter) { this.metadataClient = new AwsMetadata({ logger: this.logger, @@ -328,9 +331,20 @@ export default class Eureka extends EventEmitter { } /* - Retrieves all applications registered with the Eureka server + Orchestrates fetching registry */ fetchRegistry(callback = noop) { + if (this.config.shouldUseDelta && this.hasFullRegistry) { + this.fetchDelta(callback); + } else { + this.fetchFullRegistry(callback); + } + } + + /* + Retrieves all applications registered with the Eureka server + */ + fetchFullRegistry(callback = noop) { this.eurekaRequest({ uri: '', headers: { @@ -338,22 +352,51 @@ export default class Eureka extends EventEmitter { }, }, (error, response, body) => { if (!error && response.statusCode === 200) { - this.logger.debug('retrieved registry successfully'); + this.logger.debug('retrieved full registry successfully'); try { this.transformRegistry(JSON.parse(body)); } catch (ex) { return callback(ex); } this.emit('registryUpdated'); + this.hasFullRegistry = true; return callback(null); } else if (error) { this.logger.warn('Error fetching registry', error); return callback(error); } - callback(new Error('Unable to retrieve registry from Eureka server')); + callback(new Error('Unable to retrieve full registry from Eureka server')); }); } + /* + Retrieves all applications registered with the Eureka server + */ + fetchDelta(callback = noop) { + this.eurekaRequest({ + uri: 'delta', + headers: { + Accept: 'application/json', + }, + }, (error, response, body) => { + if (!error && response.statusCode === 200) { + this.logger.debug('retrieved delta successfully'); + let applications; + try { + const jsonBody = JSON.parse(body); + applications = jsonBody.applications.application; + this.handleDelta(this.cache, applications); + return callback(null); + } catch (ex) { + return callback(ex); + } + } else if (error) { + this.logger.warn('Error fetching delta registry', error); + return callback(error); + } + callback(new Error('Unable to retrieve delta registry from Eureka server')); + }); + } /* Transforms the given registry and caches the registry locally */ @@ -382,24 +425,11 @@ export default class Eureka extends EventEmitter { */ transformApp(app, cache) { if (app.instance.length) { - const instances = app.instance.filter((instance) => (this.validateInstance(instance))); - cache.app[app.name.toUpperCase()] = instances; - instances.forEach((inst) => { - const vipAddresses = this.splitVipAddress(inst.vipAddress); - vipAddresses.forEach((vipAddress) => { - if (!cache.vip[vipAddress]) { - cache.vip[vipAddress] = []; - } - cache.vip[vipAddress].push(inst); - }); - }); + app.instance + .filter(this.validateInstance.bind(this)) + .forEach((inst) => this.addInstance(cache, inst)); } else if (this.validateInstance(app.instance)) { - const instances = [app.instance]; - const vipAddresses = this.splitVipAddress(app.instance.vipAddress); - vipAddresses.forEach((vipAddress) => { - cache.vip[vipAddress] = instances; - }); - cache.app[app.name.toUpperCase()] = instances; + this.addInstance(cache, app.instance); } } @@ -421,6 +451,63 @@ export default class Eureka extends EventEmitter { return vipAddress.split(','); } + handleDelta(cache, appDelta) { + const delta = normalizeDelta(appDelta); + delta.forEach((app) => { + app.instance.forEach((instance) => { + switch (instance.actionType) { + case 'ADDED': this.addInstance(cache, instance); break; + case 'MODIFIED': this.modifyInstance(cache, instance); break; + case 'DELETED': this.deleteInstance(cache, instance); break; + default: this.logger.warn('Unknown delta actionType', instance.actionType); break; + } + }); + }); + } + + addInstance(cache, instance) { + if (!this.validateInstance(instance)) return; + const vipAddresses = this.splitVipAddress(instance.vipAddress); + const appName = instance.app.toUpperCase(); + vipAddresses.forEach((vipAddress) => { + const alreadyContains = findIndex(cache.vip[vipAddress], findInstance(instance)) > -1; + if (alreadyContains) return; + if (!cache.vip[vipAddress]) { + cache.vip[vipAddress] = []; + } + cache.vip[vipAddress].push(instance); + }); + if (!cache.app[appName]) cache.app[appName] = []; + const alreadyContains = findIndex(cache.app[appName], findInstance(instance)) > -1; + if (alreadyContains) return; + cache.app[appName].push(instance); + } + + modifyInstance(cache, instance) { + if (!this.validateInstance(instance)) return; + const vipAddresses = this.splitVipAddress(instance.vipAddress); + const appName = instance.app.toUpperCase(); + vipAddresses.forEach((vipAddress) => { + const index = findIndex(cache.vip[vipAddress], findInstance(instance)); + if (index > -1) cache.vip[vipAddress].splice(index, 1, instance); + else this.addInstance(cache, instance); + }); + const index = findIndex(cache.app[appName], findInstance(instance)); + if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1, instance); + else this.addInstance(cache, instance); + } + + deleteInstance(cache, instance) { + const vipAddresses = this.splitVipAddress(instance.vipAddress); + const appName = instance.app.toUpperCase(); + vipAddresses.forEach((vipAddress) => { + const index = findIndex(cache.vip[vipAddress], findInstance(instance)); + if (index > -1) cache.vip[vipAddress].splice(index, 1); + }); + const index = findIndex(cache.app[appName], findInstance(instance)); + if (index > -1) cache.app[appName].splice(cache.vip[instance.vipAddress], 1); + } + /* Fetches the metadata using the built-in client and updates the instance configuration with the hostname and IP address. If the value of the config diff --git a/src/defaultConfig.js b/src/defaultConfig.js index f89d7f7..3c97d91 100644 --- a/src/defaultConfig.js +++ b/src/defaultConfig.js @@ -1,6 +1,7 @@ // Default configuration values: export default { requestMiddleware: (request, done) => done(request), + shouldUseDelta: false, eureka: { heartbeatInterval: 30000, registryFetchInterval: 30000, diff --git a/src/deltaUtils.js b/src/deltaUtils.js new file mode 100644 index 0000000..a7bbe96 --- /dev/null +++ b/src/deltaUtils.js @@ -0,0 +1,17 @@ +/* + General utilities for handling processing of delta changes from eureka. +*/ +export function arrayOrObj(mysteryValue) { + return Array.isArray(mysteryValue) ? mysteryValue : [mysteryValue]; +} + +export function findInstance(a) { + return b => a.hostName === b.hostName && a.port.$ === b.port.$; +} + +export function normalizeDelta(appDelta) { + return arrayOrObj(appDelta).map((app) => { + app.instance = arrayOrObj(app.instance); + return app; + }); +} diff --git a/test/EurekaClient.test.js b/test/EurekaClient.test.js index 9e77801..f1e0d0d 100644 --- a/test/EurekaClient.test.js +++ b/test/EurekaClient.test.js @@ -1,4 +1,4 @@ -/* eslint-disable no-unused-expressions */ +/* eslint-disable no-unused-expressions, max-len */ import sinon from 'sinon'; import chai, { expect } from 'chai'; import sinonChai from 'sinon-chai'; @@ -615,6 +615,14 @@ describe('Eureka client', () => { } expect(badConfig).to.throw(TypeError); }); + + it('should throw an exception with an invalid request middleware', () => { + function badConfig() { + config.requestMiddleware = 'invalid middleware'; + return new Eureka(config); + } + expect(badConfig).to.throw(TypeError); + }); }); describe('getInstancesByAppId()', () => { @@ -680,11 +688,13 @@ describe('Eureka client', () => { config = makeConfig(); client = new Eureka(config); sinon.stub(client, 'transformRegistry'); + sinon.stub(client, 'handleDelta'); }); afterEach(() => { request.get.restore(); client.transformRegistry.restore(); + client.handleDelta.restore(); }); it('should should trigger registryUpdated event', () => { @@ -709,13 +719,41 @@ describe('Eureka client', () => { expect(registryCb).to.have.been.calledWithMatch(null); }); + it('should call registry URI for delta', () => { + sinon.stub(request, 'get').yields(null, { statusCode: 200 }, '{ "applications": {} }'); + const registryCb = sinon.spy(); + client.config.shouldUseDelta = true; + client.hasFullRegistry = true; + client.fetchRegistry(registryCb); + + expect(request.get).to.have.been.calledWithMatch({ + baseUrl: 'http://127.0.0.1:9999/eureka/v2/apps/', + uri: 'delta', + headers: { Accept: 'application/json' }, + }); + + expect(registryCb).to.have.been.calledWithMatch(null); + }); + it('should throw error for non-200 response', () => { sinon.stub(request, 'get').yields(null, { statusCode: 500 }, null); const registryCb = sinon.spy(); client.fetchRegistry(registryCb); expect(registryCb).to.have.been.calledWithMatch({ - message: 'Unable to retrieve registry from Eureka server', + message: 'Unable to retrieve full registry from Eureka server', + }); + }); + + it('should throw error for non-200 response for delta', () => { + sinon.stub(request, 'get').yields(null, { statusCode: 500 }, null); + const registryCb = sinon.spy(); + client.config.shouldUseDelta = true; + client.hasFullRegistry = true; + client.fetchRegistry(registryCb); + + expect(registryCb).to.have.been.calledWithMatch({ + message: 'Unable to retrieve delta registry from Eureka server', }); }); @@ -727,6 +765,16 @@ describe('Eureka client', () => { expect(registryCb).to.have.been.calledWithMatch({ message: 'request error' }); }); + it('should throw error for request error for delta request', () => { + sinon.stub(request, 'get').yields(new Error('request error'), null, null); + const registryCb = sinon.spy(); + client.config.shouldUseDelta = true; + client.hasFullRegistry = true; + client.fetchRegistry(registryCb); + + expect(registryCb).to.have.been.calledWithMatch({ message: 'request error' }); + }); + it('should throw error on invalid JSON', () => { sinon.stub(request, 'get').yields(null, { statusCode: 200 }, '{ blah'); const registryCb = sinon.spy(); @@ -734,6 +782,16 @@ describe('Eureka client', () => { expect(registryCb).to.have.been.calledWith(new SyntaxError()); }); + + it('should throw error on invalid JSON for delta request', () => { + sinon.stub(request, 'get').yields(null, { statusCode: 200 }, '{ blah'); + const registryCb = sinon.spy(); + client.config.shouldUseDelta = true; + client.hasFullRegistry = true; + client.fetchRegistry(registryCb); + + expect(registryCb).to.have.been.calledWith(new SyntaxError()); + }); }); describe('transformRegistry()', () => { @@ -743,6 +801,8 @@ describe('Eureka client', () => { let instance1; let instance2; let instance3; + let instance4; + let instance5; let app1; let app2; let app3; @@ -751,15 +811,24 @@ describe('Eureka client', () => { registry = { applications: { application: {} }, }; - instance1 = { host: '127.0.0.1', port: 1000, vipAddress: 'vip1', status: 'UP' }; - instance2 = { host: '127.0.0.2', port: 2000, vipAddress: 'vip2', status: 'UP' }; - instance3 = { host: '127.0.0.2', port: 2000, vipAddress: 'vip2', status: 'UP' }; + instance1 = { hostName: '127.0.0.1', port: { $: 1000 }, app: 'theapp', vipAddress: 'vip1', status: 'UP' }; + instance2 = { hostName: '127.0.0.2', port: { $: 2000 }, app: 'theapptwo', vipAddress: 'vip2', status: 'UP' }; + instance3 = { hostName: '127.0.0.3', port: { $: 2000 }, app: 'theapp', vipAddress: 'vip2', status: 'UP' }; + instance4 = { hostName: '127.0.0.4', port: { $: 2000 }, app: 'theappthree', vipAddress: 'vip3', status: 'UP' }; + instance5 = { hostName: '127.0.0.5', port: { $: 2000 }, app: 'theappthree', vipAddress: 'vip2', status: 'UP' }; + app1 = { name: 'theapp', instance: instance1 }; app2 = { name: 'theapptwo', instance: [instance2, instance3] }; - app3 = { name: 'theappthree', instance: [instance1, instance3] }; + app3 = { name: 'theappthree', instance: [instance5, instance4] }; client = new Eureka(config); }); + it('should noop if empty registry', () => { + client.transformRegistry(undefined); + expect(client.cache.vip).to.be.empty; + expect(client.cache.app).to.be.empty; + }); + it('should return clear the cache if no applications exist', () => { registry.applications.application = null; client.transformRegistry(registry); @@ -777,7 +846,7 @@ describe('Eureka client', () => { it('should transform a registry with two or more apps', () => { registry.applications.application = [app1, app2]; client.transformRegistry(registry); - expect(client.cache.app[app2.name.toUpperCase()].length).to.equal(2); + expect(client.cache.app[app1.name.toUpperCase()].length).to.equal(2); expect(client.cache.vip[instance2.vipAddress].length).to.equal(2); }); @@ -785,8 +854,8 @@ describe('Eureka client', () => { registry.applications.application = [app3]; client.transformRegistry(registry); expect(client.cache.app[app3.name.toUpperCase()].length).to.equal(2); - expect(client.cache.vip[instance1.vipAddress].length).to.equal(1); - expect(client.cache.vip[instance3.vipAddress].length).to.equal(1); + expect(client.cache.vip[instance5.vipAddress].length).to.equal(1); + expect(client.cache.vip[instance4.vipAddress].length).to.equal(1); }); }); @@ -809,11 +878,11 @@ describe('Eureka client', () => { client = new Eureka(config); theVip = 'theVip'; multiVip = 'fooVip,barVip'; - instance1 = { host: '127.0.0.1', port: 1000, vipAddress: theVip, status: 'UP' }; - instance2 = { host: '127.0.0.2', port: 2000, vipAddress: theVip, status: 'UP' }; - instance3 = { host: '127.0.0.2', port: 2000, vipAddress: multiVip, status: 'UP' }; - instance4 = { host: '127.0.0.2', port: 2000, vipAddress: void 0, status: 'UP' }; - downInstance = { host: '127.0.0.2', port: 2000, vipAddress: theVip, status: 'DOWN' }; + instance1 = { hostName: '127.0.0.1', port: 1000, vipAddress: theVip, app: 'theapp', status: 'UP' }; + instance2 = { hostName: '127.0.0.2', port: 2000, vipAddress: theVip, app: 'theapp', status: 'UP' }; + instance3 = { hostName: '127.0.0.5', port: 2000, vipAddress: multiVip, app: 'theapp', status: 'UP' }; + instance4 = { hostName: '127.0.0.6', port: 2000, vipAddress: void 0, app: 'theapp', status: 'UP' }; + downInstance = { hostName: '127.0.0.7', port: 2000, app: 'theapp', vipAddress: theVip, status: 'DOWN' }; app = { name: 'theapp' }; cache = { app: {}, vip: {} }; }); @@ -1050,4 +1119,116 @@ describe('Eureka client', () => { }); }); }); + + + describe('handleDelta()', () => { + let client; + beforeEach(() => { + const config = makeConfig({ shouldUseDelta: true }); + client = new Eureka(config); + }); + + it('should add instances', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'ADDED' }, + ], + }, + ]; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(1); + expect(client.cache.app.THEAPP).to.have.length(1); + }); + + it('should handle duplicate instances on add', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'ADDED' }, + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'ADDED' }, + ], + }, + ]; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(1); + expect(client.cache.app.THEAPP).to.have.length(1); + }); + + it('should modify instances', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'MODIFIED', newProp: 'foo' }, + ], + }, + ]; + const original = { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'MODIFIED' }; + client.cache = { + app: { THEAPP: [original] }, + vip: { thevip: [original] }, + }; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(1); + expect(client.cache.app.THEAPP).to.have.length(1); + expect(client.cache.vip.thevip[0]).to.have.property('newProp'); + expect(client.cache.app.THEAPP[0]).to.have.property('newProp'); + }); + + it('should add if instance doesnt exist when modifying', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'MODIFIED', newProp: 'foo' }, + ], + }, + ]; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(1); + expect(client.cache.app.THEAPP).to.have.length(1); + expect(client.cache.vip.thevip[0]).to.have.property('newProp'); + expect(client.cache.app.THEAPP[0]).to.have.property('newProp'); + }); + + it('should delete instances', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'DELETED', newProp: 'foo' }, + ], + }, + ]; + const original = { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'ADDED' }; + client.cache = { + app: { THEAPP: [original] }, + vip: { thevip: [original] }, + }; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(0); + expect(client.cache.app.THEAPP).to.have.length(0); + }); + + it('should not delete instances if they do not exist', () => { + const appDelta = [ + { + instance: [ + { hostName: '127.0.0.1', port: { $: 1000 }, app: 'THEAPP', vipAddress: 'thevip', status: 'UP', actionType: 'DELETED', newProp: 'foo' }, + ], + }, + ]; + client.cache = { + app: { THEAPP: [] }, + vip: { thevip: [] }, + }; + + client.handleDelta(client.cache, appDelta); + expect(client.cache.vip.thevip).to.have.length(0); + expect(client.cache.app.THEAPP).to.have.length(0); + }); + }); }); diff --git a/test/deltaUtil.test.js b/test/deltaUtil.test.js new file mode 100644 index 0000000..cb80a6a --- /dev/null +++ b/test/deltaUtil.test.js @@ -0,0 +1,39 @@ +import { expect } from 'chai'; +import { arrayOrObj, findInstance, normalizeDelta } from '../src/deltaUtils'; + +describe('deltaUtils', () => { + describe('arrayOrObj', () => { + it('should return same array if passed an array', () => { + const arr = ['foo']; + expect(arrayOrObj(arr)).to.equal(arr); + }); + it('should return an array containing obj', () => { + const obj = {}; + expect(arrayOrObj(obj)[0]).to.equal(obj); + }); + }); + describe('findInstance', () => { + it('should return true if objects match', () => { + const obj1 = { hostName: 'foo', port: { $: '6969' } }; + const obj2 = { hostName: 'foo', port: { $: '6969' } }; + expect(findInstance(obj1)(obj2)).to.equal(true); + }); + it('should return false if objects do not match', () => { + const obj1 = { hostName: 'foo', port: { $: '6969' } }; + const obj2 = { hostName: 'bar', port: { $: '1111' } }; + expect(findInstance(obj1)(obj2)).to.equal(false); + }); + }); + describe('normalizeDelta', () => { + it('should normalize nested objs to arrays', () => { + const delta = { + instance: { + hostName: 'foo', port: { $: '6969' }, + }, + }; + const normalized = normalizeDelta(delta); + expect(normalized).to.be.an('array'); + expect(normalized[0].instance).to.be.an('array'); + }); + }); +});