Comparison between different NoSQL technologies

 

Relational databases were not designed to cope with the scale and agility challenges that face modern applications, nor were they built to take advantage of the commodity storage and processing power available today.

NoSQL encompasses a wide variety of different database technologies that were developed in response to the demands presented in building modern applications:

The Benefits of NoSQL

When compared to relational databases, NoSQL databases are more scalable and provide superior performance, and their data model addresses several issues that the relational model is not designed to address:

  • Supports large volumes of dynamic schema (structured, semi-structured, and unstructured data). Using NoSQL is especially useful in agile development environments where changes to schema happen frequently which would require migration of the entire database involving significant downtime
  • Automatic Sharding – NoSQL databases, on the other hand, usually support auto-sharding, meaning that they natively and automatically spread data across an arbitrary number of servers, without requiring the application to even be aware of the composition of the server pool. Data and query load are automatically balanced across servers, and when a server goes down, it can be quickly and transparently replaced with no application disruption.
  • Replication

    Most NoSQL databases also support automatic database replication to maintain availability in the event of outages or planned maintenance events. More sophisticated NoSQL databases are fully self-healing, offering automated failover and recovery, as well as the ability to distribute the database across multiple geographic regions to withstand regional failures and enable data localization. Unlike relational databases, NoSQL databases generally have no requirement for separate applications or expensive add-ons to implement replication.

  • Integrated Caching

    Many NoSQL database technologies have excellent integrated caching capabilities, keeping frequently-used data in system memory as much as possible and removing the need for a separate caching layer. Some NoSQL databases also offer fully managed, integrated in-memory database management layer for workloads demanding the highest throughput and lowest latency.

NoSQL Database Types

  • Key-value stores are the simplest NoSQL databases. Every single item in the database is stored as an attribute name (or ‘key’), together with its value. Examples of key-value stores are Riak and Berkeley DB. Some key-value stores, such as Redis, allow each value to have a type, such as ‘integer’, which adds functionality.
  • Wide-column stores such as Cassandra and HBase are optimized for queries over large datasets, and store columns of data together, instead of rows.
  • Document databases pair each key with a complex data structure known as a document. Documents can contain many different key-value pairs, or key-array pairs, or even nested documents.
  • Graph stores are used to store information about networks of data, such as social connections. Graph stores include Titan, Neo4J and Giraph.

Criteria for evaulating NoSQL technologies

  1. Data/Storage model: How is the data stored in the nosql database.
  2. Use cases (suitable and not suitable): What are the use cases that can be supported due to the underlying storage model
  3. Limitations: What are the limitations due to the underlying storage model
  4. Supported data types: What are the supported data types for storing data
  5. Storage/Access costs: What are the storage and access costs
  6. Scalability: How does the database scale for large volumes of data
  7. Availability: How is the data replicated and partitioned to handle fault tolerance and provide high availability
  8. CAP model: Where each NoSQL technology fits in the CAP model
  9. Querying: What are the various querying capabilities provided (batch)
  10. Pagination of results: What is the capability provided by the NoSQL solution to return paginated results
  11. Indexes: What is the support for secondary indexes for faster non-key based lookups
  12. Transactions: What is the support provided for transactions
  13. Concurrency: How does the nosql technology deal with concurrent updates and what support does it have for dealing with concurrency
  14. Search: What are the search capabilities provided
  15. Multi-tenancy: What is the support for handling multi-tenancy if used in a SaaS software
  16. Client Libraries: What are the client libraries available for interacting with the nosql database.
  17. Backup and Recovery: Whether the solution provides any mechanisms for backing up and recovering data

1. REDIS (Remote Dictionary Server)

Like memcached, Redis can also store a mapping of keys to values and can even achieve similar performance levels as memcached. Redis also supports the writing of its data to disk automatically in two different ways, and can store data in four structures in addition to plain string keys as memcached does. These additional capabilities allow Redis to be used either as a primary database or as an auxiliary database with other storage systems. Persistence can be optionally disabled, if you just need a feature-rich, networked, in-memory cache.

  1. Data/Storage Model

In traditional key-value stores we associate string keys to string values, in Redis the value is not limited to a simple string, but can also hold more complex data structures like lists, sets, hashes (similar to map), sorted sets etc.

Redis primarily supports key based retrieval. Clients can use the capabilities provided by Redis in order to create secondary indexes of different kinds, including composite (multi-column) indexes.

https://stackoverflow.com/questions/4908197/data-modeling-practices-in-redis

https://redis.io/topics/twitter-clone

2. UseCases where Redis is suitable

  • Storing last login time for each user
  • Storing last uptime for various services (especially useful in a microservices architecture for implementing retries and circuit breakers)
  • LRU cache with automatic eviction, Session Cache, Full Page Cache
  • User Profile Information
  • Product details/reviews for e-commerce applications
  • Leaderboard/Counting etc
  • Storing temporary state that requires fast access. For example, storing the most recent views details for answers to quora questions. This can be used to batch the update calls to the primary storage (say BigTable or HBase) allowing us to throttle the writes to the underlying storage. Storing the views data for all answers in Redis does not add much value as all the data would not be accessed frequently. For more details please refer to Most Viewed Writers

Redis can also be used as a Message Queue and a PubSub messaging system

      UseCases where Redis is not suitable

  • Storing large amounts of data in a single string value (e.g. the most recent feed contents for each user). This means queries on these keys will be slow, which will block other queries and thereby generally slow down all clients.
  • Storing data across two or more dimensions (e.g. a score for each (user, topic) pair). The data set size for these keys will likely grow superlinearly with the rest of the dataset, and we’ll have to reshard too soon.
  • Storing data that requires queries with high time complexity. Using a Redis list as a queue is fine (queries at the ends of the list take constant time), but if the list is long, queries that operate far from the ends of the list or on the entire list will be very slow. Some commands have unconditionally high time complexity (e.g. SMEMBERS, HGETALL, KEYS) and we’ll want to avoid those entirely.
  • Storing data that requires secondary access paths. Redis doesn’t have secondary indexes since it’s a key-value store, so we would have to implement them in the client, and using them would require multiple queries to the server.

3. Limitations

  • Each Redis instance is single-threaded. This means individual queries that take a long time are inadvisable, since they will block other queries to the same Redis instance until they complete. Starting Redis 4.0, each instance can support more than one thread for a few commands like deleting objects in the background and blocking commands implemented via Redis modules.
  • The data set size is limited by the available memory on the machine on which the server is running. Redis will either be killed by the Linux kernel OOM killer, crash with an error, or will start to slow down so you’ll probably notice there is something wrong. Redis has built-in protections allowing the user to set a max limit to memory usage, using the maxmemory option in the configuration file to put a limit to the memory Redis can use. If this limit is reached Redis will start to reply with an error to write commands (but will continue to accept read-only commands), or you can configure it to evict keys when the max memory limit is reached in the case you are using Redis for caching.
  • Redis can handle up to 232 keys, and was tested in practice to handle at least 250 million keys per instance. Every hash, list, set, and sorted set, can hold 232 elements. In other words your limit is likely the available memory in your system.

4.  Supported data types

Strings, Lists, Hashes, Sets, Sorted Sets

5.  Storage/Access costs:

Since its not a managed database, there is no specific read/write costs. The only cost incurred would be to setup and maintain the redis servers (master/slave) and the redis sentinel for automatic failover if using the master/slave instances. If using redis cluster the cost incurred would be to setup and maintain the redis cluster servers.

6. Scalability

Redis supports scaling with Master/Slave architecture. Reads can happen on slave nodes; but writes have to go through master. However, there is still a limitation on the size of the data that can be supported in memory based on the underlying hardware.

7. Availability

Master/Slave Replication

At the base of Redis replication there is a very simple to use and configure master-slave replication that allows slave Redis servers to be exact copies of master servers. The slave will automatically reconnect to the master every time the link breaks, and will attempt to be an exact copy of it regardless of what happens to the master.

This system works using three main mechanisms:

  1. When a master and a slave instance are well-connected, the master keeps the slave updated by sending a stream of commands in order to replicate the effects on the dataset happening in the master dataset: client writes, keys expiring or evicted, and so forth.
  2. When the link between the master and the slave breaks, for network issues or because a timeout is sensed in the master or the slave, the slave reconnects and attempts to proceed with a partial resynchronization: it means that it will try to just obtain the part of the stream of commands it missed during the disconnection.
  3. When a partial resynchronization is not possible, the slave will ask for a full resynchronization. This will involve a more complex process in which the master needs to create a snapshot of all its data, send it to the slave, and then continue sending the stream of commands as the dataset changes.

Redis uses by default asynchronous replication, which being high latency and high performance, is the natural replication mode for the vast majority of Redis use cases

Zookeeper vs Redis

This is similar to how Zookeeper operates; however the significant difference being zookeeper writes will have to wait for a quorum of nodes to replicate the data before the write is considered successful.

Redis is not a typical distributed system unlike Zookeeper. The Zookeeper master/slave nodes communicate with each other using the Paxos protocol for leader election and to sync writes with the master. Since Redis replicates the data asynchronously, the writes are significantly faster compared to Zookeeper. Zookeeper is most suitable for coordination: tracking which nodes are active, leader election amongst a group, etc. Use redis for datasets that need faster writes but where the occasional outage isn’t a disaster.

For more details, refer to Master Slave Replication

Sharding/Partitioning Strategies

Partitioning in Redis serves two main goals:

  • It allows for much larger databases, using the sum of the memory of many computers. Without partitioning you are limited to the amount of memory a single computer can support.
  • It allows scaling the computational power to multiple cores and multiple computers, and the network bandwidth to multiple computers and network adapters.

There are different partitioning criteria. Imagine we have four Redis instances R0R1R2R3, and many keys representing users like user:1user:2, … and so forth, we can find different ways to select in which instance we store a given key. In other words there are different systems to map a given key to a given Redis server.

One of the simplest ways to perform partitioning is with range partitioning, and is accomplished by mapping ranges of objects into specific Redis instances. For example, I could say users from ID 0 to ID 10000 will go into instance R0, while users form ID 10001 to ID 20000 will go into instance R1 and so forth.

This system works and is actually used in practice, however, it has the disadvantage of requiring a table that maps ranges to instances. This table needs to be managed and a table is needed for every kind of object, so therefore range partitioning in Redis is often undesirable because it is much more inefficient than other alternative partitioning approaches.

An alternative to range partitioning is hash partitioning. This scheme works with any key, without requiring a key in the form object_name:<id>, and is as simple as:

  • Take the key name and use a hash function (e.g., the crc32 hash function) to turn it into a number. For example, if the key is foobarcrc32(foobar) will output something like 93024922.
  • Use a modulo operation with this number in order to turn it into a number between 0 and 3, so that this number can be mapped to one of my four Redis instances. 93024922 modulo 4 equals 2, so I know my key foobarshould be stored into the R2 instance. Note: the modulo operation returns the remainder from a division operation, and is implemented with the % operator in many programming languages.

There are many other ways to perform partitioning, but with these two examples you should get the idea. One advanced form of hash partitioning is called consistent hashing and is implemented by a few Redis clients and proxies.

Different implementations of partitioning

Partitioning can be the responsibility of different parts of a software stack.

  • Client side partitioning means that the clients directly select the right node where to write or read a given key. Many Redis clients implement client side partitioning.
  • Proxy assisted partitioning means that our clients send requests to a proxy that is able to speak the Redis protocol, instead of sending requests directly to the right Redis instance. The proxy will make sure to forward our request to the right Redis instance accordingly to the configured partitioning schema, and will send the replies back to the client. The Redis and Memcached proxy Twemproxy implements proxy assisted partitioning.
  • Query routing means that you can send your query to a random instance, and the instance will make sure to forward your query to the right node. Redis Cluster implements an hybrid form of query routing, with the help of the client (the request is not directly forwarded from a Redis instance to another, but the client gets redirected to the right node).

Disadvantages of partitioning

Some features of Redis don’t play very well with partitioning:

  • Operations involving multiple keys are usually not supported. For instance you can’t perform the intersection between two sets if they are stored in keys that are mapped to different Redis instances (actually there are ways to do this, but not directly).
  • Redis transactions involving multiple keys can not be used.
  • The partitioning granularity is the key, so it is not possible to shard a dataset with a single huge key like a very big sorted set.
  • When partitioning is used, data handling is more complex, for instance you have to handle multiple RDB / AOF files, and to make a backup of your data you need to aggregate the persistence files from multiple instances and hosts.
  • Adding and removing capacity can be complex. For instance Redis Cluster supports mostly transparent rebalancing of data with the ability to add and remove nodes at runtime, but other systems like client side partitioning and proxies don’t support this feature. However a technique called Pre-sharding helps in this regard

Using Redis as Primary Data store or cache?

Although partitioning in Redis is conceptually the same whether using Redis as a data store or as a cache, there is a significant limitation when using it as a data store. When Redis is used as a data store, a given key must always map to the same Redis instance. When Redis is used as a cache, if a given node is unavailable it is not a big problem if a different node is used, altering the key-instance map as we wish to improve the availability of the system (that is, the ability of the system to reply to our queries).

Consistent hashing implementations are often able to switch to other nodes if the preferred node for a given key is not available. Similarly if you add a new node, part of the new keys will start to be stored on the new node.

For more details, please check Redis partitioning

The main concept here is the following:

  • If Redis is used as a cache scaling up and down using consistent hashing is easy.
  • If Redis is used as a store, a fixed keys-to-nodes map is used, so the number of nodes must be fixed and cannot vary. Otherwise, a system is needed that is able to rebalance keys between nodes when nodes are added or removed, and currently only Redis Cluster is able to do this

Redis-Cluster

Redis Cluster provides a way to run a Redis installation where data is automatically sharded across multiple Redis nodes.

Redis Cluster also provides some degree of availability during partitions, that is in practical terms the ability to continue the operations when some nodes fail or are not able to communicate. However the cluster stops to operate in the event of larger failures (for example when the majority of masters are unavailable).

Redis Cluster is not able to guarantee strong consistency. In practical terms this means that under certain conditions it is possible that Redis Cluster will lose writes that were acknowledged by the system to the client.

The first reason why Redis Cluster can lose writes is because it uses asynchronous replication. This means that during writes the following happens:

  • Your client writes to the master B.
  • The master B replies OK to your client.
  • The master B propagates the write to its slaves B1, B2 and B3.

As you can see B does not wait for an acknowledge from B1, B2, B3 before replying to the client, since this would be a prohibitive latency penalty for Redis, so if your client writes something, B acknowledges the write, but crashes before being able to send the write to its slaves, one of the slaves (that did not receive the write) can be promoted to master, losing the write forever.

So in practical terms, what you get with Redis Cluster?

  • The ability to automatically split your dataset among multiple nodes.
  • The ability to continue operations when a subset of the nodes are experiencing failures or are unable to communicate with the rest of the cluster.

Redis Sentinel

Redis Sentinel is used to provide high availability for Redis by taking care of automatic failover. In practical terms this means that using Sentinel you can create a Redis deployment that resists without human intervention to certain kind of failures.

Redis Sentinel also provides other collateral tasks such as monitoring, notifications and acts as a configuration provider for clients.

