How to write a Kinto plugin?

Kinto plugins allow to add extra-features to Kinto. Most notably:

  • Respond to internal events (e.g. notify third-party)
  • Add endpoints for custom URLs (e.g. new hook URL)
  • Add custom endpoint renderers (e.g. XML instead of JSON)

Kinto plugins are Python modules loaded on startup.

In this tutorial, we will build a plugin for ElasticSearch, a full-text search engine. The plugin will:

  • Initialize an indexer on startup;
  • Index the records when they’re created, updated, or deleted.
  • Add a new /{collection}/search endpoint;

Plugins are built using the Pyramid ecosystem.

Run ElasticSearch

We will run a local install of ElasticSearch on localhost:9200.

Using Docker it is pretty straightforward:

sudo docker run -p 9200:9200 elasticsearch

It is also be installed manually using the official instructions.

Include me

First, create a Python package and install it locally. For example:

$ pip install cookiecutter
$ cookiecutter gh:kragniz/cookiecutter-pypackage-minimal

[...]

$ cd kinto_elasticsearch
$ python setup.py develop

In order to be included, a package must define an includeme(config) function.

For example, in kinto_elasticsearch/__init__.py:

def includeme(config):
    print("I am the ElasticSearch plugin!")

Add it the config.ini file:

kinto.includes = kinto_elasticsearch

Our message should now appear on kinto start.

Simple indexer

Let’s define a simple indexer class in kinto_elasticsearch/indexer.py. It can search and index records, using the official Python package:

$ pip install elasticsearch

It is a wrapper basically, and the code is kept simple for the simplicity of this tutorial:

import elasticsearch

class Indexer(object):
    def __init__(self, hosts):
        self.client = elasticsearch.Elasticsearch(hosts)

    def search(self, bucket_id, collection_id, query, **kwargs):
        indexname = '%s-%s' % (bucket_id, collection_id)
        return self.client.search(index=indexname,
                                  doc_type=indexname,
                                  body=query,
                                  **kwargs)

    def index_record(self, bucket_id, collection_id, record, id_field='id'):
        indexname = '%s-%s' % (bucket_id, collection_id)
        if not self.client.indices.exists(index=indexname):
            self.client.indices.create(index=indexname)

        record_id = record[id_field]
        index = self.client.index(index=indexname,
                                  doc_type=indexname,
                                  id=record_id,
                                  body=record,
                                  refresh=True)
        return index

    def unindex_record(self, bucket_id, collection_id, record, id_field='id'):
        indexname = '%s-%s' % (bucket_id, collection_id)
        record_id = record[id_field]
        result = self.client.delete(index=indexname,
                                    doc_type=indexname,
                                    id=record_id,
                                    refresh=True)
        return result

And a simple method to load from configuration:

from pyramid.settings import aslist

def load_from_config(config):
    settings = config.get_settings()
    hosts = aslist(settings.get('elasticsearch.hosts', 'localhost:9200'))
    indexer = Indexer(hosts=hosts)
    return indexer

Initialize on startup

We now need to initialize the indexer when Kinto starts. It happens in the includeme() function.

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

Add a search view

Add an endpoint definition in kinto_elasticsearch/views.py:

from cliquet import Service, logger

search = Service(name="search",
                 path='/buckets/{bucket_id}/collections/{collection_id}/search',
                 description="Search")

@search.post()
def get_search(request):
    bucket_id = request.matchdict['bucket_id']
    collection_id = request.matchdict['collection_id']

    query = request.body

    # Access indexer from views using registry.
    indexer = request.registry.indexer
    try:
        results = indexer.search(bucket_id, collection_id, query)
    except Exception as e:
        logger.exception(e)
        results = {}
    return results

Enable the view:

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

This new URL should now be accessible, but return no result:

$ http POST "http://localhost:8888/v1/buckets/example/collections/notes/search
HTTP/1.1 200 OK
Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff
Content-Length: 2
Content-Type: application/json; charset=UTF-8
Date: Wed, 20 Jan 2016 12:01:50 GMT
Server: waitress

{}

Index records on change

When records change, we index them. When they are deleted, we unindex them.

Let’s define a function on_resource_changed() that will be called when an action is performed on records.

def on_resource_changed(event):
    indexer = event.request.registry.indexer

    resource_name = event.payload['resource_name']

    if resource_name != "record":
        return

    bucket_id = event.payload['bucket_id']
    collection_id = event.payload['collection_id']

    action = event.payload['action']
    for change in events.impacted_records:
        if action == 'delete':
            indexer.unindex_record(bucket_id,
                                   collection_id,
                                   record=change['old'])
        else:
            indexer.index_record(bucket_id,
                                 collection_id,
                                 record=change['new'])

And then we bind this function with the Cliquet events (the toolkit used by Kinto):

from cliquet.events import ResourceChanged

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

    # Plug the callback with resource events.
    config.add_subscriber(on_resource_changed, ResourceChanged)

Test it altogether

We’re almost done! Now, let’s check if it works properly.

Create a bucket and collection:

$ http --auth token:alice-token --verbose PUT http://localhost:8888/v1/buckets/example
$ http --auth token:alice-token --verbose PUT http://localhost:8888/v1/buckets/example/collections/notes

Add a new record:

$ echo '{"data": {"note": "kinto"}}' | http --auth token:alice-token --verbose POST http://localhost:8888/v1/buckets/example/collections/notes/records

It should now be possible to search for it:

$ http --auth token:alice-token --verbose POST http://localhost:8888/v1/buckets/default/collections/assets/search
HTTP/1.1 200 OK
Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff
Content-Length: 333
Content-Type: application/json; charset=UTF-8
Date: Wed, 20 Jan 2016 12:02:05 GMT
Server: waitress

{
    "_shards": {
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "hits": [
            {
                "_id": "453ff779-e967-4b08-99b9-5c16af865a67",
                "_index": "example-assets",
                "_score": 1.0,
                "_source": {
                    "id": "453ff779-e967-4b08-99b9-5c16af865a67",
                    "last_modified": 1453291301729,
                    "note": "kinto"
                },
                "_type": "example-assets"
            }
        ],
        "max_score": 1.0,
        "total": 1
    },
    "timed_out": false,
    "took": 20
}

Going further

This plugins implements the basic functionnality. In order to make it a first-class plugin, it would require:

  • Check that user has read permission on the collection before searching
  • Create the index when the collection is created
  • Create a mapping if the collection has a JSON schema
  • Delete the index when the bucket or collection are deleted

If you feel like doing it, we would be very glad to help you!