08 Sep 2022
Build started end '21, as part of a client engagement.
Goal: use Twitter data of events promoted to generate new leads.
High-level process:
- daily Python script to search for events promoted on Twitter with a link
- script also identifies provider behind the event based on registration link (direct or via landing page) by scraping data
- data added to a Grist database
- used to generate a daily email campaign with Mailingee Mailingee
Results
No tracking used in my email system (b/c emails get caught more in spam), so no "open" or "click" details to share.
answer rate
Answer rate was 16% - many "not interested" but also many "contract ongoing for next year or two", with intel as to when to reconnect in most cases.
The widely accepted average across all cold emails is approximately 1% to 5%.
conversion to meetings
Email campaign from Twittee's data led to a 3.4% rate of conversion to meeting.
TODO: document.
02 Mar 2023 re-ignite this project & check if it's still operational with Twitter API changes.
Context
Stack
- Twitter API
- Grist
Script
Functions:
search_specific_handle
search_handle_recent
generic_search
check_loc
check_handle
Generic search
Performs a search based on keyword.
def generic_search(daylimit=7,keywords=[],blacklist=[],whitelist=[],limit=100,continue_from_cache=False):
#Just in case anything bad happens :D
DEBUG_MODE = False
#Final return of generic_search
final_data = []
# If you already have the response and want to continue from where you've left off
#Loads already processed tweets that might have been lost because of some error
if os.path.exists('twittee_processed.json'):
if continue_from_cache:
final_data.extend(json.loads('['+open('twittee_processed.json','r').read().replace('}{','},{')+']'))
print('Successfully restored {} tweets from cache!'.format(len(final_data)))
time.sleep(1)
else:
answer = input("Noticed the file 'twittee_processed.json' Would you like to continue_from_cache instead? (Y/N)\n Old cache file(s) will be deleted if you type 'N'")
if answer.startswith('y') or answer.startswith('Y'):
return generic_search(daylimit,keywords,blacklist,whitelist,limit,continue_from_cache=True)
elif answer.startswith('n') or answer.startswith('N'):
if os.path.exists('twittee_cached.json'): os.remove('twittee_cached.json')
if os.path.exists('twittee_processed.json'): os.remove('twittee_processed.json')
if os.path.exists('processed_cache_file.txt'): os.remove('processed_cache_file.txt')
else:
if continue_from_cache:
print("Parameter continue_from_cache = True but couldn't find file 'twittee_processed.json' please check ")
return
# If continue_from_cache = False, run generic_search normally
if not continue_from_cache:
#Keyword list error
if type(keywords) != list:
return "Function's keyword argument must be a list."
if limit != 0:
limit -= 100
returned = 0
# Endpoint to use
search_url = "https://api.twitter/2/tweets/search/recent"
# Header to send bearer token to twitter API
headers = {"Authorization": "Bearer {}".format(bearer_token)}
# Create a string keyword query with the provided *args to function.
query = ""
# Cool keyword function?
add_after = []
for keyword in keywords:
if ' ' in keyword:
add_after.append(keyword)
else:
last_keywords = []
last_keywords.append(keyword.capitalize())
last_keywords.append(keyword)
last_keywords.append(keyword.lower())
temp_keys = []
for kw in last_keywords:
if kw not in temp_keys:
temp_keys.append(kw)
keywords = temp_keys
for keyword in keywords:
if keyword == keywords[0]: query += '('
if keyword != keywords[-1]:
query += keyword+" OR "
else:
query += keyword+') '
query.strip()
if len(add_after) > 0:
for kw in add_after:
query += '"'+kw+'" '
query = query.strip()
# --------------------------------------------------------------------
# Crate a date/time object that is acceptable by twitter API to use start_time
now = datetime.datetime.now()
d = datetime.timedelta(days = daylimit)
a = now - d
a = str(a).replace(" ","T")
match = re.findall(pattern="\.\d+",string=a)[0]
a = a.replace(match,"") + "Z"
# --------------------------------------------------------------------------- #
# Query for the twitter API request
query_params = {'query': '{} has:links -is:retweet'.format(query),
'max_results': '{}'.format(100),
'expansions': 'author_id',
'tweet.fields': 'text,entities,created_at,id,lang',
'user.fields': 'id,name,username,url,entities',
'start_time': f'{a}'
}
# ---------------------------------
if DEBUG_MODE:
print(query_params)
input()
# Send the request using search_url, parameters and headers set above.
response = requests.request("GET", search_url,params=query_params, headers = headers)
endpoint_response = response.json()
# --------------------------------------------------------------------
if DEBUG_MODE:
pprint(endpoint_response)
with open('yaman.txt','w') as fh:
json.dump(endpoint_response,fh)
input()
#Error if connection could not be established with the endpoint.
if response.status_code != 200:
print("\n\nConnection to endpoint was NOT sucessfull..\n")
raise Exception(response.status_code, response.text)
# --------------------------------------------------------------
# This will be the final list of tweet datas
tweet_data = []
# *-*-*-*-*-*-*-*-*-*-*-*-*-*-*-**-*-**-*-*-
# Check the length of the response tweets
if len(endpoint_response) == 0:
print("No data found with given keywords...")
return None
# Proceed if not 0 :)
else:
if 'meta' in endpoint_response:
if DEBUG_MODE:
print('\nFound META!\n')
if 'next_token' in endpoint_response['meta']:
if DEBUG_MODE:
print('\nFound next_token!\n')
next_token = endpoint_response['meta']['next_token']
token_found = True
else:
token_found = False
else:
token_found = False
if DEBUG_MODE:
print('Endpoint data length: ',len(endpoint_response['data']))
for i in range(0,len(endpoint_response['data'])):
# Gather all info in this dictionary
data_of_tweet = {}
# Get text, entities, created_at, urls and author_id for data_of_tweet from endpoint_response
data_of_tweet["created_at"] = (endpoint_response["data"][i]["created_at"])
data_of_tweet["text"] = endpoint_response["data"][i]["text"]
data_of_tweet["entities"] = {}
data_of_tweet["entities"]["urls"] = []
data_of_tweet["author_id"] = endpoint_response["data"][i]["author_id"]
# ---------------------------------------------------------------------------
# Filter url's based on list_blacklist and hardcode-remove some urls
for url in endpoint_response["data"][i]["entities"]["urls"]:
if not url["expanded_url"].startswith("https://twitter") and 'facebook.com' not in url and 'linkedin.com' not in url:
domain = url["expanded_url"].split("//")[1].split("/")[0]
if domain not in list_blacklist:
data_of_tweet["entities"]["urls"].append(url["expanded_url"])
# ------------------------------------------------------------------
users = endpoint_response["includes"]["users"]
#Match user in users with the author of the tweet to gather author info
for user in users:
if data_of_tweet["author_id"] == user["id"]:
#Try to gather author_website_url of the author, if not available, append []
try:
data_of_tweet['author_website_url'] = user['entities']['url']['urls'][0]['expanded_url']
except:
data_of_tweet['author_website_url'] = None
# ---------------------------------------------------------------------------
# Set name, username, user_url from endpoint_response to data_of_tweet
data_of_tweet["name"] = user["name"]
data_of_tweet["username"] = user["username"]
data_of_tweet["user_url"] = "https://twitter/" + user["username"]
# --------------------------------------------------------------------------
# Set tweet_id and tweet_url
data_of_tweet["tweet_id"] = endpoint_response["data"][i]["id"]
data_of_tweet["tweet_url"] = "https://twitter/"+data_of_tweet["username"]+"/status/"+data_of_tweet["tweet_id"]
# ---------------------------------------------------------------------------------------------------------------
# Getting the tweet language from endpoint_response
data_of_tweet["lang"] = endpoint_response["data"][i]["lang"]
# ---------------------------------------------------------
#Only append this tweet to tweet_data if it has any url.
if len(data_of_tweet["entities"]["urls"]) > 0:
tweet_data.append(data_of_tweet)
returned += 1
# ------------------------------------------------------
print('Got {} tweet from first request'.format(returned))
#Check if more pages are available
while token_found:
time.sleep(2.2)
if limit != -100:
if returned >= limit:
token_found = False
break
found_this_step = 0
# Query for the twitter API request
query_params = {
'query': '{} has:links -is:retweet'.format(query),
'max_results': '{}'.format(100),
'expansions': 'author_id',
'tweet.fields': 'text,entities,created_at,id,lang',
'user.fields': 'id,name,username,url,entities',
'start_time': f'{a}',
'next_token': next_token
}
# ---------------------------------
# Send the request using search_url, parameters and headers set above.
response = requests.request("GET", search_url,params=query_params, headers = headers)
endpoint_response = response.json()
# --------------------------------------------------------------------
#Error if connection could not be established with the endpoint.
if response.status_code != 200:
print("\n\nConnection to endpoint was NOT sucessfull..\n")
raise Exception(response.status_code, response.text)
# --------------------------------------------------------------
# Proceed if not 0 :)
i = 0
for i in range(0,len(endpoint_response['data'])):
data_of_tweet = {}
# Get text, entities, created_at, urls and author_id for data_of_tweet from endpoint_response
data_of_tweet["created_at"] = (endpoint_response["data"][i]["created_at"])
data_of_tweet["text"] = endpoint_response["data"][i]["text"]
data_of_tweet["entities"] = {}
data_of_tweet["entities"]["urls"] = []
data_of_tweet["author_id"] = endpoint_response["data"][i]["author_id"]
# ---------------------------------------------------------------------------
# Filter url's based on list_blacklist and hardcode-remove some urls
for url in endpoint_response["data"][i]["entities"]["urls"]:
if not url["expanded_url"].startswith("https://twitter") and url != 'facebook.com' and url != 'linkedin.com':
domain = url["expanded_url"].split("//")[1].split("/")[0]
if domain not in list_blacklist:
data_of_tweet["entities"]["urls"].append(url["expanded_url"])
# ------------------------------------------------------------------
users = endpoint_response["includes"]["users"]
#Match user in users with the author of the tweet to gather author info
for user in users:
if data_of_tweet["author_id"] == user["id"]:
#Try to gather author_website_url of the author, if not available, append []
try:
data_of_tweet['author_website_url'] = user['entities']['url']['urls'][0]['expanded_url']
except:
data_of_tweet['author_website_url'] = None
# ---------------------------------------------------------------------------
# Set name, username, user_url from endpoint_response to data_of_tweet
data_of_tweet["name"] = user["name"]
data_of_tweet["username"] = user["username"]
data_of_tweet["user_url"] = "https://twitter/" + user["username"]
# --------------------------------------------------------------------------
# Set tweet_id and tweet_url
data_of_tweet["tweet_id"] = endpoint_response["data"][i]["id"]
data_of_tweet["tweet_url"] = "https://twitter/"+data_of_tweet["username"]+"/status/"+data_of_tweet["tweet_id"]
# ---------------------------------------------------------------------------------------------------------------
# Getting the tweet language from endpoint_response
data_of_tweet["lang"] = endpoint_response["data"][i]["lang"]
# ---------------------------------------------------------
#Only append this tweet to tweet_data if it has any url.
if len(data_of_tweet["entities"]["urls"]) > 0:
tweet_data.append(data_of_tweet)
found_this_step += 1
# ------------------------------------------------------
if 'meta' in endpoint_response:
if DEBUG_MODE:
print('\nFound META!\n')
if 'next_token' in endpoint_response['meta']:
if DEBUG_MODE:
print('\nFound next_token!\n')
next_token = endpoint_response['meta']['next_token']
token_found = True
else:
token_found= False
else:
token_found = False
returned += found_this_step
if DEBUG_MODE:
print('Processing ->', end=' ')
print(next_token,' New tweet data length :: ', len(tweet_data))
else:
print('Current data length : ', len(tweet_data))
#Save the response just in case you want to continue from where you've left off
cached_file = open('twittee_cached.json','w')
json.dump(tweet_data,cached_file)
cached_file.close()
# Shortened_url to actual_url transformer:
#First, restore the already processed tweets
if continue_from_cache:
try:
cached_file = open('twittee_cached.json','r')
tweet_data = json.load(cached_file)
print('Successfully returned response of length: {} from cache.'.format(len(tweet_data)))
cached_file.close()
except:
print("Couldn't load twittee_cached.json, make sure file exists or continue_from_cache == False ")
return
processed_cache_file = open('processed_cache_file.txt','r')
already_processed_temp = processed_cache_file.readlines()
already_processed = [id.strip() for id in already_processed_temp]
processed_cache_file.close()
#Continue processing tweet data list normally
index = 1
num_of_tw = len(tweet_data)
converted_urls = {}
for tweet in tweet_data[:]:
# But skip if tweet is already processed
if continue_from_cache:
if tweet['tweet_id'] in already_processed:
num_of_tw -= 1
continue
print(f"Tweet: {index}/{num_of_tw}")
index2 = 0
for url in tweet["entities"]["urls"][:]:
index2 += 1
print(f"Url #{index2}",end=" ")
# final_urls list of the specific tweet
final_urls = []
# Try to convert shortened_url to actual_url
try:
shortened_url = url
print("Gathering actual url from shortened url...")
req = http.request('GET', url, fields=payload)
actual_url = req.geturl()
#r = requests.get(shortened_url, timeout=15, headers={"User-Agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"})
#actual_url = r.url
print(shortened_url[0:25]+"..."," >>> " ,actual_url[0:25]+"...","\n")
final_urls.append(actual_url)
if DEBUG_MODE:
if 'zoom' in actual_url:
with open('yaman.txt','a') as df: df.write(actual_url+'\n')
#-------------------------------------------
# If cant convert, append the normal url
except:
print("Couldn't convert:", url, " skipping..\n")
final_urls.append(url)
# ---------------------------------------
# Blacklist and whitelist implementation
for link in final_urls[:]:
skip_url_w = False
skip_url_b = False
for item in blacklist:
if item in url:
skip_url_b = True
break
for item in whitelist:
if item not in url:
skip_url_w = True
break
if skip_url_b or skip_url_w:
final_urls.remove(link)
# -------------------------------------- #
tweet["entities"]["urls"] = final_urls
index += 1
for url in tweet["entities"]["urls"][:]:
if "youtube.com" in url:
tweet["entities"]["urls"].remove(url)
if len(tweet["entities"]["urls"]) != 0:
print('Writing tweet to cache...')
write_data_file = open('twittee_processed.json','a')
json.dump(tweet,write_data_file)
final_data.append(tweet)
processed_cache_file = open('processed_cache_file.txt','a')
processed_cache_file.write(tweet['tweet_id']+'\n')
processed_cache_file.close()
# Final lookthrough of tweets before returning the final_data
return final_data
# End of generic_search()
Daily use
12 Apr 2022
Need to expand on the write-up here.
I have run the scripts/process for a few days now and it's working pretty well.
Generates 100s of new potential leads every day.
Twitter API
Clipee Tweet
Some of the logic used for this initial project is being reused for Clipee Tweet Clipee - tweet from clipboard.