This is the full list of Sentinel capabilities at a macroscopical level (i.e. the big picture):

  • Monitoring. Sentinel constantly checks if your master and slave instances are working as expected.
  • Notification. Sentinel can notify the system administrator, another computer programs, via an API, that something is wrong with one of the monitored Redis instances.
  • Automatic failover. If a master is not working as expected, Sentinel can start a failover process where a slave is promoted to master, the other additional slaves are reconfigured to use the new master, and the applications using the Redis server informed about the new address to use when connecting.
  • Configuration provider. Sentinel acts as a source of authority for clients service discovery: clients connect to Sentinels in order to ask for the address of the current Redis master responsible for a given service. If a failover occurs, Sentinels will report the new address.

For more details, check out Redis Failover and Redis Sentinel

8.  CAP Model

Redis was designed primarily as a single-server system. As a consequence it aims to be CP. It can be distributed either by master-slave or truly distributed (multi-master) with Redis Sentinel protocol.

Both ways to distribute Redis are faulty in that they loose both data consistency and also can loose persistent data in the presence of network partitions. So not only Redis is not CP, but looses data forever (something which AP systems don’t, e.g. strong eventually consistent ones).

Yet another way to use Redis is to shard by consistent hashing from the client, moment in which you simply work with different single-node systems, however data can be lost in this scenario, and is only useful for caching or recomputable data.

One more thought: taking any single-node service, you could theoretically transform it into a CP distributed system, by starting an uneven number of nodes and running a distributed consensus between those nodes: Raft, the Paxos successor scales really well and is implemented in great projects including consul. If a majority of nodes vote for changes and they apply them to local dbs, things should be smooth.

In conclusion, with Redis Sentinel, Redis can be AP if used as a cache and not a primary store. If its used as a primary store, it cannot even be AP since there can be data loss.

9.  Querying

Redis provides commands for operating on the various data strucutures (strings, hashes, lists, sets, sorted sets) that it provides.

Operations on Strings:

Operations on Hashes:

Operations on Lists:

Operations on Sets:

Operations on SortedSets:

For a complete list of commands, refer Redis Commands

Batch operations are also supported on some of the data structures. However, when using Redis-cluster, some of the batch operations may not be supported since the data might reside on several nodes.

10.  Query Pagination

There are different ways in which pagination can be implemented in Redis.

a) Using the SSCAN Command

The SSCAN command is part of a group of commands similar to the regular SCAN command. These include:

  • SCAN – Used to iterate over the set of keys in the current database.
  • SSCAN – Used to iterate over elements of sets.
  • HSCAN – Used to iterate fields hashes and associated values.
  • ZSCAN – Used to iterate elements of sorted sets and their scores.

While the regular SCAN command iterates over the database keys, the SSCAN command can iterate over elements of sets. By using the returned SSCAN cursor, you could paginate over a Redis set.

The downside is that you need some way to persist the value of the cursor, and if there are concurrent users this could lead to some odd behavior, since the cursor may not be where it is expected. However, this can be useful for applications where traffic to these paginated areas may be lighter.

b) Using Sorted Sets

To paginate with Sorted Set, we can use the ZRANGE command to select a range of elements in a sorted set based on their scores. So, you could, for example, select scores from 1-20, 21-40, and so on. By programmatically adjusting the range as the user moves through the data, you can achieve the pagination you need for your application. Since sorted sets and ZRANGE do this task more intuitively than using a scan, it is often the preferred method of pagination, and is easier to implement with multiple users, since you can programmatically keep track of which ZRANGE each user is selecting at any given time.

11.  Indexes

Redis does not have built-in support for indexes. Redis only offers primary key access. However since Redis is a data structures server, its capabilities can be used for indexing, in order to create secondary indexes of different kinds, including composite (multi-column) indexes. Here is a few ways in which we can create indexes with Redis

  • Sorted sets to create secondary indexes by ID or other numerical fields.
  • Sorted sets with lexicographical ranges for creating more advanced secondary indexes, composite indexes and graph traversal indexes.
  • Sets for creating random indexes.
  • Lists for creating simple iterable indexes and last N items indexes.

For details on how to create indexes with Redis, refer Redis Indexes

12.  Transactions

MULTIEXECDISCARD and WATCH are the foundation of transactions in Redis. They allow the execution of a group of commands in a single step, with two important guarantees:

  • All the commands in a transaction are serialized and executed sequentially. It can never happen that a request issued by another client is served in the middle of the execution of a Redis transaction. This guarantees that the commands are executed as a single isolated operation.
  • Either all of the commands or none are processed, so a Redis transaction is also atomic. The EXEC command triggers the execution of all the commands in the transaction, so if a client loses the connection to the server in the context of a transaction before calling the MULTI command none of the operations are performed, instead if the EXEC command is called, all the operations are performed

For more details on how to use transactions in Redis, refer to Redis Transactions

13.  Concurrency

Redis is single-threaded and transactions block until they finish. However, multiple clients can connect to the instance and execute their commands concurrently. Redis handles execution of the commands sequentially with event loops.

By default every client (the benchmark simulates 50 clients if not otherwise specified with -c) sends the next command only when the reply of the previous command is received, this means that the server will likely need a read call in order to read each command from every client.

Redis supports Pipelining, so it is possible to send multiple commands at once, a feature often exploited by real world applications. Redis pipelining is able to dramatically improve the number of operations per second a server is able do deliver. More details on Redis Benchmarks

14.  Search

Redis provides data structures (sorted sets) that can be used for implementing lexicographic search as well as typical document search. For details on how Redis can be used as a search engine, check Searching with Redis

15.  Multi-Tenancy

To separate various tenants’ data in Redis, we have – like SQL – two options. The first is to maintain independent Redis server instances, and switch out the connection per request. Of course, this has the disadvantage of reducing or eliminating reuse of the connection pool

Instead, we could use a simple “namespace” concept to identify each key with its tenant.

Redis namespaces

When working with Redis, it’s customary to namespace keys by simply prepending their name with the namespace. For example, <tenant-id>:key_name

16. Client Libraries

There are numerous client libraries available for Redis in different languages. For a complete list, refer to client libraries

17.  Backup and Recovery

Redis provides a different range of persistence options:

  • The RDB persistence performs point-in-time snapshots of your dataset at specified intervals.
  • the AOF persistence logs every write operation received by the server, that will be played again at server startup, reconstructing the original dataset. Commands are logged using the same format as the Redis protocol itself, in an append-only fashion. Redis is able to rewrite the log on background when it gets too big.
  • If you wish, you can disable persistence at all, if you want your data to just exist as long as the server is running.
  • It is possible to combine both AOF and RDB in the same instance. Notice that, in this case, when Redis restarts the AOF file will be used to reconstruct the original dataset since it is guaranteed to be the most complete.

The most important thing to understand is the different trade-offs between the RDB and AOF persistence.

For more details on the advantages and disadvantages of RDB and AOF, checkout Redis persistence

In summary, Redis is appropriate for small, simple data that requires fast access, but in most environments needs to be backed up by another slower, less expensive datastore. On AWS, data stored in memory is approximately 25 times more expensive than data stored on disk, so at Quora, we use Redis as a standalone datastore for only performance-sensitive data, and prefer MySQL or HBase for data that can tolerate more access latency. For some of the guidelines when using in production, check Production Usage guidelines

2. MongoDB

MongoDB is a document database with the scalability and flexibility that you want with the querying and indexing that you need. MongoDB is a distributed database at its core, so high availability, horizontal scaling, and geographic distribution are built in and easy to use. MongoDB’s design philosophy is focused on combining the critical capabilities of relational databases with the innovations of NoSQL technologies

  1.  Data/Storage Model

MongoDB stores data in a binary representation called BSON (Binary JSON) which are JSON-like documents, meaning fields can vary from document to document and data structure can be changed over time. It allows data be represented as simple key-value pairs and flat, table-like structures, through to rich documents and objects with deeply nested arrays and sub-documents. The document model maps to the objects in your application code, making data easy to work with.

Documents that tend to share a similar structure are organized as collections. It may be helpful to think of collections as being analogous to a table in a relational database: documents are similar to rows, and fields are similar to columns. Fields can vary from document to document; there is no need to declare the structure of documents to the system – documents are self describing. If a new field needs to be added to a document then the field can be created without affecting all other documents in the system, without updating a central system catalog, and without taking the system offline. Developers can start writing code and persist the objects as they are created. And when developers add more features, MongoDB continues to store the updated objects without the need to perform costly ALTER TABLE operations, or worse – having to re-design the schema from scratch.

Storage Engines

The storage engine is the component of the database that is responsible for managing how data is stored, both in memory and on disk. MongoDB supports multiple storage engines, as different engines perform better for specific workloads. Choosing the appropriate storage engine for your use case can significantly impact the performance of your applications.  For more details, checkout Storage Engines

Data Model considerations

  1.  Using Embedded documents vs normalized data model (Data Model Design)
  2.  Atomicity of write operations involving multiple collections (Atomicity)
  3.  Document Growth (Document Growth)
  4.  Data Use and Performance

For complete details, check Data Model Considerations

Document Validation

MongoDB provides the capability to validate documents during updates and insertions. Validation rules can be specified on a per-collection basis. For complete details, check Document Validation

Operational Factors to consider when designing Data Model

a) Sharding

b) Indexes

c) Large Number of Collections

d) Large number of small documents

e)  Storage Optimization for small documents

f)  Data Lifecycle Management

Data Model Patterns and Examples

Modeling relationships between documents

Modeling Tree structures

Modeling data for Atomic Operations

Modeling data for keyword search

Modeling Monetary data

Modeling Time Data

2.  Use Cases

Storing Log Data

Pre Aggregated Reports

Hierarchical Aggregation

Product Catalog

Inventory Management

Category Hierarchy

Metadata and Asset Management

Storing Comments

3.  Limitations

There are several restrictions that MongoDB places on Documents, Indexes, Namespaces, Data, Operations etc. For complete details, check MongoDB limitations.

Also check MongoDB limitations

4.  Supported DataTypes

MongoDB data types

5.   Storage/Access costs

MongoDB comes is several flavours.

a) MongoDB Professional

b) MongoDB Enterprise Advanced

c) MongoDB community server

d) MongoDB Atlas is a Database as a Service offering which allows to Deploy, operate and scale a MongoDB database in the cloud (supports AWS, GCP and Azure)

Each offering has a licensing cost. For differences between the various offerings and the pricing checkout MongoDB Professional vs Enterprise

For details on how MongoDB Atlas stacks up against other MongoDB as a service offerings, checkout MongoDB Atlas comparison

6. Scalability

MongoDB provides horizontal scale-out for databases on low cost, commodity hardware or cloud infrastructure through sharding by distributing data across multiple physcial partitions which is transparent to applications. Sharding allows MongoDB deployments to address the hardware limitations of a single server, such as bottlenecks in RAM or disk I/O. MongoDB automatically balances the data in the sharded cluster as the data grows or the size of the cluster increases or decreases. The diagram below depicts how MongoDB allows Application to transparently put/fetch data from different shards underneath.

MongoDB Sharding.png

MongoDB supports 2 strategies for sharding the data through shard keys viz-a-viz Hashed Sharding and Range Sharding

MongoDB partitions data sets into chunks with a chunk storing all data within a specific range, and assigns chunks to shards. By default, it creates two 64 megabyte chunks per shard. When a chunk is full, MongoDB splits it into two smaller 32 megabyte chunks. To ensure an even distribution of data, MongoDB will automatically migrate chunks between shards so that each shard has the same number of chunks. This is why sharded clusters require routers and config servers. The config servers store cluster metadata including the mapping between chunks and shards. The routers are responsible for migrating chunks and updating the config servers. All read and write requests are sent to routers because only they can determine where the data resides.

MongoDB recommends running routers next to application servers. However, as the number of application servers increases, the number of routers increases and that can be a problem since the routers maintain open connections to MongoDB instances. Eventually, the routers will open too many connections and performance will degrade. The solution is to separate routers from application servers and reduce the number of them. While this alleviates the problem of too many open connections, it limits users’ ability to scale routers by adding more. When rebalancing the data due to addition/removal of shards, MongoDB will migrate only one chunk at a time in order minimize the performance impact on applications. However, migration happens automatically and will have some impact on performance.

For complete details, checkout MongoDB Sharding and Planning a MongoDB sharded cluster and Horizontal Scalability with Sharding

7.  Availability

MongoDB supports High Availability through the use of sharding and replication. A sharded cluster can continue to perform partial read / write operations even if one or more shards are unavailable. While the subset of data on the unavailable shards cannot be accessed during the downtime, reads or writes directed at the available shards can still succeed.

MongoDB also supports High Availability through automatic failover in replica sets. For details, check MongoDB Replication

With multiple copies of data on different database servers, replication protects a database from the loss of a single server. Replication thus provides redundancy and increases data availability. It also allows recovery from hardware failures and service interruptions. In some cases, replication is used to increase read capacity. Clients have the ability to send read and write operations to different servers. Maintaining copies in different data centers increases the locality and availability of data for distributed applications.

Increased data availability is achieved in MongoDB through Replica Set, which provides data redundancy across multiple servers. Replica Set is a group of mongod instances that host the same data set. One mongod, the primary, receives all data modification operations (insert / update / delete).

Screen Shot 2017-08-15 at 8.52.17 AM.png

All other instances, secondaries, apply operations from the primary so that they have the same data set. Because only one member can accept write operations, replica sets provide strict consistency for all reads from the primary. To support replication, the primary logs all data set changes in its oplog. The secondaries asynchronously replicate the primary’s oplog and apply the operations to their data sets. Secondaries’ data sets reflect the primary’s data set. It is possible for the client to set up Read Preference to read data from secondaries. This way the client can balance loads from master to replicas, improving throughput and decreasing latency. However, as a result secondaries may not return the most recent data to the clients. Due to the asynchronous nature of the replication process, replica read preference guarantees only eventual consistency for its data.

Failover

Within the replica set, members are interconnected with each other to exchange heartbeat messages. A crashed server with a missing heartbeat will be detected by other members and removed from the replica set membership. After the dead secondary recovers, it can rejoin the cluster by connecting to the primary, then catch up to the latest update. If a crash occurs over a lengthy period of time, where the change log from the primary doesn’t cover the whole crash period, then the recovered secondary needs to reload the whole data from the primary as if it was a brand new server.

Screen Shot 2017-08-15 at 8.55.14 AM.png

Starting in MongoDB 3.2, you can deploy config servers as replica sets. A sharded cluster with a Config Server Replica Set (CSRS) can continue to process reads and writes as long as a majority of the replica set is available.

8.  CAP Model

Depending on the need of the application, MongoDB provides options to either maximize Consistency or Availability. So, its not possible to label MongoDB as either an AP or CP system.

Maximizing Consistency

Maximizing Availability

With regards to consistency, MongoDB is strongly consistent by default but can be configured to be eventually consistent. However, there are possibilities of reading data which may be rolled back later based on the readPreference/readConcern and writeConcern settings.

