Twittee

[operational] Lead generation using Twitter

08 Sep 2022

Build started end '21, as part of a client engagement.

Goal: use Twitter data of events promoted to generate new leads.

High-level process:

  • daily Python script to search for events promoted on Twitter with a link
  • script also identifies provider behind the event based on registration link (direct or via landing page) by scraping data
  • data added to a Grist database
  • used to generate a daily email campaign with Mailingee !projects/mailingee

Results

No tracking used in my email system (b/c emails get caught more in spam), so no "open" or "click" details to share.

answer rate

Answer rate was 16% - many "not interested" but also many "contract ongoing for next year or two", with intel as to when to reconnect in most cases.

The widely accepted average across all cold emails is approximately 1% to 5%.

conversion to meetings

Email campaign from Twittee's data led to a 3.4% rate of conversion to meeting.

TODO: document.
02 Mar 2023 re-ignite this project & check if it's still operational with Twitter API changes.

👷🏼‍♂️ WORK IN PROGRESS 👷🏼‍♂️

Context

Stack

  • Twitter API
  • Grist

Script

Functions:

  • search_specific_handle
  • search_handle_recent
  • generic_search
  • check_loc
  • check_handle

Performs a search based on keyword.

def generic_search(daylimit=7,keywords=[],blacklist=[],whitelist=[],limit=100,continue_from_cache=False):
    #Just in case anything bad happens :D
    DEBUG_MODE = False

    #Final return of generic_search
    final_data = []


    # If you already have the response and want to continue from where you've left off
    #Loads already processed tweets that might have been lost because of some error
    if os.path.exists('twittee_processed.json'):
        if continue_from_cache:
            final_data.extend(json.loads('['+open('twittee_processed.json','r').read().replace('}{','},{')+']'))
            print('Successfully restored {} tweets from cache!'.format(len(final_data)))
            time.sleep(1)
        else:
            answer = input("Noticed the file 'twittee_processed.json' Would you like to continue_from_cache instead? (Y/N)\n Old cache file(s) will be deleted if you type 'N'")
            if answer.startswith('y') or answer.startswith('Y'):
                return generic_search(daylimit,keywords,blacklist,whitelist,limit,continue_from_cache=True)
            elif answer.startswith('n') or answer.startswith('N'):
                if os.path.exists('twittee_cached.json'): os.remove('twittee_cached.json')
                if os.path.exists('twittee_processed.json'): os.remove('twittee_processed.json')
                if os.path.exists('processed_cache_file.txt'): os.remove('processed_cache_file.txt')
    else:

        if continue_from_cache:
            print("Parameter continue_from_cache = True but couldn't find file 'twittee_processed.json' please check ")
            return


    # If continue_from_cache = False, run generic_search normally
    if not continue_from_cache:

        #Keyword list error
        if type(keywords) != list:
            return "Function's keyword argument must be a list."

        if limit != 0:
            limit -= 100

        returned = 0
        # Endpoint to use
        search_url = "https://api.twitter/2/tweets/search/recent" 



        # Header to send bearer token to twitter API
        headers = {"Authorization": "Bearer {}".format(bearer_token)}


        # Create a string keyword query with the provided *args to function.
        query = ""

        # Cool keyword function?
        add_after = []
        for keyword in keywords:
            if ' ' in keyword:
                add_after.append(keyword)
            else:
                last_keywords = []
                last_keywords.append(keyword.capitalize())
                last_keywords.append(keyword)
                last_keywords.append(keyword.lower())
                temp_keys = []
                for kw in last_keywords:
                    if kw not in temp_keys:
                        temp_keys.append(kw)

                keywords = temp_keys

                for keyword in keywords:
                    if keyword == keywords[0]: query += '('
                    if keyword != keywords[-1]:
                        query += keyword+" OR "
                    else:
                        query += keyword+') '
        query.strip()
        if len(add_after) > 0:
            for kw in add_after:
                query += '"'+kw+'" '


        query = query.strip()
        # --------------------------------------------------------------------


        # Crate a date/time object that is acceptable by twitter API to use start_time
        now = datetime.datetime.now()
        d = datetime.timedelta(days = daylimit)
        a = now - d
        a = str(a).replace(" ","T")
        match = re.findall(pattern="\.\d+",string=a)[0]
        a = a.replace(match,"") + "Z"
        # --------------------------------------------------------------------------- #


        # Query for the twitter API request
        query_params = {'query': '{} has:links -is:retweet'.format(query),
                        'max_results': '{}'.format(100),
                        'expansions': 'author_id',
                        'tweet.fields': 'text,entities,created_at,id,lang',
                        'user.fields': 'id,name,username,url,entities',
                        'start_time': f'{a}'
                        }
        # ---------------------------------

        if DEBUG_MODE:
            print(query_params)
            input()


        # Send the request using search_url, parameters and headers set above.
        response = requests.request("GET", search_url,params=query_params, headers = headers)

        endpoint_response = response.json()
        # --------------------------------------------------------------------
        if DEBUG_MODE:
            pprint(endpoint_response)
            with open('yaman.txt','w') as fh:
                json.dump(endpoint_response,fh)
            input()

        #Error if connection could not be established with the endpoint.
        if response.status_code != 200:
            print("\n\nConnection to endpoint was NOT sucessfull..\n")
            raise Exception(response.status_code, response.text)
        # --------------------------------------------------------------




        # This will be the final list of tweet datas
        tweet_data = []
        # *-*-*-*-*-*-*-*-*-*-*-*-*-*-*-**-*-**-*-*-



        # Check the length of the response tweets
        if len(endpoint_response) == 0:

            print("No data found with given keywords...")
            return None

        # Proceed if not 0 :)
        else:


            if 'meta' in endpoint_response:
                if DEBUG_MODE:
                    print('\nFound META!\n')
                if 'next_token' in endpoint_response['meta']:
                    if DEBUG_MODE:
                        print('\nFound next_token!\n')
                    next_token = endpoint_response['meta']['next_token']
                    token_found = True
                else:
                    token_found = False
            else:
                token_found = False

            if DEBUG_MODE:
                print('Endpoint data length: ',len(endpoint_response['data']))

            for i in range(0,len(endpoint_response['data'])):

                # Gather all info in this dictionary
                data_of_tweet = {}



                # Get text, entities, created_at, urls and author_id for data_of_tweet from endpoint_response
                data_of_tweet["created_at"] = (endpoint_response["data"][i]["created_at"])
                data_of_tweet["text"] = endpoint_response["data"][i]["text"]
                data_of_tweet["entities"] = {}
                data_of_tweet["entities"]["urls"] = []
                data_of_tweet["author_id"] = endpoint_response["data"][i]["author_id"]
                # ---------------------------------------------------------------------------


                # Filter url's based on list_blacklist and hardcode-remove some urls
                for url in endpoint_response["data"][i]["entities"]["urls"]:

                    if not url["expanded_url"].startswith("https://twitter") and 'facebook.com' not in url and 'linkedin.com' not in url:
                        domain = url["expanded_url"].split("//")[1].split("/")[0]
                        if domain not in list_blacklist:
                            data_of_tweet["entities"]["urls"].append(url["expanded_url"])
                # ------------------------------------------------------------------


                users = endpoint_response["includes"]["users"]

                #Match user in users with the author of the tweet to gather author info

                for user in users:


                    if data_of_tweet["author_id"] == user["id"]:

                        #Try to gather author_website_url of the author, if not available, append []
                        try:
                            data_of_tweet['author_website_url'] = user['entities']['url']['urls'][0]['expanded_url']

                        except:
                            data_of_tweet['author_website_url'] = None


                        # ---------------------------------------------------------------------------

                        # Set name, username, user_url from endpoint_response to data_of_tweet
                        data_of_tweet["name"] = user["name"]
                        data_of_tweet["username"] = user["username"]
                        data_of_tweet["user_url"] = "https://twitter/" + user["username"]

                # --------------------------------------------------------------------------

                # Set tweet_id and tweet_url
                data_of_tweet["tweet_id"] = endpoint_response["data"][i]["id"]
                data_of_tweet["tweet_url"] = "https://twitter/"+data_of_tweet["username"]+"/status/"+data_of_tweet["tweet_id"]
                # ---------------------------------------------------------------------------------------------------------------

                # Getting the tweet language from endpoint_response
                data_of_tweet["lang"] = endpoint_response["data"][i]["lang"]
                # ---------------------------------------------------------

                #Only append this tweet to tweet_data if it has any url.
                if len(data_of_tweet["entities"]["urls"]) > 0:
                    tweet_data.append(data_of_tweet)
                    returned += 1
                # ------------------------------------------------------




        print('Got {} tweet from first request'.format(returned))

        #Check if more pages are available
        while token_found:
            time.sleep(2.2)
            if limit != -100:
                if returned >= limit:
                        token_found = False
                        break

            found_this_step = 0

            # Query for the twitter API request
            query_params = {
                        'query': '{} has:links -is:retweet'.format(query),
                        'max_results': '{}'.format(100),
                        'expansions': 'author_id',
                        'tweet.fields': 'text,entities,created_at,id,lang',
                        'user.fields': 'id,name,username,url,entities',
                        'start_time': f'{a}',
                        'next_token': next_token
                        }
            # ---------------------------------


            # Send the request using search_url, parameters and headers set above.
            response = requests.request("GET", search_url,params=query_params, headers = headers)

            endpoint_response = response.json()
            # --------------------------------------------------------------------


            #Error if connection could not be established with the endpoint.
            if response.status_code != 200:
                print("\n\nConnection to endpoint was NOT sucessfull..\n")
                raise Exception(response.status_code, response.text)
            # --------------------------------------------------------------





            # Proceed if not 0 :)



            i = 0
            for i in range(0,len(endpoint_response['data'])):

                    data_of_tweet = {}

                    # Get text, entities, created_at, urls and author_id for data_of_tweet from endpoint_response
                    data_of_tweet["created_at"] = (endpoint_response["data"][i]["created_at"])
                    data_of_tweet["text"] = endpoint_response["data"][i]["text"]
                    data_of_tweet["entities"] = {}
                    data_of_tweet["entities"]["urls"] = []
                    data_of_tweet["author_id"] = endpoint_response["data"][i]["author_id"]
                    # ---------------------------------------------------------------------------



                    # Filter url's based on list_blacklist and hardcode-remove some urls
                    for url in endpoint_response["data"][i]["entities"]["urls"]:

                        if not url["expanded_url"].startswith("https://twitter") and url != 'facebook.com' and url != 'linkedin.com':
                            domain = url["expanded_url"].split("//")[1].split("/")[0]
                            if domain not in list_blacklist:
                                data_of_tweet["entities"]["urls"].append(url["expanded_url"])
                    # ------------------------------------------------------------------




                    users = endpoint_response["includes"]["users"]

                    #Match user in users with the author of the tweet to gather author info

                    for user in users:


                        if data_of_tweet["author_id"] == user["id"]:

                            #Try to gather author_website_url of the author, if not available, append []
                            try:
                                data_of_tweet['author_website_url'] = user['entities']['url']['urls'][0]['expanded_url']

                            except:
                                data_of_tweet['author_website_url'] = None


                            # ---------------------------------------------------------------------------

                            # Set name, username, user_url from endpoint_response to data_of_tweet
                            data_of_tweet["name"] = user["name"]
                            data_of_tweet["username"] = user["username"]
                            data_of_tweet["user_url"] = "https://twitter/" + user["username"]

                    # --------------------------------------------------------------------------

                    # Set tweet_id and tweet_url
                    data_of_tweet["tweet_id"] = endpoint_response["data"][i]["id"]
                    data_of_tweet["tweet_url"] = "https://twitter/"+data_of_tweet["username"]+"/status/"+data_of_tweet["tweet_id"]
                    # ---------------------------------------------------------------------------------------------------------------

                    # Getting the tweet language from endpoint_response
                    data_of_tweet["lang"] = endpoint_response["data"][i]["lang"]
                    # ---------------------------------------------------------

                    #Only append this tweet to tweet_data if it has any url.
                    if len(data_of_tweet["entities"]["urls"]) > 0:
                        tweet_data.append(data_of_tweet)
                        found_this_step += 1
                    # ------------------------------------------------------

            if 'meta' in endpoint_response:
                            if DEBUG_MODE:
                                print('\nFound META!\n')
                            if 'next_token' in endpoint_response['meta']:
                                if DEBUG_MODE:
                                    print('\nFound next_token!\n')
                                next_token = endpoint_response['meta']['next_token']
                                token_found = True
                            else:
                                token_found= False
            else:
                        token_found = False

            returned += found_this_step
            if DEBUG_MODE:
                print('Processing ->', end=' ')
                print(next_token,' New tweet data length :: ', len(tweet_data))
            else:
                print('Current data length : ', len(tweet_data))

        #Save the response just in case you want to continue from where you've left off
        cached_file = open('twittee_cached.json','w')
        json.dump(tweet_data,cached_file)
        cached_file.close()

    # Shortened_url to actual_url transformer:

    #First, restore the already processed tweets
    if continue_from_cache:
        try:
            cached_file = open('twittee_cached.json','r')
            tweet_data = json.load(cached_file)
            print('Successfully returned response of length: {} from cache.'.format(len(tweet_data)))
            cached_file.close()
        except:
            print("Couldn't load twittee_cached.json, make sure file exists or continue_from_cache == False ")
            return
        processed_cache_file = open('processed_cache_file.txt','r')
        already_processed_temp = processed_cache_file.readlines()
        already_processed = [id.strip() for id in already_processed_temp]
        processed_cache_file.close()


    #Continue processing tweet data list normally
    index = 1
    num_of_tw = len(tweet_data)

    converted_urls = {}

    for tweet in tweet_data[:]:
                # But skip if tweet is already processed
                if continue_from_cache:
                    if tweet['tweet_id'] in already_processed:
                        num_of_tw -= 1
                        continue

                print(f"Tweet: {index}/{num_of_tw}")
                index2 = 0
                for url in tweet["entities"]["urls"][:]:
                        index2 += 1
                        print(f"Url #{index2}",end=" ")
                        # final_urls list of the specific tweet
                        final_urls = []


                        # Try to convert shortened_url to actual_url
                        try:

                            shortened_url = url
                            print("Gathering actual url from shortened url...")
                            req = http.request('GET', url, fields=payload)

                            actual_url = req.geturl()
                            #r = requests.get(shortened_url, timeout=15, headers={"User-Agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"})
                            #actual_url = r.url

                            print(shortened_url[0:25]+"..."," >>> " ,actual_url[0:25]+"...","\n")

                            final_urls.append(actual_url)

                            if DEBUG_MODE:
                                if 'zoom' in actual_url:
                                    with open('yaman.txt','a') as df: df.write(actual_url+'\n')
                        #-------------------------------------------

                        # If cant convert, append the normal url 
                        except:

                            print("Couldn't convert:", url, " skipping..\n")
                            final_urls.append(url)

                        # ---------------------------------------

                # Blacklist and whitelist implementation
                for link in final_urls[:]:

                    skip_url_w = False
                    skip_url_b = False

                    for item in blacklist:
                        if item in url:
                            skip_url_b = True
                            break

                    for item in whitelist:
                        if item not in url:
                            skip_url_w = True
                            break

                    if skip_url_b or skip_url_w:
                        final_urls.remove(link)
                # -------------------------------------- #


                tweet["entities"]["urls"] = final_urls
                index += 1

                for url in tweet["entities"]["urls"][:]:
                        if "youtube.com" in url:
                            tweet["entities"]["urls"].remove(url)

                if len(tweet["entities"]["urls"]) != 0:
                    print('Writing tweet to cache...')
                    write_data_file = open('twittee_processed.json','a')
                    json.dump(tweet,write_data_file)
                    final_data.append(tweet)
                processed_cache_file = open('processed_cache_file.txt','a')
                processed_cache_file.write(tweet['tweet_id']+'\n')
                processed_cache_file.close()


                # Final lookthrough of tweets before returning the final_data


    return final_data
        # End of generic_search()

Daily use

12 Apr 2022

Need to expand on the write-up here.
I have run the scripts/process for a few days now and it's working pretty well.
Generates 100s of new potential leads every day.

twittee/230412-1245-twittee-run.jpg

Twitter API

Clipee Tweet

Some of the logic used for this initial project is being reused for Clipee Tweet !projects/clipee - tweet from clipboard.

links

social