Connecting AIOHTTP with Spotify

Continuing our post about building JSON REST APIs with Python and AOIHTTP, we will extend it’s capabilities to comunicate the system with Spotify servers to list some artists and tracks.


AIOHTTP is a Python Library to built HTTP Clients and Servers using a single-threaded architecture using Python asyncio.

  • Supports HTTP Client and Server
  • Supports Websocket Client and Servers
  • Single threaded
  • Middleware architecture

Python asyncio

Is a Python +3.5 package to handle concurrency with a single-threaded architecture.

“The library for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives.”



Spotify is a super popular service to listen to music on your mobile device or computer.

Spotify Technology S.A. is a Swedish media-services provider founded in 2006. The company’s primary business is its audio streaming platform that provides DRM-protected music and podcasts from record labels and media companies.


Show me the code


  • Python 3.7+
  • Git
  • pip3

1. Creating the Project Folder and Files

mkdir shopify_api
cd shopify_api

2. Create a requirements File with this content:



3. In your project Folder Create the virtualenv

 python3 -m venv .

4. Activate the virtual Env (Unix Bash)

source bin/activate

5. Install AIOHTTP Framework and dependencies:

bin/pip3 install -r requirements.txt

6. AOIHTTP Server App

from aiohttp import web
import json
import spotipy

spotify = spotipy.Spotify()

async def handle_spotify_search(request):
    results ='artist:' + request.query['name'], type='artist')
    return web.Response(text=json.dumps(results))

async def handle_spotify_albums_search(request):
    artist_uri = f"spotify:artist:{request.query['artist_id']}"
    results = spotify.artist_albums(artist_uri, album_type='album')
    albums = results['items']
    while results['next']:
        results =
    return web.Response(text=json.dumps(albums))

def handle_get(request):
    text = 'Hello, World!'
    return web.Response(text=text)

async def handle_post(request):
        # Success path where name is set
        response_obj = {'status': 'success'}
        # Process our new user
        response_obj['body'] = request.query['name']
        # return a success json response with status code 200 i.e. 'OK'
        return web.Response(text=json.dumps(response_obj), status=200)
    except Exception as e:
        # Failed path where name is not set
        response_obj = {'status': 'failed', 'reason': str(e)}
        # return failed with a status code of 500 i.e. 'Server Error'
        return web.Response(text=json.dumps(response_obj), status=500)

app = web.Application()

routes = [
    web.get('/', handle_get),'/add-user', handle_post),'/spotify-search', handle_spotify_search),'/spotify-album-search', handle_spotify_albums_search)



7. Run the app

chmod a+x
Share This:

Get The Best Of All Hands Delivered To Your Inbox

Subscribe to our newsletter and stay updated.