If readPreference is secondary and readConcern is local, and writeConcern is majority, this case might arise. Depending on the consistency needs of the application, these need to be configured appropriately.

readConcern – is the way we want to read data from mongo – that means if we have a replica set, then readConcern majority will allow to get data saved (persisted) to majority of replica set, so we can be assured that this document will not be rolled back if there be an issue with replication.

readPreference – can help with balancing load, so we can have a report generator process which will always read data from secondary and leave primary server to serve data for online users.

Dirty Reads are possible with MongoDB based on the readConcern and writeConcern settings

9.  Querying

MongoDB allows querying on documents, embedded documents, array and array of embedded documents as well. For details, check MongoDB querying

MongoDB also supports projections to select only a few fields in the query result when querying a collection for documents. For details, check MongoDB Projections

Understand any existing document schema – MongoDB Compass. If there is an existing MongoDB database that needs to be understood and optimized then MongoDB Compass is an invaluable tool. The MongoDB Compass GUI allows users to understand the structure of existing data in the database and perform ad hoc queries against it – all with zero knowledge of MongoDB’s query language. By understanding what kind of data is present, you’re better placed to determine what indexes might be appropriate. Without Compass, users wishing to understand the shape of their data would have to connect to the MongoDB shell and write queries to reverse engineer the document structure, field names and data types. MongoDB Compass is included with MongoDB Professional and MongoDB Enterprise Advanced.

Also checkout, MongoDB distributed queries and Query Optimization

10. Query Pagination

MongoDB supports cursors to iterate over a query result. By default, the server will automatically close the cursor after 10 minutes of inactivity, or if client has exhausted the cursor. As a cursor returns documents, other operations may interleave with the query. For the MMAPv1 storage engine, intervening write operations on a document may result in a cursor that returns a document more than once if that document has changed. To handle this situation, see the information on snapshot mode

11. Indexes

MongoDB supports single-field, compound, multi-key index, geospatial, text and hashed indexes. For complete details, check MongoDB indexes

12.  Transactions

In MongoDB, a write operation is atomic on the level of a single document, even if the operation modifies multiple embedded documents within a single document.

When a single write operation modifies multiple documents, the modification of each document is atomic, but the operation as a whole is not atomic and other operations may interleave. However, you can isolate a single write operation that affects multiple documents using the $isolated operator. Using the $isolated operator, a write operation that affects multiple documents can prevent other processes from interleaving once the write operation modifies the first document. This ensures that no client sees the changes until the write operation completes or errors out. An isolated write operation does not provide “all-or-nothing” atomicity. That is, an error during the write operation does not roll back all its changes that preceded the error.

Since a single document can contain multiple embedded documents, single-document atomicity is sufficient for many practical use cases. For cases where a sequence of write operations must operate as if in a single transaction, you can implement a two-phase commit in your application.

However, two-phase commits can only offer transaction-like semantics. Using two-phase commit ensures data consistency, but it is possible for applications to return intermediate data during the two-phase commit or rollback.

For details, check how to apply Two Phase Commit

13.  Concurrency

Concurrency control allows multiple applications to run concurrently without causing data inconsistency or conflicts.

One approach is to create a unique index on a field that can only have unique values. This prevents insertions or updates from creating duplicate data. Create a unique index on multiple fields to force uniqueness on that combination of field values. For examples of use cases, see update() and Unique Index and findAndModify() and Unique Index.

Another approach is to specify the expected current value of a field in the query predicate for the write operations. The two-phase commit pattern provides a variation where the query predicate includes the application identifier as well as the expected state of the data in the write operation.

Read Isolation, Consistency, and Recency

14.  Search

MongoDB provides TextIndex which can be used to support text search queries on string content. To perform text search queries, you must have a text index on your collection. A collection can only have onetext search index, but that index can cover multiple fields. You can check how MongoDB search works.

15.  Multi-Tenancy

The common strategies employed for multi-tenancy in any database include

  1. All tenants in the same collection, using tenant-specific fields for security
  2. 1 Collection per tenant in a single shared DB
  3. 1 Database per tenant (kind of a Namespace)

All 3 strategies can be employed to support multi-tenancy with MongoDB. However, the most recommended approach is to have 1 database per tenant.

16.  Client Libraries

An application communicates with MongoDB by way of a client library, called a driver, that handles all interaction with the database in a language appropriate to the application.

Check Client Libraries for a complete list of supported drivers

17.  Backup and Recovery

MongoDB provides various options for backup

  1.  Back Up with Atlas
  2.  Back Up with CloudManager or OpsManager
  3.  Back Up by copying underlying data files
  4.  Back Up with mongodump

For more details on the various options, check Backup and Restore

MongoDB should be tuned according to the needs of the application to achieve high performance. Check MongoDB-Performance-Best-Practices for the guidelines. For more details on MongoDB architecture, check MongoDB_Architecture_Guide

3.  CouchBase

  1.  Data/Storage Model

Couchbase can be used either as a key-value store or as a document database. Data in Couchbase Server can be either JSON documents or binary format. Almost all the data models supported by MongoDB can be supported with Couchbase as well. The major difference lies only in how the data can be queried. Since we have already seen the different data models that can be supported already in MongoDB, I will add some details about CouchBase architecture in this section.

The core architecture is designed to simplify building modern applications with a flexible data model, powerful SQL-based query language, and a secure core database platform that provides high availability, scalability, and performance. Couchbase Server consists of a single package that is installed on all nodes within a cluster. Through the SDKs (also known as client libraries), developers can write applications in the language of their choice (Java, Node.js, .NET or others) and connect to a Couchbase Server cluster to perform read and write operations and queries across key-value records and/or JSON documents with low latencies (sub-millisecond) at high throughput rates (millions of operations per second).

The basic database architecture consists of one or more Couchbase Servers (also called nodes) organized as a cluster. Each node contains a set of configurable services including a Managed Cache, Storage, Cluster Management as well as Data, Index, and Query Services.

Screen Shot 2017-08-17 at 8.06.44 AM.png

The Cluster Manager handles the cluster level operations and the coordination between nodes in a Couchbase Server cluster. These management functions include configuring nodes, monitoring nodes, gathering statistics, logging, and controlling data rebalancing among cluster nodes. The cluster manager determines the node’s membership in a cluster, responds to heartbeat requests, monitors its own services and repairs itself if possible.

Although each node runs its own local Cluster Manager, there is only one node chosen from among them, called the orchestrator, that supervises the cluster at a given point in time. The orchestrator maintains the authoritative copy of the cluster configuration, and performs the necessary node management functions to avoid any conflicts from multiple nodes interacting. If a node becomes unresponsive for any reason, the orchestrator notifies the other nodes in the cluster and promotes the relevant replicas to active status. This process is called failover, and it can be done automatically or manually. If the orchestrator fails or loses communication with the cluster for any reason, the remaining nodes detect the failure when they stop receiving its heartbeat, so they immediately elect a new orchestrator. This is done immediately and is transparent to the operations of the cluster.

The Data Manager serves client requests and stores data. The Data Manager has several components including

Data Service: handles core data management operations, such as Key Value GET/SET. The data service is also responsible for building and maintaining MapReduce views. Note that even though MapReduce views can act as indexes, they are always managed by the data service, not the indexing service.

Index Service: efficiently maintains indexes for fast query execution. This involves the creation and deletion of indexes, keeping those indexes up to date based on change notifications from the data service and responding to index-scan requests from the query service. You can access and manage the indexing service through the query service.

Couchbase Server provides an indexing service to manage Global Secondary Indexes (GSIs). Global Secondary Indexes are aggregated and stored on a given Index Service node and can be independently partitioned by using a WHERE clause in the index definition. GSIs provide fast lookups and range scans which are especially useful for interactive applications that use secondary keys. Global Secondary Indexes are defined via the N1QL query language and are extremely flexible, enabling index creation on a specific subset of fields, on a subset of records, and on JSON arrays using built-in SQL functions. Additionally, Global Secondary Indexes can be declared to use in-memory storage optimization, enabling enhanced index performance. Couchbase indexes are automatically maintained asynchronously from the core Data Service write operations, using high speed in-memory queues to forward mutations (writes) to the appropriate indexer using Couchbase’s advanced Database Change Protocol (DCP). This asynchronous indexing architecture provides two fundamental benefits: a) write operations within the Data Service can be completed extremely quickly, and b) developers can create as many indexes as are needed by the application, no longer having to pay the typical performance and latency penalty that is common in traditional RDBMS systems. If needed, read operations can optionally request to wait for any pending index updates to be completed prior to performing the index scan if this is an application requirement for a given query.

Cache Service: Unlike other database platforms, Couchbase Server does not require a caching tier in front of it. Couchbase uses a memory-first architecture, ensuring that all data operations are performed through a configurable high-speed in-memory cache. Internally, Couchbase moves data to and from disk as needed, thereby automatically acting as both a read-through and a write-through cache, which facilitates an extremely high read/write rate. With Couchbase Server, application configuration and deployment is simplified as applications don’t have to deal with complex cache coherency issues or varying performance capabilities across technologies.

Each data node relies on a built-in multithreaded Managed Cache that is used both for the data service and separately for the indexing service. These caches are actively managed as opposed to relying upon the operating system’s file buffer cache and/or memory-mapped files. The data service’s integral managed cache is an object-managed cache based on memcached. All key-value operations are performed via the cache. Applications can access Couchbase using memcached compatible APIs such as get, set, delete, append, and prepend. Because this API is complete and transparent to the application, Couchbase Server can be used as a drop in replacement for memcached.

Because Couchbase Server automatically replicates data, cache availability is greatly improved and end users rarely hit a cold cache even if system components fail. The managed caching layer within the indexing service is built into the underlying storage engine (ForestDB in 4.0). It manages the caching of individual portions of indexes as RAM is available to speed performance both of inserting or updating information as well as servicing index scans.

Query Service:  handles N1QL query parsing, optimization, and execution. This service interacts with both the indexing service and the data service to process and return those queries back to the requesting application.

The Query Service takes a N1QL query and performs the necessary functions to retrieve, filter, and/or project the data in order to resolve the application request. Actual data and index lookups and scans may occur in the Data Service or in the Index Service, depending on the data and the indexes that are required to resolve the query. Applications and developers can use the N1QL REST API, the cbq shell, the cbc command line tool, or the Query Workbench to query records (JSON documents) using SQL-like syntax. With N1QL, developers can run ad hoc or prepared queries using standard SQL syntax (including WHERE clauses and aggregate functions) with JSON-specific extensions to filter, process, and format query output. N1QL queries will automatically leverage existing MapReduce View and GSI indexes to resolve GROUP BY, ORDER BY, and WHERE clauses. The Query Service is where the N1QL query parser, planner, optimizer, and executor reside and it is the recipient of incoming N1QL queries. It provides the basis for how N1QL queries are executed across the Couchbase Server, leveraging the facilities that are available in the Data and Indexing Services.

Storage Layer: Couchbase uses a highly efficient append-only storage system to persist data to files that are placed in the local file system. Append-only storage systems are a technique used by many modern database architectures to provide optimized I/O throughput, especially for write-intensive applications. A background compaction process (automated or manually controlled) cleans up orphaned and fragmented space within the files caused by ongoing data mutations to the append-only file system. The compaction process is designed to minimize the impact to front end I/O operations. Although high performance storage systems (such as SSDs) can be used on Couchbase Server nodes, they are not required. Couchbase Server uses two optimized storage engines, Couchstore and ForestDB. Couchstore is used to store the data items and local indexes, while ForestDB is used to store the Secondary Indexes.

The Storage Layer persists data from RAM to disk and reads data back into memory when needed. Couchbase Server’s storage engines are responsible for persisting data to disk. They write every change operation that a node receives to an append-only file (AOF), meaning that mutations go to the end of the file and in-place updates are forbidden. This file is replayed if necessary at node startup to repopulate the locally managed cache.

Couchbase Server periodically compacts its data files and index files to save space (auto-compaction). During compaction, each node remains online and continues to process read and write requests. Couchbase Server performs compaction in parallel, both across the many nodes of a cluster as well as across the many files in persistent storage. This highly parallel processing allows very large datasets to be compacted incrementally.

Multidimensional Scaling

Couchbase Server 4.0 and greater supports multidimensional scaling (MDS). MDS enables users to turn on or off specific services on each Couchbase Server node so that the node in effect becomes specialized to handle a specific workload: document storage (data service), global indexes (index service) or N1QL query processing (query service).

Multidimensional scaling has four main advantages:

  • Each service can be independently scaled to suit an application’s evolution, whether that entails a growing data set, expanding indexing requirements, or increased query processing needs.
  • The index and query services work most quickly and efficiently when a single or small number of machines contain the entire index.
  • You can choose to customize machines to their workloads. For example, by adding more CPUs to a node running queries.
  • Provides workload isolation so that query workloads do not interfere with indexing or data on the same node.

Multidimensional scaling allows specific services to both scale up and scale out without sacrificing ease of administration because they are all managed from within the same cluster and configured at run time, and the software installed on each machine is identical.

Multidimensional scaling optionally disables services to dedicate nodes to certain workloads

Buckets and vBuckets

A bucket is a logical collection of related documents in Couchbase, just like a database in RDBMS. It is a unique key space.

Couchbase administrators and developers work with buckets when performing tasks such as accessing and managing data, checking statistics, and issuing N1QL queries. Unlike a table in an RDBMS, a bucket can and typically does contain documents of varying schemas. Buckets are used to segregate the configuration and operational handling of data in terms of cache allocation, indexing and replication. While buckets can play a role in the concept of multitenancy, they are not necessarily the only component

Internally, Couchbase Server uses a mechanism called vBuckets (synonymous to shard or partition) to automatically distribute data across nodes, a process sometimes known as “auto-sharding”. vBuckets help enable data replication, failover, and dynamic cluster reconfiguration. Unlike data buckets, users and applications do not manipulate vBuckets directly. Couchbase Server automatically divides each bucket into a 1024 active vBuckets, 1024 replica vBuckets per replica and then distributes them evenly across the nodes running the Data Service within a cluster. vBuckets do not have a fixed physical location on nodes; therefore there is a mapping of vBuckets to nodes known as the cluster map. Through the Couchbase SDKs the application automatically and transparently distributes the data and workload across these vBuckets.

For more information on vBuckets, see the technical white paper available here.

Database Change Protocol

Database Change Protocol (DCP) is a high-performance streaming protocol that communicates the state of the data using an ordered change log with sequence numbers. It is a generic protocol designed for keeping a consumer of the Data Service up to date.

DCP is robust and resilient in the face of transitory errors. For example, if a stream is interrupted, DCP resumes from exactly the point of its last successful update once connectivity resumes. DCP provides version histories and rollbacks that provide a snapshot of the data. Snapshots can be used to create a brand new replica or catch up an interrupted or partially built node.

Couchbase Server leverages DCP internally and externally for several different purposes. Within Couchbase Server, DCP is used to create replicas of data within and between clusters, maintain indexes (both local and global), backups, etc. DCP also feeds most connectors, which are integrations with external systems such as Hadoop, Kafka or Elasticsearch. Some of these connectors leverage DCP directly or connect to Couchbase Server’s XDCR functionality to keep the external systems in sync by acting like Couchbase Server nodes.

2.  Use cases

Couchbase can be used for almost all usecases that MongoDB can be used for. Since it supports better throughput and latency with concurrent users, it can also be used for high traffic applications like

Personalization, Profile Management, Real-Time BigData, Content Management, Catalogs, Mobile Apps, IOT etc

Check Use Cases for more details. Also check Couchbase Server In Use

3. Limitations

Couchbase Limitations and Query Limitations

4.  Data Types

The data types supported by Couchbase include Boolean, Numbers, Strings, Arrays, Objects, NULL, MISSING, Date, Collation, Binary

For more details, Couchbase DataTypes

5.  Storage/Access costs

Couchbase Server comes in 3 main editions – Open Source Edition, Community Edition and Enterprise Edition.

Open Source Edition (OSE) is free and gives you access to the open source code to build your custom Couchbase Server. Open source project continues
to serve as the foundation for both the community edition and the enterprise edition. OSE is available under the Apache License 2.0. OSE allows endless customizations. However OSE comes with no support or guarantees.

Community Edition (CE) is free as well and gives you a set of binaries to run with your application. If you are building a simple application that does not need more than the basic availability, performance, scale, tooling, security
capabilities and community support through our forums, this is the edition for you. Documentation, mailing lists, and forums provide support for the Couchbase user community to help troubleshoot issues and answer questions. CE is built from the open source and its versions are aligned with the versions of Couchbase Server Enterprise Edition. However,  CE lacks advanced features available in EE

Enterprise Edition (EE) is the latest, most stable, production-ready release of Couchbase Server and is recommended for production environments. EE provides full range of availability, performance, scalability, tooling and security capabilities and includes the latest quality improvements. EE subscribers get hot-fixes for issues. Use of EE in development and test environments is free. However it requires a paid subscription for production use. EE contains some unique features that make it the best fit for large production deployments running in data centers or a public cloud
infrastructure.

For full details, check Couchbase Editions

 

6.  Scalability

 

https://developer.couchbase.com/documentation/server/4.6/architecture/storage-architecture.html

https://blog.couchbase.com/cap-theorem-and-couchbase-server-time-xdcr/

https://blog.couchbase.com/multi-tenancy-couchbase-server/

https://blog.couchbase.com/10-things-developers-should-know-about-couchbase/

Some real world use cases for NoSQL

Google Cloud Datastore vs MongoDB vs Amazon DynamoDB vs Couchbase vs HBase vs Neo4j

https://cloud.google.com/datastore/docs/concepts/overview

https://cloud.google.com/storage-options/

https://www.mongodb.com/compare/mongodb-dynamodb

http://vschart.com/compare/dynamo-db/vs/couchbase

https://db-engines.com/en/system/Cassandra%3BCouchbase%3BGoogle+Cloud+Datastore

https://www.trustradius.com/compare-products/amazon-dynamodb-vs-couchbase-server

http://calculator.s3.amazonaws.com/index.html#s=DYNAMODB

https://aws.amazon.com/dynamodb/pricing/

http://optimalbi.com/blog/2017/03/15/dynamodb-vs-mongodb-battle-of-the-nosql-databases/

Posted in Uncategorized | Leave a comment

CAP theorem for distributed systems explained

Horizontal scaling of software systems has become necessary in recent years, due to the global nature of computing and the ever-increasing performance demands on applications. It is no longer sufficient to run a single server with a single database in a single data center to handle the scalability needs of today. Distributed systems that span multiple data centers and regions have been in place since long at companies like Google since late 1990’s.

Unfortunately, the performance benefits that horizontal scaling provides come at a cost – complexity. Distributed systems introduce many more factors into the performance equation than existed before. Data records vary across clients/nodes in different locations. Single points of failure destroy system up-time, and intermittent network issues creep up at the worst possible time.

In the late 1990’s the CAP theorem was introduced by Eric Brewer to justify the need to explore a wider design space than the traditional RDBMS systems that were only focused on ACID properties. The original definition of the CAP thoerem when it was first formalized in 1999 was

Any networked distributed system can have at most two of three desirable properties:

  • Consistency (C) : equivalent to having a single up-to-date copy of the data
  • High Availability (A) : of that data (for updates) and
  • Tolerance to network partitions (P).

Simply put, the CAP theorem demonstrates that any distributed system cannot guarantee C, A, and P simultaneously, rather, trade-offs must be made at a point-in-time to achieve the level of performance and availability required for a specific task. For example, if consistency and availability (CA) are important, you can’t partition the data. If consistency is not that important, then you can partition the data for high availability (AP). If you have to partition the data and consistency is important (CP), then availability suffers.  In the past decade, a vast range of new systems have emerged, as well as much debate on the relative merits of consistency and availability.The new breed of NoSQL solutions play around with these concepts so you can choose your tolerance for your desired properties.

scalability-cap-theorem1.png

Note: There’s no distributed system that wants to live with “Partioning” – if it does, it’s not distributed. That is why putting SQL in this triangle may lead to confusion.

The 2 of 3 formulation was always misleading because it tended to oversimplify the tensions among properties.  CAP prohibits only a tiny part of the design space: perfect availability and consistency in the presence of partitions, which are rare.

Why 2 of 3 formulation is misleading

The three attributes: Consistency, Availability and Partition Tolerance, are not binary. In practical large scale distributed systems spanning data centers, all of these are in fact continuous variables. Partitions are rare in a distributed system; so there is little reason to forfeit C or A when the system is not partitioned. Second, the choice between C and A can occur many times within the same system at very fine granularity; not only can subsystems make different choices, but the choice can change according to the operation or even the specific data or user involved.

Because partitions are rare, CAP should allow perfect C and A most of the time, but when partitions are present or perceived, a strategy that detects partitions and explicitly accounts for them should be in order. This strategy should have three steps: detect partitions, enter an explicit partition mode that can limit some operations, and initiate a recovery process to restore consistency and compensate for mistakes made during a partition.

It’s really just A vs C

  1. Availability is achieved by replicating the data across different machines
  2. Consistency is achieved by updating several nodes before allowing further reads
  3. Total partitioning, meaning failure of part of the system is rare. However, we could look at a delay, a latency, of the update between nodes, as a temporary partitioning. It will then cause a temporary decision between A and C:
    1. On systems that allow reads before updating all the nodes, we will get high Availability
    2. On systems that lock all the nodes before allowing reads, we will get Consistency
And since this decision is temporary, it exists only for the duration of the delay, that we are really contrasting Availability against Consistency.
References:

http://www.infoq.com/articles/cap-twelve-years-later-how-the-rules-have-changed

https://dzone.com/articles/better-explaining-cap-theorem

 

Posted in Uncategorized | Leave a comment

Understanding OAuth

What is OAuth?

OAuth is an open standard for authorization which is an alternative to the password anti-pattern. OAuth provides client applications a ‘secure delegated access’ to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party client access to their server resources without sharing their credentials. The third-party client can be a Web Application, Mobile App, Browser App, Desktop Client or a trusted Application. To explain this in simple terms, lets say you are a user of Facebook and you try to play a game (third party app), which requires access to some of your information and access to post to wall etc. Before the advent of OAuth, the user would have been required to give his Facebook username and password details to the third party app. OAuth allows to do this without you having to provide your Facebook username and password to the third party app.

In the traditional client-server authentication model, the client requests an access restricted resource (protected resource) on the server by authenticating with the server using the resource owner’s credentials. In order to provide third-party applications access to restricted resources, the resource owner shares its credentials with the third-party. This creates several problems and limitations.

1. Third-party applications are required to store the resource owner’s credentials for future use, typically a password in clear-text.

2. Servers are required to support password authentication, despite the security weaknesses created by passwords.

3. Third-party applications gain overly broad access to the resource owner’s protected resources, leaving resource owners without any ability to restrict duration or access to a limited subset of resources.

4. Resource owners cannot revoke access to an individual third-party without revoking access to all third-parties, and must do so by changing their password.

5. Compromise of any third-party application results in compromise of the end-user’s password and all of the data protected by that password.

OAuth addresses these issues by introducing an authorization layer and separating the role of the client from that of the resource owner.

OAuth History

Large social providers like Facebook, Yahoo!, AOL, Google  each started developing their own alternative to the password anti-pattern. The community decided to standardize the specification and OAuth 1.0 was published as an RFC (RFC 5489). In 2010, an attack on OAuth 1.0 was identified which relied on an attacker initiating the OAuth authorization sequence, and then convincing a victim to finish the sequence  (session fixation attack) – a result of which would be the attacker’s account at an (honest) client being assigned permissions to the victim’s resources at the Service Provider. OAuth1.0 has been officially deprecated on April 20, 2012 due to security flaws .There were other issues like the complexity of the signing process and the support for different client types which made OAuth1.0 a little difficult to use.

Then came a revised version of OAuth 1.0 which has been called OAuth 1.0a which addresses the security issue in OAuth 1.0 but the complexity of the signing requests and the support for different client types remained as is. Some of the major Service Providers on OAuth1.0a include Twitter, Flickr, MySpace, Linkedin (also on OAuth2.0) etc.

In 2010, Microsoft, Yahoo!, and Google created the Web Resource Authentication Protocol (WRAP), which was soon submitted into the IETF WG as input for OAuth 2.0. WRAP proposed significant reworking of the OAuth 1.0a model.

OAuth 2.0 attempted to replace 1.0a with a simpler protocol but its more of a framework. The onus of security falls into the hands of the providers and the client developers who are implementing the specification. There have been a lot of concerns with OAuth 2.0 security Read more. Some of the major Service Providers on OAuth2.0 include Facebook, Google, Amazon, AOL, Box, bitly, Github, Instagram etc.

Development of OAuth 2.0 in the IETF consequently reflects the input of both OAuth 1.0, OAuth 1.0a, and the WRAP proposal. It is fair to say that the very different assumptions about what are appropriate security protections between OAuth 1.0a and WRAP have created tensions within the IETG OAuth WG.

While OAuth 2.0 initially reflected more of the WRAP input, lately (i.e. fall 2010) there has been a swing in group consensus that the signatures of OAuth 1.0a that were deprecated by WRAP are appropriate and desirable in some situations. Consequently, signatures are to be added back as an optional security mechanism.

While many deployments of OAuth 1.0a survive, more and more OAuth 2.0 deployments are appearing – necessarily against a non-final version of the spec. For instance, Facebook, Salesforce, and Microsoft Azure ACS all use draft 10 of OAuth 2.0.

OAuth Terminology

Below is the list of terms often encountered in OAuth 1.0a specification.

Term Explanation
Service Provider A web application that allows access via OAuth.
User An individual who has an account with the Service Provider
Consumer A website or application that uses OAuth to access the Service Provider on behalf of the User
Protected Resource(s) Data controlled by the Service Provider, which the Consumer can access through authentication
Consumer Developer An individual or organization that implements a Consumer
Consumer Key A value used by the Consumer to identify itself to the Service Provider
Consumer Secret A secret used by the Consumer to establish ownership of the Consumer Key
Request Token A value used by the Consumer to obtain authorization from the User and exchanged for an Access Token
Access Token A value used by the Consumer to gain access to the Protected Resources on behalf of the User, instead of using the Users Service Provider credentials
Token Secret A secret used by the Consumer to establish ownership of a given Token
OAuth Protocol Parameters Parameters with names beginning with oauth_ which are passed in the Authorization request header
Request Token URL The Service Provider endpoint that the Consumer uses to get an unauthorized request token
User Authorization URL The URL used by the Service Provider to obtain User authorization for Consumer access
Access Token URL The Service Provider endpoint that the Consumer uses to exchange the authorized Request Token for an Access Token

Below is the list of terms often encountered in OAuth 2.0

Term Explanation
Server A web application that allows access via OAuth.
Resource Owner An individual who has an account with the Server
Client A website or application that uses OAuth to access the Server on behalf of the Resource Owner
Protected Resource(s) Data controlled by the Server, which the Client can access through authentication
Client Developer An individual or organization that implements a Client
Client Id A value used by the Client to identify itself to the Server
Client Secret A secret used by the Client to establish ownership of the Client Id
Access Token A value used by the Client to gain access to the Protected Resources on behalf of the Resource Owner, instead of using the Owners Server credentials
Request Token A value used by the Client to get exchange the Access Token for a new one in case of expiry
Token Type The type of access token issued by the Server
Grant Type A value used by the Client to authorize access to protected resources in various ways with different security credentials

OAuth – High Level Overview

OAuth defines three roles: client, server, and resource owner. These three roles are present in any OAuth transaction (3-legged); in some cases(2-legged) the client might be the resource owner or might act on behalf of the resource owner. We will look at these scenarios in the later sections. The 1.0 specification used a different set of terms for these roles: consumer (client), service provider (server), and user (resource owner).

OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner, or end-user. The client then uses the access token to access the protected resources hosted by the resource server. OAuth is commonly used as a way for web surfers to log into third party clients using their Microsoft, Google, Facebook or Twitter accounts, without worrying about their access credentials being compromised. For example, a twitter app called TweetGif was hacked, releasing user information (does not include passwords) for 10,000 Twitter accounts to the public. However, no Twitter credentials were compromised, because Tweetgif was using OAuth access tokens for authorization and had access to only basic user information.

2-legged and 3-legged OAuth

How OAuth Works?

Lets say you are a user with a Facebook account and you play FB games (lets say Farmville). Farmville posts updates to your Wall so that your friends know your scores. In order to do so, you need to give the thrid party Farmville App access to some of the resources of your account (basic information, access to post to wall), so it can get your info and post to your wall on your behalf. In the old days, you would have to give an app like Farmville your Facebook username and password, so it could log in and access those services. You not only had to trust them to use those credentials wisely, but also to keep them safe from hackers—that’s a pretty big leap of faith. It’s like giving your house keys to a stranger and trusting them not to make copies for all their friends and steal all of your stuff.

OAuth gets around this problem by only giving them access to the stuff you want them to access. Instead of asking you for your password, this happens:

  1. In order to become a Facebook app, Farmville should acquire two tokens from Facebook service: a “Consumer Key” and a “Consumer Secret” that Facebook can use to uniquely identify the Farmville App. These are what create a connection between the consumer (in this case, Farmville) and the service provider (in this case, Facebook).
  2. When user visits the Farmville app (on zynga.com) , he gets an option to play on facebook. When a user clicks on it, he is redirected back to Facebook. If the user isn’t logged in to Facebook, he will be prompted to login (remember, user is giving his username and password to Facebook itself, not to the third party Farmville App).
  3. Facebook then asks you whether you want to authorize this app, and tells you what permissions its giving to the app. Maybe it can view your timeline, or maybe it can view your timeline and post on your behalf. In some cases, you may only be giving it access to your username and avatar. When you click the “Authorize” button, it creates an “Access Token” and an “Access Token Secret”. These are like passwords, but they only allow Farmville to access your account and do the things you’ve allowed it to do.

Step1: User goes to zynga to play farmville

farmville1

Step2: User clicks on Play on Facebook button and is redirected to Facebook login

farmville2farmville3Step3: Facebook asks user for Authorization to allow Farmville app to access some info

farmville4

farmville5

Step4: After user authorizes, he is redirected to the Farmville app

farmville6

Authentication Flow in OAuth1.0a

The authentication in OAuth1.0a involves four steps:

  1. The Consumer obtains an unauthorized Request Token.
  2. The User authorizes the Request Token.
  3. The Consumer exchanges the Request Token for an Access Token.
  4. The Consumer uses the Access Token to access protected resources.

The detailed authentication flow can be seen in the figure below.

oauth1_0aThe request and response in each step are as follows

OAuth 1.0aNote that these request parameters are sent in the Authorization Request Header. For example, the request might have an Authorization header like

Authorization: OAuth realm=”http://sp.example.com/&#8221;,
oauth_consumer_key=”0685bd9184jfhq22″,
oauth_token=”ad180jjd733klru7″,
oauth_signature_method=”HMAC-SHA1″,
oauth_signature=”wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D”,
oauth_timestamp=”137131200″,
oauth_nonce=”4572616e48616d6d65724c61686176″,
oauth_version=”1.0″

The process for generating the signature can be found in detail here.

Differences between OAuth1.0a and OAuth2.0

OAuth for various scenarios

Posted in Uncategorized | Leave a comment

Using Apigee Edge for REST API Management

Benefits of Using Apigee Edge

  • Effectively eliminate unnecessary traffic to your Web Services
  • All OAuth2 support will be handled through apigee, that includes token validation.   All requests that pass to your Services are guaranteed cleansed and legit.
  • Subscription control and enforcement.  Only legitimate clients that have subscribed (and approved) can call your Services.
  • Quota management
  • caching and edge computing.   You can also incorporate a apigee cache (Redis) and key value store (cassandra) to enrich the programmable api proxies by offloading all cross cutting concerns from backend Services.  As a result, the Services remains pure and concentrate on optimizing business semantics.
  • If you have your own Identity Provider, the required data can be injected as header.   Your service will get all essential authorization information right away.  No additional calls to the Identity Provider are needed.
  • Rich analytics.  Available in the form of api and web console custom reports
  • Throttling protection against DDoS attack
  • Customizable behavior : request routing logic, header manipulation/instrumentation, additional validation logic, circuit breaking, ……, etc
  • control over misbehaving clients.  Can stop/terminate a rogue apikey any time.
  • streaming support for large payload handling.   No intermediate buffering incurred.
  • Dynamic deployment, revision control.   Zero downtime incur to deploy new api versions/revisions.
  • IO intensive thread handling.   Separate thread pools to deal with network IO heavy calls to backend services.
  • Autoscaling.  managed by apigee OPS.
Posted in Uncategorized | Leave a comment

Asynchronous Communication in SOA

In this article, we will look at asynchronous communication across services in a Service Oriented Architecture through Message Oriented Middleware (brokered and brokerless) using the Producer-Consumer pattern.

Advantages of Message Oriented Middleware

  1. Asynchronicity/Loose coupling between components: MOM technologies use asynchronous message passing as opposed to request-response architectures. In asynchronous systems, message queues provide temporary storage when the destination program is busy or not connected. In addition, most asynchronous MOM systems provide persistent storage to back up the message queue. This means that the sender and receiver do not need to connect to the network at the same time (asynchronous delivery), and problems with intermittent connectivity are solved. It also means that should the receiver application fail for any reason, the senders can continue unaffected, as the messages they send will simply accumulate in the message queue for later processing when the receiver restarts.
  2. Routing: Many message-oriented middleware implementations depend on a message queue system. Some implementations permit routing logic to be provided by the messaging layer itself, while others depend on client applications to provide routing information or allow for a mix of both paradigms.

Disadvantages of Message Oriented Middleware

The primary disadvantage of many broker based message oriented middleware systems is that they require an extra component in the architecture, the message transfer agent (message broker). As with any system, adding another component can lead to reductions in performance and reliability, and can also make the system as a whole more difficult and expensive to maintain.

Criteria for evaluating Message Queues

  1. Durability: Whether messages can be written to disk, or even committed to aDBMS for reliability in case of server crashes.
  2. Point-Point vs Publish-Subscribe: Whether the message queue provider supports point-point or publish-subscribe mode of delivery
  3. Push vs Pull Model : Whether the consumers have to pull the messages or the broker pushes the messages to the consumers.
  4. Message Acknowledgements: Whether messages acknowledgement modes are supported between consumers, brokers and producers.
  5. Delivery Policies: Whether a message should be delivered at least once, or no more than once.
  6. Purging Policies: Whether Queues or Messages support a “time to live”.
  7. Message Size/Format: What message sizes and types are supported.
  8. Message filtering/routing policies: Whether messages can be filtered so that a consumer can see only messages matching some criteria of interest and messages can be routed to consumers based on defined policies. An example of why routing is necessary can be understood from the scenario described below.

Lets consider an enterprise social application architected using SOA as different services like

ItemService              –    Manages items like documents/events/pages etc.

UploadService         –     Handles item uploads.

FeedService              –   Manages a users feed.

SearchService           –    Handles search in the entire application.

ConversionService   –    Generates intermediary artifacts like thumbnails for image document items.

User uploads a new file using the UploadService which has to trigger the ItemService for creating a new item once the upload is successful. ItemService in turn triggers

FeedService to update the users feed,

SearchService to index the item so that its searchable,

ConversionService so that any intermediary artifacts for the item can be generated.

ConversionService in turn might use other services within the company or outside. ConversionService should also notify FeedService about status of items it submitted for conversion as shown in Figure0.

Routing_Service_Dependency

Figure0: Service Dependencies in an Application

The dependencies can go to many levels depending on the nature of the application and the dependency between services.

  1. Message Ordering: Whether messages are delivered to consumers in a FIFO ordering or not.
  2. Batching Policies: Should messages be delivered immediately, or should the system wait a bit and try to deliver many messages at once.
  3. Scalability: What order of throughput, latency and volume of queues, messages is supported.
  4. Availability: Does the queuing system support high availability in case of failures of servers.
  1. Operational and financial cost: Whether the solution incurs any financial or operational burden
  2. Cross platform interoperability: Whether the solution works across disparate services/systems either of different enterprises/companies or within the enterprise to easily exchange messages between each other regardless of the message broker vendor and platform.
  1. Using Database as a Message Queue

The current architecture used in A360 for communication across services is outlined below in Figure1

When ServiceA has to communicate with ServiceB, it uses ServiceB’s HttpClient which inturn goes through the Custom Load Balancer (which knows information about all the instances of ServiceB through data structures populated from the config DB) to distribute requests to the available instances of ServiceB in a round-robin fashion.

ServiceBHttpClient constructs a Message with all the required inputs for the call to ServiceB and makes a http call through the Custom LB. ServiceB has a MessageHandler component which receives these http calls with the message in the body of the request, delegates the call to the appropriate business component which pushes the message to a queue (which is a table in the database) and returns a response immediately.

Every instance of ServiceB is bundled with a MessageProcessingFramework which has a pool of threads continuously polling (in a certain interval) for messages in the queue by locking the table to ensure a message is picked up only by one instance for processing. Each instance picks up a batch of messages to process and invokes the appropriate business component.

Figure1:  Using DB as MessageQueue in A360

Figure1: Using DB as MessageQueue in A360

Observations:

  1. If new instances are added or an existing instance comes down, the config has to be reloaded.
  2. The load balancing strategy has to be intelligent to distribute requests to instances that are less loaded. A simple round-robin strategy may not be optimal.
  3. A short interval for polling may be hammering the database with constant queries. Alternatively, a long interval may result in processing delays.
  4. The solution does not scale well. Polling adds a significant load to the database even at a medium volume. A database table is typically configured to be fast at adding data, updating data or querying data, but almost never all three on the same table. As the volume scales up, load increases on the database and performance decreases.
  5. Inserting, updating and querying on the database table by a lot of concurrent producers and consumers may result in race conditions.
  6. Does not support a callback mechanism to notify a Producer of any intermediary events when a consumer is either processing or has finished processing a message. For example, ServiceA submits a document to ServiceB which generates multiple intermediate derivatives (thumbnails for images etc) each taking a certain amount of time to be generated. Whenever an artifact is available, ServiceA has to be notified. In such cases a callback mechanism (for ex, invoking a callback url on the producer end) needs to be supported.

Evaluation of Solution

Durability: Messages are saved to the database always.

Point-Point vs Publish-Subscribe:  Supports only Point-Point model.

Delivery Policies: Messages are guaranteed to be delivered as the Message producer gets an acknowledgement. Messages are consumed by only one consumer by using a status field in the table to co-ordinate between multiple consumers.

Message Acknowledgements: Messages are acknowledged as soon as they are stored in the persistent store

Purging Policies: Messages can be retained and can be purged manually or through a database trigger based on some purging policies.

Message Size/Format: Can store binary/text messages of any size limited by the underlying disk space and datatype used for the column. Even though there is no limitation on message size, its not ideal to have messages of very large size.

Message filtering/routing Policies: Message filtering has to be implemented in the MessageHandler component of the consumer service. Does not support routing of message to multiple services implicitly. Each message will have to be sent to every other Service that has to consume the message as shown in Figure2 below.

Routing message to multiple consumers

Figure2: Routing message to multiple consumers

Message Ordering: Consumers pick up messages from the table based on the timestamp of message. This does not result in strict FIFO due to multiple consumers.

Batching Policies: Messages can be processed in batches based on batching policies defined.

Scalability: Not very scalable. Would hit performance issues due to lock contention on database read/write operations and too much disk I/O.

Availability:  Using a dynamic service discovery solution to detect service instances being added or going down, would achieve high availability.

Operational and Financial Cost: Does not incur any financial cost except the initial effort to design the solution; would incur minimal operational burden.

Cross Platform interoperability:

 

  1. Using Amazon SQS as the broker

Amazon Simple Queue Service (SQS) is a fast, reliable, scalable, fully managed message queuing service. Figure3 illustrates how to use SQS to design an asynchronous messaging solution in SOA.

ServiceA pushes messages to ServiceB’s SQS queue. MessageHandler component in each instance of ServiceB polls for messages from the queue (in batches) and process the messages. SQS takes care of locking on concurrent read requests on the queue to ensure the same messages are not returned to multiple consumers.

Figure3: Message Queue Architecture using SQS

Figure3: Message Queue Architecture using SQS

 

Observations

  1. Consumers have to poll the queue in order to receive messages. If messages are available, they are delivered; otherwise consumers must idle for a while, and then poll again increasing the latency of processing messages and also the cost of using SQS as each request to SQS incurs a minor cost which is multiplied when we consider the number of consumers making requests and the number of queues. When there is a chain of services using queues for asynchronous communication, the latency effect is multiplied. SQS supports long polling (with a configurable wait time) to overcome this; but threads polling for messages would remain idle until messages are available.
  2. Consumers are responsible for deleting messages in the queue after processing as the messages are not deleted by SQS once delivered to the consumer. SQS generates a receipt handle for each successful delivery of messages to the consumer. The consumer needs to pass in the receipt handle to delete all the consumed messages from the queue once it finishes processing all the received messages.
  3. There is still a locking overhead when multiple consumers request for receiving/deleting messages but that overhead is handled by SQS.
  4. If a Producer has to be notified of any intermediary events when a consumer is processing the Message, either a callback URL has to be made available in the message or the consumer can send a message to the Producer’s SQS queue as shown in Figure4.
Figure4: Notifying Producer using SQS

Figure4: Notifying Producer using SQS

Evaluation of Solution

Durability: Messages are durable though only for a limited period of time. (default being 4 days and max being 14 days)

Point-Point vs Publish-Subscribe:  Supports only Point-Point model.

Delivery Policies: Messages are guaranteed to be delivered at least once since they are retained in the queue as long as a consumer instructs SQS to delete messages. The possible scenarios for more than once delivery are

  1. Messages not getting deleted from SQS before visibility timeout due to consumer crashing or taking longer than visibility timeout to process the message. The visibility timeout should be set to a value longer than the consumer workers normally take to finish processing, plus a bit extra for padding.
  2. Concurrent consumers polling visible messages simultaneously.
  3. In case of SQS server crashing and coming back up, if the messages that the server going down holds are already processed, they would be deleted from other servers but not from the server that went down resulting in re-delivery once the server comes up again.

As a result of at-least-once delivery, consuming services need to make sure that re-processing of messages does not result in an inconsistent state of the system by handling the messages in an idempotent manner.

Message Acknowledgements: Does not support message acknowledgement; a trade-off that SQS makes for providing high throughput.

Purging Policies: Messages are retained in SQS for a default period of 4 days and a max period of 14 days after which they are automatically deleted from the queue.

Message Size/Format: Can store text messages upto 256kb.

Message filtering/routing Policies: Does not support filtering/routing of messages. Message filtering has to be implemented in the MessageHandler component of the consumer service. Does not support routing of message to multiple consuming services. Each message will have to be sent to the SQS queue for every other Service that has to consume the message as shown in Figure5

Figure5: SQS routing to multiple consumers

Figure5: SQS routing to multiple consumers

Message Ordering: Messages are not guaranteed to be delivered in FIFO order. When a client polls for messages, SQS picks up messages from a random sample subset of servers due to which message ordering cannot be guaranteed.

Batching Policies: Messages can be requested in batches with a max batch size of 10 messages.

Scalability: Highly Scalable in terms of throughput. Latency is in the order of a few to several seconds as the message will have to be replicated before it is available for consumption and message consumption by the consumers is not real time due to polling interval. Supports unlimited volume of queues/messages.

Availability:  Supports High availability in case of server failures through replication of the queue across a cluster of servers. Chooses Availability over Consistency in the event of a network partition.

Operational and Financial Cost: Zero operational cost but every request to SQS would incur some cost (1$ for 1million requests).

 

  1. Using JMS Message Brokers

Specific JMS broker implementations are not evaluated in detail in this section. JMS has evolved over 10 years into a robust and mature specification. We will specifically look at cross platform interoperability with JMS and the underlying issues. There are many JMS brokers like ActiveMQ, HornetQ etc which are quite popular and have been used extensively.

With JMS, it becomes easy to replace any JMS-compliant message broker with another one with minor configuration changes provided the protocol used for communication is supported by both brokers. JMS provides a High level API for Producers and Consumers which abstracts out the underlying wire protocols used for communication between the clients and JMS brokers. Messaging Interoperability between Java Platform and other platforms/languages and between non-Java platforms is proprietary and dependent on broker implementations.

Communication between Producers->Broker and Broker->Consumers can happen through various wire-level protocols. Before the advent of AMQP, there were other protocols like OpenWire, STOMP, XMPP etc. REST over HTTP ( to provide technology agnostic and language neutral web based API to messaging) is another communication option. Different vendors supported different protocols. For example,

ActiveMQ has support for OpenWire, STOMP, XMPP, MQTT, WSIF protocols and REST (over HTTP). After AMQP was standardized, ActiveMQ also added support for AMQP.

HornetQ has support for STOMP, REST and also AMQP after it was standardized.

Apache Apollo has support for STOMP, MQTT, OpenWire, AMQP, WebSockets.

There are many client libraries that support one or more protocols for each language platform. Some of the libraries and the supported protocols is given below. This is not an extensive list and includes only a few libraries and protocols.

  • ActionScript3 (as3-stomp – STOMP, as3-amqp – AMQP)
  • Ajax, JavaScript (XMLHttpRequest –    REST connector over HTTP)
  • WebSockets (Stomple, stomp-websocket – STOMP; Eclipse Paho – XMPP)
  • C (openwire-c – OpenWire; libstomp –   STOMP; rabbitmq-c – AMQP)
  • C++ (Apache CMS – OpenWire, STOMP, AMQP-CPP – AMQP)
  • C# and .Net (Apache NMS –  OpenWire, STOMP, AMQP, MSMQ)
  • Java (ActiveMQ Java Client – OpenWire; Gozirra, Stampy – STOMP; rabbitmq-java – AMQP)
  • Perl (AnyEvent::Stomp, Net::Stomp – STOMP; AnyEvent::RabbitMQ, Net::AMQP – AMQP)
  • PHP (simplisticstompclient, stomp – STOMP; php-amqplib – AMQP)
  • Python (pyactivemq, stompest – STOMP; pika – AMQP)
  • Ruby (activemessaging, onstomp – STOMP; bunny, March Hare – AMQP)
  • Erlang (stomp.erl – STOMP; bunny_farm – AMQP)

With these semantics of communication in JMS, we will look into messaging models before the advent of AMQP.

  1. In the simplest case where the Producer and Consumer are both written in Java, the resulting messaging model would be as shown in the Figure below
Figure5a: JMS Messaging Model with Java Producer and Consumer

Figure5a: JMS Messaging Model with Java Producer and Consumer

In this particular example, the clients are using the activemq client which uses OpenWire protocol to communicate with broker; however we can replace the client with any other client which uses OpenWire protocol for communicating with broker. Also, any JMS broker which supports OpenWire protocol like ActiveMQ or Apollo can be used as the middleware. OpenWire is the default native protocol used by ActiveMQ. In this example, the broker does not have to use a message bridge to transform the message structure and protocol into another format since both clients are talking to the broker through the same protocol.

  1. Now lets consider a case where the Producer is written in Java and the Consumer is written in Python or vice-versa. In this case the messaging model would be as shown in the Figure below.
Figure5b: JMS Cross Platform Messaging Model

Figure5b: JMS Cross Platform Messaging Model

In Model 1, the Java Producer uses the ActiveMQ client to communicate to the broker using OpenWire and the Python Consumer uses the pyactivemq client to talk to the broker in STOMP. The most popular protocol choice for Python is STOMP and there are no clients available that use other protocols for communication. So, in this model, the broker has to provide a message bridge which can transform the protocol and message structure from OpenWire to Stomp. It will not be possible to replace one broker with any other as the new broker should also support the transformations that the existing broker was supporting. Simlarly in Model2, the message bridge should be able to transform from Stomp to OpenWire.

In Model3, there is no transformation required as both the Producer and Consumer clients are using the same protocol to communicate. Although this scenario seems to solve the cross-platform interoperability problem, there are some inherent problems.

  1. Both protocols may not support the same message body types.
  2. Applications would be locked into one specific vendor solution (or in some cases only a few vendor choices) due to the built-in message bridge.
  3. Both protocols may not support same datatypes, custom properties and header properties between the messaging clients.

These scenarios illustrate that JMS isnt’t the right solution for cross-platform interoperability. While vendors such as ActiveMQ, HornetQ, Apollo, SonicMQ etc provide some level of cross platform interoperability, it is usually through proprietary protocils, API’s and client libraries. Without some sort of messaging standard, it is difficult to know what messaging system another company or system is using and how to interface with it.

This is exactly why AMQP was created; to standardize cross-platform messaging  interoperability. Whereas JMS provides a standard messaging API for the Java Platform, AMQP provides a standard messaging protocol across all platforms. AMQP does not provide a specification for an industry standard API. Rather, it provides a specification for an industry standard wire-level binary protocol to describe how the message should be structured and sent across the network.

With AMQP, we can use whatever AMQP-compliant client library we want and any AMQP-compliant broker. As a result, messaging clients using AMQP are completely agnostic to which AMQP client API or AMQP message broker we are using. With AMQP, the messaging model for cross platform messaging can be seen in Figure 5c below.

Figure5c: AMQP Cross Platform Messaging Model

Figure5c: AMQP Cross Platform Messaging Model

The advantages of having an industry-wide messaging standard become very clear when looking at the messaging model with AMQP in Figure5c. In the JMS cross platform messaging model, we have seen why we had to be concerned about protocols and what message broker vendors supported both protocols and transformations between protocols. A messaging system using AMQP as its messaging standard (regardless of the client API or message broker) can send messages back and forth to other messaging systems that use AMQP.

NOTE: References to AMQP in this article are to the 0.9.1 specification. With AMQP 1.0, some of the statements might not hold true as there are some drastic change in the specification.

  1. Using AMQP Message Brokers

AMQP (Advanced Message Queuing Protocol) is a standard binary wire level protocol that enables conforming client applications to communicate with conforming messaging middleware brokers. AMQP allows cross platform services/systems between different enterprises or within the enterprise to easily exchange messages between each other regardless of the message broker vendor and platform. AMQP model has the notion of 3 entities Exchanges, Bindings and Queues

The AMQP Model at a high level works like this: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand. There are various brokers that have implemented the AMQP protocol like RabbitMQ, StormMQ, Apache Qpid, Apache ActiveMQ, Apache Apollo etc. We will specifically evaluate RabbitMQ in this section

ServiceA publishes messages to the exchange which routes the message to ServiceB’s AMQP queue. In the push model, the broker delivers the messages in the queue to the subscribing consumers (consumer subscribes using basic.consume amqp method) in a round-robin fashion as shown in Figure6.

Figure6: AMQP Push Messaging Architecture

Figure6: AMQP Push Messaging Architecture

In the pull model, each consumer fetches messages using amqp basic.get method (requests to basic.get are synchronized by the broker) as shown in Figure7.

Figure7: AMQP Pull Messaging Architecture

Figure7: AMQP Pull Messaging Architecture

The broker maintains information about the state of the queue and messages in Erlang’s distributed database Mnesia.

Observations:

  1. Consumers have to establish a connection to the broker and all communication happens on a channel over amqp. Each connection can be multiplexed between multiple channels.
  2. Consumer can send message to the same exchange to notify producer of status if required as shown in Figure8. In case of an External Service, the external service can notify the producer through a callback url.
Figure8: AMQP Producer Notification Model

Figure8: AMQP Producer Notification Model

Evaluation of Solution

Durability: Durability can be configured at exchange/queue/message level. Messages will be persisted until they are acknowledged (either auto or explicit acknowledgement).

Point-Point vs Publish-Subscribe:  Supports Point-Point model as well as publish-subscribe through different exchange types (direct, fanout, topic etc).

Delivery Policies: Supports different message delivery policies

  1. At-most once Delivery: When using auto-acknowledgement messages are delivered at most once as a message is deleted as soon as it is delivered to the consumer

    2. At-least once Delivery: When using explicit acknowledgement messages may be delivered more than once if the               broker does not receive an acknowledgement from the consumer either due to network failure or consumer/broker               crash. In such scenarios, messages will re re-queued resulting in delivery again with a delivered flag set to true.                   Consumers can also reject or nack messages which will result in re-queuing of messages for re-delivery.

Message Acknowledgements: Supports message acknowledgements between broker->producer as well as consumer->broker.

Purging Policies: Messages are removed from the queue once they are acknowledged. Messages are stored in memory (also on disk for persistent messages) and retaining for long periods of time would exhaust memory resulting in performance issues. Rejected messages and negative acknowledged messages can be requeued or forwarded to dead letter exchange which will route to appropriate queue for handling such messages.

Message Size/Format: Supports binary messages and message size is limited by the amount of memory/disk space on the broker. When a queue gets too large exhausting memory, messages published to the queue will be written to disk. Even though there is no limitation on message size, its not ideal to have messages of very large size.

Message filtering/routing Policies: Supports a variety of filtering/routing techniques through the different types of exchanges (direct, fanout, topic, headers etc). No other broker solutions offer such a wide range of routing capabilities. Figure9 demonstrates a sample model where a message is routed to two different queues from the exchange. This is the simplest of bindings; AMQP supports more complex bindings depending on the requirement of applications.

Figure9: AMQP Multiple Consumer Routing Model

Figure9: AMQP Multiple Consumer Routing Model

Message Ordering: Messages are delivered in FIFO order but message consumption may not be in FIFO order due to multiple consumers; unless each queue is associated with an exclusive consumer.

Batching Policies: This applies for both the push/pull model. Messages can be pushed/fetched in batches by setting a Qos prefetch count either on the channel level or consumer level. This value should be optimized based on the average round trip time (time taken for broker to send a message and receive ack) and the processing time of the message on the consumer so that the consumer can consume messages at the maximum possible rate.

Scalability: Highly Scalable in terms of throughput(million messages/sec in some cases; varies with message size) and latency is in the order of tens or hundreds of millisecs. The number of queues that can be supported would also be in the order of millions in a clustered deployment limited only by the number of concurrent socket connections that the broker can support.

Availability: Supports high availability by replicating entities(Exchanges and Bindings) to all brokers in the cluster to recover from unexpected failures. Queues have to be mirrored to a subset of brokers for making them highly available. Chooses Availability over Consistency in the event of a network partition (AP model)

Operational and Financial Cost: Involves operational and financial cost to manage servers and monitor deployments and health of the system.

  1. Using Kafka as Message Broker

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, with a unique design. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.

ServiceA publishes the message to a topic using the Kafka  producer client library which balances the messages across the available partitions using a Partitioner. The broker to which the producer connects to takes care of sending the message to the broker which is the leader of that partition using the partition owner information in zookeeper. Instances of ServiceB use Kafka’s High-level consumer library (which handles broker leader changes, managing offset info in zookeeper and figuring out partition owner info etc implicitly) as shown in Figure10 to consume messages from partitions in streams; each stream may be mapped to a few partitions depending on how the consumer chooses to create the message streams.

kafka_message_queue_architecture

Figure10: Messaging Model with Kafka

For example, if there are 10 partitions for a topic and 3 consumer instances (C1,C2,C3 started in that order) all belonging to the same Consumer Group, we can have different consumption models that allow read parallelism as below

  1. Each consumer uses a single stream. In this model, when C1 starts all 10 partitions of the topic are mapped to the same stream and C1 starts consuming from that stream. When C2 starts, Kafka rebalances the partitions between the two streams. So, each stream will be assigned to 5 partitions(depending on the rebalance algorithm it might also be 4 vs 6) and each consumer consumes from its stream. Similarly, when C3 starts, the partitions are again rebalanced between the 3 streams. Note that in this model, when consuming from a stream assigned to more than one partition, the order of messages will be jumbled between partitions.
  2. Each consumer uses more than one stream (say C1 uses 3, C2 uses 3 and C3 uses 4). In this model, when C1 starts, all the 10 partitions are assigned to the 3 streams and C1 can consume from the 3 streams concurrently using multiple threads. When C2 starts, the partitions are rebalanced between the 6 streams and similarly when C3 starts, the partitions are rebalanced between the 10 streams. Each consumer can consume concurrently from multiple streams. Note that the number of streams and partitions here are equal. In case the number of streams exceed the partitions, some streams will not get any messages as they will not be assigned any partitions. Consumers can send message to another topic in to notify producer of status if required as shown in Figure11 below.
Figure11: Kafka Producer Notification Model

Figure11: Kafka Producer Notification Model

Evaluation of Solution:

Durability: Messages are always persisted to disk and the retention time can be configured. Note that in order to improve performance, Kafka buffers messages upto a certain limit or a certain time before flushing the page cache to disk. In the event of a broker crash during this period, messages may not be persisted to disk.

Point-Point vs Publish-Subscribe:  Supports Point-Point model (many consumers per group) as well as publish-subscribe (one consumer per group) through a single abstraction Consumer Group.

Delivery Policies: Supports different message delivery policies

  1. a) At-most once Delivery: This can happen when a consumer receives a message and updates its offset (in zookeeper) before processing the message and the consumer crashes before processing the message.
  2. At-least once Delivery: This can happen when a consumer receives a message, processes the message but crashes before updating its offset. As a result the same messages are consumed by a different consumer belonging to the same consumer group.

 Message Acknowledgements: Message acknowledgements between           broker->producer are supported by setting a configuration property;

No acknowledgements between broker->consumer. The guarantee that kafka offers is all messages before a given offset for a partition will have been already consumed.

Purging Policies: Messages are written to log files on disk and the retention period for the log files can be configured using a property on the broker.

Message Size/Format: Supports binary/text messages and message size can be limited by configuring a property on the broker which limits the message size that can be sent by the producer. On the consumer side, there is another property that controls the number of bytes of messages to attempt to fetch in one request to the broker. These two properties have to be in sync so that a producer cannot send a large message which cannot be fetched by consumer. Even though its possible to set the message size to a very high value, its not ideal to use send and receive large messages. Most message broker solutions are not designed specifically to handle large messages.

Message filtering/routing Policies: Does not support filtering/routing policies. Filtering has to be done in the MessageHandler component in the consuming Service. If a message has to be routed to multiple consuming services, the producer has to explicitly send it to the corresponding topics for those services as shown below.

Figure12: Kafka Multiple Consumer Routing Model

Kafka Multiple Consumer Routing Model

Message Ordering: Messages are delivered in FIFO order within the partition; however the overall order across the partitions is not FIFO. If there is only one partition per topic, then Kafka guarantees FIFO order.

Batching Policies:  Messages can be batched by the producer(until a certain threshold or until a certain interval ) while sending messages to the broker through configuration properties. Kafka has an AsyncProducer that offers the capability not only to do the sends on a separate thread, but to batch multiple messages together into a single send. Both characteristics are generally desirable–isolating network I/O from the threads doing computation and reducing the number of network messages into a smaller number of larger sends. Consumers also can fetch messages in batches configured by a property.

Scalability: Highly Scalable in terms of throughput(million writes/sec,  order of hundreds of MB/sec depending on message size) and latency is in the order of tens or hundreds of millisecs. The number of topics that can be supported will be in the order of a few thousands. This is limited due to the fact that the configuration information of brokers, topic-partitions, consumer offsets, partition-owners is stored in zookeeper znodes and zookeeper being a in-memory non-sharded data store, we will eventually exhaust the memory.

The possible limitation on kafka can occur due to the number of open file handles the native OS on the broker supports. For each broker, there will be a open file descriptor per partition for the segment file (since each kafka partition is maintained as a directory with several segment files) and for each socket connection (multiple producers and one consumer for each partition in the broker)

Availability: Supports high availability by replicating partitions to other brokers in the cluster based on a replication factor, rebalancing the partitions to other brokers in case of broker crashes and by maintain ISR’s for each partition. Chooses Availability over Consistency in the event of a network partition (AP model) in the latest stable release; with future releases supporting a configuration for choosing consistency over availability or availability over consistency.

Operational and Financial Cost: Involves operational and financial cost to manage servers and monitor deployments and health of the system.

6.  Using ZeroMQ Library

To be updated.

I have covered only a few message queue solutions in this article. There are a lot of other solutions like

1.  IronMQ

2. Kestrel

3. EagleMQ

4. Beanstalkd

5. RestMQ

6. Resque

7. Sidekiq

8. NSQ

9. Celery etc

Posted in Uncategorized | 8 Comments

Service Discovery in Service Oriented Architectures

In Service Oriented Architectures/Micro Service Architectures, we are seeing an application built as a number of restful services which do one thing great. Since that one thing could be compute /any other resource intense, it makes sense to scale each service independently. From the end user’s perspective he is interested in the end result, which is all of these services working together to deliver a result. So each service would end up calling one or many other services

As a result, In SOA/distributed systems, services need to find each other. for example, a web service might need to find a caching service or another mid-tier component service etc. A Service Discovery system should provide a mechanism for
• Service Registration
• Service Discovery
• Handling Fail over of service instances
• Load balancing across multiple instances of a Service
• Handling issues arising due to unreliable network.

Other aspects to consider when choosing or implementing a service registration/discovery solution
• Integrating service registration and discovery into the application or using a sidekick process
• Application Software stack compatibility with service discovery solution
• Handling Failure/outage of service discovery solution itself

1. Using a DNS

The simplest solution to registration and discovery is to just put all of your backends behind a single DNS name. To address a service, you contact it by DNS name and the request should get to a random backend.
Details of how to use DNS for service registration and discovery can be found in the links below.

On our own infrastructure, we can use dynamic DNS backends like BIND-DLZ for registration. In the cloud, with hosted DNS like Route53, simple API calls suffice. In AWS, if we use round-robin CNAME records, the same records would work from both inside and outside AWS.

Advantages

1.   Easy to implement and has been there for a long time.

Disadvantages

1.   Service Instances have to poll for all changes – there’s no way to push state. A monitoring component has to be implemented which will detect server failures or additions and propagates the state changes to the consumers.

2.   DNS suffers from propagation delays; even after a server failure is detected a de-registration command issued to DNS, there will be at least a few seconds before this information gets to the consumers. Also, due to the various layers of caching in the DNS infrastructure, the exact propagation delay is often non-deterministic.

3.  If a service is identified just by name, there’s no way to determine which boxes get traffic. We will get the equivalent of random routing, with loads chaotically piling up behind some backends while others are left idle.

4.  Most services use a front-end reverse proxy like nginx(to handle more connections, load balancing, ssl offloading, static file caching etc). These proxies cache the dns configuration unless you use some configuration file hack resulting in issues with detecting state. The same is true of HAProxy.

Note: By using a tool like Serf to handle the dynamic node additions/failures, the DNS solution can be made more robust. Serf is a tool for cluster membership, failure detection, and orchestration that is decentralized, fault-tolerant and highly available. Some of the uses of Serf are below

1. Maintain the list of web servers for a load balancer and notify that load balancer whenever a node comes online or goes offline.

2. Detect failed nodes within seconds, notifies the rest of the cluster, and executes handler scripts allowing you to handle these events.

3. Broadcast custom events and queries to a cluster. These can be used to trigger deploys, propagate configuration, etc.

2. Using In-App Service Discovery through a persistent store (database or filesytem) to store the configuration

The In-App solution for service discovery can be seen in Figure1. The configuration of all the services is stored in a config DB which is loaded into custom data structures in memory by all services at start-up time.
This configuration information is used by custom software load balancers (developed in-house) which use round-robin load balancing to distribute load across dependent mid-tier services.
It should be clear that there is a need for figuring out servers added/removed/replaced dynamically and reload the config in all servers to avoid service interruption with this approach. A pinger mechanism needs to be embedded into each application to find out the health of the services and trigger reload whenever a change is detected.
Currently the pinger logic is separate and not part of the application; so a manual triggering of the config reload is necessary.
Figure1 : In-App service discovery and load balancing
config_lb

Advantages
1. Easy to implement.

Disadvantages
1. Things get more complicated as we start deploying more services.
2. With a live system, service locations can change quite frequently due to auto or manual scaling which needs a reload of the config in all servers.
3. Config has to be reloaded on new deployments of services, as well as hosts failing or being added/replaced.
4. Health check of services is separated out from the application. So need to trigger config reload manually

3. Using a distributed consistent datastore (CP model in CAP theorem) – ZOOKEEPER

Apache ZooKeeper is an open source distributed coordination service that’s popular for use cases like dynamic configuration management and distributed locking. It can also be used for service discovery.

Service Registration is implemented with ephemeral nodes under a namespace. Ephemeral nodes only exist while the client is connected so typically a backend service registers itself, after startup, with it’s location information. If it fails or disconnects, the node disappears from the tree.

Service Discovery is implemented by listing and watching the namespace for the service. Clients receive all the currently registered services as well as notifications when a service becomes unavailable or new ones register. Clients also need to handle any load balancing or failover themselves.

The solution to dynamically handle configuration changes can be seen in Figure2. Even though it helps solve the problem, there are a few problems which makes the solution inefficient as show in Figure3.

Figure 2: Dynamic service discovery

config_zookeeper_1

The diagram on the left represents how ZooKeeper is being used for dynamic service discovery; Applications consume service information and dynamic configuration from ZooKeeper connected to it directly. They cache data in memory but otherwise rely on ZooKeeper as the source of truth for the data. ZooKeeper outages directly impact the applications.
The diagram on the right side shows how we can tweak the design to take advantage of the conveniences that ZooKeeper provides while tolerate its unavailability for various reasons. The application is completely isolated from ZooKeeper. Instead, a daemon process running on each machine connects to ZooKeeper, establishes watches on the data it is configured to monitor, and whenever the data changes, downloads it into a file on local disk at a well known location. The application itself only consumes the data from the local file and reloads when it changes. It doesn’t need to care that ZooKeeper is involved in propagating updates. With the new design applications are decoupled from zookeeper and the solution becomes more robust and fault tolerant

Figure 3 : Dynamic service discovery improvement
config_zookeeper

Advantages
1. Can handle dynamic configuration changes.

Disadvantages
1. Can be tricky and complicated to implement the solution correctly.
2. The zookeeper cluster becomes a SPoF. The solution has to handle outages/failures of the zookeeper cluster for various reasons like protocol bugs, network partitions, cluster unavailability due to too many connections from clients etc
3. Recovery might sometimes take a few hours due to a combination of the thundering herd problem and having to restart from a clean slate and recover from backup.
4. Works well if a unified software stack is used across services. Otherwise we might have to implement the solution in different languages
5. Not possible to use this solution if you would like to run apps which you did not write yourself but still have them consume your infrastructure: Nginx, a CI server, rsyslog or other systems software.

4. Using curator zookeeper recipes
The Zookeeper API can be difficult to use properly. We can use existing recipes like the Curator Service Discovery Extension to simplify the solution. We would still have to deal with zookeeper outage under certain circumstances; however with the fallback to persistent datastore approach we will be able to handle any outages to the zookeeper cluster efficiently.

5. Using open source software specifically tailored for service registration and discovery
Airbnb’s SmartStack: is a combination of two custom tools, Nerve and Synapse that leverage haproxy and Zookeeper to handle service registration and discovery. Since SmartStack relies on Zookeeper, the problems that plague zookeeper as mentioned earlier would affect this setup.

Netflix’s Eureka:
Eureka is Netflix’s middle-tier, load balancing and discovery service. The Eureka server is the registry for services. The servers replicate their state to each other through an asynchronous model which means each instance may have a slightly, different picture of all the services at any given time.
Service registration is handled by the client component. Services embed the client in their application code. At runtime, the client registers the service and periodically sends heartbeats to renew it’s leases.
Service discovery is handled by the smart-client as well. It retrieves the current registrations from the server and caches them locally. The client periodically refreshes it’s state and also handles load balancing and failovers.
Eureka was designed to be very resilient during failures. It favors availabilty over strong consistency (AP model) and can operate under a number of different failure modes.
Figure 4 : Using Eureka to handle service discovery and load balancing
config_lb (1)

Advantages
1. Much simpler and less complicated to implement than zookeeper.
2. Load balancing, fault tolerance and service discovery are handled by the eureka client implicitly.

Consul.io

Consul has multiple components, but as a whole, it is a tool for discovering and configuring services in your infrastructure. It provides several key features:

  • Service Discovery: Clients of Consul can provide a service, such as apior mysql, and other clients can use Consul to discover providers of a given service. Using either DNS or HTTP, applications can easily find the services they depend upon.
  • Health Checking: Consul clients can provide any number of health checks, either associated with a given service (“is the webserver returning 200 OK”), or with the local node (“is memory utilization below 90%”). This information can be used by an operator to monitor cluster health, and it is used by the service discovery components to route traffic away from unhealthy hosts.
  • Key/Value Store: Applications can make use of Consul’s hierarchical key/value store for any number of purposes including: dynamic configuration, feature flagging, coordination, leader election, etc. The simple HTTP API makes it easy to use.
  • Multi Datacenter: Consul supports multiple datacenters out of the box. This means users of Consul do not have to worry about building additional layers of abstraction to grow to multiple regions.

Consul is designed to be friendly to both the DevOps community and application developers, making it perfect for modern, elastic infrastructures.

Etcd

Etcd is a highly-available, key-value store for shared configuration and service discovery. Etcd was inspired by Zookeeper and Doozer. It’s written in Go, uses Raft for consensus and has a HTTP+JSON based API.

SkyDNS

SkyDNS is a relatively new project that is written in Go, uses RAFT for consensus and also provides a client API over HTTP and DNS. It has some similarities to Etcd and Spotify’s DNS model and actually uses the same RAFT implementation as Etcd, go-raft.

6. Using a hardware load balancer (Works if we are maintaining our own infrastructure)
A hardware/network load balancer can be used to split network load across multiple servers. The basic principle is that network traffic is sent to a shared IP in many cases called a virtual IP (VIP), or listening IP. This VIP is an address that it attached to the load balancer. Once the load balancer receives a request on this VIP it will need to make a decision on where to send it.
This decision is normally controlled by a “load balancing method/ strategy”, a “Server health check” or, in the case of a next generation device in addition a rule set.
The request is then sent to the appropriate server.
There are a lot of feature rich hardware load balancers like jetNEXUS, F5, NetScaler etc
Advantages
1. SSL acceleration and offload can be delegated to the hardware LB increasing the webserver performance
2. No effort required to develop LB software and maintain
Disadvantages
1. Cost of hardware and support might become a factor.
2. Cannot use a hardware LB if we are deploying our services in the cloud.

Posted in Uncategorized | Leave a comment

Common HTTP Status Codes

HTTP Status Codes
The HTTP status codes can be used to indicate a successful or an error response for API’s

2xx class (Success)
This class of status codes indicates the action requested by the client was received, understood, accepted and processed successfully.

3xx class (Redirection)
This class of status code indicates the client must take additional action to complete the request. Many of these status codes are used in URL redirection.

4xx class (Client error)
The 4xx class of status code is intended for cases in which the client seems to have errored. Except when responding to a HEAD request, the server should include an entity containing an explanation of the error situation, and whether it is a temporary or permanent condition. These status codes are applicable to any request method.

5xx class (Server error)
The server failed to fulfil an apparently valid request.
Response status codes beginning with the digit “5” indicate cases in which the server is aware that it has encountered an error or is otherwise incapable of performing the request. Except when responding to a HEAD request, the server should include an entity containing an explanation of the error situation, and indicate whether it is a temporary or permanent condition.

Code Message Description
200  OK Response to a successful GET, PUT, PATCH or DELETE. Can also be used for a POST that doesn’t result in a creation.
201 Created Response to a POST that results in a creation. Should be combined with a Location header pointing to the location of the new resource
204  No Content Response to a successful request that won’t be returning a body (like a DELETE request)
304  Not Modified Used when HTTP caching headers are in play
400  Bad Request The request is malformed, such as if the body does not parse. Any application related errors can be reported here with proper error code and description.
401  Unauthorized When no or invalid authentication details are provided. Also useful to trigger an auth popup if the API is used from a browser
403  Forbidden When authentication succeeded but authenticated user doesn’t have access to the resource
404 Not Found  When a non existent resource is requested
405  Method Not Allowed When an HTTP method is being requested that isn’t allowed for the authenticated user
409 Conflict Indicates that the request could not be processed because of conflict in the request, such as an edit conflict in the case of multiple updates.
410  Gone Indicates that the resource at this end point is no longer available. Useful as a blanket response for old API versions
413 Request Entity Too Large The request is larger than the server is willing or able to process.
414 Request URI too long The URI provided was too long for the server to process. Often the result of too much data being encoded as a query-string of a GET request, in which case it should be converted to a POST request.
415 Unsupported Media Type If incorrect content type was provided as part of the request
422 Unprocessable Entity Used for validation errors
429 Too Many Requests When a request is rejected due to rate limiting
500 Internal Server Error A generic error message, given when an unexpected condition was encountered and no more specific message is suitable.
501 Not Implemented The server either does not recognize the request method, or it lacks the ability to fulfil the request. Usually this implies future availability (e.g., a new feature of a web-service API)
502 Bad Gateway The server was acting as a gateway or proxy and received an invalid response from the upstream server.
503 Service Unavailable The server is currently unavailable (because it is overloaded or down for maintenance). Generally, this is a temporary state.
504 Gateway Timeout The server was acting as a gateway or proxy and did not receive a timely response from the upstream server.
Posted in Uncategorized | 2 Comments

REST API Best practices

Understanding best practices for designing RESTful API’s

The concept of REST is to separate the API structure into logical resources. These resources are manipulated using HTTP requests where the method (GET, POST, PUT, PATCH, DELETE) has specific meaning.

Why should one conform to REST standards?

First of all, REST is an architectural style, and one of its principles is to leverage on the standardized behavior of the protocol underlying it, so if you want to implement a RESTful API over HTTP, you have to follow HTTP strictly for it to be RESTful. You’re free to not do so if you think it’s not adequate for your needs, nobody will curse you for that, but then you’re not doing REST. You’ll have to document where and how you deviate from the standard, creating a strong coupling between client and server implementations, and the whole point of using REST is precisely to avoid that and focus on your media types. By conforming to the REST standard, everyone using your API who knows how the different HTTP methods work and you don’t have to document or explain what the methods should do for a given resource.

Keep in mind that REST is an architectural style focused on long term evolution of your API. To do it right will add more work now, but will make changes easier and less traumatic later. That doesn’t mean REST is adequate for everything and everyone. If your focus is the ease of implementation and short term usage, just use the methods as you want. You can do everything through POST if you don’t want to bother about clients choosing the right methods.

HTTP Method Properties

Safe Methods
Request methods are considered “safe” if their defined semantics are essentially read-only; i.e., the client does not request, and does not expect, any state change on the origin server as a result of applying a safe method to a target resource.

This definition of safe methods does not prevent an implementation from including behavior that is potentially harmful, that is not entirely read-only, or that causes side effects while invoking a safe method. For example, a safe request initiated by selecting an advertisement on the Web will often have the side effect of charging an advertising account.

Idempotent Methods
A request method is considered “idempotent” if the intended effect on the server of multiple identical requests with that method is the same as the effect for a single such request.  Of the request methods defined by this specification, PUT, DELETE, and safe request methods are idempotent.Like the definition of safe, the idempotent property only applies to what has been requested by the user; a server is free to log each request separately, retain a revision control history, or implement other non-idempotent side effects for each idempotent request.Idempotent methods are distinguished because the request can be repeated automatically if a communication failure occurs before the client is able to read the server’s response.  For example, if a client sends a PUT request and the underlying connection is closed before any response is received, then the client can establish a new connection and retry the idempotent request.  It knows that repeating the request will have the same intended effect, even if the original request succeeded, though the response might differ.

Cacheable Methods
Request methods can be defined as “cacheable” to indicate that responses to them are allowed to be stored for future reuse;  In general, safe methods that do not depend on a current or authoritative response are defined as cacheable.

HTTP METHODS

Method Description Safe Idempotent Cacheable
GET Transfer a current representation of the target resource Yes Yes Yes
HEAD Same as GET, but only transfer the status line and header section Yes Yes Yes
POST Perform resource-specific processing on the request payload No No No (Can use cache-control headers)
PUT Replace all current representations of the target resource with the request payload No Yes No (Can use cache-control headers)
DELETE Delete all current representations of the target resource No Yes No
OPTIONS Describe the communication options for the target resource Yes Yes No
PATCH Apply a set of changes described in the    request payload to the resource identified by the Request-URI No No No (Can use cache-control headers)

1. Use nouns and not verbs for resources

What can I make a resource? Resources should be nouns (not verbs) that make sense from the perspective of the API consumer. Although your internal models may map neatly to resources, it isn’t necessarily a one-to-one mapping. The key here is to not leak irrelevant implementation details out to your API consumer.

Once you have your resources defined, you need to identify what actions apply to them and how those would map to your API. RESTful principles provide strategies to handle CRUD actions using HTTP methods mapped as follows:

Resource POST GET PUT DELETE
/groups Create a new group Returns list of groups Bulk update of groups if allowed. (405 Method not allowed otherwise) Delete all groups if allowed. (405 Method not allowed otherwise)
/groups/{group id} Method not allowed (405) Returns the group with the given group id Updates the group with {group id} Deletes the group with the given group id

2. GET and Query Parameters should not alter state

Use PUT, POST and DELETE methods  instead of the GET method to alter the state.
Do not use GET for state changes:

GET /users/711?activate or
GET /users/711/activate

3. Use PLURAL nouns

Do not mix up singular and plural nouns. Keep it simple and use only plural nouns for all resources.

/users instead of /user
/groups instead of /group

4. Use sub resources for relations

If a relation can only exist within another resource, use sub resources to represent the relation

GET /groups/{group id}/members Returns a list of members in group with given id
GET /groups/{group id}/members/{member id} Returns user with given user id for given group id

5. Handling actions that dont fall under CRUD category

  • Use PATCH to apply the set of changes described in the patch document (request payload). For example an activate/deactivate group action could be handled via a PATCH to the resource as follows
    PATCH  /groups/{group id}
    {
    “action”:”activate|deactivate”
    }
  • Treat it like a sub-resource with RESTful principles. For example, the activate/deactivate group action can be handled via a PUT to the sub resource as follows
    PUT  /groups/{group id}/status
    {
    “status”:”active”|”inactive”
    }
  • Sometimes you really have no way to map the action to a sensible RESTful structure. For example, a multi-resource search doesn’t really make sense to be applied to a specific resource’s endpoint. In this case, /search would make the most sense even though it isn’t a resource. This is OK – just do what’s right from the perspective of the API consumer and make sure it’s documented clearly to avoid confusion.

6.  Use HTTP headers for specifying serialization formats

Both, client and server, need to know which format is used for the communication. The format has to be specified in the HTTP-Header.

Content-Type defines the request format.
Accept defines a list of acceptable response formats.

7.  Result filtering, sorting and searching

Complex result filters, sorting requirements and advanced searching (when restricted to a single type of resource) can all be easily implemented as query parameters on top of the base URL.

Filtering
Use a unique query parameter for each field that implements filtering.
GET /groups?status=activeReturns a list of active groups

Sorting:
Similar to filtering, a generic parameter sort can be used to describe sorting rules. Accommodate complex sorting requirements by letting the sort parameter take in list of comma separated fields, each with a possible unary negative to imply descending sort order. For example,
GET /groups?sort=status,-nameReturns list of groups in ascending order of status; Within the same status, groups returned will be sorted by name in descending order

Searching:
Sometimes basic filters aren’t enough and you need the power of full text search. When full text search is used as a mechanism of retrieving resource instances for a specific type of resource, it can be exposed on the API as a query parameter on the resource’s endpoint. Let’s say q. Search queries should be passed straight to the search engine (elastic search) and API output should be in the same format as a normal list result.
GET /groups/?status=active&sort=-name&q=test –  Return list of all active groups sorted by name (desc) which contain ‘test’ in their names

8.  Aliases for common queries

To make the API experience more pleasant for the average consumer, consider packaging up sets of conditions into easily accessible RESTful paths. For example, when  querying for mostactive,recommended groups etc, we can have endpoints like

GET /groups/mostactive         –    Returns list of mostactive groups
Default values can be used for the sort parameters.

  • Field selection

The API consumer doesn’t always need the full representation of a resource (mobile clients for example). The ability select and chose returned fields goes a long way in letting the API consumer minimize network traffic and speed up their own usage of the API. Use a fields query parameter that takes a comma separated list of fields to include.
GET /groups/?fields=id,name,owner,status&status=active&sort=-name

9.  Pagination

The right way to include pagination details today is using the Link header introduced by RFC 5988. An API that uses the Link header can return a set of ready-made links so the API consumer doesn’t have to construct links themselves.

GET /groups?offset=20&limit=20

The response should include pagination information through links as below

{
“start”: 1,
“count”: 20,
“totalCount”: 100,
“totalPages”: 5,
“links”: [
{
“href”: “https: //<url>/offset=40&limit=20”,
“rel”: “next”
},
{
“href”: “https: //<url>/offset=0&limit=20”,
“rel”: “previous”
}
]
}

10.  Authentication

Using HTTP basic authentication
The most simple way to deal with authentication is to use HTTP basic authentication. We use a special HTTP header where we add ‘username:password’ encoded in base64. It is very easy to retrieve the username and password from a basic authentication. This authentication scheme should not be used on plain HTTP, but only through SSL/TLS.

Using HMAC
One of the downsides of basic authentication is that we need to send over the password on every request. Also, it does not safeguard against tampering of headers or body by an eavesdropper. Here, we just concatenate the HTTP verb and the actual URL. We could add other information as well, like the current timestamp, a random number, or the md5 of the message body in order to prevent tampering of the body, or prevent replay attacks. Next, we generate a hmac and pass it in the request header. The server than validates the digest from the request, headers and payload using the client secret to ensure that the client making the request is genuine

OAuth
One of the downsides of hmac is that there are no more passwords, but just plain secrets. When a user’s secret gets out, everyone could use that secret to access the account of that user. A common way to prevent this is to use temporary tokens. Client “asks” the server for a “token” and “secret”, and with these token and secret, it is allowed to access protected resources of a user.

OAuth is a mechanism that allows you to create temporary tokens. It is a common used scheme for authentication and authorization.

11.  Caching

HTTP provides a built-in caching framework! All you have to do is include some additional outbound response headers and do a little validation when you receive some inbound request headers. However, its important to note that the caching behavior across browsers may vary when using the cache control headers. There are two ways of handling caching using conditional GET requests

Time-based Approach (Last-Modified, If-UnModified-Since)
A time-based conditional request ensures that only if the requested resource has changed since the browser’s copy was cached will the contents be transferred. If the cached copy is the most up-to-date then the server returns the 304 response code.

Content-based Approach (ETag, If-Match)
A content based conditional request is similar to a time based request except that the value is a digest of the resources contents (for instance, an MD5 hash). This allows the server to identify if the cached contents of the resource are different to the most recent version.

12.  Response Handling (Success and errors)

Successful Response
The response codes returned for API’s should be standard http codes with additional information in the response headers or payload wherever necessary.

Error Response
An API should provide a useful error message in a known consumable format in case of errors in API’s. The representation of an error should be no different than the representation of any resource, just with its own set of fields. A JSON error body should provide a few things for the developer – a useful error message, a unique error code (that can be looked up for more details in the docs) and possibly a detailed description. JSON output representation for something like this would look like:

{
“code” : 1234,
“message” : “Something bad happened :(“,
“description” : “More details about the error here”
}

Both successful and error response can be handled using the HTTP Status codes.

Here’s a list of Common Http Status Codes

How to use HTTP status codes for successful/error response in APIs?

Let us consider a resource ‘group’ which has properties id, name, description and status; time keeping fields like create_time and update_time are also available. A group is created in active state by default. The name and description cannot be empty, have a limit on the number of characters and also have a restriction on allowed characters.

1. Creating a new group
POST /api/v1/groups
{
“name”:”Test Group”,
“description”:”Test Description”
}
Response Code

Http Status Code Error Code Description
201 Successfully created group. Include a Location header in response to locate newly created resource
Location:/api/v1/groups/1
422 ID-GP-001 The name is empty.
422 ID-GP-002 The description is empty.
422 ID-GP-003 Invalid characters in name.
422 ID-GP-004 Invalid characters in description.
422 ID-GP-005 Name too long.
422 ID-GP-006 Description too long.
415 ID-GE-001 Unsupported Media Type.
500 ID-GE-002 Application encountered unexpected error

Successful Response Payload

{
“id”: “1”,
“name”: “Test Group”,
“description”: “Test Description”,
“status”: “active”,
“created_time”: “2014-04-18 14:12:47”,
“updated_time”: “2014-04-30 14:12:47”
}

Note: The response of a POST request is not cached by default. Though it is possible to control the cache behavior with cache control response headers, the behavior of various browsers to these headers in case of POST, PUT methods is different; so its a better practice to let the client cache the resource with a GET call using the URI in the Location header.

2. Updating an existing group
PUT /api/v1/groups/1
{
“name”:”Test Updated Group”,
“description”:”Test Updated Description”
}

Response Code

Http Status Code Error Code Description
200 or 204 Successfully updated group. 200 or 204 are valid status codes. Ensure the status codes are used uniformly across API’s
422 ID-GP-001 The name is empty.
422 ID-GP-002 The description is empty.
422 ID-GP-003 Invalid characters in name.
422 ID-GP-004 Invalid characters in description.
422 ID-GP-005 Name too long.
422 ID-GP-006 Description too long.
404 ID-GP-007 Group with specified id not found in system.
415 ID-GE-001 Unsupported Media Type.
500 ID-GE-002 Application encountered unexpected error

Note: To get the updated resource, do a GET on the URI

3. Deleting an existing group
DELETE /api/v1/groups/1

Response Code

Http Status Code Error Code Description
200 or 204 Successfully updated group. 200 or 204 are valid status codes. Ensure the status codes are used uniformly across API’s
404 ID-GP-007 Group with specified id not found in system.
415 ID-GE-001 Unsupported Media Type.
500 ID-GE-002 Application encountered unexpected error

13.  Pretty print response(dont minify) and support compression

An API that provides white-space compressed output isn’t very fun to look at from a browser. Although some sort of query parameter (like ?pretty=true) could be provided to enable pretty printing, an API that pretty prints by default is much more approachable. The cost of the extra data transfer is negligible, especially when you compare to the cost of not implementing gzip.

Consider some use cases: What if an API consumer is debugging and has their code print out data it received from the API – It will be readable by default. Or if the consumer grabbed the URL their code was generating and hit it directly from the browser – it will be readable by default. These are small things. Small things that make an API pleasant to use!

14.  Auto loading related resource representations

There are many cases where an API consumer needs to load data related to (or referenced) from the resource being requested. Rather than requiring the consumer to hit the API repeatedly for this information, there would be a significant efficiency gain from allowing related data to be returned and loaded alongside the original resource on demand. We can use an embed (or expand) parameter to include the related resource details in the response

GET /groups/{group id}/?embed=members

Why do we need ’embed’ when we have ‘fields’ query parameter?

Let’s say we are fetching all the groups in the system with an option to fetch members selectively (we need only a few selected fields in groups and members).
On the API front, we need to fetch all the groups first using a query(1) and for each group we need to fire another query to fetch all the member data(N) resulting in a total of N+1 queries. To optimize this we need to use ‘Get and Stitch’ approach where we fire one query to get all the group information and another query to get all the member information for all groups fetched using IN clause. Once the results are fetched, we try to associate the members to the group in memory. In order to support this optimization, we would need to differentiate between fetching properties of a resource and fetching related resources of a resource; thats where embed comes in

Response
{
“name”: “Test Group”,
“owner”: “201206”,
“members”: [
{
“memberid”: “2017709”,
“displayName”: “Roger”,
“firstName”: “Roger”,
“lastName”: “Federer”,
“links”: {
“link”: [
{
“rel”: “self”,
“href”: “https://<url>/profile?accountId=2017709&#8221;,
“title”: “Profile details”
},
{
“rel”: “permalink”,
“href”: “https://<url>/profile/2017709&#8221;,
“title”: “Profile page”
}
]
}
}
]
}

15.  Rate limiting

To prevent abuse, it is standard practice to add some sort of rate limiting to an API. RFC 6585 introduced a HTTP status code 429 Too Many Requests to accommodate this.

However, it can be very useful to notify the consumer of their limits before they actually hit it. This is an area that currently lacks standards but has a number of popular conventions using HTTP response headers.

At a minimum, include the following headers

  • X-Rate-Limit-Limit- The number of allowed requests in the current period
  • X-Rate-Limit-Remaining – The number of remaining requests in the current period
  • X-Rate-Limit-Reset – The number of seconds left in the current period

16.  API Versioning

The most common ways of versioning API’s are

1. Having the version information in the URL
This is very easy to implement. When an API is changed, the URL to access the API should include the new version.
When implementing versioning using URL, we need to ensure all HATEOAS links return the right version information

2. Using custom headers
We can use the same URL as before but add a header such as “api-version: 2”
The HTTP spec gives us a means of requesting the resource we’d like by way of the accept header

3. Versioning media types using accept header
Modify the accept header to specify the version, for example “Accept: application/vnd.groups.v2+json”
Clients accessing the API have to carefully construct the request and configure the accept header appropriately

URL Versioning is used by a lot of popular API’s like twitter, Github etc though it is not without its issues

17.  API Documentation

When exposing API’s to consumers it is very important to document all API’s. Documentation should at a minimum include a description of the API, response and error codes, sample success response and error response. There are a lot of open source tools like Swagger, Enunciate,miredot etc available which enable client and documentation systems to update at the same pace as the server. The documentation of methods, parameters and models are tightly integrated into the server code, allowing APIs to always stay in sync making it easier to deploy and manage APIs

Posted in Uncategorized | 8 Comments